change datetime.now to datetime.utcnow;
fix test_ifeed test;
This commit is contained in:
parent
1b12dd40b6
commit
bf8d5c450e
@ -16,7 +16,7 @@ from event_logger import EventLogger
|
||||
class TestEventLogger(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
self.el = EventLogger(StrictRedis(), False, False)
|
||||
self.timestamp = datetime.datetime.now()
|
||||
self.timestamp = datetime.datetime.utcnow()
|
||||
|
||||
def test_unpack_daf(self):
|
||||
daf = DAF(vehicle_identification_number='my_vin',
|
||||
@ -60,7 +60,7 @@ class TestEventLogger(unittest.TestCase):
|
||||
self.assertEqual(message, json.dumps(unknown))
|
||||
|
||||
def test_unpack_unknown_object(self):
|
||||
obj = datetime.datetime.now()
|
||||
obj = datetime.datetime.utcnow()
|
||||
|
||||
key, message = self.el._unpack_message_to_log(pickle.dumps(obj))
|
||||
|
||||
|
||||
@ -90,7 +90,7 @@ class TrafficLight:
|
||||
self.current_color = TrafficLightColor((self._starting_color.value - 1) % num_colors)
|
||||
while self.running:
|
||||
self.current_color = TrafficLightColor((self.current_color.value + 1) % num_colors)
|
||||
self.last_switch = datetime.now()
|
||||
self.last_switch = datetime.utcnow()
|
||||
self.send_status_update()
|
||||
time.sleep(self.switching_time / SCALING)
|
||||
|
||||
|
||||
@ -168,7 +168,7 @@ class Vehicle:
|
||||
|
||||
# Get old and updated timestamps
|
||||
old_timestamp = self.last_update
|
||||
updated_timestamp = datetime.now()
|
||||
updated_timestamp = datetime.utcnow()
|
||||
self.last_update = updated_timestamp
|
||||
|
||||
# get driving time between timestamps (in seconds)
|
||||
@ -201,7 +201,7 @@ class Vehicle:
|
||||
informs the message broker about the current state (DAF) of the vehicle.
|
||||
"""
|
||||
print('{} starts driving ... SCALING: x{}\n\n'.format(self.vin, SCALING))
|
||||
self.last_update = datetime.now()
|
||||
self.last_update = datetime.utcnow()
|
||||
self._driven_kms = 0
|
||||
|
||||
self._t = threading.Thread(target=self.drive)
|
||||
@ -266,7 +266,7 @@ class Vehicle:
|
||||
print('\n\nEnd of route reached ... resetting and restarting vehicle')
|
||||
self._gps_location = self._starting_point
|
||||
self._driven_kms = 0
|
||||
self.last_update = datetime.now()
|
||||
self.last_update = datetime.utcnow()
|
||||
self.nce = False
|
||||
self.velocity = STARTING_VELOCITY
|
||||
|
||||
|
||||
@ -21,7 +21,7 @@ class TestVehicle(unittest.TestCase):
|
||||
self.vin = 'my_vin'
|
||||
self.starting_velocity = 130
|
||||
self.starting_point = geopy.Point(0, 0, 0)
|
||||
self.timestamp = datetime.datetime.now()
|
||||
self.timestamp = datetime.datetime.utcnow()
|
||||
|
||||
self.v = Vehicle(
|
||||
vin=self.vin,
|
||||
@ -40,7 +40,7 @@ class TestVehicle(unittest.TestCase):
|
||||
# initially false
|
||||
self.assertEqual(self.v.nce, False)
|
||||
# get past nce km
|
||||
self.v._driven_kms = 3
|
||||
self.v._gps_location.latitude = 48
|
||||
# now nce should fire
|
||||
self.assertEqual(self.v.nce, True)
|
||||
# second call is false again
|
||||
|
||||
@ -61,7 +61,7 @@ class Orchestrator:
|
||||
for traffic_light in traffic_lights['cursor']:
|
||||
self.tls[traffic_light['id']] = {'color': traffic_light['color'],
|
||||
'switching_time': traffic_light['switchingTime'],
|
||||
'last_switch': datetime.now()}
|
||||
'last_switch': datetime.utcnow()}
|
||||
|
||||
def setup_msg_queues(self):
|
||||
"""
|
||||
@ -108,7 +108,7 @@ class Orchestrator:
|
||||
print('Target velocity: {}'.format(target_vel))
|
||||
response_channel.send(pickle.dumps(
|
||||
TargetVelocity(vin=received_daf_object.vehicle_identification_number, target_velocity=target_vel,
|
||||
timestamp=datetime.now())))
|
||||
timestamp=datetime.utcnow())))
|
||||
|
||||
def handle_tl_state_receive(self, msg):
|
||||
"""
|
||||
|
||||
@ -9,7 +9,7 @@ from orchestrator import Orchestrator
|
||||
|
||||
class TestOrchestrator(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
self.timestamp = datetime.datetime.now()
|
||||
self.timestamp = datetime.datetime.utcnow()
|
||||
with patch('requests.sessions.Session.get', MyResponse):
|
||||
self.orc = Orchestrator()
|
||||
|
||||
@ -18,7 +18,18 @@ class TestOrchestrator(unittest.TestCase):
|
||||
tl_geo = {'cursor': [
|
||||
{'id': '1', 'calculatedRange': 1000}
|
||||
]}
|
||||
current_vel = 130.0
|
||||
current_vel = 55.0
|
||||
|
||||
target_vel = self.orc._compute_velocity(tl_geo, current_vel)
|
||||
self.assertEqual(current_vel, target_vel)
|
||||
|
||||
def test_slow_down_if_passing_not_possible(self):
|
||||
self.orc.tls = {'1': {'color': 'RED', 'switching_time': 1, 'last_switch': self.timestamp}}
|
||||
tl_geo = {'cursor': [
|
||||
{'id': '1', 'calculatedRange': 1000}
|
||||
]}
|
||||
current_vel = 50.0
|
||||
|
||||
target_vel = self.orc._compute_velocity(tl_geo, current_vel)
|
||||
self.assertEqual(current_vel, target_vel)
|
||||
print(target_vel)
|
||||
|
||||
@ -6,6 +6,5 @@ class MyDate:
|
||||
cls.timestamp = timestamp
|
||||
|
||||
@classmethod
|
||||
def now(cls):
|
||||
def utcnow(cls):
|
||||
return cls.timestamp
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user