import datetime import pickle import sys import os from typing import List, Dict from datetime import datetime, timedelta from math import floor import requests from dse_shared_libs import daf, traffic_light_state, traffic_light_color, target_velocity from dse_shared_libs.daf import DAF from dse_shared_libs.message_broker_wrapper import MBWrapper # necessary to unpickle daf object from dse_shared_libs.target_velocity import TargetVelocity from dse_shared_libs.traffic_light_state import TrafficLightState from dse_shared_libs.traffic_light_color import TrafficLightColor from requests import Session sys.modules['daf'] = daf sys.modules['traffic_light_state'] = traffic_light_state sys.modules['traffic_light_color'] = traffic_light_color sys.modules['target_velocity'] = target_velocity ENTITY_IDENT_URL = 'http://entityident:5002/api/v1/resources/' SCALING = int(os.environ.get('DSE2021_SCALING', 1)) class Orchestrator: # vehicle ids vins: List[str] = [] # traffic lights {tlid: {color: color, switching_time: in_seconds}} tls: Dict[str, Dict] = {} # cache all active incoming daf connections _daf_mbs: Dict[str, MBWrapper] = {} _velocity_mbs: Dict[str, MBWrapper] = {} _session: Session def __init__(self): # re-use http session, is faster then request.ing everytime self._session = requests.Session() available = False while not available: try: response = self._session.get(ENTITY_IDENT_URL + 'cars') cars = response.json() response = self._session.get(ENTITY_IDENT_URL + 'traffic_lights') traffic_lights = response.json() except requests.exceptions.ConnectionError as e: print("Is the entity_ident_server running and reachable?") continue available = True for car in cars['cursor']: self.vins.append(car['vin']) for traffic_light in traffic_lights['cursor']: self.tls[traffic_light['id']] = {'color': traffic_light['color'], 'switching_time': traffic_light['switchingTime'], 'last_switch': datetime.now()} def setup_msg_queues(self): """ Setup the message queues for communicating with vehicles and traffic lights. A receiving and a sending queue for vehicles. A receiving queue for traffic lights. """ # spawn the vehicle related message broker channels for vin in self.vins: daf_channel = MBWrapper(exchange_name='vehicle_daf_{}'.format(vin), callback=self.handle_daf_receive) daf_channel.setup_receiver() self._daf_mbs[vin] = daf_channel velocity_channel = MBWrapper(exchange_name='vehicle_velocity_{}'.format(vin)) velocity_channel.setup_sender() self._velocity_mbs[vin] = velocity_channel # spawn the traffic light related mb channels for tl in self.tls: tl_channel = MBWrapper(exchange_name='traffic_light_state_{}'.format(tl), callback=self.handle_tl_state_receive) tl_channel.setup_receiver() def handle_daf_receive(self, pickle_binary): """ Gets the daf object's pickle binary dump. Unpickle, calculate new target velocity based on daf and current tl data and respond new target velocity for this vehicle. :param pickle_binary: daf object pickle binary dump """ received_daf_object: DAF = pickle.loads(pickle_binary) loc = received_daf_object.gps_location print('Received DAF object: {}'.format(received_daf_object)) response = self._session.get(ENTITY_IDENT_URL + 'traffic_lights_geo', params={'lat': loc.latitude, 'lon': loc.longitude}) traffic_lights_geo = response.json() current_vel = received_daf_object.velocity target_vel = self._compute_velocity(traffic_lights_geo, current_vel) response_channel = self._velocity_mbs[received_daf_object.vehicle_identification_number] print('Target velocity: {}'.format(target_vel)) response_channel.send(pickle.dumps( TargetVelocity(vin=received_daf_object.vehicle_identification_number, target_velocity=target_vel, timestamp=datetime.now()))) def handle_tl_state_receive(self, msg): """ Gets a dict full of traffic light state information: { 'tlid': str, 'color': TrafficLightColor, 'last_switch': datetime } Updates internal information about available TLs """ tl_state: TrafficLightState = pickle.loads(msg) self.tls[tl_state.tlid]['color'] = tl_state.color self.tls[tl_state.tlid]['last_switch'] = tl_state.last_switch print(tl_state) def _compute_velocity(self, traffic_lights_geo, current_vel): target_vel = 130 * SCALING print('Nearest traffic lights: {}'.format(traffic_lights_geo['cursor'])) for traffic_light in traffic_lights_geo['cursor']: # Should only ever contain one traffic light (the next in line) tl_id = traffic_light['id'] distance = traffic_light['calculatedRange'] switching_time = self.tls[tl_id]['switching_time'] / float(SCALING) # Time until next switch must be scaled accordingly next_switch_time = self.tls[tl_id]['last_switch'] + timedelta(seconds=switching_time) time_until_switch = (next_switch_time - datetime.utcnow()).total_seconds() print('Distance to TL: {}'.format(distance)) print('Time until switch in seconds: {}'.format(time_until_switch)) if self.tls[tl_id]['color'] is TrafficLightColor.RED: speed_needed_max = (distance / float(time_until_switch)) * 3.6 if speed_needed_max < 130 * SCALING: if current_vel < speed_needed_max: target_vel = current_vel else: target_vel = floor(speed_needed_max) else: # Cannot make it on next green i = 2 while speed_needed_max > 130 * SCALING: next_green_phase_start = time_until_switch + switching_time * i speed_needed_max = (distance / float(next_green_phase_start)) * 3.6 i = i + 2 target_vel = floor(speed_needed_max) else: # Check if we can reach TL in time speed_needed_min = (distance / float(time_until_switch)) * 3.6 if speed_needed_min < 130 * SCALING: if current_vel < speed_needed_min: target_vel = 130 * SCALING else: target_vel = current_vel else: i = 1 speed_needed_max = 132 * SCALING while speed_needed_max > 130 * SCALING: next_green_phase_start = time_until_switch + switching_time * i speed_needed_max = (distance / float(next_green_phase_start)) * 3.6 i = i + 2 target_vel = floor(speed_needed_max) return target_vel