276 lines
10 KiB
Python

import pickle
import threading
import time
import os
from datetime import datetime
from typing import Union
import geopy
import geopy.distance
from circuitbreaker import circuit
from dse_shared_libs.daf import DAF
from dse_shared_libs.message_broker_wrapper import MBWrapper
# Lat, Long
from dse_shared_libs.target_velocity import TargetVelocity
from pika.exceptions import AMQPConnectionError
STARTING_POINT = geopy.Point(47.89053, 16.20703)
# Driving direction in degrees: 0=N, 90=E, 180=S, 270=W
BEARING = 2
# Scale speed of vehicles by factor x
SCALING = int(os.environ['DSE2021_SCALING'])
# in km/h
STARTING_VELOCITY = 130 * SCALING
# Interval between status updates in seconds (is not scaled)
UPDATE_INTERVAL = 1
# At x km the NCE shall happen
NCE_KM = 2.4
# Time in seconds to recover from NCE (will be scaled)
TIME_TO_RECOVER = 15
# Resets vehicle at km x
RESET_KM = 4.3
class Vehicle:
# vehicle identification number
vin: str
# velocity in km/h
velocity: float
# stores the timestamp most recently used to calculate driving distance
last_update: Union[datetime, None]
# marks if vehicle is currently driving, a new one isn't, has to be started via self.start_driving()
driving: bool = False
# stores last velocity before NCE, is used to recover after NCE
_last_velocity: float
# stores the so-far driven kilometers
_driven_kms: int
# stores the starting point for resetting the vehicle
_starting_point: geopy.Point
# stores the gps_location, shall be accessed by gps_location, where the actual value is calculated and returned
_gps_location: geopy.Point
# stores if nce possible
_nce_possible = True
# stores if nce already happened, it only happens (up to) once
_nce_happened = False
# stores if we are still recovering from NCE
_recovering = False
# continuous driving thread
_t: threading.Thread
# NCE recovery thread
_rt: threading.Thread
# outgoing daf info MBWrapper
_daf_mb: MBWrapper
# incoming target velocity MBWrapper
_velocity_mb: MBWrapper
def __init__(self,
vin: str,
starting_point: geopy.Point = STARTING_POINT,
starting_velocity: float = STARTING_VELOCITY):
"""
Initialize a new vehicle client. Already initializes the corresponding sending and receiving message queue.
A vehicle sends every UPDATE_INTERVAL seconds its current DAF to the orchestrator. It receives as response the
new TargetVelocity to achieve a possible green wave. After a fixed amount of kilometers, the NCE event is
happening. The car then stands still for TIME_TO_RECOVER seconds. Afterwards, it starts driving with the last
known velocity. While it is recovering, it ignores the target velocity responses from the orchestrator.
A vehicle drives a full route between start and end. After reaching the end, it restores the internal state and
starts again from the beginning.
:param vin: Vehicle Identification Number
:param starting_point: point on globe to start at
:param starting_velocity: velocity to start driving with
"""
self.vin = vin
self._starting_point = starting_point
self._gps_location = starting_point
self.velocity = starting_velocity
self._daf_mb = MBWrapper(exchange_name='vehicle_daf_{}'.format(vin))
self._daf_mb.setup_sender()
self._velocity_mb = MBWrapper(exchange_name='vehicle_velocity_{}'.format(self.vin), callback=self.new_velocity)
self._velocity_mb.setup_receiver()
@property
def nce(self):
"""
On accessing this property, it is calculated if the NCE shall happen. NCE only happens up to once per route.
If the NCE happens, the car stops driving (velocity = 0) for TIME_TO_RECOVER seconds. In this time,
responses from the orchestrator are ignored. After recovery, the vehicle starts driving again with the last
known velocity.
:return: True if NCE invoked, otherwise False
"""
d = geopy.distance.distance(kilometers=0.4)
tl2_loc = geopy.Point(47.90937, 16.20814)
# Calculate point 400m south of traffic light 2
nce_point = d.destination(point=tl2_loc, bearing=182)
if self._nce_possible and not self._nce_happened:
if self._gps_location.latitude >= nce_point.latitude:
self._nce_happened = True
self._last_velocity = self.velocity
self.velocity = 0
def _sleep_and_recover():
recover_in = TIME_TO_RECOVER / SCALING
self._recovering = True
print('\nNCE !!! Recovering in {} (scaled) seconds.\n'.format(recover_in))
time.sleep(recover_in)
print('\nRecovered.\n')
self._recovering = False
self.velocity = self._last_velocity
self._rt = threading.Thread(target=_sleep_and_recover)
self._rt.start()
return True
return False
@nce.setter
def nce(self, val):
self._nce_happened = val
@property
def daf(self):
"""
Return the DAF object of the vehicle. The properties are always evaluated on calling. That guarantees accurate
values.
:return: current "Datenaufzeichnung für automatisiertes Fahren" (DAF) object
"""
# ATTENTION: ORDER MANDATORY (except for static vin)
return DAF(vehicle_identification_number=self.vin,
# first deduce nce - is calculated, sets velocity to 0 if NCE
near_crash_event=self.nce,
# then get current location based on last location and current speed
gps_location=self.gps_location,
# finally get last_update, is updated by gps_location deduction
timestamp=self.last_update,
# is set to 0 if nce
velocity=self.velocity,
)
@property
def gps_location(self):
"""
An accurate current position of the vehicle at the moment the property is called is retunred.
The gps location is derived from the last position this property was called at and the
time the vehicle was driving since then. Therefore, it is not necessary to call this function exactly at a given
time span, because it calculates the driven kms relative to the calling time.
"""
# Define starting point.
start = self._gps_location
# Get old and updated timestamps
old_timestamp = self.last_update
updated_timestamp = datetime.now()
self.last_update = updated_timestamp
# get driving time between timestamps (in seconds)
driving_time = (updated_timestamp - old_timestamp).total_seconds()
# reached distance in kilometers: convert km/h to km/s and multiply by driving time
kilometers = self.velocity / 3600 * driving_time
# Define a general distance object, initialized with a distance of k km.
d = geopy.distance.distance(kilometers=kilometers)
# Use the `destination` method with a bearing of BEARING degrees
# in order to move from point `start` to kilometers km bearing.
self._gps_location = d.destination(point=start, bearing=BEARING)
# update driven kilometers
self._driven_kms += kilometers
return self._gps_location
@property
def driven_kms(self):
"""
:returns: a string representation of the driven kms
"""
return '{}km'.format(round(self._driven_kms, 2))
def start_driving(self):
"""
Starts a thread responsible for driving. This thread continuously updates internal values and
informs the message broker about the current state (DAF) of the vehicle.
"""
print('{} starts driving ... SCALING: x{}\n\n'.format(self.vin, SCALING))
self.last_update = datetime.now()
self._driven_kms = 0
self._t = threading.Thread(target=self.drive)
self._t.start()
def drive(self):
"""
Drive until not self.driving == update status every UPDATE_INTERVAL seconds.
"""
self.driving = True
while self.driving:
self.check_reset()
self.send_status_update()
time.sleep(UPDATE_INTERVAL)
def stop_driving(self):
"""
Set self.driving to stop driving thread gracefully.
"""
self.driving = False
self._gps_location = self._starting_point
self._driven_kms = 0
def check_reset(self):
"""
Checks if end of route is reached and resets vehicle to starting conditions.
It also resets on malicious coordinates.
Afterwards, the vehicle is still driving, but again from start.
"""
if self._driven_kms >= RESET_KM:
self._reset()
if self._gps_location.latitude < self._starting_point.latitude or \
self._gps_location.longitude < self._starting_point.longitude:
self._reset()
@circuit(failure_threshold=10, expected_exception=AMQPConnectionError)
def send_status_update(self):
"""
Sends the current DAF to the orchestrator. The orchestrator will then respond asynchronously on the response
queue.
"""
print(self.driven_kms, '\t', self.daf)
self._daf_mb.send(pickle.dumps(self.daf))
def new_velocity(self, response: bytes):
"""
Will be invoked if new target velocity message received from orchestrator.
:param response: pickled TargetVelocity object
"""
response: TargetVelocity = pickle.loads(response)
velocity = response.target_velocity
print('Received new velocity {}'.format(velocity))
if not self._recovering:
self.velocity = velocity
else:
print('We are still recovering ... ignoring new target velocity.')
def _reset(self):
print('\n\nEnd of route reached ... resetting and restarting vehicle')
self._gps_location = self._starting_point
self._driven_kms = 0
self.last_update = datetime.now()
self.nce = False
self.velocity = STARTING_VELOCITY
if __name__ == "__main__":
...