diff --git a/components/shared/dse_shared_libs/message_broker_wrapper.py b/components/shared/dse_shared_libs/message_broker_wrapper.py index 8ae2e65..bb48033 100644 --- a/components/shared/dse_shared_libs/message_broker_wrapper.py +++ b/components/shared/dse_shared_libs/message_broker_wrapper.py @@ -1,7 +1,9 @@ +import time from threading import Thread from typing import Union import pika +from pika.exceptions import AMQPConnectionError class MBWrapper: @@ -79,10 +81,17 @@ class MBWrapper: self._connection.close() def _setup_channel(self): - self._connection = pika.BlockingConnection( - pika.ConnectionParameters(host=self.host)) - self._channel = self._connection.channel() + connect_succeeded = False + while not connect_succeeded: + try: + self._connection = pika.BlockingConnection( + pika.ConnectionParameters(host=self.host)) + connect_succeeded = True + except pika.exceptions.AMQPConnectionError: + print('Could not connect, retry') + time.sleep(1) + self._channel = self._connection.channel() self._channel.exchange_declare(exchange=self.exchange_name, exchange_type=self.exchange_type) def _receive(self, ch, method, properties, body):