from threading import Thread import pika class MBWrapper: host: str exchange_type: str exchange_name: str callback: callable _type: str = None _connection: pika.BlockingConnection _channel: pika.BlockingConnection.channel def __init__(self, host: str = 'localhost', exchange_type: str = 'fanout', exchange_name: str = None, callback: callable = None, verbose: bool = False): assert exchange_name, 'Please define an exchange name' self.host = host self.exchange_type = exchange_type self.exchange_name = exchange_name self.callback = callback self.verbose = verbose def print(self, *msg): if self.verbose: print(*msg) def setup_sender(self): assert self._type != 'receiver', 'MBWrapper is already a receiver. Use another MBWrapper.' self._type = 'sender' self._setup_channel() def setup_receiver(self): assert self._type != 'sender', 'MBWrapper is already a sender. Use another MBWrapper.' assert self.callback, 'Please setup MBWrapper with "on response" self.callback which can handle a byte string as input.' def consumer(): self._setup_channel() result = self._channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue self._channel.queue_bind(exchange=self.exchange_name, queue=queue_name) self.print('[{}] Waiting for messages.'.format(self.exchange_name)) self._channel.basic_consume( queue=queue_name, on_message_callback=self._receive, auto_ack=True) self._channel.start_consuming() Thread(target=consumer).start() def send(self, message: bytes): if type(message) is not bytes: message = str(message).encode() self._channel.basic_publish(exchange=self.exchange_name, routing_key='', body=message) self.print("[{}] Sent {}".format(self.exchange_name, message)) def close(self): self._connection.close() def _setup_channel(self): self._connection = pika.BlockingConnection( pika.ConnectionParameters(host=self.host)) self._channel = self._connection.channel() self._channel.exchange_declare(exchange=self.exchange_name, exchange_type=self.exchange_type) def _receive(self, ch, method, properties, body): """ Reduce complexity, only forward the message body to the callback method """ self.callback(body)