import pickle import threading import time from datetime import datetime from circuitbreaker import circuit # Default switching time in seconds from dse_shared_libs.traffic_light_color import TrafficLightColor from dse_shared_libs.message_broker_wrapper import MBWrapper from dse_shared_libs.traffic_light_state import TrafficLightState from pika.exceptions import AMQPConnectionError SWITCHING_TIME = 500 # Scale speed of switching by factor x SCALING = 100 class TrafficLight: # id of traffic light tlid: str # time between switches in seconds switching_time: int # timestamp of last switch last_switch: datetime # currently lit color current_color: TrafficLightColor # marks if traffic light started running: bool = False # stores starting color _starting_color: TrafficLightColor # stores runner thread _t: threading.Thread # outgoing traffic light state MBWrapper _tl_mb: MBWrapper def __init__(self, tlid: str, switching_time: int = SWITCHING_TIME, starting_color: TrafficLightColor = TrafficLightColor.RED): self.tlid = tlid self.switching_time = switching_time self._starting_color = starting_color self._tl_mb = MBWrapper(exchange_name='traffic_light_state_{}'.format(self.tlid)) self._tl_mb.setup_sender() def start(self): self.running = True def switcher(): num_colors = len(TrafficLightColor) # set current color to the one before starting color, because switch immediately happens in loop # therefore it sleeps on the end of the loop and first status is immediately sent to msg broker self.current_color = TrafficLightColor((self._starting_color.value - 1) % num_colors) while self.running: self.current_color = TrafficLightColor((self.current_color.value + 1) % num_colors) self.last_switch = datetime.now() self.send_status_update() time.sleep(self.switching_time / SCALING) self._t = threading.Thread(target=switcher) self._t.start() def stop(self): self.running = False @circuit(failure_threshold=10, expected_exception=AMQPConnectionError) def send_status_update(self): print(self.tlid, self.current_color, self.last_switch) self._tl_mb.send(pickle.dumps( TrafficLightState(tlid=self.tlid, color=self.current_color, last_switch=self.last_switch))) if __name__ == '__main__': ...