import pickle import threading import time from datetime import datetime from circuitbreaker import circuit # Default switching time in seconds from dse_shared_libs.traffic_light_color import TrafficLightColor from dse_shared_libs.message_broker_wrapper import MBWrapper from dse_shared_libs.traffic_light_state import TrafficLightState from pika.exceptions import AMQPConnectionError SWITCHING_TIME = 15 # Scale speed of switching by factor x SCALING = 6 class TrafficLight: # id of traffic light tlid: str # time between switches in seconds switching_time: int # timestamp of last switch last_switch: datetime # currently lit color current_color: TrafficLightColor # marks if traffic light started running: bool = False # stores starting color _starting_color: TrafficLightColor # stores runner thread _t: threading.Thread # outgoing traffic light state MBWrapper _tl_mb: MBWrapper def __init__(self, tlid: str, switching_time: int = SWITCHING_TIME, starting_color: TrafficLightColor = TrafficLightColor.RED): """ Init a new Traffic Light client. This will already initialize its message queue. Start it with self.start(). :param tlid: ID of the traffic light :param switching_time: time in seconds to switch between states (TrafficLightColor.RED and .GREEN) :param starting_color: TrafficLightColor to start with """ self.tlid = tlid self.switching_time = switching_time self._starting_color = starting_color self._tl_mb = MBWrapper(exchange_name='traffic_light_state_{}'.format(self.tlid)) self._tl_mb.setup_sender() def start(self): """ Starts the traffic light by spawning a new thread. It toggles its state every self.switching_time / SCALING seconds. When it switches, it notifies the orchestrator with self.send_status_update(). """ self.running = True def switcher(): num_colors = len(TrafficLightColor) # set current color to the one before starting color, because switch immediately happens in loop # therefore it sleeps on the end of the loop and first status is immediately sent to msg broker self.current_color = TrafficLightColor((self._starting_color.value - 1) % num_colors) while self.running: self.current_color = TrafficLightColor((self.current_color.value + 1) % num_colors) self.last_switch = datetime.now() self.send_status_update() time.sleep(self.switching_time / SCALING) self._t = threading.Thread(target=switcher) self._t.start() def stop(self): """ The running thread will stop its execution at the next evaluation of while self.running. """ self.running = False @circuit(failure_threshold=10, expected_exception=AMQPConnectionError) def send_status_update(self): """ Sends a status update containing an updated TrafficLightState object to the orchestrator. This update contains the tlid, current_color and timestamp of the last switch. """ print(self.tlid, self.current_color, self.last_switch) self._tl_mb.send(pickle.dumps( TrafficLightState(tlid=self.tlid, color=self.current_color, last_switch=self.last_switch))) if __name__ == '__main__': ...