From 349a38fa9f7ce8648a204bf9a9502428da7ba05c Mon Sep 17 00:00:00 2001 From: Marco Zeisler Date: Mon, 24 May 2021 17:17:50 +0200 Subject: [PATCH] vehicle sends daf to orchestrator via rabbitMQ broker vehicle receives new target velocity from orchestrator as response; at this point, this velocity is just a random float between 0 and 130; --- components/i_feed/requirements.txt | 1 + components/i_feed/vehicle.py | 36 +++++++++++---- ...chestration.py => orchestration_server.py} | 7 ++- components/orchestration/orchestrator.py | 45 +++++++++++++++++++ components/{i_feed => wrappers}/daf.py | 0 .../message_broker_wrapper.py | 16 ++++--- 6 files changed, 91 insertions(+), 14 deletions(-) rename components/orchestration/{orchestration.py => orchestration_server.py} (56%) create mode 100644 components/orchestration/orchestrator.py rename components/{i_feed => wrappers}/daf.py (100%) rename components/{i_feed => wrappers}/message_broker_wrapper.py (81%) diff --git a/components/i_feed/requirements.txt b/components/i_feed/requirements.txt index 1cf2ae5..f884b43 100644 --- a/components/i_feed/requirements.txt +++ b/components/i_feed/requirements.txt @@ -1,2 +1,3 @@ circuitbreaker # fault tolerance geopy +pika diff --git a/components/i_feed/vehicle.py b/components/i_feed/vehicle.py index ba5fbc1..e854311 100644 --- a/components/i_feed/vehicle.py +++ b/components/i_feed/vehicle.py @@ -1,3 +1,4 @@ +import pickle import threading import time from datetime import datetime @@ -6,7 +7,9 @@ from typing import Union import geopy import geopy.distance from circuitbreaker import circuit -from daf import DAF +from wrappers.daf import DAF + +from wrappers.message_broker_wrapper import MBWrapper # Lat, Long STARTING_POINT = geopy.Point(48.853, 2.349) @@ -17,7 +20,7 @@ BEARING = 0 # Scale speed of vehicles by factor x SCALING = 100 # Interval between status updates in seconds (is not scaled) -UPDATE_INTERVAL = 1 +UPDATE_INTERVAL = 2 # At x km the NCE shall happen NCE_KM = 30 # Time in seconds to recover from NCE (will be scaled) @@ -47,11 +50,17 @@ class Vehicle: _nce_possible = True # stores if nce already happened, it only happens (up to) once _nce_happened = False + # continuous driving thread _t: threading.Thread # NCE recovery thread _rt: threading.Thread + # outgoing daf info MBWrapper + _daf_mb: MBWrapper + # incoming target velocity MBWrapper + _velocity_mb: MBWrapper + def __init__(self, vin: str, starting_point: geopy.Point = STARTING_POINT, @@ -61,6 +70,12 @@ class Vehicle: self._gps_location = starting_point self.velocity = starting_velocity + self._daf_mb = MBWrapper(exchange_name='vehicle_daf_{}'.format(vin)) + self._daf_mb.setup_sender() + + self._velocity_mb = MBWrapper(exchange_name='vehicle_velocity_{}'.format(self.vin), callback=self.new_velocity) + self._velocity_mb.setup_receiver() + @property def nce(self): """ @@ -148,7 +163,7 @@ class Vehicle: Starts a thread responsible for driving. This thread continuously updates internal values and informs the message broker about the current state (DAF) of the vehicle. """ - print('Start driving ... SCALING: x{}\n\n'.format(SCALING)) + print('{} starts driving ... SCALING: x{}\n\n'.format(self.vin, SCALING)) self.last_update = datetime.now() self._driven_kms = 0 @@ -187,15 +202,20 @@ class Vehicle: @circuit(failure_threshold=10, expected_exception=ConnectionError) def send_status_update(self): - # TODO inform the message broker about the current status print(self.driven_kms, '\t', self.daf) + self._daf_mb.send(pickle.dumps(self.daf)) - # TODO adapt the vehicle's velocity to the suggestion of the orchestration service - def new_velocity(self): - ... + def new_velocity(self, velocity: bytes): + velocity = float(velocity) + print('Received new velocity {}'.format(velocity)) + self.velocity = velocity if __name__ == "__main__": - v1 = Vehicle(vin='SB164ABN1PE082986') + v1 = Vehicle(vin='SB164ABN1PE082000') v1.start_driving() + + time.sleep(1) + v2 = Vehicle(vin='SB999ABN1PE082111') + v2.start_driving() # stop manually diff --git a/components/orchestration/orchestration.py b/components/orchestration/orchestration_server.py similarity index 56% rename from components/orchestration/orchestration.py rename to components/orchestration/orchestration_server.py index 3dfc734..e6c555e 100644 --- a/components/orchestration/orchestration.py +++ b/components/orchestration/orchestration_server.py @@ -1,4 +1,7 @@ +import threading + from flask import Flask +from orchestrator import Orchestrator app = Flask(__name__) @@ -19,4 +22,6 @@ def api2(): if __name__ == '__main__': - app.run() + orc = Orchestrator() + threading.Thread(target=orc.setup_msg_queues).start() + threading.Thread(target=app.run).start() diff --git a/components/orchestration/orchestrator.py b/components/orchestration/orchestrator.py new file mode 100644 index 0000000..d338e15 --- /dev/null +++ b/components/orchestration/orchestrator.py @@ -0,0 +1,45 @@ +import pickle +import sys +from random import randrange + +from wrappers import daf +from wrappers.message_broker_wrapper import MBWrapper + +sys.modules['daf'] = daf + + +class Orchestrator: + vins = [] + tls = [] + + _daf_mbs = {} + _velocity_mbs = {} + + def __init__(self): + # TODO fetch available car VINs from Entity Ident + self.vins.append('SB164ABN1PE082000') + self.vins.append('SB999ABN1PE082111') + + # TODO fetch available TLs from Entity Ident + self.tls.append('traffic-light-1') + self.tls.append('traffic-light-2') + self.tls.append('traffic-light-3') + + def setup_msg_queues(self): + for vin in self.vins: + daf_channel = MBWrapper(exchange_name='vehicle_daf_{}'.format(vin), callback=self.handle_daf_receive) + daf_channel.setup_receiver() + self._daf_mbs[vin] = daf_channel + + velocity_channel = MBWrapper(exchange_name='vehicle_velocity_{}'.format(vin)) + velocity_channel.setup_sender() + self._velocity_mbs[vin] = velocity_channel + + def handle_daf_receive(self, msg): + received_daf_object = pickle.loads(msg) + print(received_daf_object) + + # TODO use the data from the traffic lights to transmit a new target velocity for this vehicle to achieve + # a green wave + response_channel = self._velocity_mbs[received_daf_object.vehicle_identification_number] + response_channel.send(randrange(0, 130)) diff --git a/components/i_feed/daf.py b/components/wrappers/daf.py similarity index 100% rename from components/i_feed/daf.py rename to components/wrappers/daf.py diff --git a/components/i_feed/message_broker_wrapper.py b/components/wrappers/message_broker_wrapper.py similarity index 81% rename from components/i_feed/message_broker_wrapper.py rename to components/wrappers/message_broker_wrapper.py index d6c04e7..7a749a0 100644 --- a/components/i_feed/message_broker_wrapper.py +++ b/components/wrappers/message_broker_wrapper.py @@ -10,7 +10,7 @@ class MBWrapper: exchange_name: str callback: callable - _type: str + _type: str = None _connection: pika.BlockingConnection _channel: pika.BlockingConnection.channel @@ -18,13 +18,15 @@ class MBWrapper: host: str = 'localhost', exchange_type: str = 'fanout', exchange_name: str = None, - callback: callable = None): + callback: callable = None, + verbose: bool = False): assert exchange_name, 'Please define an exchange name' self.host = host self.exchange_type = exchange_type self.exchange_name = exchange_name self.callback = callback + self.verbose = verbose def setup_sender(self): assert self._type != 'receiver', 'MBWrapper is already a receiver. Use another MBWrapper.' @@ -33,7 +35,7 @@ class MBWrapper: def setup_receiver(self): assert self._type != 'sender', 'MBWrapper is already a sender. Use another MBWrapper.' - assert callback, 'Please setup MBWrapper with "on response" self.callback which can handle a byte string as input.' + assert self.callback, 'Please setup MBWrapper with "on response" self.callback which can handle a byte string as input.' def consumer(): self._setup_channel() @@ -42,7 +44,7 @@ class MBWrapper: self._channel.queue_bind(exchange=self.exchange_name, queue=queue_name) - print(' [*] Waiting for messages. To exit press CTRL+C') + self.print(' [*] Waiting for messages. To exit press CTRL+C') self._channel.basic_consume( queue=queue_name, on_message_callback=self._receive, auto_ack=True) @@ -55,11 +57,15 @@ class MBWrapper: if type(message) is not bytes: message = str(message).encode() self._channel.basic_publish(exchange=self.exchange_name, routing_key='', body=message) - print(" [x] Sent %r" % message) + self.print(" [x] Sent %r" % message) def close(self): self._connection.close() + def print(self, *msg): + if self.verbose: + print(*msg) + def _setup_channel(self): self._connection = pika.BlockingConnection( pika.ConnectionParameters(host=self.host))