outsource _compute_velocity to make it more easily testable

This commit is contained in:
Marco Zeisler 2021-06-17 22:18:51 +02:00
parent c1ee6c55a0
commit 528bf1b370

View File

@ -88,7 +88,7 @@ class Orchestrator:
""" """
Gets the daf object's pickle binary dump. Gets the daf object's pickle binary dump.
Unpickle, calculate new target velocity based on daf and current tl data and Unpickle, calculate new target velocity based on daf and current tl data and
respond new target velicity for this vehicle. respond new target velocity for this vehicle.
:param pickle_binary: daf object pickle binary dump :param pickle_binary: daf object pickle binary dump
""" """
@ -100,6 +100,31 @@ class Orchestrator:
params={'lat': loc.latitude, 'lon': loc.longitude}) params={'lat': loc.latitude, 'lon': loc.longitude})
traffic_lights_geo = response.json() traffic_lights_geo = response.json()
current_vel = received_daf_object.velocity current_vel = received_daf_object.velocity
target_vel = self._compute_velocity(traffic_lights_geo, current_vel)
response_channel = self._velocity_mbs[received_daf_object.vehicle_identification_number]
print('Target velocity: {}'.format(target_vel))
response_channel.send(pickle.dumps(
TargetVelocity(vin=received_daf_object.vehicle_identification_number, target_velocity=target_vel,
timestamp=datetime.now())))
def handle_tl_state_receive(self, msg):
"""
Gets a dict full of traffic light state information:
{
'tlid': str,
'color': TrafficLightColor,
'last_switch': datetime
}
Updates internal information about available TLs
"""
tl_state: TrafficLightState = pickle.loads(msg)
self.tls[tl_state.tlid]['color'] = tl_state.color
self.tls[tl_state.tlid]['last_switch'] = tl_state.last_switch
print(tl_state)
def _compute_velocity(self, traffic_lights_geo, current_vel):
target_vel = 130 * SCALING target_vel = 130 * SCALING
print('Nearest traffic lights: {}'.format(traffic_lights_geo['cursor'])) print('Nearest traffic lights: {}'.format(traffic_lights_geo['cursor']))
@ -148,23 +173,4 @@ class Orchestrator:
i = i + 2 i = i + 2
target_vel = floor(speed_needed_max) target_vel = floor(speed_needed_max)
response_channel = self._velocity_mbs[received_daf_object.vehicle_identification_number] return target_vel
print('Target velocity: {}'.format(target_vel))
response_channel.send(pickle.dumps(
TargetVelocity(vin=received_daf_object.vehicle_identification_number, target_velocity=target_vel,
timestamp=datetime.now())))
def handle_tl_state_receive(self, msg):
"""
Gets a dict full of traffic light state information:
{
'tlid': str,
'color': TrafficLightColor,
'last_switch': datetime
}
Updates internal information about available TLs
"""
tl_state: TrafficLightState = pickle.loads(msg)
self.tls[tl_state.tlid]['color'] = tl_state.color
self.tls[tl_state.tlid]['last_switch'] = tl_state.last_switch
print(tl_state)