added docu

This commit is contained in:
Marco Zeisler 2021-05-15 17:46:34 +02:00
parent 2eed541718
commit 220f4674e5

View File

@ -16,6 +16,8 @@ STARTING_VELOCITY = 130
BEARING = 0 BEARING = 0
# Scale speed of vehicles by factor x # Scale speed of vehicles by factor x
SCALING = 100 SCALING = 100
# Interval between status updates in seconds (is not scaled)
UPDATE_INTERVAL = 1
# At x km the NCE shall happen # At x km the NCE shall happen
NCE_KM = 30 NCE_KM = 30
# Time in seconds to recover from NCE (will be scaled) # Time in seconds to recover from NCE (will be scaled)
@ -25,15 +27,30 @@ RESET_KM = 50
class Vehicle: class Vehicle:
t: threading.Thread # vehicle identification number
vin: str vin: str
# velocity in km/h
velocity: float velocity: float
# stores the timestamp most recently used to calculate driving distance
last_update: Union[datetime, None]
# marks if vehicle is currently driving
driving: bool = False
# stores last velocity before NCE, is used to recover after NCE
_last_velocity: float _last_velocity: float
timestamp: Union[datetime, None] # stores the so-far driven kilometers
_driven_kms: int _driven_kms: int
# stores the starting point for resetting the vehicle
_starting_point: geopy.Point _starting_point: geopy.Point
# stores the gps_location, shall be accessed by gps_location, where the actual value is calculated and returned
_gps_location: geopy.Point _gps_location: geopy.Point
# stores if nce possible
_nce_possible = True
# stores if nce already happened, it only happens (up to) once
_nce_happened = False _nce_happened = False
# continuous driving thread
_t: threading.Thread
# NCE recovery thread
_rt: threading.Thread
def __init__(self, def __init__(self,
vin: str, vin: str,
@ -46,33 +63,43 @@ class Vehicle:
@property @property
def nce(self): def nce(self):
if not self._nce_happened: """
On accessing this property, it is calculated if the NCE shall happen. NCE only happens up to once per route.
:return: True if NCE invoked, otherwise False
"""
if self._nce_possible and not self._nce_happened:
if self._driven_kms >= NCE_KM: if self._driven_kms >= NCE_KM:
self._nce_happened = True self._nce_happened = True
self._last_velocity = self.velocity self._last_velocity = self.velocity
self.velocity = 0 self.velocity = 0
threading.Thread(target=self._sleep_and_recover_from_nce).start()
return True
return False
def _sleep_and_recover_from_nce(self): def _sleep_and_recover():
recover_in = TIME_TO_RECOVER / SCALING recover_in = TIME_TO_RECOVER / SCALING
print('\nNCE !!! Recovering in {} (scaled) seconds.\n'.format(recover_in)) print('\nNCE !!! Recovering in {} (scaled) seconds.\n'.format(recover_in))
time.sleep(recover_in) time.sleep(recover_in)
print('\nRecovered.\n') print('\nRecovered.\n')
self.velocity = self._last_velocity self.velocity = self._last_velocity
self._rt = threading.Thread(target=_sleep_and_recover)
self._rt.start()
return True
return False
@nce.setter @nce.setter
def nce(self, val): def nce(self, val):
self._nce_happened = val self._nce_happened = val
@property @property
def daf(self): def daf(self):
"""
:return: "Datenaufzeichnung für automatisiertes Fahren" (DAF) object
"""
return DAF(vehicle_identification_number=self.vin, return DAF(vehicle_identification_number=self.vin,
gps_location=self.gps_location, gps_location=self.gps_location,
timestamp=self.timestamp,
near_crash_event=self.nce, near_crash_event=self.nce,
velocity=self.velocity, velocity=self.velocity,
timestamp=self.last_update,
) )
@property @property
@ -87,9 +114,9 @@ class Vehicle:
start = self._gps_location start = self._gps_location
# Get old and updated timestamps # Get old and updated timestamps
old_timestamp = self.timestamp old_timestamp = self.last_update
updated_timestamp = datetime.now() updated_timestamp = datetime.now()
self.timestamp = updated_timestamp self.last_update = updated_timestamp
# get driving time between timestamps (in seconds) # get driving time between timestamps (in seconds)
driving_time = (updated_timestamp - old_timestamp).total_seconds() driving_time = (updated_timestamp - old_timestamp).total_seconds()
@ -113,36 +140,56 @@ class Vehicle:
return '{}km'.format(round(self._driven_kms, 2)) return '{}km'.format(round(self._driven_kms, 2))
def start_driving(self): def start_driving(self):
"""
Starts a thread responsible for driving. This thread continuously updates internal values and
informs the message broker about the current state (DAF) of the vehicle.
"""
print('Start driving ... SCALING: x{}\n\n'.format(SCALING)) print('Start driving ... SCALING: x{}\n\n'.format(SCALING))
self.timestamp = datetime.now() self.last_update = datetime.now()
self._driven_kms = 0 self._driven_kms = 0
self.t = threading.Thread(target=self.drive) self._t = threading.Thread(target=self.drive)
self.t.start() self._t.start()
def drive(self): def drive(self):
while self.timestamp: """
Drive until not self.driving == update status every UPDATE_INTERVAL seconds.
"""
self.driving = True
while self.driving:
self.check_reset() self.check_reset()
self.external_call() self.external_call()
time.sleep(1) time.sleep(UPDATE_INTERVAL)
def stop_driving(self): def stop_driving(self):
"""
Set self.driving to stop driving thread gracefully.
"""
self.driving = False
self._gps_location = self._starting_point self._gps_location = self._starting_point
self._driven_kms = 0 self._driven_kms = 0
self.timestamp = None
def check_reset(self): def check_reset(self):
"""
Checks if end of route is reached and resets vehicle to starting conditions.
Afterwards, the vehicle is still driving, but again from start.
"""
if self._driven_kms >= RESET_KM: if self._driven_kms >= RESET_KM:
print('\n\nEnd of route reached ... resetting and restarting vehicle') print('\n\nEnd of route reached ... resetting and restarting vehicle')
self._gps_location = self._starting_point self._gps_location = self._starting_point
self._driven_kms = 0 self._driven_kms = 0
self.timestamp = datetime.now() self.last_update = datetime.now()
self.nce = False self.nce = False
@circuit(failure_threshold=10, expected_exception=ConnectionError) @circuit(failure_threshold=10, expected_exception=ConnectionError)
def external_call(self): def external_call(self):
# TODO inform the message broker about the current status
print(v1.driven_kms, '\t', v1.daf) print(v1.driven_kms, '\t', v1.daf)
# TODO adapt the vehicle's velocity to the suggestion of the orchestration service
def new_velocity(self):
...
if __name__ == "__main__": if __name__ == "__main__":
v1 = Vehicle(vin='SB164ABN1PE082986') v1 = Vehicle(vin='SB164ABN1PE082986')