implemented NCE recovery;

This commit is contained in:
Marco Zeisler 2021-05-15 17:27:58 +02:00
parent b8ca4e6bfa
commit 2eed541718

View File

@ -1,3 +1,4 @@
import threading
import time
from datetime import datetime
from typing import Union
@ -14,16 +15,23 @@ STARTING_VELOCITY = 130
# Driving direction in degrees: 0=N, 90=E, 180=S, 270=W
BEARING = 0
# Scale speed of vehicles by factor x
SCALING = 1
SCALING = 100
# At x km the NCE shall happen
NCE_KM = 0.1
NCE_KM = 30
# Time in seconds to recover from NCE (will be scaled)
TIME_TO_RECOVER = 100
# Resets vehicle at km x
RESET_KM = 50
class Vehicle:
t: threading.Thread
vin: str
velocity: float
_last_velocity: float
timestamp: Union[datetime, None]
_driven_kms: int
_starting_point: geopy.Point
_gps_location: geopy.Point
_nce_happened = False
@ -32,18 +40,31 @@ class Vehicle:
starting_point: geopy.Point = STARTING_POINT,
starting_velocity: float = STARTING_VELOCITY):
self.vin = vin
self._starting_point = starting_point
self._gps_location = starting_point
self.velocity = starting_velocity
@property
def nce(self):
nce = self._nce_happened
if not nce:
if not self._nce_happened:
if self._driven_kms >= NCE_KM:
nce = True
self._nce_happened = nce
self._nce_happened = True
self._last_velocity = self.velocity
self.velocity = 0
return nce
threading.Thread(target=self._sleep_and_recover_from_nce).start()
return True
return False
def _sleep_and_recover_from_nce(self):
recover_in = TIME_TO_RECOVER / SCALING
print('\nNCE !!! Recovering in {} (scaled) seconds.\n'.format(recover_in))
time.sleep(recover_in)
print('\nRecovered.\n')
self.velocity = self._last_velocity
@nce.setter
def nce(self, val):
self._nce_happened = val
@property
def daf(self):
@ -92,20 +113,38 @@ class Vehicle:
return '{}km'.format(round(self._driven_kms, 2))
def start_driving(self):
print('Start driving ... SCALING: x{}\n\n'.format(SCALING))
self.timestamp = datetime.now()
self._driven_kms = 0
self.t = threading.Thread(target=self.drive)
self.t.start()
def drive(self):
while self.timestamp:
self.check_reset()
self.external_call()
time.sleep(1)
def stop_driving(self):
self._gps_location = self._starting_point
self._driven_kms = 0
self.timestamp = None
def check_reset(self):
if self._driven_kms >= RESET_KM:
print('\n\nEnd of route reached ... resetting and restarting vehicle')
self._gps_location = self._starting_point
self._driven_kms = 0
self.timestamp = datetime.now()
self.nce = False
@circuit(failure_threshold=10, expected_exception=ConnectionError)
def external_call(self):
...
print(v1.driven_kms, '\t', v1.daf)
if __name__ == "__main__":
v1 = Vehicle(vin='SB164ABN1PE082986')
v1.start_driving()
while True:
print(v1.daf)
time.sleep(1)
# stop manually