diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/impl/WorkloadMonitor.java b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/WorkloadMonitor.java new file mode 100644 index 0000000..fe4a09d --- /dev/null +++ b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/WorkloadMonitor.java @@ -0,0 +1,100 @@ +package dst.ass3.messaging.impl; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.http.client.Client; +import dst.ass3.messaging.Constants; +import dst.ass3.messaging.IWorkloadMonitor; +import dst.ass3.messaging.Region; +import dst.ass3.messaging.WorkerResponse; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeoutException; + +public class WorkloadMonitor implements IWorkloadMonitor { + Client client; + Connection connection; + Channel channel; + String queueName; + Map> requests = new HashMap<>(); + + public WorkloadMonitor() { + try { + client = new Client(Constants.RMQ_API_URL, Constants.RMQ_USER, Constants.RMQ_PASSWORD); + } catch (MalformedURLException | URISyntaxException e) { + e.printStackTrace(); + } + + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(Constants.RMQ_HOST); + factory.setUsername(Constants.RMQ_USER); + factory.setPassword(Constants.RMQ_PASSWORD); + + try { + connection = factory.newConnection(); + channel = connection.createChannel(); + + channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, "topic"); + queueName = channel.queueDeclare().getQueue(); + channel.queueBind(queueName, Constants.TOPIC_EXCHANGE, "requests.#"); + } catch (IOException | TimeoutException e) { + e.printStackTrace(); + } + } + + @Override + public Map getRequestCount() { + Map result = new HashMap<>(); + result.put(Region.AT_VIENNA, client.getQueue("/", Constants.QUEUE_AT_VIENNA).getTotalMessages()); + result.put(Region.AT_LINZ, client.getQueue("/", Constants.QUEUE_AT_LINZ).getTotalMessages()); + result.put(Region.DE_BERLIN, client.getQueue("/", Constants.QUEUE_DE_BERLIN).getTotalMessages()); + return result; + } + + @Override + public Map getWorkerCount() { + Map result = new HashMap<>(); + result.put(Region.AT_VIENNA, client.getQueue("/", Constants.QUEUE_AT_VIENNA).getConsumerCount()); + result.put(Region.AT_LINZ, client.getQueue("/", Constants.QUEUE_AT_LINZ).getConsumerCount()); + result.put(Region.DE_BERLIN, client.getQueue("/", Constants.QUEUE_DE_BERLIN).getConsumerCount()); + return result; + } + + @Override + public Map getAverageProcessingTime() { + Map result = new HashMap<>(); + result.put(Region.AT_VIENNA, avgTimeLastTen(requests.get(Region.AT_VIENNA))); + result.put(Region.AT_LINZ, avgTimeLastTen(requests.get(Region.AT_LINZ))); + result.put(Region.DE_BERLIN, avgTimeLastTen(requests.get(Region.DE_BERLIN))); + return result; + } + + private Double avgTimeLastTen(List workerResponses) { + return workerResponses.stream().limit(10).mapToDouble(WorkerResponse::getProcessingTime).average().orElse(0); + } + + @Override + public void close() throws IOException { + if (queueName != null && channel != null) { + client.getQueues().forEach(System.out::println); + channel.queueDelete(queueName); + channel.exchangeDelete(Constants.TOPIC_EXCHANGE); + try { + channel.close(); + } catch (TimeoutException e) { + e.printStackTrace(); + } + } + + if (connection != null) + connection.close(); + if (client != null) + client.closeConnection(Constants.RMQ_API_URL); + } +}