make message unpacking testable;
added message unpacking tests;
This commit is contained in:
parent
4a38a3a4a7
commit
8cb45d9b23
@ -67,6 +67,14 @@ class EventLogger:
|
||||
|
||||
:param bytes msg: pickled msg binary to log
|
||||
"""
|
||||
key, to_log = self._unpack_message_to_log(msg)
|
||||
|
||||
if self.log_to_redis:
|
||||
self.redis.lpush(key, "{}".format(to_log))
|
||||
if self.verbose:
|
||||
print(to_log)
|
||||
|
||||
def _unpack_message_to_log(self, msg: bytes):
|
||||
try:
|
||||
msg = pickle.loads(msg)
|
||||
except Exception as e:
|
||||
@ -90,9 +98,9 @@ class EventLogger:
|
||||
try:
|
||||
to_log = json.dumps(msg.to_dict())
|
||||
except AttributeError:
|
||||
try:
|
||||
to_log = json.dumps(msg)
|
||||
except (json.decoder.JSONDecodeError, TypeError):
|
||||
to_log = str(msg)
|
||||
|
||||
if self.log_to_redis:
|
||||
self.redis.lpush(key, "{}".format(to_log))
|
||||
if self.verbose:
|
||||
print(to_log)
|
||||
return key, to_log
|
||||
|
||||
68
components/event_store/service/test_event_logger.py
Normal file
68
components/event_store/service/test_event_logger.py
Normal file
@ -0,0 +1,68 @@
|
||||
import datetime
|
||||
import json
|
||||
import pickle
|
||||
import unittest
|
||||
|
||||
import geopy
|
||||
from dse_shared_libs.daf import DAF
|
||||
from dse_shared_libs.target_velocity import TargetVelocity
|
||||
from dse_shared_libs.traffic_light_color import TrafficLightColor
|
||||
from dse_shared_libs.traffic_light_state import TrafficLightState
|
||||
from redis import StrictRedis
|
||||
|
||||
from event_logger import EventLogger
|
||||
|
||||
|
||||
class TestEventLogger(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
self.el = EventLogger(StrictRedis(), False, False)
|
||||
self.timestamp = datetime.datetime.now()
|
||||
|
||||
def test_unpack_daf(self):
|
||||
daf = DAF(vehicle_identification_number='my_vin',
|
||||
gps_location=geopy.Point(0, 0, 0),
|
||||
velocity=130.0,
|
||||
near_crash_event=False,
|
||||
timestamp=self.timestamp,
|
||||
)
|
||||
|
||||
key, message = self.el._unpack_message_to_log(pickle.dumps(daf))
|
||||
|
||||
self.assertEqual(key, 'DAF:my_vin')
|
||||
self.assertEqual(message, json.dumps(daf.to_dict()))
|
||||
|
||||
def test_unpack_target_velocity(self):
|
||||
tv = TargetVelocity(vin='my_other_vin',
|
||||
target_velocity=120.0,
|
||||
timestamp=self.timestamp)
|
||||
|
||||
key, message = self.el._unpack_message_to_log(pickle.dumps(tv))
|
||||
|
||||
self.assertEqual(key, 'TV:my_other_vin')
|
||||
self.assertEqual(message, json.dumps(tv.to_dict()))
|
||||
|
||||
def test_unpack_traffic_light_state(self):
|
||||
tls = TrafficLightState(tlid='my_traffic_light',
|
||||
color=TrafficLightColor.GREEN,
|
||||
last_switch=self.timestamp)
|
||||
|
||||
key, message = self.el._unpack_message_to_log(pickle.dumps(tls))
|
||||
|
||||
self.assertEqual(key, 'TL:my_traffic_light')
|
||||
self.assertEqual(message, json.dumps(tls.to_dict()))
|
||||
|
||||
def test_unpack_unknown_dict(self):
|
||||
unknown = dict(foo='foo', bar='bar')
|
||||
|
||||
key, message = self.el._unpack_message_to_log(pickle.dumps(unknown))
|
||||
|
||||
self.assertEqual(key, 'UNKNOWN')
|
||||
self.assertEqual(message, json.dumps(unknown))
|
||||
|
||||
def test_unpack_unknown_object(self):
|
||||
obj = datetime.datetime.now()
|
||||
|
||||
key, message = self.el._unpack_message_to_log(pickle.dumps(obj))
|
||||
|
||||
self.assertEqual(key, 'UNKNOWN')
|
||||
self.assertEqual(message, str(obj))
|
||||
Loading…
x
Reference in New Issue
Block a user