diff --git a/components/event_store/service/event_logger.py b/components/event_store/service/event_logger.py index ed284fa..d886372 100644 --- a/components/event_store/service/event_logger.py +++ b/components/event_store/service/event_logger.py @@ -1,21 +1,44 @@ import pickle -from flask_redis import Redis +from redis import StrictRedis +from dse_shared_libs.daf import DAF +from dse_shared_libs.target_velocity import TargetVelocity +from dse_shared_libs.traffic_light_state import TrafficLightState from dse_shared_libs.message_broker_wrapper import MBWrapper class EventLogger: - redis: Redis - - def __init__(self, redis): + def __init__(self, redis, log_to_redis, verbose): self.redis = redis + self.log_to_redis = log_to_redis + self.verbose = verbose - @staticmethod - def log(msg): + def log(self, msg): try: - print("{}".format(pickle.loads(msg))) + msg = pickle.loads(msg) except Exception as e: print("Could not unpickle [{}]: {}", msg, e) + self.redis: StrictRedis + self.redis.append('ERRORS', str(e)) + + key: str + if type(msg) == DAF: + msg: DAF + key = 'DAF:{}'.format(msg.vehicle_identification_number) + elif type(msg) == TrafficLightState: + msg: TrafficLightState + key = 'TL:{}'.format(msg.tlid) + elif type(msg) == TargetVelocity: + msg: TargetVelocity + key = 'TV:{}'.format(msg.vin) + else: + key = 'UNKNOWN' + + to_log = str(msg) + if self.log_to_redis: + self.redis.append(key, "{}
".format(to_log)) + if self.verbose: + print(to_log) def setup_msq_queue(self): mb = MBWrapper(exchange_name='logger', callback=self.log) diff --git a/components/event_store/service/event_store_server.py b/components/event_store/service/event_store_server.py index 6cbeba5..d79920b 100644 --- a/components/event_store/service/event_store_server.py +++ b/components/event_store/service/event_store_server.py @@ -1,8 +1,14 @@ +import json import threading -from event_logger import EventLogger from flask import Flask from flask_redis import Redis +from redis import StrictRedis + +from event_logger import EventLogger + +LOG_TO_REDIS = True +VERBOSE = True # make sure redis (container) is running and accessible @@ -12,19 +18,32 @@ from flask_redis import Redis app = Flask(__name__) app.config["REDIS_HOST"] = "redis" app.config["REDIS_PORT"] = 6379 -redis = Redis(app) +redis: StrictRedis = Redis(app) + + +@app.route("/api/keys") +def api_get_keys(): + return json.dumps([key.decode() for key in redis.keys()]) + + +@app.route("/api/keys//") +def api_get_of_key(key): + key = redis.get(key) + return key.decode() if key else json.dumps("") @app.route("/") -def home_page(): - redis.set('tl1', 'sending change red -> green
') - redis.append('tl1', 'sending change green -> red
') - redis.set('v1', 'receiving vel 100 km/h
') - - return redis.get('tl1') + redis.get('v1') +def get_keys(): + keys = redis.keys() + keys.sort() + return 'Existing keys:

{}'.format( + ['{0}'.format(key.decode()) for key in keys]) + \ + '

Access Api via

' \ + 'GET /api/keys
' \ + 'GET /api/keys/<key>
' if __name__ == '__main__': - el = EventLogger(redis) + el = EventLogger(redis, LOG_TO_REDIS, VERBOSE) threading.Thread(target=el.setup_msq_queue).start() threading.Thread(target=app.run, args=('0.0.0.0', 5001)).start()