Calculate average from last elements (3.1.4)

This commit is contained in:
Tobias Eidelpes 2021-05-17 17:31:34 +02:00
parent 45054d8f57
commit 928902fe0b

View File

@ -1,8 +1,7 @@
package dst.ass3.messaging.impl; package dst.ass3.messaging.impl;
import com.rabbitmq.client.Channel; import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Connection; import com.rabbitmq.client.*;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.http.client.Client; import com.rabbitmq.http.client.Client;
import dst.ass3.messaging.Constants; import dst.ass3.messaging.Constants;
import dst.ass3.messaging.IWorkloadMonitor; import dst.ass3.messaging.IWorkloadMonitor;
@ -10,17 +9,17 @@ import dst.ass3.messaging.Region;
import dst.ass3.messaging.WorkerResponse; import dst.ass3.messaging.WorkerResponse;
import java.io.IOException; import java.io.IOException;
import java.math.BigDecimal;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.HashMap; import java.nio.charset.StandardCharsets;
import java.util.List; import java.util.*;
import java.util.Map;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
public class WorkloadMonitor implements IWorkloadMonitor { public class WorkloadMonitor implements IWorkloadMonitor {
Client client; Client client = null;
Connection connection; Connection connection = null;
Channel channel; Channel channel = null;
String queueName; String queueName;
Map<Region, List<WorkerResponse>> requests = new HashMap<>(); Map<Region, List<WorkerResponse>> requests = new HashMap<>();
@ -43,6 +42,36 @@ public class WorkloadMonitor implements IWorkloadMonitor {
channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, "topic"); channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, "topic");
queueName = channel.queueDeclare().getQueue(); queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, Constants.TOPIC_EXCHANGE, "requests.#"); channel.queueBind(queueName, Constants.TOPIC_EXCHANGE, "requests.#");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
ObjectMapper mapper = new ObjectMapper();
WorkerResponse workerResponse = mapper.readValue(message, WorkerResponse.class);
Region region = null;
if (envelope.getRoutingKey().equals(Constants.ROUTING_KEY_AT_VIENNA))
region = Region.AT_VIENNA;
else if (envelope.getRoutingKey().equals(Constants.ROUTING_KEY_AT_LINZ))
region = Region.AT_LINZ;
else if (envelope.getRoutingKey().equals(Constants.ROUTING_KEY_DE_BERLIN))
region = Region.DE_BERLIN;
List<WorkerResponse> responses = new ArrayList<>();
if (requests.containsKey(region)) {
responses = requests.get(region);
responses.add(workerResponse);
}
requests.put(region, responses);
}
};
channel.basicConsume(queueName, true, consumer);
} catch (IOException | TimeoutException e) { } catch (IOException | TimeoutException e) {
e.printStackTrace(); e.printStackTrace();
} }
@ -76,6 +105,7 @@ public class WorkloadMonitor implements IWorkloadMonitor {
} }
private Double avgTimeLastTen(List<WorkerResponse> workerResponses) { private Double avgTimeLastTen(List<WorkerResponse> workerResponses) {
Collections.reverse(workerResponses);
return workerResponses.stream().limit(10).mapToDouble(WorkerResponse::getProcessingTime).average().orElse(0); return workerResponses.stream().limit(10).mapToDouble(WorkerResponse::getProcessingTime).average().orElse(0);
} }
@ -97,4 +127,4 @@ public class WorkloadMonitor implements IWorkloadMonitor {
if (client != null) if (client != null)
client.closeConnection(Constants.RMQ_API_URL); client.closeConnection(Constants.RMQ_API_URL);
} }
} }