diff --git a/components/i_feed/message_broker_wrapper.py b/components/i_feed/message_broker_wrapper.py new file mode 100644 index 0000000..d6c04e7 --- /dev/null +++ b/components/i_feed/message_broker_wrapper.py @@ -0,0 +1,71 @@ +import time +from threading import Thread + +import pika + + +class MBWrapper: + host: str + exchange_type: str + exchange_name: str + callback: callable + + _type: str + _connection: pika.BlockingConnection + _channel: pika.BlockingConnection.channel + + def __init__(self, + host: str = 'localhost', + exchange_type: str = 'fanout', + exchange_name: str = None, + callback: callable = None): + assert exchange_name, 'Please define an exchange name' + + self.host = host + self.exchange_type = exchange_type + self.exchange_name = exchange_name + self.callback = callback + + def setup_sender(self): + assert self._type != 'receiver', 'MBWrapper is already a receiver. Use another MBWrapper.' + self._type = 'sender' + self._setup_channel() + + def setup_receiver(self): + assert self._type != 'sender', 'MBWrapper is already a sender. Use another MBWrapper.' + assert callback, 'Please setup MBWrapper with "on response" self.callback which can handle a byte string as input.' + + def consumer(): + self._setup_channel() + result = self._channel.queue_declare(queue='', exclusive=True) + queue_name = result.method.queue + + self._channel.queue_bind(exchange=self.exchange_name, queue=queue_name) + + print(' [*] Waiting for messages. To exit press CTRL+C') + + self._channel.basic_consume( + queue=queue_name, on_message_callback=self._receive, auto_ack=True) + + self._channel.start_consuming() + + Thread(target=consumer).start() + + def send(self, message: bytes = b"info: Hello World!"): + if type(message) is not bytes: + message = str(message).encode() + self._channel.basic_publish(exchange=self.exchange_name, routing_key='', body=message) + print(" [x] Sent %r" % message) + + def close(self): + self._connection.close() + + def _setup_channel(self): + self._connection = pika.BlockingConnection( + pika.ConnectionParameters(host=self.host)) + self._channel = self._connection.channel() + + self._channel.exchange_declare(exchange=self.exchange_name, exchange_type=self.exchange_type) + + def _receive(self, ch, method, properties, body): + self.callback(body)