import pickle import sys from random import randrange from typing import List, Dict from shared import daf from shared.TrafficLightColor import TrafficLightColor from shared.message_broker_wrapper import MBWrapper # necessary to unpickle daf object sys.modules['daf'] = daf class Orchestrator: # vehicle ids vins: List[str] = [] # traffic lights {tlid: {color: color, switching_time: in_seconds}} tls: Dict[str, Dict] = {} # cache all active incoming daf connections _daf_mbs: Dict[str, MBWrapper] = {} _velocity_mbs: Dict[str, MBWrapper] = {} def __init__(self): # TODO fetch available car VINs from Entity Ident self.vins.append('SB164ABN1PE082000') self.vins.append('SB999ABN1PE082111') self.vins.append('SB555ABN1PE082555') print('vins', self.vins) # TODO fetch available TLs from Entity Ident self.tls['traffic-light-1'] = {'color': TrafficLightColor.RED, 'switching_time': 120} self.tls['traffic-light-2'] = {'color': TrafficLightColor.GREEN, 'switching_time': 240} self.tls['traffic-light-3'] = {'color': TrafficLightColor.RED, 'switching_time': 360} print('tls', self.tls) def setup_msg_queues(self): # spawn the vehicle related message broker channels for vin in self.vins: daf_channel = MBWrapper(exchange_name='vehicle_daf_{}'.format(vin), callback=self.handle_daf_receive) daf_channel.setup_receiver() self._daf_mbs[vin] = daf_channel velocity_channel = MBWrapper(exchange_name='vehicle_velocity_{}'.format(vin)) velocity_channel.setup_sender() self._velocity_mbs[vin] = velocity_channel # spawn the traffic light related mb channels for tl in self.tls: tl_channel = MBWrapper(exchange_name='traffic_light_state_{}'.format(tl), callback=self.handle_tl_state_receive) tl_channel.setup_receiver() def handle_daf_receive(self, pickle_binary): """ Gets the daf object's pickle binary dump. Unpickle, calculate new target velocity based on daf and current tl data and respond new target velicity for this vehicle. :param pickle_binary: daf object pickle binary dump """ received_daf_object = pickle.loads(pickle_binary) print(received_daf_object) # TODO use the data from the traffic lights (self.tls) # to transmit a new target velocity for this vehicle to achieve a green wave response_channel = self._velocity_mbs[received_daf_object.vehicle_identification_number] response_channel.send(str(randrange(0, 130)).encode()) def handle_tl_state_receive(self, msg): """ Gets a dict full of traffic light state information: { 'tlid': str, 'color': TrafficLightColor, 'last_switch': datetime } Updates internal information about available TLs """ tl_state = pickle.loads(msg) self.tls[tl_state['tlid']]['color'] = tl_state['color'] print(tl_state)