import datetime import unittest from unittest.mock import patch from dse_shared_libs.mock.response import MyResponse from dse_shared_libs.traffic_light_color import TrafficLightColor from orchestrator import Orchestrator class TestOrchestrator(unittest.TestCase): def setUp(self) -> None: self.timestamp = datetime.datetime.utcnow() with patch('requests.sessions.Session.get', MyResponse): self.orc = Orchestrator() def test_full_speed_if_nothing_in_range(self): tl_geo = {'cursor': []} current_vel = 55.0 target_vel = self.orc._compute_velocity(tl_geo, current_vel) self.assertEqual(130, target_vel) def test_keep_speed_if_far_away_and_green(self): self.orc.tls = {'1': {'color': TrafficLightColor.GREEN, 'switching_time': 1000, 'last_switch': self.timestamp}} tl_geo = {'cursor': [ {'id': '1', 'calculatedRange': 1000} ]} current_vel = 55.0 target_vel = self.orc._compute_velocity(tl_geo, current_vel) self.assertEqual(current_vel, target_vel) def test_slow_down_if_passing_RED_not_possible(self): self.orc.tls = {'1': {'color': TrafficLightColor.RED, 'switching_time': 100000, 'last_switch': self.timestamp}} tl_geo = {'cursor': [ {'id': '1', 'calculatedRange': 1} ]} current_vel = 130.0 target_vel = self.orc._compute_velocity(tl_geo, current_vel) self.assertNotEqual(current_vel, target_vel) self.assertEqual(0, target_vel) def test_adjust_speed_to_get_over_on_green_if_currently_red(self): self.orc.tls = {'1': {'color': TrafficLightColor.RED, 'switching_time': 100, 'last_switch': self.timestamp}} tl_geo = {'cursor': [ {'id': '1', 'calculatedRange': 1000} ]} current_vel = 130.0 target_vel = self.orc._compute_velocity(tl_geo, current_vel) self.assertNotEqual(current_vel, target_vel) self.assertEqual(36.0, target_vel)