first implementation of vehicle
This commit is contained in:
parent
df5084bf21
commit
ca3ac6fa9b
12
components/i_feed/daf.py
Normal file
12
components/i_feed/daf.py
Normal file
@ -0,0 +1,12 @@
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
import geopy.distance
|
||||
|
||||
|
||||
@dataclass
|
||||
class DAF:
|
||||
vehicle_identification_number: str
|
||||
gps_location: geopy.Point
|
||||
velocity: float
|
||||
near_crash_event: bool
|
||||
timestamp: datetime
|
||||
@ -1,2 +1,2 @@
|
||||
circuitbreaker # fault tolerance
|
||||
|
||||
geopy
|
||||
|
||||
@ -1,6 +1,79 @@
|
||||
from circuitbreaker import circuit
|
||||
import time
|
||||
from datetime import datetime
|
||||
|
||||
import geopy
|
||||
import geopy.distance
|
||||
from circuitbreaker import circuit
|
||||
from daf import DAF
|
||||
|
||||
# Lat, Long
|
||||
STARTING_POINT = geopy.Point(48.853, 2.349)
|
||||
# in km/h
|
||||
STARTING_VELOCITY = 130
|
||||
# Driving direction in degrees: 0=N, 90=E, 180=S, 270=W
|
||||
BEARING = 90
|
||||
|
||||
|
||||
class Vehicle:
|
||||
vin: str
|
||||
gps_location: geopy.Point
|
||||
velocity: float
|
||||
timestamp: datetime
|
||||
|
||||
def __init__(self,
|
||||
vin: str,
|
||||
starting_point: geopy.Point = STARTING_POINT,
|
||||
starting_velocity: float = STARTING_VELOCITY):
|
||||
self.vin = vin
|
||||
self.gps_location = starting_point
|
||||
self.velocity = starting_velocity
|
||||
|
||||
def get_daf(self, invoke_nce=False):
|
||||
return DAF(vehicle_identification_number=self.vin,
|
||||
gps_location=self.gps_location,
|
||||
velocity=self.velocity,
|
||||
timestamp=self.timestamp,
|
||||
near_crash_event=invoke_nce
|
||||
)
|
||||
|
||||
def update_pos(self):
|
||||
"""
|
||||
Update self.gps_location with given speed in km/h and the driven time in seconds
|
||||
:param velocity: in km/h
|
||||
:param time: in seconds
|
||||
:param bearing: direction in degrees: 0=N, 90=E, 180=S, 270=W
|
||||
"""
|
||||
# Define starting point.
|
||||
start = self.gps_location
|
||||
|
||||
# Get old and updated timestamps
|
||||
old_timestamp = self.timestamp
|
||||
updated_timestamp = datetime.now()
|
||||
self.timestamp = updated_timestamp
|
||||
|
||||
# get driving time between timestamps (in seconds)
|
||||
driving_time = (old_timestamp - updated_timestamp).total_seconds()
|
||||
|
||||
# reached distance in kilometers: convert km/h to km/s and multiply by driving time
|
||||
kilometers = self.velocity / 3600 * driving_time
|
||||
|
||||
# Define a general distance object, initialized with a distance of 1 km.
|
||||
d = geopy.distance.distance(kilometers=kilometers)
|
||||
|
||||
# Use the `destination` method with a bearing of bearing degrees (90 is east)
|
||||
# in order to go from point `start` to kilometers km bearing.
|
||||
self.gps_location = d.destination(point=start, bearing=BEARING)
|
||||
return self.gps_location
|
||||
|
||||
@circuit(failure_threshold=10, expected_exception=ConnectionError)
|
||||
def external_call():
|
||||
def external_call(self):
|
||||
...
|
||||
|
||||
def start_driving(self):
|
||||
self.timestamp = datetime.now()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
v1 = Vehicle(vin='SB164ABN1PE082986')
|
||||
v1.start_driving()
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user