144 lines
6.4 KiB
Python

import datetime
import pickle
import sys
from random import randrange
from typing import List, Dict
from datetime import datetime, timedelta
from math import floor, ceil
import requests
from dse_shared_libs import daf, traffic_light_state, traffic_light_color, target_velocity
from dse_shared_libs.daf import DAF
from dse_shared_libs.message_broker_wrapper import MBWrapper
# necessary to unpickle daf object
from dse_shared_libs.target_velocity import TargetVelocity
from dse_shared_libs.traffic_light_state import TrafficLightState
from dse_shared_libs.traffic_light_color import TrafficLightColor
from requests import Session
sys.modules['daf'] = daf
sys.modules['traffic_light_state'] = traffic_light_state
sys.modules['traffic_light_color'] = traffic_light_color
sys.modules['target_velocity'] = target_velocity
ENTITY_IDENT_URL = 'http://entityident:5002/api/v1/resources/'
class Orchestrator:
# vehicle ids
vins: List[str] = []
# traffic lights {tlid: {color: color, switching_time: in_seconds}}
tls: Dict[str, Dict] = {}
# cache all active incoming daf connections
_daf_mbs: Dict[str, MBWrapper] = {}
_velocity_mbs: Dict[str, MBWrapper] = {}
_session: Session
def __init__(self):
# re-use http session, is faster then request.ing everytime
self._session = requests.Session()
try:
response = self._session.get(ENTITY_IDENT_URL + 'cars')
cars = response.json()
response = self._session.get(ENTITY_IDENT_URL + 'traffic_lights')
traffic_lights = response.json()
except requests.exceptions.ConnectionError as e:
print("Is the entity_ident_server running and reachable?")
raise e
for car in cars['cursor']:
self.vins.append(car['vin'])
for traffic_light in traffic_lights['cursor']:
self.tls[traffic_light['id']] = {'color': traffic_light['initialColor'],
'switching_time': traffic_light['switchingTime'],
'last_switch': datetime.now()}
def setup_msg_queues(self):
# spawn the vehicle related message broker channels
for vin in self.vins:
daf_channel = MBWrapper(exchange_name='vehicle_daf_{}'.format(vin), callback=self.handle_daf_receive)
daf_channel.setup_receiver()
self._daf_mbs[vin] = daf_channel
velocity_channel = MBWrapper(exchange_name='vehicle_velocity_{}'.format(vin))
velocity_channel.setup_sender()
self._velocity_mbs[vin] = velocity_channel
# spawn the traffic light related mb channels
for tl in self.tls:
tl_channel = MBWrapper(exchange_name='traffic_light_state_{}'.format(tl),
callback=self.handle_tl_state_receive)
tl_channel.setup_receiver()
def handle_daf_receive(self, pickle_binary):
"""
Gets the daf object's pickle binary dump.
Unpickle, calculate new target velocity based on daf and current tl data and
respond new target velicity for this vehicle.
:param pickle_binary: daf object pickle binary dump
"""
received_daf_object: DAF = pickle.loads(pickle_binary)
loc = received_daf_object.gps_location
print('Received DAF object: {}'.format(received_daf_object))
response = self._session.get(ENTITY_IDENT_URL + 'traffic_lights_geo',
params={'lat': loc.latitude, 'lon': loc.longitude})
traffic_lights_geo = response.json()
current_vel = received_daf_object.velocity
target_vel = 130
print('Nearest traffic lights: {}'.format(traffic_lights_geo['cursor']))
print('Nearest traffic light count: {}'.format(len(traffic_lights_geo['cursor'])))
for traffic_light in traffic_lights_geo['cursor']:
# Should only ever contain one traffic light
tl_id = traffic_light['id']
distance = traffic_light['calculatedRange']
next_switch_time = self.tls[tl_id]['last_switch'] + timedelta(seconds=self.tls[tl_id]['switching_time'])
time_until_switch = next_switch_time - datetime.now()
print('Distance to TL: {}'.format(distance))
print('Time until switch in seconds: {}'.format(time_until_switch.total_seconds()))
if self.tls[tl_id]['color'] is TrafficLightColor.RED:
# Ceil because we want to get there AFTER the light changed
speed_needed = (distance / float(time_until_switch.total_seconds())) * 3.6
if current_vel > speed_needed:
print('Current velocity too high ({}), adjusting to {}'.format(current_vel, speed_needed))
target_vel = speed_needed
else:
print('Current velocity lower ({})'.format(current_vel))
target_vel = current_vel
else:
# Check if we can cross in time with max speed (130)
speed_needed = (distance / float(time_until_switch.total_seconds())) * 3.6
if speed_needed >= 130:
# Cannot make it in time, wait for next green
print('Wait on green -> red -> green switch')
time_until_green = time_until_switch + timedelta(seconds=self.tls[tl_id]['switching_time'])
target_vel = (distance / float(time_until_green.total_seconds())) * 3.6
print('Speed should be way lower than 130: {}'.format(target_vel))
response_channel = self._velocity_mbs[received_daf_object.vehicle_identification_number]
print('Target velocity: {}'.format(target_vel))
response_channel.send(pickle.dumps(
TargetVelocity(vin=received_daf_object.vehicle_identification_number, target_velocity=target_vel,
timestamp=datetime.now())))
def handle_tl_state_receive(self, msg):
"""
Gets a dict full of traffic light state information:
{
'tlid': str,
'color': TrafficLightColor,
'last_switch': datetime
}
Updates internal information about available TLs
"""
tl_state: TrafficLightState = pickle.loads(msg)
self.tls[tl_state.tlid]['color'] = tl_state.color
self.tls[tl_state.tlid]['last_switch'] = tl_state.last_switch
print(tl_state)