outsource _switcher to make it easily testable;
test vehicle props; test traffic light _switcher;
This commit is contained in:
parent
8cb45d9b23
commit
643f98f121
@ -56,23 +56,10 @@ class TrafficLight:
|
|||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
"""
|
"""
|
||||||
Starts the traffic light by spawning a new thread. It toggles its state every self.switching_time / SCALING
|
Starts the traffic light by spawning a new thread..
|
||||||
seconds. When it switches, it notifies the orchestrator with self.send_status_update().
|
|
||||||
"""
|
"""
|
||||||
self.running = True
|
self.running = True
|
||||||
|
self._t = threading.Thread(target=self._switcher)
|
||||||
def switcher():
|
|
||||||
num_colors = len(TrafficLightColor)
|
|
||||||
# set current color to the one before starting color, because switch immediately happens in loop
|
|
||||||
# therefore it sleeps on the end of the loop and first status is immediately sent to msg broker
|
|
||||||
self.current_color = TrafficLightColor((self._starting_color.value - 1) % num_colors)
|
|
||||||
while self.running:
|
|
||||||
self.current_color = TrafficLightColor((self.current_color.value + 1) % num_colors)
|
|
||||||
self.last_switch = datetime.now()
|
|
||||||
self.send_status_update()
|
|
||||||
time.sleep(self.switching_time / SCALING)
|
|
||||||
|
|
||||||
self._t = threading.Thread(target=switcher)
|
|
||||||
self._t.start()
|
self._t.start()
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
@ -91,6 +78,21 @@ class TrafficLight:
|
|||||||
self._tl_mb.send(pickle.dumps(
|
self._tl_mb.send(pickle.dumps(
|
||||||
TrafficLightState(tlid=self.tlid, color=self.current_color, last_switch=self.last_switch)))
|
TrafficLightState(tlid=self.tlid, color=self.current_color, last_switch=self.last_switch)))
|
||||||
|
|
||||||
|
def _switcher(self):
|
||||||
|
"""
|
||||||
|
Toggles the traffic lights state every self.switching_time / SCALING seconds.
|
||||||
|
When it switches, it notifies the orchestrator with self.send_status_update()
|
||||||
|
"""
|
||||||
|
num_colors = len(TrafficLightColor)
|
||||||
|
# set current color to the one before starting color, because switch immediately happens in loop
|
||||||
|
# therefore it sleeps on the end of the loop and first status is immediately sent to msg broker
|
||||||
|
self.current_color = TrafficLightColor((self._starting_color.value - 1) % num_colors)
|
||||||
|
while self.running:
|
||||||
|
self.current_color = TrafficLightColor((self.current_color.value + 1) % num_colors)
|
||||||
|
self.last_switch = datetime.now()
|
||||||
|
self.send_status_update()
|
||||||
|
time.sleep(self.switching_time / SCALING)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
...
|
...
|
||||||
|
|||||||
103
components/i_feed/test_ifeed.py
Normal file
103
components/i_feed/test_ifeed.py
Normal file
@ -0,0 +1,103 @@
|
|||||||
|
import datetime
|
||||||
|
import pickle
|
||||||
|
import time
|
||||||
|
import unittest
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
import geopy
|
||||||
|
from dse_shared_libs.daf import DAF
|
||||||
|
from dse_shared_libs.target_velocity import TargetVelocity
|
||||||
|
from dse_shared_libs.traffic_light_color import TrafficLightColor
|
||||||
|
|
||||||
|
from components.i_feed.devices.traffic_light import TrafficLight
|
||||||
|
from devices.vehicle import Vehicle
|
||||||
|
from dse_shared_libs.mock.rabbit_mq_mocks import MyBlockingConnection
|
||||||
|
|
||||||
|
|
||||||
|
class TestVehicle(unittest.TestCase):
|
||||||
|
def setUp(self) -> None:
|
||||||
|
with patch('pika.BlockingConnection', MyBlockingConnection):
|
||||||
|
self.vin = 'my_vin'
|
||||||
|
self.starting_velocity = 130
|
||||||
|
self.starting_point = geopy.Point(0, 0, 0)
|
||||||
|
self.timestamp = datetime.datetime.now()
|
||||||
|
|
||||||
|
self.v = Vehicle(
|
||||||
|
vin=self.vin,
|
||||||
|
starting_point=self.starting_point,
|
||||||
|
starting_velocity=self.starting_velocity
|
||||||
|
)
|
||||||
|
|
||||||
|
self.v.last_update = self.timestamp
|
||||||
|
self.v._driven_kms = 0
|
||||||
|
|
||||||
|
def test_nce_prop(self):
|
||||||
|
with patch('time.sleep', return_value=None):
|
||||||
|
# initially false
|
||||||
|
self.assertEqual(self.v.nce, False)
|
||||||
|
# get past nce km
|
||||||
|
self.v._driven_kms = 3
|
||||||
|
# now nce should fire
|
||||||
|
self.assertEqual(self.v.nce, True)
|
||||||
|
# second call is false again
|
||||||
|
self.assertEqual(self.v.nce, False)
|
||||||
|
|
||||||
|
def test_daf_prop(self):
|
||||||
|
# test if daf object created properly
|
||||||
|
daf = DAF(vehicle_identification_number=self.vin,
|
||||||
|
gps_location=self.starting_point,
|
||||||
|
velocity=self.starting_velocity,
|
||||||
|
timestamp=self.timestamp,
|
||||||
|
near_crash_event=False)
|
||||||
|
self.assertEqual(self.v.daf, daf)
|
||||||
|
|
||||||
|
def test_current_location_prop(self):
|
||||||
|
# initial position should be starting point
|
||||||
|
self.assertEqual(self.v.gps_location, self.starting_point)
|
||||||
|
|
||||||
|
# lets say last time was about an hour ago
|
||||||
|
self.v.last_update = self.timestamp - datetime.timedelta(hours=1)
|
||||||
|
|
||||||
|
# we drive "almost" directly north
|
||||||
|
loc = self.v.gps_location
|
||||||
|
self.assertEqual(round(loc.latitude, 2), 1.17)
|
||||||
|
self.assertEqual(round(loc.longitude, 2), 0.04)
|
||||||
|
|
||||||
|
def test_driven_kms_prop(self):
|
||||||
|
# initial driven kms are 0
|
||||||
|
self.assertEqual(self.v.driven_kms, '0km')
|
||||||
|
|
||||||
|
# alter driven kms
|
||||||
|
self.v._driven_kms = 44.24552
|
||||||
|
# will be rounded to 2 digits
|
||||||
|
self.assertEqual(self.v.driven_kms, '44.25km')
|
||||||
|
|
||||||
|
def test_new_velocity(self):
|
||||||
|
# starting velocity is set to 130
|
||||||
|
self.assertEqual(self.v.velocity, 130)
|
||||||
|
|
||||||
|
# we get a new velocity, lets say 55 ...
|
||||||
|
tv = TargetVelocity(vin='my_vin', target_velocity=55, timestamp=self.timestamp)
|
||||||
|
self.v.new_velocity(pickle.dumps(tv))
|
||||||
|
# then the vehicle should comply with that
|
||||||
|
self.assertEqual(self.v.velocity, 55)
|
||||||
|
|
||||||
|
|
||||||
|
class TestTrafficLight(unittest.TestCase):
|
||||||
|
def setUp(self) -> None:
|
||||||
|
with patch('pika.BlockingConnection', MyBlockingConnection):
|
||||||
|
self.tl = TrafficLight(tlid='my_tl',
|
||||||
|
switching_time=1,
|
||||||
|
starting_color=TrafficLightColor.RED)
|
||||||
|
|
||||||
|
def tearDown(self) -> None:
|
||||||
|
self.tl.stop()
|
||||||
|
|
||||||
|
def test_switching(self):
|
||||||
|
self.assertFalse(hasattr(self.tl, 'current_color'))
|
||||||
|
self.tl.start()
|
||||||
|
self.assertEqual(self.tl.current_color, TrafficLightColor.RED)
|
||||||
|
time.sleep(1.2)
|
||||||
|
self.assertEqual(self.tl.current_color, TrafficLightColor.GREEN)
|
||||||
|
time.sleep(1)
|
||||||
|
self.assertEqual(self.tl.current_color, TrafficLightColor.RED)
|
||||||
Loading…
x
Reference in New Issue
Block a user