import pickle import threading import time from datetime import datetime from typing import Union import geopy import geopy.distance from circuitbreaker import circuit from dse_shared_libs.daf import DAF from dse_shared_libs.message_broker_wrapper import MBWrapper # Lat, Long from dse_shared_libs.target_velocity import TargetVelocity from pika.exceptions import AMQPConnectionError STARTING_POINT = geopy.Point(47.89053, 16.20703) # Driving direction in degrees: 0=N, 90=E, 180=S, 270=W BEARING = 0 # Scale speed of vehicles by factor x SCALING = 1 # in km/h STARTING_VELOCITY = 130 * SCALING # Interval between status updates in seconds (is not scaled) UPDATE_INTERVAL = 1 # At x km the NCE shall happen NCE_KM = 2.4 # Time in seconds to recover from NCE (will be scaled) TIME_TO_RECOVER = 5 # Resets vehicle at km x RESET_KM = 4 class Vehicle: # vehicle identification number vin: str # velocity in km/h velocity: float # stores the timestamp most recently used to calculate driving distance last_update: Union[datetime, None] # marks if vehicle is currently driving, a new one isn't, has to be started via self.start_driving() driving: bool = False # stores last velocity before NCE, is used to recover after NCE _last_velocity: float # stores the so-far driven kilometers _driven_kms: int # stores the starting point for resetting the vehicle _starting_point: geopy.Point # stores the gps_location, shall be accessed by gps_location, where the actual value is calculated and returned _gps_location: geopy.Point # stores if nce possible _nce_possible = True # stores if nce already happened, it only happens (up to) once _nce_happened = False # stores if we are still recovering from NCE _recovering = False # continuous driving thread _t: threading.Thread # NCE recovery thread _rt: threading.Thread # outgoing daf info MBWrapper _daf_mb: MBWrapper # incoming target velocity MBWrapper _velocity_mb: MBWrapper def __init__(self, vin: str, starting_point: geopy.Point = STARTING_POINT, starting_velocity: float = STARTING_VELOCITY): """ Initialize a new vehicle client. Already initializes the corresponding sending and receiving message queue. A vehicle sends every UPDATE_INTERVAL seconds its current DAF to the orchestrator. It receives as response the new TargetVelocity to achieve a possible green wave. After a fixed amount of kilometers, the NCE event is happening. The car then stands still for TIME_TO_RECOVER seconds. Afterwards, it starts driving with the last known velocity. While it is recovering, it ignores the target velocity responses from the orchestrator. A vehicle drives a full route between start and end. After reaching the end, it restores the internal state and starts again from the beginning. :param vin: Vehicle Identification Number :param starting_point: point on globe to start at :param starting_velocity: velocity to start driving with """ self.vin = vin self._starting_point = starting_point self._gps_location = starting_point self.velocity = starting_velocity self._daf_mb = MBWrapper(exchange_name='vehicle_daf_{}'.format(vin)) self._daf_mb.setup_sender() self._velocity_mb = MBWrapper(exchange_name='vehicle_velocity_{}'.format(self.vin), callback=self.new_velocity) self._velocity_mb.setup_receiver() @property def nce(self): """ On accessing this property, it is calculated if the NCE shall happen. NCE only happens up to once per route. If the NCE happens, the car stops driving (velocity = 0) for TIME_TO_RECOVER seconds. In this time, responses from the orchestrator are ignored. After recovery, the vehicle starts driving again with the last known velocity. :return: True if NCE invoked, otherwise False """ if self._nce_possible and not self._nce_happened: if self._driven_kms >= NCE_KM: self._nce_happened = True self._last_velocity = self.velocity self.velocity = 0 def _sleep_and_recover(): recover_in = TIME_TO_RECOVER / SCALING self._recovering = True print('\nNCE !!! Recovering in {} (scaled) seconds.\n'.format(recover_in)) time.sleep(recover_in) print('\nRecovered.\n') self._recovering = False self.velocity = self._last_velocity self._rt = threading.Thread(target=_sleep_and_recover) self._rt.start() return True return False @nce.setter def nce(self, val): self._nce_happened = val @property def daf(self): """ Return the DAF object of the vehicle. The properties are always evaluated on calling. That guarantees accurate values. :return: current "Datenaufzeichnung für automatisiertes Fahren" (DAF) object """ # ATTENTION: ORDER MANDATORY (except for static vin) return DAF(vehicle_identification_number=self.vin, # first deduce nce - is calculated, sets velocity to 0 if NCE near_crash_event=self.nce, # then get current location based on last location and current speed gps_location=self.gps_location, # finally get last_update, is updated by gps_location deduction timestamp=self.last_update, # is set to 0 if nce velocity=self.velocity, ) @property def gps_location(self): """ An accurate current position of the vehicle at the moment the property is called is retunred. The gps location is derived from the last position this property was called at and the time the vehicle was driving since then. Therefore, it is not necessary to call this function exactly at a given time span, because it calculates the driven kms relative to the calling time. """ # Define starting point. start = self._gps_location # Get old and updated timestamps old_timestamp = self.last_update updated_timestamp = datetime.now() self.last_update = updated_timestamp # get driving time between timestamps (in seconds) driving_time = (updated_timestamp - old_timestamp).total_seconds() # reached distance in kilometers: convert km/h to km/s and multiply by driving time kilometers = self.velocity / 3600 * driving_time * SCALING # Define a general distance object, initialized with a distance of k km. d = geopy.distance.distance(kilometers=kilometers) # Use the `destination` method with a bearing of BEARING degrees # in order to move from point `start` to kilometers km bearing. self._gps_location = d.destination(point=start, bearing=BEARING) # update driven kilometers self._driven_kms += kilometers return self._gps_location @property def driven_kms(self): """ :returns: a string representation of the driven kms """ return '{}km'.format(round(self._driven_kms, 2)) def start_driving(self): """ Starts a thread responsible for driving. This thread continuously updates internal values and informs the message broker about the current state (DAF) of the vehicle. """ print('{} starts driving ... SCALING: x{}\n\n'.format(self.vin, SCALING)) self.last_update = datetime.now() self._driven_kms = 0 self._t = threading.Thread(target=self.drive) self._t.start() def drive(self): """ Drive until not self.driving == update status every UPDATE_INTERVAL seconds. """ self.driving = True while self.driving: self.check_reset() self.send_status_update() time.sleep(UPDATE_INTERVAL) def stop_driving(self): """ Set self.driving to stop driving thread gracefully. """ self.driving = False self._gps_location = self._starting_point self._driven_kms = 0 def check_reset(self): """ Checks if end of route is reached and resets vehicle to starting conditions. Afterwards, the vehicle is still driving, but again from start. """ if self._driven_kms >= RESET_KM: print('\n\nEnd of route reached ... resetting and restarting vehicle') self._gps_location = self._starting_point self._driven_kms = 0 self.last_update = datetime.now() self.nce = False @circuit(failure_threshold=10, expected_exception=AMQPConnectionError) def send_status_update(self): """ Sends the current DAF to the orchestrator. The orchestrator will then respond asynchronously on the response queue. """ print(self.driven_kms, '\t', self.daf) self._daf_mb.send(pickle.dumps(self.daf)) def new_velocity(self, response: bytes): """ Will be invoked if new target velocity message received from orchestrator. :param response: pickled TargetVelocity object """ response: TargetVelocity = pickle.loads(response) velocity = response.target_velocity print('Received new velocity {}'.format(velocity)) if not self._recovering: self.velocity = velocity else: print('We are still recovering ... ignoring new target velocity.') if __name__ == "__main__": ...