From 9d14725c37524b9bae80a26df4512a1367baf3b5 Mon Sep 17 00:00:00 2001 From: Tobias Eidelpes Date: Tue, 27 Oct 2020 17:17:51 +0100 Subject: [PATCH] Implement correct behaviour on shutdown --- src/main/java/dslab/Message.java | 2 +- .../java/dslab/transfer/ClientConnection.java | 70 ++++++++++++------- .../java/dslab/transfer/ClientListener.java | 64 +++++++++++++++++ .../java/dslab/transfer/TransferServer.java | 60 +++++----------- 4 files changed, 128 insertions(+), 68 deletions(-) create mode 100644 src/main/java/dslab/transfer/ClientListener.java diff --git a/src/main/java/dslab/Message.java b/src/main/java/dslab/Message.java index 99cb40c..a623009 100644 --- a/src/main/java/dslab/Message.java +++ b/src/main/java/dslab/Message.java @@ -5,7 +5,7 @@ import dslab.exception.MissingInputException; import java.util.ArrayList; public class Message { - private ArrayList to; + private ArrayList to = new ArrayList<>(); private Email from; private String subject; private String data; diff --git a/src/main/java/dslab/transfer/ClientConnection.java b/src/main/java/dslab/transfer/ClientConnection.java index 746bee2..e9a99bf 100644 --- a/src/main/java/dslab/transfer/ClientConnection.java +++ b/src/main/java/dslab/transfer/ClientConnection.java @@ -6,18 +6,19 @@ import dslab.exception.MalformedInputException; import dslab.exception.MissingInputException; import dslab.exception.UnknownRecipientException; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.PrintWriter; +import java.io.*; import java.net.Socket; +import java.net.SocketException; +import java.util.ArrayList; import java.util.logging.Logger; public class ClientConnection implements Runnable { Logger logger = Logger.getLogger(ClientConnection.class.getName()); - private Socket socket; + private final Socket socket; + private PrintWriter out; + private BufferedReader in; - private Message msg; + private Message msg = new Message(); public ClientConnection(Socket connection) { this.socket = connection; @@ -25,19 +26,28 @@ public class ClientConnection implements Runnable { @Override public void run() { + logger.finer("Preparing for DMTP communication in " + this.toString()); try { - PrintWriter out = new PrintWriter(socket.getOutputStream(), true); - BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); + this.out = new PrintWriter(socket.getOutputStream(), true); + this.in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out.println("ok DMTP"); String userInput; - if (!("begin".equals(in.readLine()))) { - out.println("error protocol error"); + try { + if (!("begin".equals(in.readLine()))) { + out.println("error protocol error"); + shutdown(); + } + } catch (SocketException e) { + // Thrown if socket has already been closed by shutdown() method + // TODO maybe send message that connection has been closed by server? + logger.finer("Received interrupt. Exiting " + this.toString()); shutdown(); } + out.println("ok"); - while ((userInput = in.readLine()) != null) { + while (!Thread.currentThread().isInterrupted() && (userInput = in.readLine()) != null) { if ("quit".equals(userInput)) { out.println("ok bye"); shutdown(); @@ -49,31 +59,33 @@ public class ClientConnection implements Runnable { out.println(e.getMessage()); } } else if ("to".equals(userInput.split("\\s+")[0])) { + msg.setTo(new ArrayList<>()); String[] emailAddresses = userInput.split("\\s+")[1].split(","); int count = 0; - for (String emailAddress : emailAddresses) { - try { + try { + for (String emailAddress : emailAddresses) { msg.addTo(new Email(emailAddress)); count++; - } catch (MalformedInputException mie) { - out.println(mie.getMessage()); } - } - out.println("ok " + count); - } else if ("from".equals(userInput.split("\\s+")[0])) { - try { - msg.setFrom(new Email(userInput.split("\\s+")[1])); + out.println("ok " + count); + } catch (MalformedInputException mie) { + out.println(mie.getMessage()); + } + } else if ("from".equals(userInput.split("\\s+")[0])) { + try { + Email from = new Email(userInput.split("\\s+")[1]); + this.msg.setFrom(from); + out.println("ok"); } catch (MalformedInputException mie) { out.println(mie.getMessage()); } - out.println("ok"); } else if ("subject".equals(userInput.split("\\s+")[0])) { String subject = userInput.split("\\s+", 1)[1]; logger.info("Setting subject to: " + subject); msg.setSubject(subject); out.println("ok"); } else if ("data".equals(userInput.split("\\s+")[0])) { - String data = userInput.split("\\s+", 1)[1]; + String data = userInput.split("\\s+", 2)[1]; logger.info("Setting data to: " + data); msg.setData(data); out.println("ok"); @@ -82,6 +94,9 @@ public class ClientConnection implements Runnable { shutdown(); } } + } catch (InterruptedIOException ioe) { + logger.info("Received interrupt from parent. Shutting down..."); + shutdown(); } catch (IOException e) { logger.severe("Failed to get IO-Stream"); e.printStackTrace(); @@ -90,16 +105,21 @@ public class ClientConnection implements Runnable { } public void shutdown() { + logger.info("Shutting down client connection " + this.toString()); try { - socket.close(); + if (socket != null) + socket.close(); + in.close(); + out.close(); } catch (IOException e) { - logger.severe("Error closing socket " + socket.toString()); + logger.severe("Error closing socket and/or IO-streams"); e.printStackTrace(); } + Thread.currentThread().interrupt(); } public void sendMessage() throws MissingInputException { msg.allFieldsSet(); - // TODO send message to mailbox servers or submit to asynchronous queue + // TODO send message to asynchronous queue managed by TransferServer } } diff --git a/src/main/java/dslab/transfer/ClientListener.java b/src/main/java/dslab/transfer/ClientListener.java new file mode 100644 index 0000000..7a1e068 --- /dev/null +++ b/src/main/java/dslab/transfer/ClientListener.java @@ -0,0 +1,64 @@ +package dslab.transfer; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketException; +import java.util.ArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +public class ClientListener extends Thread { + private final ServerSocket serverSocket; + private final Logger logger = Logger.getLogger(ClientListener.class.getName()); + private final ArrayList clients = new ArrayList<>(); + private final ExecutorService executorService = Executors.newCachedThreadPool(); + + public ClientListener(ServerSocket serverSocket) { + this.serverSocket = serverSocket; + } + + @Override + public void run() { + while (!Thread.currentThread().isInterrupted()) { + logger.finer("Waiting for request on serverSocket " + serverSocket.toString()); + try { + Socket s = serverSocket.accept(); + logger.fine("Processing incoming socket " + s.toString()); + ClientConnection clientConnection = new ClientConnection(s); + clients.add(clientConnection); + executorService.submit(clientConnection); + } catch (InterruptedIOException | SocketException e) { + logger.finer("Received interrupt. Exiting " + this.toString()); + this.shutdown(); + } catch (IOException e) { + logger.severe("Error starting serverSocket " + serverSocket.toString()); + e.printStackTrace(); + this.shutdown(); + } + } + } + + public void shutdown() { + logger.finer("Shutting down ClientListener " + this.toString()); + for (ClientConnection client : clients) { + if (client != null) + client.shutdown(); + } + executorService.shutdown(); + try { + if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) + System.err.println("Pool did not terminate"); + } + } catch (InterruptedException ie) { + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + this.interrupt(); + } +} diff --git a/src/main/java/dslab/transfer/TransferServer.java b/src/main/java/dslab/transfer/TransferServer.java index fea3a21..5296bdd 100644 --- a/src/main/java/dslab/transfer/TransferServer.java +++ b/src/main/java/dslab/transfer/TransferServer.java @@ -4,27 +4,23 @@ import java.io.IOException; import java.io.InputStream; import java.io.PrintStream; import java.net.ServerSocket; -import java.net.Socket; import java.util.ArrayList; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import at.ac.tuwien.dsg.orvell.Shell; +import at.ac.tuwien.dsg.orvell.StopShellException; +import at.ac.tuwien.dsg.orvell.annotation.Command; import dslab.ComponentFactory; import dslab.util.Config; public class TransferServer implements ITransferServer, Runnable { Logger logger = Logger.getLogger(TransferServer.class.getName()); - private ServerSocket serverSocket; - private Integer serverPort; - - private ExecutorService executorService = Executors.newFixedThreadPool(20); - + private final Shell shell; + private final Integer serverPort; private ArrayList mailboxServers; + /** * Creates a new server instance. * @@ -36,62 +32,42 @@ public class TransferServer implements ITransferServer, Runnable { public TransferServer(String componentId, Config config, InputStream in, PrintStream out) { Config earthPlanet = new Config("mailbox-earth-planet"); Config univerZe = new Config("mailbox-univer-ze"); - mailboxServers.add(earthPlanet); - mailboxServers.add(univerZe); - Shell shell = new Shell().register("shutdown", (input, context) -> shutdown()); + // TODO parse domains for mailbox servers + this.shell = new Shell(in, out); + this.shell.register(this); + this.shell.setPrompt("Transferserver> "); this.serverPort = config.getInt("tcp.port"); - - shell.run(); } @Override public void run() { + logger.info("Creating serverSocket for " + this.toString()); try { - logger.info("Creating serverSocket for " + this.toString()); - serverSocket = new ServerSocket(serverPort); - - // TODO stop server on quit command over shell - while (true) { - logger.info("Waiting for request on serverSocket " + serverSocket.toString()); - try { - Socket s = serverSocket.accept(); - logger.info("Processing incoming socket " + s.toString()); - executorService.submit(new ClientConnection(s)); - } catch (IOException ioe) { - logger.severe("Error starting serverSocket " + serverSocket.toString()); - ioe.printStackTrace(); - } - } + this.serverSocket = new ServerSocket(serverPort); } catch (IOException e) { logger.severe("Error creating serverSocket " + serverSocket.toString()); e.printStackTrace(); + shutdown(); } + new ClientListener(serverSocket).start(); + this.shell.run(); } + @Command @Override public void shutdown() { try { - serverSocket.close(); + if (serverSocket != null) + serverSocket.close(); } catch (IOException e) { logger.severe("Error closing serverSocket " + serverSocket.toString()); e.printStackTrace(); } - executorService.shutdown(); - try { - if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { - executorService.shutdownNow(); - if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) - System.err.println("Pool did not terminate"); - } - } catch (InterruptedException ie) { - executorService.shutdownNow(); - Thread.currentThread().interrupt(); - } + throw new StopShellException(); } public static void main(String[] args) throws Exception { ITransferServer server = ComponentFactory.createTransferServer(args[0], System.in, System.out); server.run(); } - }