import pickle import threading import time from datetime import datetime from typing import Union import geopy import geopy.distance from circuitbreaker import circuit from dse_shared_libs.daf import DAF from dse_shared_libs.message_broker_wrapper import MBWrapper # Lat, Long from dse_shared_libs.target_velocity import TargetVelocity from pika.exceptions import AMQPConnectionError STARTING_POINT = geopy.Point(47.89053, 16.20703) # in km/h STARTING_VELOCITY = 130 # Driving direction in degrees: 0=N, 90=E, 180=S, 270=W BEARING = 0 # Scale speed of vehicles by factor x SCALING = 100 # Interval between status updates in seconds (is not scaled) UPDATE_INTERVAL = 2 # At x km the NCE shall happen NCE_KM = 30 # Time in seconds to recover from NCE (will be scaled) TIME_TO_RECOVER = 500 # Resets vehicle at km x RESET_KM = 50 class Vehicle: # vehicle identification number vin: str # velocity in km/h velocity: float # stores the timestamp most recently used to calculate driving distance last_update: Union[datetime, None] # marks if vehicle is currently driving, a new one isn't, has to be started via self.start_driving() driving: bool = False # stores last velocity before NCE, is used to recover after NCE _last_velocity: float # stores the so-far driven kilometers _driven_kms: int # stores the starting point for resetting the vehicle _starting_point: geopy.Point # stores the gps_location, shall be accessed by gps_location, where the actual value is calculated and returned _gps_location: geopy.Point # stores if nce possible _nce_possible = True # stores if nce already happened, it only happens (up to) once _nce_happened = False # stores if we are still recovering from NCE _recovering = False # continuous driving thread _t: threading.Thread # NCE recovery thread _rt: threading.Thread # outgoing daf info MBWrapper _daf_mb: MBWrapper # incoming target velocity MBWrapper _velocity_mb: MBWrapper def __init__(self, vin: str, starting_point: geopy.Point = STARTING_POINT, starting_velocity: float = STARTING_VELOCITY): self.vin = vin self._starting_point = starting_point self._gps_location = starting_point self.velocity = starting_velocity self._daf_mb = MBWrapper(exchange_name='vehicle_daf_{}'.format(vin)) self._daf_mb.setup_sender() self._velocity_mb = MBWrapper(exchange_name='vehicle_velocity_{}'.format(self.vin), callback=self.new_velocity) self._velocity_mb.setup_receiver() @property def nce(self): """ On accessing this property, it is calculated if the NCE shall happen. NCE only happens up to once per route. :return: True if NCE invoked, otherwise False """ if self._nce_possible and not self._nce_happened: if self._driven_kms >= NCE_KM: self._nce_happened = True self._last_velocity = self.velocity self.velocity = 0 def _sleep_and_recover(): recover_in = TIME_TO_RECOVER / SCALING self._recovering = True print('\nNCE !!! Recovering in {} (scaled) seconds.\n'.format(recover_in)) time.sleep(recover_in) print('\nRecovered.\n') self._recovering = False self.velocity = self._last_velocity self._rt = threading.Thread(target=_sleep_and_recover) self._rt.start() return True return False @nce.setter def nce(self, val): self._nce_happened = val @property def daf(self): """ :return: "Datenaufzeichnung für automatisiertes Fahren" (DAF) object """ # ATTENTION: ORDER MANDATORY return DAF(vehicle_identification_number=self.vin, # first deduce nce - is calculated, sets velocity to 0 if NCE near_crash_event=self.nce, # then get current location based on last location and current speed gps_location=self.gps_location, # finally get last_update, is updated by gps_location deduction timestamp=self.last_update, # is set to 0 if nce velocity=self.velocity, ) @property def gps_location(self): """ Update self.gps_location with given speed in km/h and the driven time in seconds :param velocity: in km/h :param time: in seconds :param bearing: direction in degrees: 0=N, 90=E, 180=S, 270=W """ # Define starting point. start = self._gps_location # Get old and updated timestamps old_timestamp = self.last_update updated_timestamp = datetime.now() self.last_update = updated_timestamp # get driving time between timestamps (in seconds) driving_time = (updated_timestamp - old_timestamp).total_seconds() # reached distance in kilometers: convert km/h to km/s and multiply by driving time kilometers = self.velocity / 3600 * driving_time * SCALING # Define a general distance object, initialized with a distance of k km. d = geopy.distance.distance(kilometers=kilometers) # Use the `destination` method with a bearing of BEARING degrees # in order to move from point `start` to kilometers km bearing. self._gps_location = d.destination(point=start, bearing=BEARING) # update driven kilometers self._driven_kms += kilometers return self._gps_location @property def driven_kms(self): return '{}km'.format(round(self._driven_kms, 2)) def start_driving(self): """ Starts a thread responsible for driving. This thread continuously updates internal values and informs the message broker about the current state (DAF) of the vehicle. """ print('{} starts driving ... SCALING: x{}\n\n'.format(self.vin, SCALING)) self.last_update = datetime.now() self._driven_kms = 0 self._t = threading.Thread(target=self.drive) self._t.start() def drive(self): """ Drive until not self.driving == update status every UPDATE_INTERVAL seconds. """ self.driving = True while self.driving: self.check_reset() self.send_status_update() time.sleep(UPDATE_INTERVAL) def stop_driving(self): """ Set self.driving to stop driving thread gracefully. """ self.driving = False self._gps_location = self._starting_point self._driven_kms = 0 def check_reset(self): """ Checks if end of route is reached and resets vehicle to starting conditions. Afterwards, the vehicle is still driving, but again from start. """ if self._driven_kms >= RESET_KM: print('\n\nEnd of route reached ... resetting and restarting vehicle') self._gps_location = self._starting_point self._driven_kms = 0 self.last_update = datetime.now() self.nce = False @circuit(failure_threshold=10, expected_exception=AMQPConnectionError) def send_status_update(self): print(self.driven_kms, '\t', self.daf) self._daf_mb.send(pickle.dumps(self.daf)) def new_velocity(self, response: bytes): """ Will be invoked if new target velocity message received :param response: pickled TargetVelocity object """ response: TargetVelocity = pickle.loads(response) velocity = response.target_velocity print('Received new velocity {}'.format(velocity)) if not self._recovering: self.velocity = velocity else: print('We are still recovering ... ignoring new target velocity.') if __name__ == "__main__": ...