From d9fe77818646467b425da5e6222d0ebbda4628d2 Mon Sep 17 00:00:00 2001 From: Marco Zeisler Date: Fri, 28 May 2021 19:03:56 +0200 Subject: [PATCH] retry connection setup until succeeds; --- .../dse_shared_libs/message_broker_wrapper.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/components/shared/dse_shared_libs/message_broker_wrapper.py b/components/shared/dse_shared_libs/message_broker_wrapper.py index 8ae2e65..bb48033 100644 --- a/components/shared/dse_shared_libs/message_broker_wrapper.py +++ b/components/shared/dse_shared_libs/message_broker_wrapper.py @@ -1,7 +1,9 @@ +import time from threading import Thread from typing import Union import pika +from pika.exceptions import AMQPConnectionError class MBWrapper: @@ -79,10 +81,17 @@ class MBWrapper: self._connection.close() def _setup_channel(self): - self._connection = pika.BlockingConnection( - pika.ConnectionParameters(host=self.host)) - self._channel = self._connection.channel() + connect_succeeded = False + while not connect_succeeded: + try: + self._connection = pika.BlockingConnection( + pika.ConnectionParameters(host=self.host)) + connect_succeeded = True + except pika.exceptions.AMQPConnectionError: + print('Could not connect, retry') + time.sleep(1) + self._channel = self._connection.channel() self._channel.exchange_declare(exchange=self.exchange_name, exchange_type=self.exchange_type) def _receive(self, ch, method, properties, body):