91 lines
2.8 KiB
Python
91 lines
2.8 KiB
Python
import pickle
|
|
import threading
|
|
import time
|
|
from datetime import datetime
|
|
|
|
from circuitbreaker import circuit
|
|
|
|
# Default switching time in seconds
|
|
from shared import TrafficLightColor
|
|
from shared.TrafficLightColor import TrafficLightColor
|
|
from shared.message_broker_wrapper import MBWrapper
|
|
|
|
SWITCHING_TIME = 120
|
|
# Scale speed of switching by factor x
|
|
SCALING = 100
|
|
|
|
|
|
class TrafficLight:
|
|
# id of traffic light
|
|
tlid: str
|
|
# time between switches in seconds
|
|
switching_time: int
|
|
# timestamp of last switch
|
|
last_switch: datetime
|
|
# currently lit color
|
|
current_color: TrafficLightColor
|
|
# marks if traffic light started
|
|
running: bool = False
|
|
# stores starting color
|
|
_starting_color: TrafficLightColor
|
|
# stores runner thread
|
|
_t: threading.Thread
|
|
|
|
# outgoing traffic light state MBWrapper
|
|
_tl_mb: MBWrapper
|
|
|
|
def __init__(self,
|
|
tlid: str,
|
|
switching_time: int = SWITCHING_TIME,
|
|
starting_color: TrafficLightColor = TrafficLightColor.RED):
|
|
self.tlid = tlid
|
|
self.switching_time = switching_time
|
|
self._starting_color = starting_color
|
|
|
|
self._tl_mb = MBWrapper(exchange_name='traffic_light_state_{}'.format(self.tlid))
|
|
self._tl_mb.setup_sender()
|
|
|
|
def start(self):
|
|
self.running = True
|
|
|
|
def switcher():
|
|
num_colors = len(TrafficLightColor)
|
|
# set current color to the one before starting color, because switch immediately happens in loop
|
|
# therefore it sleeps on the end of the loop and first status is immediately sent to msg broker
|
|
self.current_color = TrafficLightColor((self._starting_color.value - 1) % num_colors)
|
|
while self.running:
|
|
self.current_color = TrafficLightColor((self.current_color.value + 1) % num_colors)
|
|
self.last_switch = datetime.now()
|
|
self.send_status_update()
|
|
time.sleep(self.switching_time / SCALING)
|
|
|
|
self._t = threading.Thread(target=switcher)
|
|
self._t.start()
|
|
|
|
def stop(self):
|
|
self.running = False
|
|
|
|
@circuit(failure_threshold=10, expected_exception=ConnectionError)
|
|
def send_status_update(self):
|
|
print(self.tlid, self.current_color, self.last_switch)
|
|
self._tl_mb.send(pickle.dumps({
|
|
'tlid': self.tlid,
|
|
'color': self.current_color,
|
|
'last_switch': self.last_switch
|
|
}))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# TODO fetch and use data from Entity Ident
|
|
|
|
tl1 = TrafficLight(tlid='traffic-light-1', switching_time=120)
|
|
tl1.start()
|
|
|
|
time.sleep(1)
|
|
tl2 = TrafficLight(tlid='traffic-light-2', switching_time=240)
|
|
tl2.start()
|
|
|
|
time.sleep(1)
|
|
tl3 = TrafficLight(tlid='traffic-light-3', switching_time=360)
|
|
tl3.start()
|