Implement python worker (3.2.1)
This commit is contained in:
parent
a707f963a5
commit
0733b04894
@ -1 +1,87 @@
|
||||
# TODO
|
||||
import sys, signal, time, redis, pika, json, os
|
||||
from random import randint
|
||||
from haversine import haversine
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) != 2:
|
||||
print('Region not set, exiting...')
|
||||
sys.exit(1)
|
||||
|
||||
global timer
|
||||
if sys.argv[1] == 'at_linz':
|
||||
timer = randint(1, 2)
|
||||
elif sys.argv[1] == 'at_vienna':
|
||||
timer = randint(3, 5)
|
||||
elif sys.argv[1] == 'de_berlin':
|
||||
timer = randint(8, 11)
|
||||
else:
|
||||
print('Region not known, exiting...')
|
||||
sys.exit(1)
|
||||
|
||||
queue = 'dst.' + sys.argv[1]
|
||||
|
||||
credentials = pika.PlainCredentials('dst', 'dst')
|
||||
parameters = pika.ConnectionParameters('192.168.99.99', '5672', '/', credentials=credentials)
|
||||
connection = pika.BlockingConnection(parameters)
|
||||
channel = connection.channel()
|
||||
|
||||
channel.queue_declare(queue=queue)
|
||||
channel.exchange_declare(exchange='dst.workers', exchange_type='topic', durable=False)
|
||||
# Do not give more than one message to one worker
|
||||
channel.basic_qos(prefetch_count=1)
|
||||
|
||||
channel.basic_consume(on_message_callback=on_message, queue=queue)
|
||||
|
||||
print(' [*] Waiting for messages. To exit press CTRL+C')
|
||||
channel.start_consuming()
|
||||
|
||||
|
||||
def on_message(ch, method_frame, header_frame, body):
|
||||
print(' [x] Received %r' % body.decode())
|
||||
tic = time.perf_counter_ns()
|
||||
request = json.loads(body)
|
||||
print('Request: %s' % request)
|
||||
pickup_lon = float(request['pickup']['longitude'])
|
||||
pickup_lat = float(request['pickup']['latitude'])
|
||||
drivers_locations = r.hgetall('drivers:' + sys.argv[1])
|
||||
closest_driver = ''
|
||||
# Loop over drivers, get closest to pickup
|
||||
for key, value in drivers_locations.items():
|
||||
driver_lat = float(value.split()[0])
|
||||
driver_lon = float(value.split()[1])
|
||||
current_driver_distance = haversine((driver_lat, driver_lon), (pickup_lat, pickup_lon))
|
||||
if closest_driver == '':
|
||||
closest_driver = key
|
||||
shortest_distance = current_driver_distance
|
||||
else:
|
||||
if current_driver_distance < shortest_distance:
|
||||
closest_driver = key
|
||||
shortest_distance = current_driver_distance
|
||||
# Remove driver from hashmap
|
||||
r.hdel('drivers:' + sys.argv[1], closest_driver)
|
||||
# Simulate additional processing
|
||||
time.sleep(timer)
|
||||
toc = time.perf_counter_ns()
|
||||
# Calculate processing time
|
||||
duration = (toc - tic) // 1000000
|
||||
response = {"requestId": str(request['id']), "driverId": closest_driver.decode(), "processingTime": str(duration)}
|
||||
response = json.dumps(response)
|
||||
print(response)
|
||||
ch.basic_publish(exchange='dst.workers', routing_key='requests.' + sys.argv[1], body=response)
|
||||
# After finishing processing, send ack
|
||||
ch.basic_ack(delivery_tag=method_frame.delivery_tag)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
try:
|
||||
timer = 0
|
||||
# Connect to redis running at localhost:6379
|
||||
r = redis.Redis(host='192.168.99.99')
|
||||
main()
|
||||
except KeyboardInterrupt:
|
||||
print('SIGTERM received, exiting...')
|
||||
try:
|
||||
sys.exit(0)
|
||||
except SystemExit:
|
||||
os._exit(0)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user