Implement UDP Listener for MonitoringServer

This commit is contained in:
Tobias Eidelpes 2020-11-16 15:23:42 +01:00
parent b946b47c8c
commit 439bea7cb1
2 changed files with 111 additions and 6 deletions

View File

@ -0,0 +1,64 @@
package dslab.monitoring;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.SocketException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Logger;
public class MonitoringListener extends Thread {
private static final Logger logger = Logger.getLogger(MonitoringListener.class.getName());
private final DatagramSocket serverSocket;
private final ConcurrentHashMap<String, Integer> addresses;
private final ConcurrentHashMap<String, Integer> servers;
public MonitoringListener(DatagramSocket serverSocket, ConcurrentHashMap<String, Integer> addresses, ConcurrentHashMap<String, Integer> servers) {
this.serverSocket = serverSocket;
this.addresses = addresses;
this.servers = servers;
}
@Override
public void run() {
byte[] recvBuffer = new byte[1024];
while (!Thread.currentThread().isInterrupted()) {
logger.finer("Waiting for request on UDP serverSocket " + serverSocket.toString());
try {
DatagramPacket packet = new DatagramPacket(recvBuffer, recvBuffer.length);
serverSocket.receive(packet);
logger.fine("Processing incoming socket " + serverSocket.toString());
String receivedData = new String(packet.getData(), StandardCharsets.UTF_8).trim();
String server = receivedData.split("\\s+")[0];
String address = receivedData.split("\\s+")[1];
logger.info("Incoming packet contains server: " + server);
logger.info("Incoming packet contains address: " + address);
// Add/Update num of packets sent by server
if (!this.servers.containsKey(server))
this.servers.put(server, 1);
else
this.servers.put(server, this.servers.get(server) + 1);
// Add/Update num of packets sent for address
if (!this.addresses.containsKey(address))
this.addresses.put(address, 1);
else
this.addresses.put(address, this.addresses.get(address) + 1);
} catch (InterruptedIOException | SocketException e) {
logger.finer("Received interrupt. Exiting " + this.toString());
this.shutdown();
} catch (IOException e) {
logger.severe("Error starting serverSocket " + serverSocket.toString());
e.printStackTrace();
this.shutdown();
}
}
}
public void shutdown() {
logger.finer("Shutting down MonitoringListener " + this.toString());
this.serverSocket.close();
this.interrupt();
}
}

View File

@ -1,12 +1,31 @@
package dslab.monitoring;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Logger;
import at.ac.tuwien.dsg.orvell.Shell;
import at.ac.tuwien.dsg.orvell.StopShellException;
import at.ac.tuwien.dsg.orvell.annotation.Command;
import dslab.ComponentFactory;
import dslab.util.Config;
public class MonitoringServer implements IMonitoringServer {
private static final Logger logger = Logger.getLogger(MonitoringServer.class.getName());
private DatagramSocket serverSocket;
private final Shell shell;
private final Integer serverPort;
private MonitoringListener monitoringListener;
private final ConcurrentHashMap<String, Integer> addresses = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Integer> servers = new ConcurrentHashMap<>();
/**
* Creates a new server instance.
@ -17,32 +36,54 @@ public class MonitoringServer implements IMonitoringServer {
* @param out the output stream to write console output to
*/
public MonitoringServer(String componentId, Config config, InputStream in, PrintStream out) {
// TODO
this.shell = new Shell(in, out);
this.shell.register(this);
this.shell.setPrompt("MonitoringServer> ");
this.serverPort = config.getInt("udp.port");
}
@Override
public void run() {
// TODO
logger.info("Creating UDP serverSocket for " + this.toString());
try {
this.serverSocket = new DatagramSocket(serverPort);
} catch (IOException e) {
logger.severe("Error creating serverSocket " + serverSocket.toString());
e.printStackTrace();
shutdown();
}
this.monitoringListener = new MonitoringListener(serverSocket, addresses, servers);
this.monitoringListener.start();
this.shell.run();
}
@Command
@Override
public void addresses() {
// TODO
for (String address : addresses.keySet()) {
this.shell.out().println(address + " " + addresses.get(address));
}
}
@Command
@Override
public void servers() {
// TODO
for (String server : servers.keySet()) {
this.shell.out().println(server + " " + servers.get(server));
}
}
@Command
@Override
public void shutdown() {
// TODO
if (serverSocket != null)
serverSocket.close();
this.monitoringListener.interrupt();
throw new StopShellException();
}
public static void main(String[] args) throws Exception {
IMonitoringServer server = ComponentFactory.createMonitoringServer(args[0], System.in, System.out);
server.run();
}
}