From 480eff011f790b614f5a7a8fef791b1e31bfd4d5 Mon Sep 17 00:00:00 2001 From: Marco Zeisler Date: Wed, 2 Jun 2021 21:51:16 +0200 Subject: [PATCH] move ident request to orchestration_server.py; import sys modules for proper pickling; --- .../event_store/service/event_logger.py | 7 ++++ components/i_feed/i_feed.py | 26 ++++++++++----- .../orchestration/orchestration_server.py | 14 +++++++- components/orchestration/orchestrator.py | 33 +++++++------------ 4 files changed, 49 insertions(+), 31 deletions(-) diff --git a/components/event_store/service/event_logger.py b/components/event_store/service/event_logger.py index 8dfca4f..456d9af 100644 --- a/components/event_store/service/event_logger.py +++ b/components/event_store/service/event_logger.py @@ -1,12 +1,19 @@ import json import pickle +import sys +from dse_shared_libs import daf, traffic_light_state, traffic_light_color, target_velocity from dse_shared_libs.daf import DAF from dse_shared_libs.message_broker_wrapper import MBWrapper from dse_shared_libs.target_velocity import TargetVelocity from dse_shared_libs.traffic_light_state import TrafficLightState from redis import StrictRedis +sys.modules['daf'] = daf +sys.modules['traffic_light_state'] = traffic_light_state +sys.modules['traffic_light_color'] = traffic_light_color +sys.modules['target_velocity'] = target_velocity + class EventLogger: def __init__(self, redis, log_to_redis, verbose): diff --git a/components/i_feed/i_feed.py b/components/i_feed/i_feed.py index 4c9dff2..108dd01 100644 --- a/components/i_feed/i_feed.py +++ b/components/i_feed/i_feed.py @@ -1,23 +1,33 @@ +import sys import time -import requests -from devices.traffic_light import TrafficLight, SWITCHING_TIME +import requests +from dse_shared_libs import daf, traffic_light_state, traffic_light_color, target_velocity + +from devices.traffic_light import TrafficLight from devices.vehicle import Vehicle +sys.modules['daf'] = daf +sys.modules['traffic_light_state'] = traffic_light_state +sys.modules['traffic_light_color'] = traffic_light_color +sys.modules['target_velocity'] = target_velocity + """ USED AS ENTRYPOINT FOR DOCKERFILE Start all vehicles / traffic lights here """ if __name__ == '__main__': - ENTITY_IDENT_URL = 'http://entityident:5002/api/v1/resources/' + try: + response = requests.get(ENTITY_IDENT_URL + 'cars') + cars = response.json() - response = requests.get(ENTITY_IDENT_URL + 'cars') - cars = response.json() - - response = requests.get(ENTITY_IDENT_URL + 'traffic_lights') - traffic_lights = response.json() + response = requests.get(ENTITY_IDENT_URL + 'traffic_lights') + traffic_lights = response.json() + except requests.exceptions.ConnectionError as e: + print("Is the entity_ident_server running and reachable?") + raise e print('Traffic lights', traffic_lights['cursor']) for traffic_light in traffic_lights['cursor']: diff --git a/components/orchestration/orchestration_server.py b/components/orchestration/orchestration_server.py index bbd8045..7c0a87d 100644 --- a/components/orchestration/orchestration_server.py +++ b/components/orchestration/orchestration_server.py @@ -1,5 +1,6 @@ import threading +import requests from flask import Flask from orchestrator import Orchestrator @@ -22,6 +23,17 @@ def api2(): if __name__ == '__main__': - orc = Orchestrator() + ENTITY_IDENT_URL = 'http://entityident:5002/api/v1/resources/' + try: + response = requests.get(ENTITY_IDENT_URL + 'cars') + cars = response.json() + + response = requests.get(ENTITY_IDENT_URL + 'traffic_lights') + traffic_lights = response.json() + except requests.exceptions.ConnectionError as e: + print("Is the entity_ident_server running and reachable?") + raise e + + orc = Orchestrator(cars, traffic_lights) threading.Thread(target=orc.setup_msg_queues).start() threading.Thread(target=app.run, args=('0.0.0.0', 5003)).start() diff --git a/components/orchestration/orchestrator.py b/components/orchestration/orchestrator.py index 2b04a73..2259bcd 100644 --- a/components/orchestration/orchestrator.py +++ b/components/orchestration/orchestrator.py @@ -3,21 +3,21 @@ import sys from random import randrange from typing import List, Dict -import requests -from dse_shared_libs import daf -from dse_shared_libs.traffic_light_color import TrafficLightColor +from dse_shared_libs import daf, traffic_light_state, traffic_light_color, target_velocity from dse_shared_libs.message_broker_wrapper import MBWrapper - # necessary to unpickle daf object from dse_shared_libs.target_velocity import TargetVelocity from dse_shared_libs.traffic_light_state import TrafficLightState sys.modules['daf'] = daf +sys.modules['traffic_light_state'] = traffic_light_state +sys.modules['traffic_light_color'] = traffic_light_color +sys.modules['target_velocity'] = target_velocity class Orchestrator: # vehicle ids - vins: List[str] = [] + vins: List[str] = set() # traffic lights {tlid: {color: color, switching_time: in_seconds}} tls: Dict[str, Dict] = {} @@ -25,25 +25,13 @@ class Orchestrator: _daf_mbs: Dict[str, MBWrapper] = {} _velocity_mbs: Dict[str, MBWrapper] = {} - def __init__(self): - - ENTITY_IDENT_URL = 'http://entityident:5002/api/v1/resources/' - - response = requests.get(ENTITY_IDENT_URL + 'cars') - cars = response.json() - - response = requests.get(ENTITY_IDENT_URL + 'traffic_lights') - traffic_lights = response.json() - - for traffic_light in traffic_lights['cursor']: - self.tls[traffic_light['id']] = {'color': traffic_light['initialColor'], 'switching_time': traffic_light['switchingTime']} - - print('tls', self.tls) - + def __init__(self, cars, traffic_lights): for car in cars['cursor']: self.vins.append(car['vin']) - print('vins', self.vins) + for traffic_light in traffic_lights['cursor']: + self.tls[traffic_light['id']] = {'color': traffic_light['initialColor'], + 'switching_time': traffic_light['switchingTime']} def setup_msg_queues(self): # spawn the vehicle related message broker channels @@ -58,7 +46,8 @@ class Orchestrator: # spawn the traffic light related mb channels for tl in self.tls: - tl_channel = MBWrapper(exchange_name='traffic_light_state_{}'.format(tl), callback=self.handle_tl_state_receive) + tl_channel = MBWrapper(exchange_name='traffic_light_state_{}'.format(tl), + callback=self.handle_tl_state_receive) tl_channel.setup_receiver() def handle_daf_receive(self, pickle_binary):