From 8cb4304abc9783f095a47346a8635c75833e7ed6 Mon Sep 17 00:00:00 2001 From: Tobias Eidelpes Date: Mon, 17 May 2021 15:55:23 +0200 Subject: [PATCH] Implement QueueManager (3.1.1) --- .../dst/ass3/messaging/impl/QueueManager.java | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 ass3-messaging/src/main/java/dst/ass3/messaging/impl/QueueManager.java diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/impl/QueueManager.java b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/QueueManager.java new file mode 100644 index 0000000..5b54c6f --- /dev/null +++ b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/QueueManager.java @@ -0,0 +1,67 @@ +package dst.ass3.messaging.impl; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import dst.ass3.messaging.Constants; +import dst.ass3.messaging.IQueueManager; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +public class QueueManager implements IQueueManager { + private Connection connection; + private Channel channel; + private String viennaQueue; + private String linzQueue; + private String berlinQueue; + + @Override + public void setUp() { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(Constants.RMQ_HOST); + factory.setUsername(Constants.RMQ_USER); + factory.setPassword(Constants.RMQ_PASSWORD); + + try{ + connection = factory.newConnection(); + } catch (IOException | TimeoutException e) { + e.printStackTrace(); + } + + try { + channel = connection.createChannel(); + channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, "topic"); + channel.exchangeDeclare(Constants.QUEUE_AT_VIENNA, "direct", true); + channel.exchangeDeclare(Constants.QUEUE_AT_LINZ, "direct", true); + channel.exchangeDeclare(Constants.QUEUE_DE_BERLIN, "direct", true); + + channel.queueBind(viennaQueue, Constants.QUEUE_AT_VIENNA, Constants.ROUTING_KEY_AT_VIENNA.toLowerCase()); + channel.queueBind(linzQueue, Constants.QUEUE_AT_LINZ, Constants.ROUTING_KEY_AT_LINZ.toLowerCase()); + channel.queueBind(berlinQueue, Constants.QUEUE_DE_BERLIN, Constants.ROUTING_KEY_DE_BERLIN.toLowerCase()); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void tearDown() { + try { + channel.queueDelete(berlinQueue); + channel.queueDelete(linzQueue); + channel.queueDelete(viennaQueue); + channel.exchangeDelete(Constants.QUEUE_DE_BERLIN); + channel.exchangeDelete(Constants.QUEUE_AT_LINZ); + channel.exchangeDelete(Constants.QUEUE_AT_VIENNA); + channel.exchangeDelete(Constants.TOPIC_EXCHANGE); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void close() throws IOException { + if (connection != null) + connection.close(); + } +}