From dc89332bd5ffe1c8d4f7283925837b0670b90dcc Mon Sep 17 00:00:00 2001 From: Marco Zeisler Date: Thu, 27 May 2021 19:38:49 +0200 Subject: [PATCH] use shared libs; added event store which intercepts the message broker wrapper if exchange name is not logger --- components/event_store/event_logger.py | 16 ++++++++++ .../{event_store.py => event_store_server.py} | 7 ++++- components/i_feed/traffic_light.py | 30 +++++++++---------- components/i_feed/vehicle.py | 23 +++++++------- components/orchestration/orchestrator.py | 16 +++++++--- 5 files changed, 61 insertions(+), 31 deletions(-) create mode 100644 components/event_store/event_logger.py rename components/event_store/{event_store.py => event_store_server.py} (76%) diff --git a/components/event_store/event_logger.py b/components/event_store/event_logger.py new file mode 100644 index 0000000..a3ba3fe --- /dev/null +++ b/components/event_store/event_logger.py @@ -0,0 +1,16 @@ +import pickle + +from shared.message_broker_wrapper import MBWrapper + + +class EventLogger: + @staticmethod + def log(msg): + try: + print("{}".format(pickle.loads(msg))) + except Exception as e: + print("Could not unpickle [{}]: {}", msg, e) + + def setup_msq_queue(self): + mb = MBWrapper(exchange_name='logger', callback=self.log) + mb.setup_receiver() diff --git a/components/event_store/event_store.py b/components/event_store/event_store_server.py similarity index 76% rename from components/event_store/event_store.py rename to components/event_store/event_store_server.py index 3b620c2..2cce47f 100644 --- a/components/event_store/event_store.py +++ b/components/event_store/event_store_server.py @@ -1,3 +1,6 @@ +import threading + +from event_store.event_logger import EventLogger from flask import Flask from flask_redis import Redis @@ -22,4 +25,6 @@ def home_page(): if __name__ == '__main__': - app.run() + el = EventLogger() + threading.Thread(target=el.setup_msq_queue).start() + threading.Thread(target=app.run).start() diff --git a/components/i_feed/traffic_light.py b/components/i_feed/traffic_light.py index 44d47e2..a7ab929 100644 --- a/components/i_feed/traffic_light.py +++ b/components/i_feed/traffic_light.py @@ -6,11 +6,12 @@ from datetime import datetime from circuitbreaker import circuit # Default switching time in seconds -from shared import TrafficLightColor -from shared.TrafficLightColor import TrafficLightColor +from shared import traffic_light_color +from shared.traffic_light_color import TrafficLightColor from shared.message_broker_wrapper import MBWrapper +from shared.traffic_light_state import TrafficLightState -SWITCHING_TIME = 120 +SWITCHING_TIME = 500 # Scale speed of switching by factor x SCALING = 100 @@ -68,23 +69,20 @@ class TrafficLight: @circuit(failure_threshold=10, expected_exception=ConnectionError) def send_status_update(self): print(self.tlid, self.current_color, self.last_switch) - self._tl_mb.send(pickle.dumps({ - 'tlid': self.tlid, - 'color': self.current_color, - 'last_switch': self.last_switch - })) + self._tl_mb.send(pickle.dumps( + TrafficLightState(tlid=self.tlid, color=self.current_color, last_switch=self.last_switch))) if __name__ == '__main__': # TODO fetch and use data from Entity Ident - tl1 = TrafficLight(tlid='traffic-light-1', switching_time=120) + tl1 = TrafficLight(tlid='traffic-light-1', switching_time=SWITCHING_TIME) tl1.start() - time.sleep(1) - tl2 = TrafficLight(tlid='traffic-light-2', switching_time=240) - tl2.start() - - time.sleep(1) - tl3 = TrafficLight(tlid='traffic-light-3', switching_time=360) - tl3.start() + # time.sleep(1) + # tl2 = TrafficLight(tlid='traffic-light-2', switching_time=240) + # tl2.start() + # + # time.sleep(1) + # tl3 = TrafficLight(tlid='traffic-light-3', switching_time=360) + # tl3.start() diff --git a/components/i_feed/vehicle.py b/components/i_feed/vehicle.py index 248aef2..da2c216 100644 --- a/components/i_feed/vehicle.py +++ b/components/i_feed/vehicle.py @@ -12,6 +12,8 @@ from shared.daf import DAF from shared.message_broker_wrapper import MBWrapper # Lat, Long +from shared.target_velocity import TargetVelocity + STARTING_POINT = geopy.Point(48.853, 2.349) # in km/h STARTING_VELOCITY = 130 @@ -24,7 +26,7 @@ UPDATE_INTERVAL = 2 # At x km the NCE shall happen NCE_KM = 30 # Time in seconds to recover from NCE (will be scaled) -TIME_TO_RECOVER = 100 +TIME_TO_RECOVER = 500 # Resets vehicle at km x RESET_KM = 50 @@ -205,8 +207,9 @@ class Vehicle: print(self.driven_kms, '\t', self.daf) self._daf_mb.send(pickle.dumps(self.daf)) - def new_velocity(self, velocity: bytes): - velocity = float(velocity) + def new_velocity(self, response: bytes): + response: TargetVelocity = pickle.loads(response) + velocity = response.target_velocity print('Received new velocity {}'.format(velocity)) self.velocity = velocity @@ -217,10 +220,10 @@ if __name__ == "__main__": v1 = Vehicle(vin='SB164ABN1PE082000') v1.start_driving() - time.sleep(1) - v2 = Vehicle(vin='SB999ABN1PE082111') - v2.start_driving() - - time.sleep(0.5) - v3 = Vehicle(vin='SB555ABN1PE082555') - v3.start_driving() + # time.sleep(1) + # v2 = Vehicle(vin='SB999ABN1PE082111') + # v2.start_driving() + # + # time.sleep(0.5) + # v3 = Vehicle(vin='SB555ABN1PE082555') + # v3.start_driving() diff --git a/components/orchestration/orchestrator.py b/components/orchestration/orchestrator.py index 825f127..f33468d 100644 --- a/components/orchestration/orchestrator.py +++ b/components/orchestration/orchestrator.py @@ -4,10 +4,13 @@ from random import randrange from typing import List, Dict from shared import daf -from shared.TrafficLightColor import TrafficLightColor +from shared.traffic_light_color import TrafficLightColor from shared.message_broker_wrapper import MBWrapper # necessary to unpickle daf object +from shared.target_velocity import TargetVelocity +from shared.traffic_light_state import TrafficLightState + sys.modules['daf'] = daf @@ -61,10 +64,15 @@ class Orchestrator: received_daf_object = pickle.loads(pickle_binary) print(received_daf_object) + # TODO ask entity ident if tl in range?! # TODO use the data from the traffic lights (self.tls) # to transmit a new target velocity for this vehicle to achieve a green wave + response_channel = self._velocity_mbs[received_daf_object.vehicle_identification_number] - response_channel.send(str(randrange(0, 130)).encode()) + target_vel = randrange(0, 130) + print('Target velocity: {}'.format(target_vel)) + response_channel.send(pickle.dumps( + TargetVelocity(vin=received_daf_object.vehicle_identification_number, target_velocity=target_vel))) def handle_tl_state_receive(self, msg): """ @@ -76,6 +84,6 @@ class Orchestrator: } Updates internal information about available TLs """ - tl_state = pickle.loads(msg) - self.tls[tl_state['tlid']]['color'] = tl_state['color'] + tl_state: TrafficLightState = pickle.loads(msg) + self.tls[tl_state.tlid]['color'] = tl_state.color print(tl_state)