Implement correct behaviour on shutdown

This commit is contained in:
Tobias Eidelpes 2020-10-27 17:17:51 +01:00
parent 0d8ea0fb86
commit 9d14725c37
4 changed files with 128 additions and 68 deletions

View File

@ -5,7 +5,7 @@ import dslab.exception.MissingInputException;
import java.util.ArrayList; import java.util.ArrayList;
public class Message { public class Message {
private ArrayList<Email> to; private ArrayList<Email> to = new ArrayList<>();
private Email from; private Email from;
private String subject; private String subject;
private String data; private String data;

View File

@ -6,18 +6,19 @@ import dslab.exception.MalformedInputException;
import dslab.exception.MissingInputException; import dslab.exception.MissingInputException;
import dslab.exception.UnknownRecipientException; import dslab.exception.UnknownRecipientException;
import java.io.BufferedReader; import java.io.*;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket; import java.net.Socket;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.logging.Logger; import java.util.logging.Logger;
public class ClientConnection implements Runnable { public class ClientConnection implements Runnable {
Logger logger = Logger.getLogger(ClientConnection.class.getName()); Logger logger = Logger.getLogger(ClientConnection.class.getName());
private Socket socket; private final Socket socket;
private PrintWriter out;
private BufferedReader in;
private Message msg; private Message msg = new Message();
public ClientConnection(Socket connection) { public ClientConnection(Socket connection) {
this.socket = connection; this.socket = connection;
@ -25,19 +26,28 @@ public class ClientConnection implements Runnable {
@Override @Override
public void run() { public void run() {
logger.finer("Preparing for DMTP communication in " + this.toString());
try { try {
PrintWriter out = new PrintWriter(socket.getOutputStream(), true); this.out = new PrintWriter(socket.getOutputStream(), true);
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); this.in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out.println("ok DMTP"); out.println("ok DMTP");
String userInput; String userInput;
if (!("begin".equals(in.readLine()))) { try {
out.println("error protocol error"); if (!("begin".equals(in.readLine()))) {
out.println("error protocol error");
shutdown();
}
} catch (SocketException e) {
// Thrown if socket has already been closed by shutdown() method
// TODO maybe send message that connection has been closed by server?
logger.finer("Received interrupt. Exiting " + this.toString());
shutdown(); shutdown();
} }
out.println("ok");
while ((userInput = in.readLine()) != null) { while (!Thread.currentThread().isInterrupted() && (userInput = in.readLine()) != null) {
if ("quit".equals(userInput)) { if ("quit".equals(userInput)) {
out.println("ok bye"); out.println("ok bye");
shutdown(); shutdown();
@ -49,31 +59,33 @@ public class ClientConnection implements Runnable {
out.println(e.getMessage()); out.println(e.getMessage());
} }
} else if ("to".equals(userInput.split("\\s+")[0])) { } else if ("to".equals(userInput.split("\\s+")[0])) {
msg.setTo(new ArrayList<>());
String[] emailAddresses = userInput.split("\\s+")[1].split(","); String[] emailAddresses = userInput.split("\\s+")[1].split(",");
int count = 0; int count = 0;
for (String emailAddress : emailAddresses) { try {
try { for (String emailAddress : emailAddresses) {
msg.addTo(new Email(emailAddress)); msg.addTo(new Email(emailAddress));
count++; count++;
} catch (MalformedInputException mie) {
out.println(mie.getMessage());
} }
} out.println("ok " + count);
out.println("ok " + count); } catch (MalformedInputException mie) {
} else if ("from".equals(userInput.split("\\s+")[0])) { out.println(mie.getMessage());
try { }
msg.setFrom(new Email(userInput.split("\\s+")[1])); } else if ("from".equals(userInput.split("\\s+")[0])) {
try {
Email from = new Email(userInput.split("\\s+")[1]);
this.msg.setFrom(from);
out.println("ok");
} catch (MalformedInputException mie) { } catch (MalformedInputException mie) {
out.println(mie.getMessage()); out.println(mie.getMessage());
} }
out.println("ok");
} else if ("subject".equals(userInput.split("\\s+")[0])) { } else if ("subject".equals(userInput.split("\\s+")[0])) {
String subject = userInput.split("\\s+", 1)[1]; String subject = userInput.split("\\s+", 1)[1];
logger.info("Setting subject to: " + subject); logger.info("Setting subject to: " + subject);
msg.setSubject(subject); msg.setSubject(subject);
out.println("ok"); out.println("ok");
} else if ("data".equals(userInput.split("\\s+")[0])) { } else if ("data".equals(userInput.split("\\s+")[0])) {
String data = userInput.split("\\s+", 1)[1]; String data = userInput.split("\\s+", 2)[1];
logger.info("Setting data to: " + data); logger.info("Setting data to: " + data);
msg.setData(data); msg.setData(data);
out.println("ok"); out.println("ok");
@ -82,6 +94,9 @@ public class ClientConnection implements Runnable {
shutdown(); shutdown();
} }
} }
} catch (InterruptedIOException ioe) {
logger.info("Received interrupt from parent. Shutting down...");
shutdown();
} catch (IOException e) { } catch (IOException e) {
logger.severe("Failed to get IO-Stream"); logger.severe("Failed to get IO-Stream");
e.printStackTrace(); e.printStackTrace();
@ -90,16 +105,21 @@ public class ClientConnection implements Runnable {
} }
public void shutdown() { public void shutdown() {
logger.info("Shutting down client connection " + this.toString());
try { try {
socket.close(); if (socket != null)
socket.close();
in.close();
out.close();
} catch (IOException e) { } catch (IOException e) {
logger.severe("Error closing socket " + socket.toString()); logger.severe("Error closing socket and/or IO-streams");
e.printStackTrace(); e.printStackTrace();
} }
Thread.currentThread().interrupt();
} }
public void sendMessage() throws MissingInputException { public void sendMessage() throws MissingInputException {
msg.allFieldsSet(); msg.allFieldsSet();
// TODO send message to mailbox servers or submit to asynchronous queue // TODO send message to asynchronous queue managed by TransferServer
} }
} }

View File

@ -0,0 +1,64 @@
package dslab.transfer;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
public class ClientListener extends Thread {
private final ServerSocket serverSocket;
private final Logger logger = Logger.getLogger(ClientListener.class.getName());
private final ArrayList<ClientConnection> clients = new ArrayList<>();
private final ExecutorService executorService = Executors.newCachedThreadPool();
public ClientListener(ServerSocket serverSocket) {
this.serverSocket = serverSocket;
}
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
logger.finer("Waiting for request on serverSocket " + serverSocket.toString());
try {
Socket s = serverSocket.accept();
logger.fine("Processing incoming socket " + s.toString());
ClientConnection clientConnection = new ClientConnection(s);
clients.add(clientConnection);
executorService.submit(clientConnection);
} catch (InterruptedIOException | SocketException e) {
logger.finer("Received interrupt. Exiting " + this.toString());
this.shutdown();
} catch (IOException e) {
logger.severe("Error starting serverSocket " + serverSocket.toString());
e.printStackTrace();
this.shutdown();
}
}
}
public void shutdown() {
logger.finer("Shutting down ClientListener " + this.toString());
for (ClientConnection client : clients) {
if (client != null)
client.shutdown();
}
executorService.shutdown();
try {
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
if (!executorService.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
this.interrupt();
}
}

View File

@ -4,27 +4,23 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.PrintStream; import java.io.PrintStream;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger; import java.util.logging.Logger;
import at.ac.tuwien.dsg.orvell.Shell; import at.ac.tuwien.dsg.orvell.Shell;
import at.ac.tuwien.dsg.orvell.StopShellException;
import at.ac.tuwien.dsg.orvell.annotation.Command;
import dslab.ComponentFactory; import dslab.ComponentFactory;
import dslab.util.Config; import dslab.util.Config;
public class TransferServer implements ITransferServer, Runnable { public class TransferServer implements ITransferServer, Runnable {
Logger logger = Logger.getLogger(TransferServer.class.getName()); Logger logger = Logger.getLogger(TransferServer.class.getName());
private ServerSocket serverSocket; private ServerSocket serverSocket;
private Integer serverPort; private final Shell shell;
private final Integer serverPort;
private ExecutorService executorService = Executors.newFixedThreadPool(20);
private ArrayList<Config> mailboxServers; private ArrayList<Config> mailboxServers;
/** /**
* Creates a new server instance. * Creates a new server instance.
* *
@ -36,62 +32,42 @@ public class TransferServer implements ITransferServer, Runnable {
public TransferServer(String componentId, Config config, InputStream in, PrintStream out) { public TransferServer(String componentId, Config config, InputStream in, PrintStream out) {
Config earthPlanet = new Config("mailbox-earth-planet"); Config earthPlanet = new Config("mailbox-earth-planet");
Config univerZe = new Config("mailbox-univer-ze"); Config univerZe = new Config("mailbox-univer-ze");
mailboxServers.add(earthPlanet); // TODO parse domains for mailbox servers
mailboxServers.add(univerZe); this.shell = new Shell(in, out);
Shell shell = new Shell().register("shutdown", (input, context) -> shutdown()); this.shell.register(this);
this.shell.setPrompt("Transferserver> ");
this.serverPort = config.getInt("tcp.port"); this.serverPort = config.getInt("tcp.port");
shell.run();
} }
@Override @Override
public void run() { public void run() {
logger.info("Creating serverSocket for " + this.toString());
try { try {
logger.info("Creating serverSocket for " + this.toString()); this.serverSocket = new ServerSocket(serverPort);
serverSocket = new ServerSocket(serverPort);
// TODO stop server on quit command over shell
while (true) {
logger.info("Waiting for request on serverSocket " + serverSocket.toString());
try {
Socket s = serverSocket.accept();
logger.info("Processing incoming socket " + s.toString());
executorService.submit(new ClientConnection(s));
} catch (IOException ioe) {
logger.severe("Error starting serverSocket " + serverSocket.toString());
ioe.printStackTrace();
}
}
} catch (IOException e) { } catch (IOException e) {
logger.severe("Error creating serverSocket " + serverSocket.toString()); logger.severe("Error creating serverSocket " + serverSocket.toString());
e.printStackTrace(); e.printStackTrace();
shutdown();
} }
new ClientListener(serverSocket).start();
this.shell.run();
} }
@Command
@Override @Override
public void shutdown() { public void shutdown() {
try { try {
serverSocket.close(); if (serverSocket != null)
serverSocket.close();
} catch (IOException e) { } catch (IOException e) {
logger.severe("Error closing serverSocket " + serverSocket.toString()); logger.severe("Error closing serverSocket " + serverSocket.toString());
e.printStackTrace(); e.printStackTrace();
} }
executorService.shutdown(); throw new StopShellException();
try {
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
if (!executorService.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
} }
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
ITransferServer server = ComponentFactory.createTransferServer(args[0], System.in, System.out); ITransferServer server = ComponentFactory.createTransferServer(args[0], System.in, System.out);
server.run(); server.run();
} }
} }