import sys, signal, time, redis, pika, json, os from random import randint from haversine import haversine def main(): if len(sys.argv) != 2: print('Region not set, exiting...') sys.exit(1) global timer if sys.argv[1] == 'at_linz': timer = randint(1, 2) elif sys.argv[1] == 'at_vienna': timer = randint(3, 5) elif sys.argv[1] == 'de_berlin': timer = randint(8, 11) else: print('Region not known, exiting...') sys.exit(1) queue = 'dst.' + sys.argv[1] credentials = pika.PlainCredentials('dst', 'dst') parameters = pika.ConnectionParameters('192.168.99.99', '5672', '/', credentials=credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.queue_declare(queue=queue) channel.exchange_declare(exchange='dst.workers', exchange_type='topic', durable=False) # Do not give more than one message to one worker channel.basic_qos(prefetch_count=1) channel.basic_consume(on_message_callback=on_message, queue=queue) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() def on_message(ch, method_frame, header_frame, body): print(' [x] Received %r' % body.decode()) tic = time.perf_counter_ns() request = json.loads(body) print('Request: %s' % request) pickup_lon = float(request['pickup']['longitude']) pickup_lat = float(request['pickup']['latitude']) drivers_locations = r.hgetall('drivers:' + sys.argv[1]) closest_driver = '' # Loop over drivers, get closest to pickup for key, value in drivers_locations.items(): driver_lat = float(value.split()[0]) driver_lon = float(value.split()[1]) current_driver_distance = haversine((driver_lat, driver_lon), (pickup_lat, pickup_lon)) if closest_driver == '': closest_driver = key shortest_distance = current_driver_distance else: if current_driver_distance < shortest_distance: closest_driver = key shortest_distance = current_driver_distance # Remove driver from hashmap r.hdel('drivers:' + sys.argv[1], closest_driver) # Simulate additional processing time.sleep(timer) toc = time.perf_counter_ns() # Calculate processing time duration = (toc - tic) // 1000000 response = {"requestId": str(request['id']), "driverId": closest_driver.decode(), "processingTime": str(duration)} response = json.dumps(response) print(response) ch.basic_publish(exchange='dst.workers', routing_key='requests.' + sys.argv[1], body=response) # After finishing processing, send ack ch.basic_ack(delivery_tag=method_frame.delivery_tag) if __name__ == '__main__': try: timer = 0 # Connect to redis running at localhost:6379 r = redis.Redis(host='192.168.99.99') main() except KeyboardInterrupt: print('SIGTERM received, exiting...') try: sys.exit(0) except SystemExit: os._exit(0)