vehicle sends daf to orchestrator via rabbitMQ broker

vehicle receives new target velocity from orchestrator as response;
at this point, this velocity is just a random float between 0 and 130;
This commit is contained in:
Marco Zeisler 2021-05-24 17:17:50 +02:00
parent 8f9109f89f
commit 349a38fa9f
6 changed files with 91 additions and 14 deletions

View File

@ -1,2 +1,3 @@
circuitbreaker # fault tolerance circuitbreaker # fault tolerance
geopy geopy
pika

View File

@ -1,3 +1,4 @@
import pickle
import threading import threading
import time import time
from datetime import datetime from datetime import datetime
@ -6,7 +7,9 @@ from typing import Union
import geopy import geopy
import geopy.distance import geopy.distance
from circuitbreaker import circuit from circuitbreaker import circuit
from daf import DAF from wrappers.daf import DAF
from wrappers.message_broker_wrapper import MBWrapper
# Lat, Long # Lat, Long
STARTING_POINT = geopy.Point(48.853, 2.349) STARTING_POINT = geopy.Point(48.853, 2.349)
@ -17,7 +20,7 @@ BEARING = 0
# Scale speed of vehicles by factor x # Scale speed of vehicles by factor x
SCALING = 100 SCALING = 100
# Interval between status updates in seconds (is not scaled) # Interval between status updates in seconds (is not scaled)
UPDATE_INTERVAL = 1 UPDATE_INTERVAL = 2
# At x km the NCE shall happen # At x km the NCE shall happen
NCE_KM = 30 NCE_KM = 30
# Time in seconds to recover from NCE (will be scaled) # Time in seconds to recover from NCE (will be scaled)
@ -47,11 +50,17 @@ class Vehicle:
_nce_possible = True _nce_possible = True
# stores if nce already happened, it only happens (up to) once # stores if nce already happened, it only happens (up to) once
_nce_happened = False _nce_happened = False
# continuous driving thread # continuous driving thread
_t: threading.Thread _t: threading.Thread
# NCE recovery thread # NCE recovery thread
_rt: threading.Thread _rt: threading.Thread
# outgoing daf info MBWrapper
_daf_mb: MBWrapper
# incoming target velocity MBWrapper
_velocity_mb: MBWrapper
def __init__(self, def __init__(self,
vin: str, vin: str,
starting_point: geopy.Point = STARTING_POINT, starting_point: geopy.Point = STARTING_POINT,
@ -61,6 +70,12 @@ class Vehicle:
self._gps_location = starting_point self._gps_location = starting_point
self.velocity = starting_velocity self.velocity = starting_velocity
self._daf_mb = MBWrapper(exchange_name='vehicle_daf_{}'.format(vin))
self._daf_mb.setup_sender()
self._velocity_mb = MBWrapper(exchange_name='vehicle_velocity_{}'.format(self.vin), callback=self.new_velocity)
self._velocity_mb.setup_receiver()
@property @property
def nce(self): def nce(self):
""" """
@ -148,7 +163,7 @@ class Vehicle:
Starts a thread responsible for driving. This thread continuously updates internal values and Starts a thread responsible for driving. This thread continuously updates internal values and
informs the message broker about the current state (DAF) of the vehicle. informs the message broker about the current state (DAF) of the vehicle.
""" """
print('Start driving ... SCALING: x{}\n\n'.format(SCALING)) print('{} starts driving ... SCALING: x{}\n\n'.format(self.vin, SCALING))
self.last_update = datetime.now() self.last_update = datetime.now()
self._driven_kms = 0 self._driven_kms = 0
@ -187,15 +202,20 @@ class Vehicle:
@circuit(failure_threshold=10, expected_exception=ConnectionError) @circuit(failure_threshold=10, expected_exception=ConnectionError)
def send_status_update(self): def send_status_update(self):
# TODO inform the message broker about the current status
print(self.driven_kms, '\t', self.daf) print(self.driven_kms, '\t', self.daf)
self._daf_mb.send(pickle.dumps(self.daf))
# TODO adapt the vehicle's velocity to the suggestion of the orchestration service def new_velocity(self, velocity: bytes):
def new_velocity(self): velocity = float(velocity)
... print('Received new velocity {}'.format(velocity))
self.velocity = velocity
if __name__ == "__main__": if __name__ == "__main__":
v1 = Vehicle(vin='SB164ABN1PE082986') v1 = Vehicle(vin='SB164ABN1PE082000')
v1.start_driving() v1.start_driving()
time.sleep(1)
v2 = Vehicle(vin='SB999ABN1PE082111')
v2.start_driving()
# stop manually # stop manually

View File

@ -1,4 +1,7 @@
import threading
from flask import Flask from flask import Flask
from orchestrator import Orchestrator
app = Flask(__name__) app = Flask(__name__)
@ -19,4 +22,6 @@ def api2():
if __name__ == '__main__': if __name__ == '__main__':
app.run() orc = Orchestrator()
threading.Thread(target=orc.setup_msg_queues).start()
threading.Thread(target=app.run).start()

View File

@ -0,0 +1,45 @@
import pickle
import sys
from random import randrange
from wrappers import daf
from wrappers.message_broker_wrapper import MBWrapper
sys.modules['daf'] = daf
class Orchestrator:
vins = []
tls = []
_daf_mbs = {}
_velocity_mbs = {}
def __init__(self):
# TODO fetch available car VINs from Entity Ident
self.vins.append('SB164ABN1PE082000')
self.vins.append('SB999ABN1PE082111')
# TODO fetch available TLs from Entity Ident
self.tls.append('traffic-light-1')
self.tls.append('traffic-light-2')
self.tls.append('traffic-light-3')
def setup_msg_queues(self):
for vin in self.vins:
daf_channel = MBWrapper(exchange_name='vehicle_daf_{}'.format(vin), callback=self.handle_daf_receive)
daf_channel.setup_receiver()
self._daf_mbs[vin] = daf_channel
velocity_channel = MBWrapper(exchange_name='vehicle_velocity_{}'.format(vin))
velocity_channel.setup_sender()
self._velocity_mbs[vin] = velocity_channel
def handle_daf_receive(self, msg):
received_daf_object = pickle.loads(msg)
print(received_daf_object)
# TODO use the data from the traffic lights to transmit a new target velocity for this vehicle to achieve
# a green wave
response_channel = self._velocity_mbs[received_daf_object.vehicle_identification_number]
response_channel.send(randrange(0, 130))

View File

@ -10,7 +10,7 @@ class MBWrapper:
exchange_name: str exchange_name: str
callback: callable callback: callable
_type: str _type: str = None
_connection: pika.BlockingConnection _connection: pika.BlockingConnection
_channel: pika.BlockingConnection.channel _channel: pika.BlockingConnection.channel
@ -18,13 +18,15 @@ class MBWrapper:
host: str = 'localhost', host: str = 'localhost',
exchange_type: str = 'fanout', exchange_type: str = 'fanout',
exchange_name: str = None, exchange_name: str = None,
callback: callable = None): callback: callable = None,
verbose: bool = False):
assert exchange_name, 'Please define an exchange name' assert exchange_name, 'Please define an exchange name'
self.host = host self.host = host
self.exchange_type = exchange_type self.exchange_type = exchange_type
self.exchange_name = exchange_name self.exchange_name = exchange_name
self.callback = callback self.callback = callback
self.verbose = verbose
def setup_sender(self): def setup_sender(self):
assert self._type != 'receiver', 'MBWrapper is already a receiver. Use another MBWrapper.' assert self._type != 'receiver', 'MBWrapper is already a receiver. Use another MBWrapper.'
@ -33,7 +35,7 @@ class MBWrapper:
def setup_receiver(self): def setup_receiver(self):
assert self._type != 'sender', 'MBWrapper is already a sender. Use another MBWrapper.' assert self._type != 'sender', 'MBWrapper is already a sender. Use another MBWrapper.'
assert callback, 'Please setup MBWrapper with "on response" self.callback which can handle a byte string as input.' assert self.callback, 'Please setup MBWrapper with "on response" self.callback which can handle a byte string as input.'
def consumer(): def consumer():
self._setup_channel() self._setup_channel()
@ -42,7 +44,7 @@ class MBWrapper:
self._channel.queue_bind(exchange=self.exchange_name, queue=queue_name) self._channel.queue_bind(exchange=self.exchange_name, queue=queue_name)
print(' [*] Waiting for messages. To exit press CTRL+C') self.print(' [*] Waiting for messages. To exit press CTRL+C')
self._channel.basic_consume( self._channel.basic_consume(
queue=queue_name, on_message_callback=self._receive, auto_ack=True) queue=queue_name, on_message_callback=self._receive, auto_ack=True)
@ -55,11 +57,15 @@ class MBWrapper:
if type(message) is not bytes: if type(message) is not bytes:
message = str(message).encode() message = str(message).encode()
self._channel.basic_publish(exchange=self.exchange_name, routing_key='', body=message) self._channel.basic_publish(exchange=self.exchange_name, routing_key='', body=message)
print(" [x] Sent %r" % message) self.print(" [x] Sent %r" % message)
def close(self): def close(self):
self._connection.close() self._connection.close()
def print(self, *msg):
if self.verbose:
print(*msg)
def _setup_channel(self): def _setup_channel(self):
self._connection = pika.BlockingConnection( self._connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self.host)) pika.ConnectionParameters(host=self.host))