added simple traffic_light

This commit is contained in:
Marco Zeisler 2021-05-17 18:14:29 +02:00
parent 8d3d08e28e
commit 731d6a564a
2 changed files with 63 additions and 4 deletions

View File

@ -1,6 +1,65 @@
import threading
import time
from datetime import datetime
from enum import Enum
from circuitbreaker import circuit
# Default switching time in seconds
SWITCHING_TIME = 120
# Scale speed of switching by factor x
SCALING = 100
class TrafficLight:
class Color(Enum):
GREEN = 0
RED = 1
# time between switches in seconds
switching_time: int
# timestamp of last switch
last_switch: datetime
# currently lit color
current_color: Color
# marks if traffic light started
running: bool = False
# stores starting color
_starting_color: Color
# stores runner thread
_t: threading.Thread
def __init__(self, switching_time: int = SWITCHING_TIME, starting_color: Color = Color.RED):
self.switching_time = switching_time
self._starting_color = starting_color
def start(self):
self.running = True
def switcher():
num_colors = len(self.Color)
# set current color to the one before starting color, because switch immediately happens in loop
# therefore it sleeps on the end of the loop and first status is immediately sent to msg broker
self.current_color = TrafficLight.Color((self._starting_color.value - 1) % num_colors)
while self.running:
self.current_color = self.Color((self.current_color.value + 1) % num_colors)
self.last_switch = datetime.now()
self.send_status_update()
time.sleep(self.switching_time / SCALING)
self._t = threading.Thread(target=switcher)
self._t.start()
def stop(self):
self.running = False
@circuit(failure_threshold=10, expected_exception=ConnectionError)
def send_status_update(self):
# TODO inform the message broker about the current status
print(self.current_color, self.last_switch)
if __name__ == '__main__':
tl1 = TrafficLight(3)
tl1.start()
@circuit(failure_threshold=10, expected_exception=ConnectionError)
def external_call():
...

View File

@ -184,7 +184,7 @@ class Vehicle:
@circuit(failure_threshold=10, expected_exception=ConnectionError)
def send_status_update(self):
# TODO inform the message broker about the current status
print(v1.driven_kms, '\t', v1.daf)
print(self.driven_kms, '\t', self.daf)
# TODO adapt the vehicle's velocity to the suggestion of the orchestration service
def new_velocity(self):