import threading import time from datetime import datetime from enum import Enum from circuitbreaker import circuit # Default switching time in seconds SWITCHING_TIME = 120 # Scale speed of switching by factor x SCALING = 100 class TrafficLight: class Color(Enum): GREEN = 0 RED = 1 # id of traffic light tlid: str # time between switches in seconds switching_time: int # timestamp of last switch last_switch: datetime # currently lit color current_color: Color # marks if traffic light started running: bool = False # stores starting color _starting_color: Color # stores runner thread _t: threading.Thread def __init__(self, tlid: str, switching_time: int = SWITCHING_TIME, starting_color: Color = Color.RED): self.tlid = tlid self.switching_time = switching_time self._starting_color = starting_color def start(self): self.running = True def switcher(): num_colors = len(self.Color) # set current color to the one before starting color, because switch immediately happens in loop # therefore it sleeps on the end of the loop and first status is immediately sent to msg broker self.current_color = TrafficLight.Color((self._starting_color.value - 1) % num_colors) while self.running: self.current_color = TrafficLight.Color((self.current_color.value + 1) % num_colors) self.last_switch = datetime.now() self.send_status_update() time.sleep(self.switching_time / SCALING) self._t = threading.Thread(target=switcher) self._t.start() def stop(self): self.running = False @circuit(failure_threshold=10, expected_exception=ConnectionError) def send_status_update(self): # TODO inform the message broker about the current status print(self.tlid, self.current_color, self.last_switch) if __name__ == '__main__': tl1 = TrafficLight(tlid='tl1') tl1.start()