Marco Zeisler 8cb45d9b23 make message unpacking testable;
added message unpacking tests;
2021-06-17 19:00:26 +02:00

107 lines
3.4 KiB
Python

import json
import pickle
import sys
from dse_shared_libs import daf, traffic_light_state, traffic_light_color, target_velocity
from dse_shared_libs.daf import DAF
from dse_shared_libs.message_broker_wrapper import MBWrapper
from dse_shared_libs.target_velocity import TargetVelocity
from dse_shared_libs.traffic_light_state import TrafficLightState
from redis import StrictRedis
sys.modules['daf'] = daf
sys.modules['traffic_light_state'] = traffic_light_state
sys.modules['traffic_light_color'] = traffic_light_color
sys.modules['target_velocity'] = target_velocity
class EventLogger:
def __init__(self, redis: StrictRedis, log_to_redis: bool, verbose: bool):
"""
:param redis:
:param log_to_redis:
:param verbose:
"""
self.redis = redis
self.log_to_redis = log_to_redis
self.verbose = verbose
def setup_msq_queue(self):
"""
Sets up the message queue which calls self.log on msg receive.
"""
mb = MBWrapper(exchange_name='logger', callback=self.log)
mb.setup_receiver()
def get_keys(self):
"""
:returns: the available redis keys.
"""
return json.dumps([key.decode() for key in self.redis.keys()])
def get_list_of(self, key: str):
"""
Returns a list of all elements inside a key. Most recent elements are stored first.
:param key: DAF:<VIN> or TV:<VIN> for either DAF events or TV events
:returns: list of elements
"""
key = self.redis.lrange(key, 0, -1)
return key.decode() if key else json.dumps("")
def get_index_of(self, key: str, index: int):
"""
Returns element matching index in key. Most recent elements are stored first.
:param key: DAF:<VIN> or TV:<VIN> for either DAF events or TV events
:param index: 0 for most recent
"""
key = self.redis.lindex(key, index)
return key.decode() if key else json.dumps("")
def log(self, msg: bytes):
"""
Logging callback. Logs the msg depending on its type to the respective redis key.
Types can be DAF, TrafficLightState or TargetVelocity. If type is unknown, message is still logged as key
UNKNOWN.
:param bytes msg: pickled msg binary to log
"""
key, to_log = self._unpack_message_to_log(msg)
if self.log_to_redis:
self.redis.lpush(key, "{}".format(to_log))
if self.verbose:
print(to_log)
def _unpack_message_to_log(self, msg: bytes):
try:
msg = pickle.loads(msg)
except Exception as e:
print("Could not unpickle [{}]: {}", msg, e)
self.redis: StrictRedis
self.redis.append('ERRORS', str(e))
key: str
if type(msg) == DAF:
msg: DAF
key = 'DAF:{}'.format(msg.vehicle_identification_number)
elif type(msg) == TrafficLightState:
msg: TrafficLightState
key = 'TL:{}'.format(msg.tlid)
elif type(msg) == TargetVelocity:
msg: TargetVelocity
key = 'TV:{}'.format(msg.vin)
else:
key = 'UNKNOWN'
try:
to_log = json.dumps(msg.to_dict())
except AttributeError:
try:
to_log = json.dumps(msg)
except (json.decoder.JSONDecodeError, TypeError):
to_log = str(msg)
return key, to_log