99 lines
3.1 KiB
Python
99 lines
3.1 KiB
Python
import json
|
|
import pickle
|
|
import sys
|
|
|
|
from dse_shared_libs import daf, traffic_light_state, traffic_light_color, target_velocity
|
|
from dse_shared_libs.daf import DAF
|
|
from dse_shared_libs.message_broker_wrapper import MBWrapper
|
|
from dse_shared_libs.target_velocity import TargetVelocity
|
|
from dse_shared_libs.traffic_light_state import TrafficLightState
|
|
from redis import StrictRedis
|
|
|
|
sys.modules['daf'] = daf
|
|
sys.modules['traffic_light_state'] = traffic_light_state
|
|
sys.modules['traffic_light_color'] = traffic_light_color
|
|
sys.modules['target_velocity'] = target_velocity
|
|
|
|
|
|
class EventLogger:
|
|
def __init__(self, redis: StrictRedis, log_to_redis: bool, verbose: bool):
|
|
"""
|
|
:param redis:
|
|
:param log_to_redis:
|
|
:param verbose:
|
|
"""
|
|
self.redis = redis
|
|
self.log_to_redis = log_to_redis
|
|
self.verbose = verbose
|
|
|
|
def setup_msq_queue(self):
|
|
"""
|
|
Sets up the message queue which calls self.log on msg receive.
|
|
"""
|
|
mb = MBWrapper(exchange_name='logger', callback=self.log)
|
|
mb.setup_receiver()
|
|
|
|
def get_keys(self):
|
|
"""
|
|
:returns: the available redis keys.
|
|
"""
|
|
return json.dumps([key.decode() for key in self.redis.keys()])
|
|
|
|
def get_list_of(self, key: str):
|
|
"""
|
|
Returns a list of all elements inside a key. Most recent elements are stored first.
|
|
|
|
:param key: DAF:<VIN> or TV:<VIN> for either DAF events or TV events
|
|
:returns: list of elements
|
|
"""
|
|
key = self.redis.lrange(key, 0, -1)
|
|
return key.decode() if key else json.dumps("")
|
|
|
|
def get_index_of(self, key: str, index: int):
|
|
"""
|
|
Returns element matching index in key. Most recent elements are stored first.
|
|
|
|
:param key: DAF:<VIN> or TV:<VIN> for either DAF events or TV events
|
|
:param index: 0 for most recent
|
|
"""
|
|
key = self.redis.lindex(key, index)
|
|
return key.decode() if key else json.dumps("")
|
|
|
|
def log(self, msg: bytes):
|
|
"""
|
|
Logging callback. Logs the msg depending on its type to the respective redis key.
|
|
Types can be DAF, TrafficLightState or TargetVelocity. If type is unknown, message is still logged as key
|
|
UNKNOWN.
|
|
|
|
:param bytes msg: pickled msg binary to log
|
|
"""
|
|
try:
|
|
msg = pickle.loads(msg)
|
|
except Exception as e:
|
|
print("Could not unpickle [{}]: {}", msg, e)
|
|
self.redis: StrictRedis
|
|
self.redis.append('ERRORS', str(e))
|
|
|
|
key: str
|
|
if type(msg) == DAF:
|
|
msg: DAF
|
|
key = 'DAF:{}'.format(msg.vehicle_identification_number)
|
|
elif type(msg) == TrafficLightState:
|
|
msg: TrafficLightState
|
|
key = 'TL:{}'.format(msg.tlid)
|
|
elif type(msg) == TargetVelocity:
|
|
msg: TargetVelocity
|
|
key = 'TV:{}'.format(msg.vin)
|
|
else:
|
|
key = 'UNKNOWN'
|
|
|
|
try:
|
|
to_log = json.dumps(msg.to_dict())
|
|
except AttributeError:
|
|
to_log = str(msg)
|
|
|
|
if self.log_to_redis:
|
|
self.redis.lpush(key, "{}".format(to_log))
|
|
if self.verbose:
|
|
print(to_log)
|