88 lines
3.0 KiB
Python

import sys, signal, time, redis, pika, json, os
from random import randint
from haversine import haversine
def main():
if len(sys.argv) != 2:
print('Region not set, exiting...')
sys.exit(1)
global timer
if sys.argv[1] == 'at_linz':
timer = randint(1, 2)
elif sys.argv[1] == 'at_vienna':
timer = randint(3, 5)
elif sys.argv[1] == 'de_berlin':
timer = randint(8, 11)
else:
print('Region not known, exiting...')
sys.exit(1)
queue = 'dst.' + sys.argv[1]
credentials = pika.PlainCredentials('dst', 'dst')
parameters = pika.ConnectionParameters('192.168.99.99', '5672', '/', credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=queue)
channel.exchange_declare(exchange='dst.workers', exchange_type='topic', durable=False)
# Do not give more than one message to one worker
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_message_callback=on_message, queue=queue)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
def on_message(ch, method_frame, header_frame, body):
print(' [x] Received %r' % body.decode())
tic = time.perf_counter_ns()
request = json.loads(body)
print('Request: %s' % request)
pickup_lon = float(request['pickup']['longitude'])
pickup_lat = float(request['pickup']['latitude'])
drivers_locations = r.hgetall('drivers:' + sys.argv[1])
closest_driver = ''
# Loop over drivers, get closest to pickup
for key, value in drivers_locations.items():
driver_lat = float(value.split()[0])
driver_lon = float(value.split()[1])
current_driver_distance = haversine((driver_lat, driver_lon), (pickup_lat, pickup_lon))
if closest_driver == '':
closest_driver = key
shortest_distance = current_driver_distance
else:
if current_driver_distance < shortest_distance:
closest_driver = key
shortest_distance = current_driver_distance
# Remove driver from hashmap
r.hdel('drivers:' + sys.argv[1], closest_driver)
# Simulate additional processing
time.sleep(timer)
toc = time.perf_counter_ns()
# Calculate processing time
duration = (toc - tic) // 1000000
response = {"requestId": str(request['id']), "driverId": closest_driver.decode(), "processingTime": str(duration)}
response = json.dumps(response)
print(response)
ch.basic_publish(exchange='dst.workers', routing_key='requests.' + sys.argv[1], body=response)
# After finishing processing, send ack
ch.basic_ack(delivery_tag=method_frame.delivery_tag)
if __name__ == '__main__':
try:
timer = 0
# Connect to redis running at localhost:6379
r = redis.Redis(host='192.168.99.99')
main()
except KeyboardInterrupt:
print('SIGTERM received, exiting...')
try:
sys.exit(0)
except SystemExit:
os._exit(0)