import time from threading import Thread from typing import Union import pika from pika.exceptions import AMQPConnectionError class MBWrapper: host: str exchange_type: str exchange_name: str callback: callable _logger: Union['MBWrapper', None] _type: str = None _connection: pika.BlockingConnection _channel: pika.BlockingConnection.channel def __init__(self, host: str = 'rabbitmq', exchange_type: str = 'fanout', exchange_name: str = None, callback: callable = None, verbose: bool = False): """ Initializes a new Message Broker Wrapper (MBWrapper). Initialize this object afterwards with self.setup_sender() or self.setup_receiver() if messages will be published to or received from the queue. If the exchange_name is not "logger", the MBWrapper will also initialize an sending queue which forwards EVERY request to this exchange, too. Therefore, every request routed via a MBWrapper is logged. Except if it is the logger MBWrapper. This one is initialized in the EventStore Service and receives all messages. :param host: of running rabbitMQ instance :param exchange_type: rabbitMQ exchange type to use :param exchange_name: name of exchange :param callback: callable callback to execute if message receiver will be set up, can also be set later on. :param verbose: print handled messages to console """ assert exchange_name, 'Please define an exchange name' # append a connection to the logging broker, if it isn't the logging broker itself if exchange_name != 'logger': self._logger = MBWrapper(exchange_name='logger') self._logger.setup_sender() else: self._logger = None self.host = host self.exchange_type = exchange_type self.exchange_name = exchange_name self.callback = callback self.verbose = verbose def print(self, *msg): """ Modified print which prints only to stout with self.verbose. :param msg: to print """ if self.verbose: print(*msg) def setup_sender(self): """ Setup the MBWrapper as sender. """ assert self._type != 'receiver', 'MBWrapper is already a receiver. Use another MBWrapper.' self._type = 'sender' self._setup_channel() def setup_receiver(self, callback: callable = None): """ Setup the MBWrapper as a receiver. A callback method which can handle the response bytes as input is mandatory. Set it here if not already done on init. """ if callback: self.callback = callback assert self._type != 'sender', 'MBWrapper is already a sender. Use another MBWrapper.' assert self.callback, \ 'Please setup MBWrapper with "on response" self.callback which can handle a byte string as input.' def consumer(): """ If initialized as receiver: Consumer thread which waits for incoming messages, invokes self._receive """ self._setup_channel() result = self._channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue self._channel.queue_bind(exchange=self.exchange_name, queue=queue_name) self.print('[{}] Waiting for messages.'.format(self.exchange_name)) self._channel.basic_consume( queue=queue_name, on_message_callback=self._receive, auto_ack=True) self._channel.start_consuming() Thread(target=consumer).start() def send(self, message: bytes): """ Send the message to the queue. Forward also to logger. :param message: msg to send """ if type(message) is not bytes: message = str(message).encode() self._channel.basic_publish(exchange=self.exchange_name, routing_key='', body=message) self.print("[{}] Sent {}".format(self.exchange_name, message)) # has a logger IFF not 'logger' as exchange name -> forwards everything to logger exchange, too if self.__getattribute__('_logger'): # print('logging') self._logger.send(message) def close(self): """ Closes the connection. """ self._connection.close() def _setup_channel(self): """ Setup the rabbitMQ channel. Retry if not succeeded (Allow the dependencies to boot). Wait one second between retries. """ connect_succeeded = False while not connect_succeeded: try: self._connection = pika.BlockingConnection( pika.ConnectionParameters(host=self.host)) connect_succeeded = True except pika.exceptions.AMQPConnectionError: print('Could not connect, retry') time.sleep(1) self._channel = self._connection.channel() self._channel.exchange_declare(exchange=self.exchange_name, exchange_type=self.exchange_type) def _receive(self, ch, method, properties, body): """ Reduce complexity, only forward the message body to the callback method. The others are not necessary for our application example. """ self.callback(body)