dse-assignment/components/shared/message_broker_wrapper.py
2021-05-24 17:52:19 +02:00

77 lines
2.5 KiB
Python

from threading import Thread
import pika
class MBWrapper:
host: str
exchange_type: str
exchange_name: str
callback: callable
_type: str = None
_connection: pika.BlockingConnection
_channel: pika.BlockingConnection.channel
def __init__(self,
host: str = 'localhost',
exchange_type: str = 'fanout',
exchange_name: str = None,
callback: callable = None,
verbose: bool = False):
assert exchange_name, 'Please define an exchange name'
self.host = host
self.exchange_type = exchange_type
self.exchange_name = exchange_name
self.callback = callback
self.verbose = verbose
def setup_sender(self):
assert self._type != 'receiver', 'MBWrapper is already a receiver. Use another MBWrapper.'
self._type = 'sender'
self._setup_channel()
def setup_receiver(self):
assert self._type != 'sender', 'MBWrapper is already a sender. Use another MBWrapper.'
assert self.callback, 'Please setup MBWrapper with "on response" self.callback which can handle a byte string as input.'
def consumer():
self._setup_channel()
result = self._channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
self._channel.queue_bind(exchange=self.exchange_name, queue=queue_name)
self.print(' [*] Waiting for messages. To exit press CTRL+C')
self._channel.basic_consume(
queue=queue_name, on_message_callback=self._receive, auto_ack=True)
self._channel.start_consuming()
Thread(target=consumer).start()
def send(self, message: bytes = b"info: Hello World!"):
if type(message) is not bytes:
message = str(message).encode()
self._channel.basic_publish(exchange=self.exchange_name, routing_key='', body=message)
self.print(" [x] Sent %r" % message)
def close(self):
self._connection.close()
def print(self, *msg):
if self.verbose:
print(*msg)
def _setup_channel(self):
self._connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self.host))
self._channel = self._connection.channel()
self._channel.exchange_declare(exchange=self.exchange_name, exchange_type=self.exchange_type)
def _receive(self, ch, method, properties, body):
self.callback(body)