diff --git a/src/main/java/dslab/monitoring/MonitoringListener.java b/src/main/java/dslab/monitoring/MonitoringListener.java new file mode 100644 index 0000000..06cb935 --- /dev/null +++ b/src/main/java/dslab/monitoring/MonitoringListener.java @@ -0,0 +1,64 @@ +package dslab.monitoring; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.SocketException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.ConcurrentHashMap; +import java.util.logging.Logger; + +public class MonitoringListener extends Thread { + private static final Logger logger = Logger.getLogger(MonitoringListener.class.getName()); + private final DatagramSocket serverSocket; + private final ConcurrentHashMap addresses; + private final ConcurrentHashMap servers; + + public MonitoringListener(DatagramSocket serverSocket, ConcurrentHashMap addresses, ConcurrentHashMap servers) { + this.serverSocket = serverSocket; + this.addresses = addresses; + this.servers = servers; + } + + @Override + public void run() { + byte[] recvBuffer = new byte[1024]; + while (!Thread.currentThread().isInterrupted()) { + logger.finer("Waiting for request on UDP serverSocket " + serverSocket.toString()); + try { + DatagramPacket packet = new DatagramPacket(recvBuffer, recvBuffer.length); + serverSocket.receive(packet); + logger.fine("Processing incoming socket " + serverSocket.toString()); + String receivedData = new String(packet.getData(), StandardCharsets.UTF_8).trim(); + String server = receivedData.split("\\s+")[0]; + String address = receivedData.split("\\s+")[1]; + logger.info("Incoming packet contains server: " + server); + logger.info("Incoming packet contains address: " + address); + // Add/Update num of packets sent by server + if (!this.servers.containsKey(server)) + this.servers.put(server, 1); + else + this.servers.put(server, this.servers.get(server) + 1); + // Add/Update num of packets sent for address + if (!this.addresses.containsKey(address)) + this.addresses.put(address, 1); + else + this.addresses.put(address, this.addresses.get(address) + 1); + } catch (InterruptedIOException | SocketException e) { + logger.finer("Received interrupt. Exiting " + this.toString()); + this.shutdown(); + } catch (IOException e) { + logger.severe("Error starting serverSocket " + serverSocket.toString()); + e.printStackTrace(); + this.shutdown(); + } + } + } + + public void shutdown() { + logger.finer("Shutting down MonitoringListener " + this.toString()); + this.serverSocket.close(); + this.interrupt(); + } +} diff --git a/src/main/java/dslab/monitoring/MonitoringServer.java b/src/main/java/dslab/monitoring/MonitoringServer.java index 12bba90..b70a825 100644 --- a/src/main/java/dslab/monitoring/MonitoringServer.java +++ b/src/main/java/dslab/monitoring/MonitoringServer.java @@ -1,12 +1,31 @@ package dslab.monitoring; +import java.io.IOException; import java.io.InputStream; import java.io.PrintStream; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.logging.Logger; +import at.ac.tuwien.dsg.orvell.Shell; +import at.ac.tuwien.dsg.orvell.StopShellException; +import at.ac.tuwien.dsg.orvell.annotation.Command; import dslab.ComponentFactory; import dslab.util.Config; public class MonitoringServer implements IMonitoringServer { + private static final Logger logger = Logger.getLogger(MonitoringServer.class.getName()); + private DatagramSocket serverSocket; + private final Shell shell; + private final Integer serverPort; + private MonitoringListener monitoringListener; + private final ConcurrentHashMap addresses = new ConcurrentHashMap<>(); + private final ConcurrentHashMap servers = new ConcurrentHashMap<>(); /** * Creates a new server instance. @@ -17,32 +36,54 @@ public class MonitoringServer implements IMonitoringServer { * @param out the output stream to write console output to */ public MonitoringServer(String componentId, Config config, InputStream in, PrintStream out) { - // TODO + this.shell = new Shell(in, out); + this.shell.register(this); + this.shell.setPrompt("MonitoringServer> "); + this.serverPort = config.getInt("udp.port"); } @Override public void run() { - // TODO + logger.info("Creating UDP serverSocket for " + this.toString()); + try { + this.serverSocket = new DatagramSocket(serverPort); + } catch (IOException e) { + logger.severe("Error creating serverSocket " + serverSocket.toString()); + e.printStackTrace(); + shutdown(); + } + this.monitoringListener = new MonitoringListener(serverSocket, addresses, servers); + this.monitoringListener.start(); + this.shell.run(); } + @Command @Override public void addresses() { - // TODO + for (String address : addresses.keySet()) { + this.shell.out().println(address + " " + addresses.get(address)); + } } + @Command @Override public void servers() { - // TODO + for (String server : servers.keySet()) { + this.shell.out().println(server + " " + servers.get(server)); + } } + @Command @Override public void shutdown() { - // TODO + if (serverSocket != null) + serverSocket.close(); + this.monitoringListener.interrupt(); + throw new StopShellException(); } public static void main(String[] args) throws Exception { IMonitoringServer server = ComponentFactory.createMonitoringServer(args[0], System.in, System.out); server.run(); } - }