Implement WorkloadMonitor (3.1.3)

This commit is contained in:
Tobias Eidelpes 2021-05-17 16:29:46 +02:00
parent 6aab14cb71
commit 221c680af4

View File

@ -0,0 +1,100 @@
package dst.ass3.messaging.impl;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.http.client.Client;
import dst.ass3.messaging.Constants;
import dst.ass3.messaging.IWorkloadMonitor;
import dst.ass3.messaging.Region;
import dst.ass3.messaging.WorkerResponse;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class WorkloadMonitor implements IWorkloadMonitor {
Client client;
Connection connection;
Channel channel;
String queueName;
Map<Region, List<WorkerResponse>> requests = new HashMap<>();
public WorkloadMonitor() {
try {
client = new Client(Constants.RMQ_API_URL, Constants.RMQ_USER, Constants.RMQ_PASSWORD);
} catch (MalformedURLException | URISyntaxException e) {
e.printStackTrace();
}
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(Constants.RMQ_HOST);
factory.setUsername(Constants.RMQ_USER);
factory.setPassword(Constants.RMQ_PASSWORD);
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, "topic");
queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, Constants.TOPIC_EXCHANGE, "requests.#");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
@Override
public Map<Region, Long> getRequestCount() {
Map<Region, Long> result = new HashMap<>();
result.put(Region.AT_VIENNA, client.getQueue("/", Constants.QUEUE_AT_VIENNA).getTotalMessages());
result.put(Region.AT_LINZ, client.getQueue("/", Constants.QUEUE_AT_LINZ).getTotalMessages());
result.put(Region.DE_BERLIN, client.getQueue("/", Constants.QUEUE_DE_BERLIN).getTotalMessages());
return result;
}
@Override
public Map<Region, Long> getWorkerCount() {
Map<Region, Long> result = new HashMap<>();
result.put(Region.AT_VIENNA, client.getQueue("/", Constants.QUEUE_AT_VIENNA).getConsumerCount());
result.put(Region.AT_LINZ, client.getQueue("/", Constants.QUEUE_AT_LINZ).getConsumerCount());
result.put(Region.DE_BERLIN, client.getQueue("/", Constants.QUEUE_DE_BERLIN).getConsumerCount());
return result;
}
@Override
public Map<Region, Double> getAverageProcessingTime() {
Map<Region, Double> result = new HashMap<>();
result.put(Region.AT_VIENNA, avgTimeLastTen(requests.get(Region.AT_VIENNA)));
result.put(Region.AT_LINZ, avgTimeLastTen(requests.get(Region.AT_LINZ)));
result.put(Region.DE_BERLIN, avgTimeLastTen(requests.get(Region.DE_BERLIN)));
return result;
}
private Double avgTimeLastTen(List<WorkerResponse> workerResponses) {
return workerResponses.stream().limit(10).mapToDouble(WorkerResponse::getProcessingTime).average().orElse(0);
}
@Override
public void close() throws IOException {
if (queueName != null && channel != null) {
client.getQueues().forEach(System.out::println);
channel.queueDelete(queueName);
channel.exchangeDelete(Constants.TOPIC_EXCHANGE);
try {
channel.close();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if (connection != null)
connection.close();
if (client != null)
client.closeConnection(Constants.RMQ_API_URL);
}
}