dse-assignment/components/shared/dse_shared_libs/message_broker_wrapper.py
Marco Zeisler b0efd9d35c update doc
2021-06-11 17:46:45 +02:00

150 lines
5.4 KiB
Python

import time
from threading import Thread
from typing import Union
import pika
from pika.exceptions import AMQPConnectionError
class MBWrapper:
host: str
exchange_type: str
exchange_name: str
callback: callable
_logger: Union['MBWrapper', None]
_type: str = None
_connection: pika.BlockingConnection
_channel: pika.BlockingConnection.channel
def __init__(self,
host: str = 'rabbitmq',
exchange_type: str = 'fanout',
exchange_name: str = None,
callback: callable = None,
verbose: bool = False):
"""
Initializes a new Message Broker Wrapper (MBWrapper). Initialize this object afterwards with
self.setup_sender() or self.setup_receiver() if messages will be published to or received from the queue.
If the exchange_name is not "logger", the MBWrapper will also initialize an sending queue which forwards
EVERY request to this exchange, too. Therefore, every request routed via a MBWrapper is logged. Except if it is
the logger MBWrapper. This one is initialized in the EventStore Service and receives all messages.
:param host: of running rabbitMQ instance
:param exchange_type: rabbitMQ exchange type to use
:param exchange_name: name of exchange
:param callback: callable callback to execute if message receiver will be set up, can also be set later on.
:param verbose: print handled messages to console
"""
assert exchange_name, 'Please define an exchange name'
# append a connection to the logging broker, if it isn't the logging broker itself
if exchange_name != 'logger':
self._logger = MBWrapper(exchange_name='logger')
self._logger.setup_sender()
else:
self._logger = None
self.host = host
self.exchange_type = exchange_type
self.exchange_name = exchange_name
self.callback = callback
self.verbose = verbose
def print(self, *msg):
"""
Modified print which prints only to stout with self.verbose.
:param msg: to print
"""
if self.verbose:
print(*msg)
def setup_sender(self):
"""
Setup the MBWrapper as sender.
"""
assert self._type != 'receiver', 'MBWrapper is already a receiver. Use another MBWrapper.'
self._type = 'sender'
self._setup_channel()
def setup_receiver(self, callback: callable = None):
"""
Setup the MBWrapper as a receiver.
A callback method which can handle the response bytes as input is mandatory.
Set it here if not already done on init.
"""
if callback:
self.callback = callback
assert self._type != 'sender', 'MBWrapper is already a sender. Use another MBWrapper.'
assert self.callback, \
'Please setup MBWrapper with "on response" self.callback which can handle a byte string as input.'
def consumer():
"""
If initialized as receiver: Consumer thread which waits for incoming messages, invokes self._receive
"""
self._setup_channel()
result = self._channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
self._channel.queue_bind(exchange=self.exchange_name, queue=queue_name)
self.print('[{}] Waiting for messages.'.format(self.exchange_name))
self._channel.basic_consume(
queue=queue_name, on_message_callback=self._receive, auto_ack=True)
self._channel.start_consuming()
Thread(target=consumer).start()
def send(self, message: bytes):
"""
Send the message to the queue. Forward also to logger.
:param message: msg to send
"""
if type(message) is not bytes:
message = str(message).encode()
self._channel.basic_publish(exchange=self.exchange_name, routing_key='', body=message)
self.print("[{}] Sent {}".format(self.exchange_name, message))
# has a logger IFF not 'logger' as exchange name -> forwards everything to logger exchange, too
if self.__getattribute__('_logger'):
# print('logging')
self._logger.send(message)
def close(self):
"""
Closes the connection.
"""
self._connection.close()
def _setup_channel(self):
"""
Setup the rabbitMQ channel. Retry if not succeeded (Allow the dependencies to
boot). Wait one second between retries.
"""
connect_succeeded = False
while not connect_succeeded:
try:
self._connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self.host))
connect_succeeded = True
except pika.exceptions.AMQPConnectionError:
print('Could not connect, retry')
time.sleep(1)
self._channel = self._connection.channel()
self._channel.exchange_declare(exchange=self.exchange_name, exchange_type=self.exchange_type)
def _receive(self, ch, method, properties, body):
"""
Reduce complexity, only forward the message body to the callback method. The others are not necessary for our
application example.
"""
self.callback(body)