diff --git a/ass3-worker/worker.py b/ass3-worker/worker.py index f87f5c1..0522807 100644 --- a/ass3-worker/worker.py +++ b/ass3-worker/worker.py @@ -1 +1,87 @@ -# TODO \ No newline at end of file +import sys, signal, time, redis, pika, json, os +from random import randint +from haversine import haversine + + +def main(): + if len(sys.argv) != 2: + print('Region not set, exiting...') + sys.exit(1) + + global timer + if sys.argv[1] == 'at_linz': + timer = randint(1, 2) + elif sys.argv[1] == 'at_vienna': + timer = randint(3, 5) + elif sys.argv[1] == 'de_berlin': + timer = randint(8, 11) + else: + print('Region not known, exiting...') + sys.exit(1) + + queue = 'dst.' + sys.argv[1] + + credentials = pika.PlainCredentials('dst', 'dst') + parameters = pika.ConnectionParameters('192.168.99.99', '5672', '/', credentials=credentials) + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + + channel.queue_declare(queue=queue) + channel.exchange_declare(exchange='dst.workers', exchange_type='topic', durable=False) + # Do not give more than one message to one worker + channel.basic_qos(prefetch_count=1) + + channel.basic_consume(on_message_callback=on_message, queue=queue) + + print(' [*] Waiting for messages. To exit press CTRL+C') + channel.start_consuming() + + +def on_message(ch, method_frame, header_frame, body): + print(' [x] Received %r' % body.decode()) + tic = time.perf_counter_ns() + request = json.loads(body) + print('Request: %s' % request) + pickup_lon = float(request['pickup']['longitude']) + pickup_lat = float(request['pickup']['latitude']) + drivers_locations = r.hgetall('drivers:' + sys.argv[1]) + closest_driver = '' + # Loop over drivers, get closest to pickup + for key, value in drivers_locations.items(): + driver_lat = float(value.split()[0]) + driver_lon = float(value.split()[1]) + current_driver_distance = haversine((driver_lat, driver_lon), (pickup_lat, pickup_lon)) + if closest_driver == '': + closest_driver = key + shortest_distance = current_driver_distance + else: + if current_driver_distance < shortest_distance: + closest_driver = key + shortest_distance = current_driver_distance + # Remove driver from hashmap + r.hdel('drivers:' + sys.argv[1], closest_driver) + # Simulate additional processing + time.sleep(timer) + toc = time.perf_counter_ns() + # Calculate processing time + duration = (toc - tic) // 1000000 + response = {"requestId": str(request['id']), "driverId": closest_driver.decode(), "processingTime": str(duration)} + response = json.dumps(response) + print(response) + ch.basic_publish(exchange='dst.workers', routing_key='requests.' + sys.argv[1], body=response) + # After finishing processing, send ack + ch.basic_ack(delivery_tag=method_frame.delivery_tag) + + +if __name__ == '__main__': + try: + timer = 0 + # Connect to redis running at localhost:6379 + r = redis.Redis(host='192.168.99.99') + main() + except KeyboardInterrupt: + print('SIGTERM received, exiting...') + try: + sys.exit(0) + except SystemExit: + os._exit(0)