diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/impl/WorkloadMonitor.java b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/WorkloadMonitor.java index fe4a09d..088add2 100644 --- a/ass3-messaging/src/main/java/dst/ass3/messaging/impl/WorkloadMonitor.java +++ b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/WorkloadMonitor.java @@ -1,8 +1,7 @@ package dst.ass3.messaging.impl; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.rabbitmq.client.*; import com.rabbitmq.http.client.Client; import dst.ass3.messaging.Constants; import dst.ass3.messaging.IWorkloadMonitor; @@ -10,17 +9,17 @@ import dst.ass3.messaging.Region; import dst.ass3.messaging.WorkerResponse; import java.io.IOException; +import java.math.BigDecimal; import java.net.MalformedURLException; import java.net.URISyntaxException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.nio.charset.StandardCharsets; +import java.util.*; import java.util.concurrent.TimeoutException; public class WorkloadMonitor implements IWorkloadMonitor { - Client client; - Connection connection; - Channel channel; + Client client = null; + Connection connection = null; + Channel channel = null; String queueName; Map> requests = new HashMap<>(); @@ -43,6 +42,36 @@ public class WorkloadMonitor implements IWorkloadMonitor { channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, "topic"); queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, Constants.TOPIC_EXCHANGE, "requests.#"); + + Consumer consumer = new DefaultConsumer(channel) { + @Override + public void handleDelivery(String consumerTag, + Envelope envelope, + AMQP.BasicProperties properties, + byte[] body) throws IOException { + String message = new String(body, StandardCharsets.UTF_8); + ObjectMapper mapper = new ObjectMapper(); + WorkerResponse workerResponse = mapper.readValue(message, WorkerResponse.class); + Region region = null; + + if (envelope.getRoutingKey().equals(Constants.ROUTING_KEY_AT_VIENNA)) + region = Region.AT_VIENNA; + else if (envelope.getRoutingKey().equals(Constants.ROUTING_KEY_AT_LINZ)) + region = Region.AT_LINZ; + else if (envelope.getRoutingKey().equals(Constants.ROUTING_KEY_DE_BERLIN)) + region = Region.DE_BERLIN; + + List responses = new ArrayList<>(); + if (requests.containsKey(region)) { + responses = requests.get(region); + responses.add(workerResponse); + } + + requests.put(region, responses); + } + }; + + channel.basicConsume(queueName, true, consumer); } catch (IOException | TimeoutException e) { e.printStackTrace(); } @@ -76,6 +105,7 @@ public class WorkloadMonitor implements IWorkloadMonitor { } private Double avgTimeLastTen(List workerResponses) { + Collections.reverse(workerResponses); return workerResponses.stream().limit(10).mapToDouble(WorkerResponse::getProcessingTime).average().orElse(0); } @@ -97,4 +127,4 @@ public class WorkloadMonitor implements IWorkloadMonitor { if (client != null) client.closeConnection(Constants.RMQ_API_URL); } -} +} \ No newline at end of file