import time from threading import Thread import pika class MBWrapper: host: str exchange_type: str exchange_name: str callback: callable _type: str _connection: pika.BlockingConnection _channel: pika.BlockingConnection.channel def __init__(self, host: str = 'localhost', exchange_type: str = 'fanout', exchange_name: str = None, callback: callable = None): assert exchange_name, 'Please define an exchange name' self.host = host self.exchange_type = exchange_type self.exchange_name = exchange_name self.callback = callback def setup_sender(self): assert self._type != 'receiver', 'MBWrapper is already a receiver. Use another MBWrapper.' self._type = 'sender' self._setup_channel() def setup_receiver(self): assert self._type != 'sender', 'MBWrapper is already a sender. Use another MBWrapper.' assert callback, 'Please setup MBWrapper with "on response" self.callback which can handle a byte string as input.' def consumer(): self._setup_channel() result = self._channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue self._channel.queue_bind(exchange=self.exchange_name, queue=queue_name) print(' [*] Waiting for messages. To exit press CTRL+C') self._channel.basic_consume( queue=queue_name, on_message_callback=self._receive, auto_ack=True) self._channel.start_consuming() Thread(target=consumer).start() def send(self, message: bytes = b"info: Hello World!"): if type(message) is not bytes: message = str(message).encode() self._channel.basic_publish(exchange=self.exchange_name, routing_key='', body=message) print(" [x] Sent %r" % message) def close(self): self._connection.close() def _setup_channel(self): self._connection = pika.BlockingConnection( pika.ConnectionParameters(host=self.host)) self._channel = self._connection.channel() self._channel.exchange_declare(exchange=self.exchange_name, exchange_type=self.exchange_type) def _receive(self, ch, method, properties, body): self.callback(body)