import time from datetime import datetime from typing import Union import geopy import geopy.distance from circuitbreaker import circuit from daf import DAF # Lat, Long STARTING_POINT = geopy.Point(48.853, 2.349) # in km/h STARTING_VELOCITY = 130 # Driving direction in degrees: 0=N, 90=E, 180=S, 270=W BEARING = 90 # Scale speed of vehicles by factor x SCALING = 1 # At x km the NCE shall happen NCE_KM = 0.1 class Vehicle: vin: str velocity: float timestamp: Union[datetime, None] _driven_kms: int _gps_location: geopy.Point _nce_happened = False def __init__(self, vin: str, starting_point: geopy.Point = STARTING_POINT, starting_velocity: float = STARTING_VELOCITY): self.vin = vin self._gps_location = starting_point self.velocity = starting_velocity @property def nce(self): nce = self._nce_happened if not nce: if self._driven_kms >= NCE_KM: nce = True self._nce_happened = nce self.velocity = 0 return nce @property def daf(self): return DAF(vehicle_identification_number=self.vin, gps_location=self.gps_location, velocity=self.velocity, timestamp=self.timestamp, near_crash_event=self.nce ) @property def gps_location(self): """ Update self.gps_location with given speed in km/h and the driven time in seconds :param velocity: in km/h :param time: in seconds :param bearing: direction in degrees: 0=N, 90=E, 180=S, 270=W """ # Define starting point. start = self._gps_location # Get old and updated timestamps old_timestamp = self.timestamp updated_timestamp = datetime.now() self.timestamp = updated_timestamp # get driving time between timestamps (in seconds) driving_time = (updated_timestamp - old_timestamp).total_seconds() # reached distance in kilometers: convert km/h to km/s and multiply by driving time kilometers = self.velocity / 3600 * driving_time * SCALING # Define a general distance object, initialized with a distance of 1 km. d = geopy.distance.distance(kilometers=kilometers) # Use the `destination` method with a bearing of bearing degrees (90 is east) # in order to go from point `start` to kilometers km bearing. self._gps_location = d.destination(point=start, bearing=BEARING) # update driven kilometers self._driven_kms += kilometers return self._gps_location @property def driven_kms(self): return '{}km'.format(round(self._driven_kms, 2)) def start_driving(self): self.timestamp = datetime.now() self._driven_kms = 0 def stop_driving(self): self.timestamp = None @circuit(failure_threshold=10, expected_exception=ConnectionError) def external_call(self): ... if __name__ == "__main__": v1 = Vehicle(vin='SB164ABN1PE082986') v1.start_driving() while True: print(v1.gps_location, v1.driven_kms, v1.nce) time.sleep(1)