use shared libs;

added event store which intercepts the message broker wrapper if exchange name is not logger
This commit is contained in:
Marco Zeisler 2021-05-27 19:38:49 +02:00
parent 4e1666ec5a
commit dc89332bd5
5 changed files with 61 additions and 31 deletions

View File

@ -0,0 +1,16 @@
import pickle
from shared.message_broker_wrapper import MBWrapper
class EventLogger:
@staticmethod
def log(msg):
try:
print("{}".format(pickle.loads(msg)))
except Exception as e:
print("Could not unpickle [{}]: {}", msg, e)
def setup_msq_queue(self):
mb = MBWrapper(exchange_name='logger', callback=self.log)
mb.setup_receiver()

View File

@ -1,3 +1,6 @@
import threading
from event_store.event_logger import EventLogger
from flask import Flask from flask import Flask
from flask_redis import Redis from flask_redis import Redis
@ -22,4 +25,6 @@ def home_page():
if __name__ == '__main__': if __name__ == '__main__':
app.run() el = EventLogger()
threading.Thread(target=el.setup_msq_queue).start()
threading.Thread(target=app.run).start()

View File

@ -6,11 +6,12 @@ from datetime import datetime
from circuitbreaker import circuit from circuitbreaker import circuit
# Default switching time in seconds # Default switching time in seconds
from shared import TrafficLightColor from shared import traffic_light_color
from shared.TrafficLightColor import TrafficLightColor from shared.traffic_light_color import TrafficLightColor
from shared.message_broker_wrapper import MBWrapper from shared.message_broker_wrapper import MBWrapper
from shared.traffic_light_state import TrafficLightState
SWITCHING_TIME = 120 SWITCHING_TIME = 500
# Scale speed of switching by factor x # Scale speed of switching by factor x
SCALING = 100 SCALING = 100
@ -68,23 +69,20 @@ class TrafficLight:
@circuit(failure_threshold=10, expected_exception=ConnectionError) @circuit(failure_threshold=10, expected_exception=ConnectionError)
def send_status_update(self): def send_status_update(self):
print(self.tlid, self.current_color, self.last_switch) print(self.tlid, self.current_color, self.last_switch)
self._tl_mb.send(pickle.dumps({ self._tl_mb.send(pickle.dumps(
'tlid': self.tlid, TrafficLightState(tlid=self.tlid, color=self.current_color, last_switch=self.last_switch)))
'color': self.current_color,
'last_switch': self.last_switch
}))
if __name__ == '__main__': if __name__ == '__main__':
# TODO fetch and use data from Entity Ident # TODO fetch and use data from Entity Ident
tl1 = TrafficLight(tlid='traffic-light-1', switching_time=120) tl1 = TrafficLight(tlid='traffic-light-1', switching_time=SWITCHING_TIME)
tl1.start() tl1.start()
time.sleep(1) # time.sleep(1)
tl2 = TrafficLight(tlid='traffic-light-2', switching_time=240) # tl2 = TrafficLight(tlid='traffic-light-2', switching_time=240)
tl2.start() # tl2.start()
#
time.sleep(1) # time.sleep(1)
tl3 = TrafficLight(tlid='traffic-light-3', switching_time=360) # tl3 = TrafficLight(tlid='traffic-light-3', switching_time=360)
tl3.start() # tl3.start()

View File

@ -12,6 +12,8 @@ from shared.daf import DAF
from shared.message_broker_wrapper import MBWrapper from shared.message_broker_wrapper import MBWrapper
# Lat, Long # Lat, Long
from shared.target_velocity import TargetVelocity
STARTING_POINT = geopy.Point(48.853, 2.349) STARTING_POINT = geopy.Point(48.853, 2.349)
# in km/h # in km/h
STARTING_VELOCITY = 130 STARTING_VELOCITY = 130
@ -24,7 +26,7 @@ UPDATE_INTERVAL = 2
# At x km the NCE shall happen # At x km the NCE shall happen
NCE_KM = 30 NCE_KM = 30
# Time in seconds to recover from NCE (will be scaled) # Time in seconds to recover from NCE (will be scaled)
TIME_TO_RECOVER = 100 TIME_TO_RECOVER = 500
# Resets vehicle at km x # Resets vehicle at km x
RESET_KM = 50 RESET_KM = 50
@ -205,8 +207,9 @@ class Vehicle:
print(self.driven_kms, '\t', self.daf) print(self.driven_kms, '\t', self.daf)
self._daf_mb.send(pickle.dumps(self.daf)) self._daf_mb.send(pickle.dumps(self.daf))
def new_velocity(self, velocity: bytes): def new_velocity(self, response: bytes):
velocity = float(velocity) response: TargetVelocity = pickle.loads(response)
velocity = response.target_velocity
print('Received new velocity {}'.format(velocity)) print('Received new velocity {}'.format(velocity))
self.velocity = velocity self.velocity = velocity
@ -217,10 +220,10 @@ if __name__ == "__main__":
v1 = Vehicle(vin='SB164ABN1PE082000') v1 = Vehicle(vin='SB164ABN1PE082000')
v1.start_driving() v1.start_driving()
time.sleep(1) # time.sleep(1)
v2 = Vehicle(vin='SB999ABN1PE082111') # v2 = Vehicle(vin='SB999ABN1PE082111')
v2.start_driving() # v2.start_driving()
#
time.sleep(0.5) # time.sleep(0.5)
v3 = Vehicle(vin='SB555ABN1PE082555') # v3 = Vehicle(vin='SB555ABN1PE082555')
v3.start_driving() # v3.start_driving()

View File

@ -4,10 +4,13 @@ from random import randrange
from typing import List, Dict from typing import List, Dict
from shared import daf from shared import daf
from shared.TrafficLightColor import TrafficLightColor from shared.traffic_light_color import TrafficLightColor
from shared.message_broker_wrapper import MBWrapper from shared.message_broker_wrapper import MBWrapper
# necessary to unpickle daf object # necessary to unpickle daf object
from shared.target_velocity import TargetVelocity
from shared.traffic_light_state import TrafficLightState
sys.modules['daf'] = daf sys.modules['daf'] = daf
@ -61,10 +64,15 @@ class Orchestrator:
received_daf_object = pickle.loads(pickle_binary) received_daf_object = pickle.loads(pickle_binary)
print(received_daf_object) print(received_daf_object)
# TODO ask entity ident if tl in range?!
# TODO use the data from the traffic lights (self.tls) # TODO use the data from the traffic lights (self.tls)
# to transmit a new target velocity for this vehicle to achieve a green wave # to transmit a new target velocity for this vehicle to achieve a green wave
response_channel = self._velocity_mbs[received_daf_object.vehicle_identification_number] response_channel = self._velocity_mbs[received_daf_object.vehicle_identification_number]
response_channel.send(str(randrange(0, 130)).encode()) target_vel = randrange(0, 130)
print('Target velocity: {}'.format(target_vel))
response_channel.send(pickle.dumps(
TargetVelocity(vin=received_daf_object.vehicle_identification_number, target_velocity=target_vel)))
def handle_tl_state_receive(self, msg): def handle_tl_state_receive(self, msg):
""" """
@ -76,6 +84,6 @@ class Orchestrator:
} }
Updates internal information about available TLs Updates internal information about available TLs
""" """
tl_state = pickle.loads(msg) tl_state: TrafficLightState = pickle.loads(msg)
self.tls[tl_state['tlid']]['color'] = tl_state['color'] self.tls[tl_state.tlid]['color'] = tl_state.color
print(tl_state) print(tl_state)