diff --git a/components/i_feed/devices/traffic_light.py b/components/i_feed/devices/traffic_light.py index 3b3c7e8..710379c 100644 --- a/components/i_feed/devices/traffic_light.py +++ b/components/i_feed/devices/traffic_light.py @@ -56,23 +56,10 @@ class TrafficLight: def start(self): """ - Starts the traffic light by spawning a new thread. It toggles its state every self.switching_time / SCALING - seconds. When it switches, it notifies the orchestrator with self.send_status_update(). + Starts the traffic light by spawning a new thread.. """ self.running = True - - def switcher(): - num_colors = len(TrafficLightColor) - # set current color to the one before starting color, because switch immediately happens in loop - # therefore it sleeps on the end of the loop and first status is immediately sent to msg broker - self.current_color = TrafficLightColor((self._starting_color.value - 1) % num_colors) - while self.running: - self.current_color = TrafficLightColor((self.current_color.value + 1) % num_colors) - self.last_switch = datetime.now() - self.send_status_update() - time.sleep(self.switching_time / SCALING) - - self._t = threading.Thread(target=switcher) + self._t = threading.Thread(target=self._switcher) self._t.start() def stop(self): @@ -91,6 +78,21 @@ class TrafficLight: self._tl_mb.send(pickle.dumps( TrafficLightState(tlid=self.tlid, color=self.current_color, last_switch=self.last_switch))) + def _switcher(self): + """ + Toggles the traffic lights state every self.switching_time / SCALING seconds. + When it switches, it notifies the orchestrator with self.send_status_update() + """ + num_colors = len(TrafficLightColor) + # set current color to the one before starting color, because switch immediately happens in loop + # therefore it sleeps on the end of the loop and first status is immediately sent to msg broker + self.current_color = TrafficLightColor((self._starting_color.value - 1) % num_colors) + while self.running: + self.current_color = TrafficLightColor((self.current_color.value + 1) % num_colors) + self.last_switch = datetime.now() + self.send_status_update() + time.sleep(self.switching_time / SCALING) + if __name__ == '__main__': ... diff --git a/components/i_feed/test_ifeed.py b/components/i_feed/test_ifeed.py new file mode 100644 index 0000000..45f5011 --- /dev/null +++ b/components/i_feed/test_ifeed.py @@ -0,0 +1,103 @@ +import datetime +import pickle +import time +import unittest +from unittest.mock import patch + +import geopy +from dse_shared_libs.daf import DAF +from dse_shared_libs.target_velocity import TargetVelocity +from dse_shared_libs.traffic_light_color import TrafficLightColor + +from components.i_feed.devices.traffic_light import TrafficLight +from devices.vehicle import Vehicle +from dse_shared_libs.mock.rabbit_mq_mocks import MyBlockingConnection + + +class TestVehicle(unittest.TestCase): + def setUp(self) -> None: + with patch('pika.BlockingConnection', MyBlockingConnection): + self.vin = 'my_vin' + self.starting_velocity = 130 + self.starting_point = geopy.Point(0, 0, 0) + self.timestamp = datetime.datetime.now() + + self.v = Vehicle( + vin=self.vin, + starting_point=self.starting_point, + starting_velocity=self.starting_velocity + ) + + self.v.last_update = self.timestamp + self.v._driven_kms = 0 + + def test_nce_prop(self): + with patch('time.sleep', return_value=None): + # initially false + self.assertEqual(self.v.nce, False) + # get past nce km + self.v._driven_kms = 3 + # now nce should fire + self.assertEqual(self.v.nce, True) + # second call is false again + self.assertEqual(self.v.nce, False) + + def test_daf_prop(self): + # test if daf object created properly + daf = DAF(vehicle_identification_number=self.vin, + gps_location=self.starting_point, + velocity=self.starting_velocity, + timestamp=self.timestamp, + near_crash_event=False) + self.assertEqual(self.v.daf, daf) + + def test_current_location_prop(self): + # initial position should be starting point + self.assertEqual(self.v.gps_location, self.starting_point) + + # lets say last time was about an hour ago + self.v.last_update = self.timestamp - datetime.timedelta(hours=1) + + # we drive "almost" directly north + loc = self.v.gps_location + self.assertEqual(round(loc.latitude, 2), 1.17) + self.assertEqual(round(loc.longitude, 2), 0.04) + + def test_driven_kms_prop(self): + # initial driven kms are 0 + self.assertEqual(self.v.driven_kms, '0km') + + # alter driven kms + self.v._driven_kms = 44.24552 + # will be rounded to 2 digits + self.assertEqual(self.v.driven_kms, '44.25km') + + def test_new_velocity(self): + # starting velocity is set to 130 + self.assertEqual(self.v.velocity, 130) + + # we get a new velocity, lets say 55 ... + tv = TargetVelocity(vin='my_vin', target_velocity=55, timestamp=self.timestamp) + self.v.new_velocity(pickle.dumps(tv)) + # then the vehicle should comply with that + self.assertEqual(self.v.velocity, 55) + + +class TestTrafficLight(unittest.TestCase): + def setUp(self) -> None: + with patch('pika.BlockingConnection', MyBlockingConnection): + self.tl = TrafficLight(tlid='my_tl', + switching_time=1, + starting_color=TrafficLightColor.RED) + + def tearDown(self) -> None: + self.tl.stop() + + def test_switching(self): + self.assertFalse(hasattr(self.tl, 'current_color')) + self.tl.start() + self.assertEqual(self.tl.current_color, TrafficLightColor.RED) + time.sleep(1.2) + self.assertEqual(self.tl.current_color, TrafficLightColor.GREEN) + time.sleep(1) + self.assertEqual(self.tl.current_color, TrafficLightColor.RED)