update doc
This commit is contained in:
parent
d15659827c
commit
b0efd9d35c
@ -24,6 +24,20 @@ class MBWrapper:
|
|||||||
exchange_name: str = None,
|
exchange_name: str = None,
|
||||||
callback: callable = None,
|
callback: callable = None,
|
||||||
verbose: bool = False):
|
verbose: bool = False):
|
||||||
|
"""
|
||||||
|
Initializes a new Message Broker Wrapper (MBWrapper). Initialize this object afterwards with
|
||||||
|
self.setup_sender() or self.setup_receiver() if messages will be published to or received from the queue.
|
||||||
|
|
||||||
|
If the exchange_name is not "logger", the MBWrapper will also initialize an sending queue which forwards
|
||||||
|
EVERY request to this exchange, too. Therefore, every request routed via a MBWrapper is logged. Except if it is
|
||||||
|
the logger MBWrapper. This one is initialized in the EventStore Service and receives all messages.
|
||||||
|
|
||||||
|
:param host: of running rabbitMQ instance
|
||||||
|
:param exchange_type: rabbitMQ exchange type to use
|
||||||
|
:param exchange_name: name of exchange
|
||||||
|
:param callback: callable callback to execute if message receiver will be set up, can also be set later on.
|
||||||
|
:param verbose: print handled messages to console
|
||||||
|
"""
|
||||||
assert exchange_name, 'Please define an exchange name'
|
assert exchange_name, 'Please define an exchange name'
|
||||||
|
|
||||||
# append a connection to the logging broker, if it isn't the logging broker itself
|
# append a connection to the logging broker, if it isn't the logging broker itself
|
||||||
@ -40,22 +54,37 @@ class MBWrapper:
|
|||||||
self.verbose = verbose
|
self.verbose = verbose
|
||||||
|
|
||||||
def print(self, *msg):
|
def print(self, *msg):
|
||||||
|
"""
|
||||||
|
Modified print which prints only to stout with self.verbose.
|
||||||
|
:param msg: to print
|
||||||
|
"""
|
||||||
if self.verbose:
|
if self.verbose:
|
||||||
print(*msg)
|
print(*msg)
|
||||||
|
|
||||||
def setup_sender(self):
|
def setup_sender(self):
|
||||||
|
"""
|
||||||
|
Setup the MBWrapper as sender.
|
||||||
|
"""
|
||||||
assert self._type != 'receiver', 'MBWrapper is already a receiver. Use another MBWrapper.'
|
assert self._type != 'receiver', 'MBWrapper is already a receiver. Use another MBWrapper.'
|
||||||
self._type = 'sender'
|
self._type = 'sender'
|
||||||
self._setup_channel()
|
self._setup_channel()
|
||||||
|
|
||||||
def setup_receiver(self):
|
def setup_receiver(self, callback: callable = None):
|
||||||
|
"""
|
||||||
|
Setup the MBWrapper as a receiver.
|
||||||
|
A callback method which can handle the response bytes as input is mandatory.
|
||||||
|
Set it here if not already done on init.
|
||||||
|
"""
|
||||||
|
if callback:
|
||||||
|
self.callback = callback
|
||||||
|
|
||||||
assert self._type != 'sender', 'MBWrapper is already a sender. Use another MBWrapper.'
|
assert self._type != 'sender', 'MBWrapper is already a sender. Use another MBWrapper.'
|
||||||
assert self.callback, \
|
assert self.callback, \
|
||||||
'Please setup MBWrapper with "on response" self.callback which can handle a byte string as input.'
|
'Please setup MBWrapper with "on response" self.callback which can handle a byte string as input.'
|
||||||
|
|
||||||
def consumer():
|
def consumer():
|
||||||
"""
|
"""
|
||||||
Consumer thread which waits for incoming messages, invokes self._receive
|
If initialized as receiver: Consumer thread which waits for incoming messages, invokes self._receive
|
||||||
"""
|
"""
|
||||||
self._setup_channel()
|
self._setup_channel()
|
||||||
result = self._channel.queue_declare(queue='', exclusive=True)
|
result = self._channel.queue_declare(queue='', exclusive=True)
|
||||||
@ -73,6 +102,11 @@ class MBWrapper:
|
|||||||
Thread(target=consumer).start()
|
Thread(target=consumer).start()
|
||||||
|
|
||||||
def send(self, message: bytes):
|
def send(self, message: bytes):
|
||||||
|
"""
|
||||||
|
Send the message to the queue. Forward also to logger.
|
||||||
|
|
||||||
|
:param message: msg to send
|
||||||
|
"""
|
||||||
if type(message) is not bytes:
|
if type(message) is not bytes:
|
||||||
message = str(message).encode()
|
message = str(message).encode()
|
||||||
self._channel.basic_publish(exchange=self.exchange_name, routing_key='', body=message)
|
self._channel.basic_publish(exchange=self.exchange_name, routing_key='', body=message)
|
||||||
@ -84,9 +118,16 @@ class MBWrapper:
|
|||||||
self._logger.send(message)
|
self._logger.send(message)
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
|
"""
|
||||||
|
Closes the connection.
|
||||||
|
"""
|
||||||
self._connection.close()
|
self._connection.close()
|
||||||
|
|
||||||
def _setup_channel(self):
|
def _setup_channel(self):
|
||||||
|
"""
|
||||||
|
Setup the rabbitMQ channel. Retry if not succeeded (Allow the dependencies to
|
||||||
|
boot). Wait one second between retries.
|
||||||
|
"""
|
||||||
connect_succeeded = False
|
connect_succeeded = False
|
||||||
while not connect_succeeded:
|
while not connect_succeeded:
|
||||||
try:
|
try:
|
||||||
@ -102,6 +143,7 @@ class MBWrapper:
|
|||||||
|
|
||||||
def _receive(self, ch, method, properties, body):
|
def _receive(self, ch, method, properties, body):
|
||||||
"""
|
"""
|
||||||
Reduce complexity, only forward the message body to the callback method
|
Reduce complexity, only forward the message body to the callback method. The others are not necessary for our
|
||||||
|
application example.
|
||||||
"""
|
"""
|
||||||
self.callback(body)
|
self.callback(body)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user