150 lines
5.4 KiB
Python
150 lines
5.4 KiB
Python
import time
|
|
from threading import Thread
|
|
from typing import Union
|
|
|
|
import pika
|
|
from pika.exceptions import AMQPConnectionError
|
|
|
|
|
|
class MBWrapper:
|
|
host: str
|
|
exchange_type: str
|
|
exchange_name: str
|
|
callback: callable
|
|
|
|
_logger: Union['MBWrapper', None]
|
|
|
|
_type: str = None
|
|
_connection: pika.BlockingConnection
|
|
_channel: pika.BlockingConnection.channel
|
|
|
|
def __init__(self,
|
|
host: str = 'rabbitmq',
|
|
exchange_type: str = 'fanout',
|
|
exchange_name: str = None,
|
|
callback: callable = None,
|
|
verbose: bool = False):
|
|
"""
|
|
Initializes a new Message Broker Wrapper (MBWrapper). Initialize this object afterwards with
|
|
self.setup_sender() or self.setup_receiver() if messages will be published to or received from the queue.
|
|
|
|
If the exchange_name is not "logger", the MBWrapper will also initialize an sending queue which forwards
|
|
EVERY request to this exchange, too. Therefore, every request routed via a MBWrapper is logged. Except if it is
|
|
the logger MBWrapper. This one is initialized in the EventStore Service and receives all messages.
|
|
|
|
:param host: of running rabbitMQ instance
|
|
:param exchange_type: rabbitMQ exchange type to use
|
|
:param exchange_name: name of exchange
|
|
:param callback: callable callback to execute if message receiver will be set up, can also be set later on.
|
|
:param verbose: print handled messages to console
|
|
"""
|
|
assert exchange_name, 'Please define an exchange name'
|
|
|
|
# append a connection to the logging broker, if it isn't the logging broker itself
|
|
if exchange_name != 'logger':
|
|
self._logger = MBWrapper(exchange_name='logger')
|
|
self._logger.setup_sender()
|
|
else:
|
|
self._logger = None
|
|
|
|
self.host = host
|
|
self.exchange_type = exchange_type
|
|
self.exchange_name = exchange_name
|
|
self.callback = callback
|
|
self.verbose = verbose
|
|
|
|
def print(self, *msg):
|
|
"""
|
|
Modified print which prints only to stout with self.verbose.
|
|
:param msg: to print
|
|
"""
|
|
if self.verbose:
|
|
print(*msg)
|
|
|
|
def setup_sender(self):
|
|
"""
|
|
Setup the MBWrapper as sender.
|
|
"""
|
|
assert self._type != 'receiver', 'MBWrapper is already a receiver. Use another MBWrapper.'
|
|
self._type = 'sender'
|
|
self._setup_channel()
|
|
|
|
def setup_receiver(self, callback: callable = None):
|
|
"""
|
|
Setup the MBWrapper as a receiver.
|
|
A callback method which can handle the response bytes as input is mandatory.
|
|
Set it here if not already done on init.
|
|
"""
|
|
if callback:
|
|
self.callback = callback
|
|
|
|
assert self._type != 'sender', 'MBWrapper is already a sender. Use another MBWrapper.'
|
|
assert self.callback, \
|
|
'Please setup MBWrapper with "on response" self.callback which can handle a byte string as input.'
|
|
|
|
def consumer():
|
|
"""
|
|
If initialized as receiver: Consumer thread which waits for incoming messages, invokes self._receive
|
|
"""
|
|
self._setup_channel()
|
|
result = self._channel.queue_declare(queue='', exclusive=True)
|
|
queue_name = result.method.queue
|
|
|
|
self._channel.queue_bind(exchange=self.exchange_name, queue=queue_name)
|
|
|
|
self.print('[{}] Waiting for messages.'.format(self.exchange_name))
|
|
|
|
self._channel.basic_consume(
|
|
queue=queue_name, on_message_callback=self._receive, auto_ack=True)
|
|
|
|
self._channel.start_consuming()
|
|
|
|
Thread(target=consumer).start()
|
|
|
|
def send(self, message: bytes):
|
|
"""
|
|
Send the message to the queue. Forward also to logger.
|
|
|
|
:param message: msg to send
|
|
"""
|
|
if type(message) is not bytes:
|
|
message = str(message).encode()
|
|
self._channel.basic_publish(exchange=self.exchange_name, routing_key='', body=message)
|
|
self.print("[{}] Sent {}".format(self.exchange_name, message))
|
|
|
|
# has a logger IFF not 'logger' as exchange name -> forwards everything to logger exchange, too
|
|
if self.__getattribute__('_logger'):
|
|
# print('logging')
|
|
self._logger.send(message)
|
|
|
|
def close(self):
|
|
"""
|
|
Closes the connection.
|
|
"""
|
|
self._connection.close()
|
|
|
|
def _setup_channel(self):
|
|
"""
|
|
Setup the rabbitMQ channel. Retry if not succeeded (Allow the dependencies to
|
|
boot). Wait one second between retries.
|
|
"""
|
|
connect_succeeded = False
|
|
while not connect_succeeded:
|
|
try:
|
|
self._connection = pika.BlockingConnection(
|
|
pika.ConnectionParameters(host=self.host))
|
|
connect_succeeded = True
|
|
except pika.exceptions.AMQPConnectionError:
|
|
print('Could not connect, retry')
|
|
time.sleep(1)
|
|
|
|
self._channel = self._connection.channel()
|
|
self._channel.exchange_declare(exchange=self.exchange_name, exchange_type=self.exchange_type)
|
|
|
|
def _receive(self, ch, method, properties, body):
|
|
"""
|
|
Reduce complexity, only forward the message body to the callback method. The others are not necessary for our
|
|
application example.
|
|
"""
|
|
self.callback(body)
|