Marco Zeisler dc89332bd5 use shared libs;
added event store which intercepts the message broker wrapper if exchange name is not logger
2021-05-27 19:38:49 +02:00

230 lines
7.7 KiB
Python

import pickle
import threading
import time
from datetime import datetime
from typing import Union
import geopy
import geopy.distance
from circuitbreaker import circuit
from shared.daf import DAF
from shared.message_broker_wrapper import MBWrapper
# Lat, Long
from shared.target_velocity import TargetVelocity
STARTING_POINT = geopy.Point(48.853, 2.349)
# in km/h
STARTING_VELOCITY = 130
# Driving direction in degrees: 0=N, 90=E, 180=S, 270=W
BEARING = 0
# Scale speed of vehicles by factor x
SCALING = 100
# Interval between status updates in seconds (is not scaled)
UPDATE_INTERVAL = 2
# At x km the NCE shall happen
NCE_KM = 30
# Time in seconds to recover from NCE (will be scaled)
TIME_TO_RECOVER = 500
# Resets vehicle at km x
RESET_KM = 50
class Vehicle:
# vehicle identification number
vin: str
# velocity in km/h
velocity: float
# stores the timestamp most recently used to calculate driving distance
last_update: Union[datetime, None]
# marks if vehicle is currently driving, a new one isn't, has to be started via self.start_driving()
driving: bool = False
# stores last velocity before NCE, is used to recover after NCE
_last_velocity: float
# stores the so-far driven kilometers
_driven_kms: int
# stores the starting point for resetting the vehicle
_starting_point: geopy.Point
# stores the gps_location, shall be accessed by gps_location, where the actual value is calculated and returned
_gps_location: geopy.Point
# stores if nce possible
_nce_possible = True
# stores if nce already happened, it only happens (up to) once
_nce_happened = False
# continuous driving thread
_t: threading.Thread
# NCE recovery thread
_rt: threading.Thread
# outgoing daf info MBWrapper
_daf_mb: MBWrapper
# incoming target velocity MBWrapper
_velocity_mb: MBWrapper
def __init__(self,
vin: str,
starting_point: geopy.Point = STARTING_POINT,
starting_velocity: float = STARTING_VELOCITY):
self.vin = vin
self._starting_point = starting_point
self._gps_location = starting_point
self.velocity = starting_velocity
self._daf_mb = MBWrapper(exchange_name='vehicle_daf_{}'.format(vin))
self._daf_mb.setup_sender()
self._velocity_mb = MBWrapper(exchange_name='vehicle_velocity_{}'.format(self.vin), callback=self.new_velocity)
self._velocity_mb.setup_receiver()
@property
def nce(self):
"""
On accessing this property, it is calculated if the NCE shall happen. NCE only happens up to once per route.
:return: True if NCE invoked, otherwise False
"""
if self._nce_possible and not self._nce_happened:
if self._driven_kms >= NCE_KM:
self._nce_happened = True
self._last_velocity = self.velocity
self.velocity = 0
def _sleep_and_recover():
recover_in = TIME_TO_RECOVER / SCALING
print('\nNCE !!! Recovering in {} (scaled) seconds.\n'.format(recover_in))
time.sleep(recover_in)
print('\nRecovered.\n')
self.velocity = self._last_velocity
self._rt = threading.Thread(target=_sleep_and_recover)
self._rt.start()
return True
return False
@nce.setter
def nce(self, val):
self._nce_happened = val
@property
def daf(self):
"""
:return: "Datenaufzeichnung für automatisiertes Fahren" (DAF) object
"""
return DAF(vehicle_identification_number=self.vin,
# first deduce nce - is calculated, sets velocity to 0 if NCE
near_crash_event=self.nce,
# then get current location based on last location and current speed
gps_location=self.gps_location,
# finally get last_update, is updated by gps_location deduction
timestamp=self.last_update,
# is set to 0 if nce
velocity=self.velocity,
)
@property
def gps_location(self):
"""
Update self.gps_location with given speed in km/h and the driven time in seconds
:param velocity: in km/h
:param time: in seconds
:param bearing: direction in degrees: 0=N, 90=E, 180=S, 270=W
"""
# Define starting point.
start = self._gps_location
# Get old and updated timestamps
old_timestamp = self.last_update
updated_timestamp = datetime.now()
self.last_update = updated_timestamp
# get driving time between timestamps (in seconds)
driving_time = (updated_timestamp - old_timestamp).total_seconds()
# reached distance in kilometers: convert km/h to km/s and multiply by driving time
kilometers = self.velocity / 3600 * driving_time * SCALING
# Define a general distance object, initialized with a distance of k km.
d = geopy.distance.distance(kilometers=kilometers)
# Use the `destination` method with a bearing of BEARING degrees
# in order to move from point `start` to kilometers km bearing.
self._gps_location = d.destination(point=start, bearing=BEARING)
# update driven kilometers
self._driven_kms += kilometers
return self._gps_location
@property
def driven_kms(self):
return '{}km'.format(round(self._driven_kms, 2))
def start_driving(self):
"""
Starts a thread responsible for driving. This thread continuously updates internal values and
informs the message broker about the current state (DAF) of the vehicle.
"""
print('{} starts driving ... SCALING: x{}\n\n'.format(self.vin, SCALING))
self.last_update = datetime.now()
self._driven_kms = 0
self._t = threading.Thread(target=self.drive)
self._t.start()
def drive(self):
"""
Drive until not self.driving == update status every UPDATE_INTERVAL seconds.
"""
self.driving = True
while self.driving:
self.check_reset()
self.send_status_update()
time.sleep(UPDATE_INTERVAL)
def stop_driving(self):
"""
Set self.driving to stop driving thread gracefully.
"""
self.driving = False
self._gps_location = self._starting_point
self._driven_kms = 0
def check_reset(self):
"""
Checks if end of route is reached and resets vehicle to starting conditions.
Afterwards, the vehicle is still driving, but again from start.
"""
if self._driven_kms >= RESET_KM:
print('\n\nEnd of route reached ... resetting and restarting vehicle')
self._gps_location = self._starting_point
self._driven_kms = 0
self.last_update = datetime.now()
self.nce = False
@circuit(failure_threshold=10, expected_exception=ConnectionError)
def send_status_update(self):
print(self.driven_kms, '\t', self.daf)
self._daf_mb.send(pickle.dumps(self.daf))
def new_velocity(self, response: bytes):
response: TargetVelocity = pickle.loads(response)
velocity = response.target_velocity
print('Received new velocity {}'.format(velocity))
self.velocity = velocity
if __name__ == "__main__":
# TODO fetch and use data from Entity Ident
v1 = Vehicle(vin='SB164ABN1PE082000')
v1.start_driving()
# time.sleep(1)
# v2 = Vehicle(vin='SB999ABN1PE082111')
# v2.start_driving()
#
# time.sleep(0.5)
# v3 = Vehicle(vin='SB555ABN1PE082555')
# v3.start_driving()