diff --git a/components/shared/dse_shared_libs/message_broker_wrapper.py b/components/shared/dse_shared_libs/message_broker_wrapper.py index 07b3a79..334591f 100644 --- a/components/shared/dse_shared_libs/message_broker_wrapper.py +++ b/components/shared/dse_shared_libs/message_broker_wrapper.py @@ -24,6 +24,20 @@ class MBWrapper: exchange_name: str = None, callback: callable = None, verbose: bool = False): + """ + Initializes a new Message Broker Wrapper (MBWrapper). Initialize this object afterwards with + self.setup_sender() or self.setup_receiver() if messages will be published to or received from the queue. + + If the exchange_name is not "logger", the MBWrapper will also initialize an sending queue which forwards + EVERY request to this exchange, too. Therefore, every request routed via a MBWrapper is logged. Except if it is + the logger MBWrapper. This one is initialized in the EventStore Service and receives all messages. + + :param host: of running rabbitMQ instance + :param exchange_type: rabbitMQ exchange type to use + :param exchange_name: name of exchange + :param callback: callable callback to execute if message receiver will be set up, can also be set later on. + :param verbose: print handled messages to console + """ assert exchange_name, 'Please define an exchange name' # append a connection to the logging broker, if it isn't the logging broker itself @@ -40,22 +54,37 @@ class MBWrapper: self.verbose = verbose def print(self, *msg): + """ + Modified print which prints only to stout with self.verbose. + :param msg: to print + """ if self.verbose: print(*msg) def setup_sender(self): + """ + Setup the MBWrapper as sender. + """ assert self._type != 'receiver', 'MBWrapper is already a receiver. Use another MBWrapper.' self._type = 'sender' self._setup_channel() - def setup_receiver(self): + def setup_receiver(self, callback: callable = None): + """ + Setup the MBWrapper as a receiver. + A callback method which can handle the response bytes as input is mandatory. + Set it here if not already done on init. + """ + if callback: + self.callback = callback + assert self._type != 'sender', 'MBWrapper is already a sender. Use another MBWrapper.' assert self.callback, \ 'Please setup MBWrapper with "on response" self.callback which can handle a byte string as input.' def consumer(): """ - Consumer thread which waits for incoming messages, invokes self._receive + If initialized as receiver: Consumer thread which waits for incoming messages, invokes self._receive """ self._setup_channel() result = self._channel.queue_declare(queue='', exclusive=True) @@ -73,6 +102,11 @@ class MBWrapper: Thread(target=consumer).start() def send(self, message: bytes): + """ + Send the message to the queue. Forward also to logger. + + :param message: msg to send + """ if type(message) is not bytes: message = str(message).encode() self._channel.basic_publish(exchange=self.exchange_name, routing_key='', body=message) @@ -84,9 +118,16 @@ class MBWrapper: self._logger.send(message) def close(self): + """ + Closes the connection. + """ self._connection.close() def _setup_channel(self): + """ + Setup the rabbitMQ channel. Retry if not succeeded (Allow the dependencies to + boot). Wait one second between retries. + """ connect_succeeded = False while not connect_succeeded: try: @@ -102,6 +143,7 @@ class MBWrapper: def _receive(self, ch, method, properties, body): """ - Reduce complexity, only forward the message body to the callback method + Reduce complexity, only forward the message body to the callback method. The others are not necessary for our + application example. """ self.callback(body)