diff --git a/src/main/java/dslab/exception/UnknownDomain.java b/src/main/java/dslab/exception/UnknownDomain.java new file mode 100644 index 0000000..4f49b0d --- /dev/null +++ b/src/main/java/dslab/exception/UnknownDomain.java @@ -0,0 +1,7 @@ +package dslab.exception; + +public class UnknownDomain extends Exception { + public UnknownDomain(String errorMessage) { + super(errorMessage); + } +} diff --git a/src/main/java/dslab/transfer/ClientConnection.java b/src/main/java/dslab/transfer/ClientConnection.java index 2ee3478..79daba4 100644 --- a/src/main/java/dslab/transfer/ClientConnection.java +++ b/src/main/java/dslab/transfer/ClientConnection.java @@ -21,11 +21,8 @@ public class ClientConnection implements Runnable { private Message msg = new Message(); - private BlockingQueue blockingQueue; - - public ClientConnection(Socket connection, BlockingQueue blockingQueue) { + public ClientConnection(Socket connection) { this.socket = connection; - this.blockingQueue = blockingQueue; } @Override @@ -128,7 +125,7 @@ public class ClientConnection implements Runnable { public void sendMessage() throws MissingInputException { this.msg.allFieldsSet(); - TransferServer.Producer producer = new TransferServer.Producer(this.blockingQueue, this.msg); + TransferServer.Producer producer = new TransferServer.Producer(this.msg); new Thread(producer).start(); this.msg = new Message(); } diff --git a/src/main/java/dslab/transfer/ClientListener.java b/src/main/java/dslab/transfer/ClientListener.java index 0c5bd66..fdbc770 100644 --- a/src/main/java/dslab/transfer/ClientListener.java +++ b/src/main/java/dslab/transfer/ClientListener.java @@ -19,11 +19,9 @@ public class ClientListener extends Thread { private final Logger logger = Logger.getLogger(ClientListener.class.getName()); private final ArrayList clients = new ArrayList<>(); private final ExecutorService executorService = Executors.newCachedThreadPool(); - private final BlockingQueue blockingQueue; - public ClientListener(ServerSocket serverSocket, BlockingQueue blockingQueue) { + public ClientListener(ServerSocket serverSocket) { this.serverSocket = serverSocket; - this.blockingQueue = blockingQueue; } @Override @@ -33,7 +31,7 @@ public class ClientListener extends Thread { try { Socket s = serverSocket.accept(); logger.fine("Processing incoming socket " + s.toString()); - ClientConnection clientConnection = new ClientConnection(s, this.blockingQueue); + ClientConnection clientConnection = new ClientConnection(s); clients.add(clientConnection); executorService.submit(clientConnection); } catch (InterruptedIOException | SocketException e) { diff --git a/src/main/java/dslab/transfer/TransferServer.java b/src/main/java/dslab/transfer/TransferServer.java index 24316cc..2695ef8 100644 --- a/src/main/java/dslab/transfer/TransferServer.java +++ b/src/main/java/dslab/transfer/TransferServer.java @@ -1,11 +1,11 @@ package dslab.transfer; -import java.io.IOException; -import java.io.InputStream; -import java.io.PrintStream; -import java.net.ServerSocket; -import java.net.Socket; +import java.io.*; +import java.net.*; +import java.rmi.ConnectIOException; import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Logger; @@ -14,7 +14,10 @@ import at.ac.tuwien.dsg.orvell.Shell; import at.ac.tuwien.dsg.orvell.StopShellException; import at.ac.tuwien.dsg.orvell.annotation.Command; import dslab.ComponentFactory; +import dslab.Email; import dslab.Message; +import dslab.exception.MalformedInputException; +import dslab.exception.UnknownDomain; import dslab.util.Config; public class TransferServer implements ITransferServer, Runnable { @@ -22,20 +25,26 @@ public class TransferServer implements ITransferServer, Runnable { private ServerSocket serverSocket; private final Shell shell; private final Integer serverPort; - private ArrayList mailboxServers; - private final BlockingQueue blockingQueue = new LinkedBlockingQueue<>(10); + public static final HashMap mailboxServers = new HashMap<>(); + public static volatile BlockingQueue blockingQueue = new LinkedBlockingQueue<>(10); + private final Consumer consumer; + public static final Object lock = new Object(); /** * Creates a new server instance. * * @param componentId the id of the component that corresponds to the Config resource - * @param config the component config - * @param in the input stream to read console input from - * @param out the output stream to write console output to + * @param config the component config + * @param in the input stream to read console input from + * @param out the output stream to write console output to */ public TransferServer(String componentId, Config config, InputStream in, PrintStream out) { - // TODO parse domains for mailbox servers + Config univerze = new Config("mailbox-univer-ze.properties"); + Config earthplanet = new Config("mailbox-earth-planet.properties"); + mailboxServers.put(univerze.getString("domain"), univerze.getInt("dmtp.tcp.port")); + mailboxServers.put(earthplanet.getString("domain"), earthplanet.getInt("dmtp.tcp.port")); + this.consumer = new Consumer(mailboxServers); this.shell = new Shell(in, out); this.shell.register(this); this.shell.setPrompt("Transferserver> "); @@ -52,8 +61,8 @@ public class TransferServer implements ITransferServer, Runnable { e.printStackTrace(); shutdown(); } - new ClientListener(serverSocket, blockingQueue).start(); - // TODO start consumer thread + new ClientListener(serverSocket).start(); + this.consumer.start(); this.shell.run(); } @@ -67,7 +76,7 @@ public class TransferServer implements ITransferServer, Runnable { logger.severe("Error closing serverSocket " + serverSocket.toString()); e.printStackTrace(); } - // TODO stop consumer thread(s) + this.consumer.interrupt(); throw new StopShellException(); } @@ -76,12 +85,10 @@ public class TransferServer implements ITransferServer, Runnable { server.run(); } - static class Producer implements Runnable { - private final BlockingQueue blockingQueue; + static class Producer extends Thread { private final Message msg; - Producer(BlockingQueue blockingQueue, Message msg) { - this.blockingQueue = blockingQueue; + Producer(Message msg) { this.msg = msg; } @@ -96,12 +103,13 @@ public class TransferServer implements ITransferServer, Runnable { } private void produce() throws InterruptedException { - synchronized (this) { - while (blockingQueue.size() == 10) - wait(); + synchronized (lock) { + while (blockingQueue.size() == 10) { + lock.wait(); + } blockingQueue.put(msg); logger.info("Added message " + msg.toString() + " to queue"); - notify(); + lock.notify(); } } @@ -110,12 +118,12 @@ public class TransferServer implements ITransferServer, Runnable { } } - class Consumer implements Runnable { - private final BlockingQueue blockingQueue; - private Socket socket; + static class Consumer extends Thread { + private final HashMap mailboxServers; - Consumer(BlockingQueue blockingQueue) { - this.blockingQueue = blockingQueue; + Consumer(HashMap mailboxServers) { + this.mailboxServers = mailboxServers; + logger.info("MailboxServers: " + mailboxServers.toString()); } @Override @@ -125,32 +133,120 @@ public class TransferServer implements ITransferServer, Runnable { } catch (InterruptedException e) { logger.info("Consumer thread has been interrupted. Exiting..." + this.toString()); shutdown(); - } // TODO catch exception where recipient not known to server and send message to "from" user + } } private void consume() throws InterruptedException { - while (true) { - synchronized (this) { - while (blockingQueue.size() == 0) - wait(); + while (!Thread.currentThread().isInterrupted()) { + synchronized (lock) { + while (blockingQueue.isEmpty()) { + logger.info("Queue currently empty. Waiting for messages to appear..."); + lock.wait(); + } + logger.info("Queue not empty. Processing message..."); Message msg = blockingQueue.take(); logger.info("Took message " + msg.toString() + " from queue"); - // TODO open socket to all MailboxServers specified in "to" field - // TODO replay DMTP message - // TODO wait for reply and throw "recipient unknown" if unknown email in "to" field. - notify(); + for (Email recipient : msg.getTo()) { + int port; + try { + port = domainLookup(recipient); + logger.info("Domain lookup successful. Port is: " + port); + } catch (UnknownDomain e) { + sendErrorMail(msg, e.getMessage()); + // TODO exit consumer? + shutdown(); + continue; + } + replayMessage(msg, port); + } + lock.notify(); } } } - private void shutdown() { + private void replayMessage(Message msg, int port) { + logger.info("Replaying message for message: " + msg.toString() + " on port " + port); try { - if (socket != null) - socket.close(); + Socket socket = new Socket("127.0.0.1", port); + PrintWriter socketOut = new PrintWriter(socket.getOutputStream(), true); + BufferedReader socketIn = new BufferedReader(new InputStreamReader(socket.getInputStream())); + socketIn.readLine(); + socketOut.println("begin"); + socketIn.readLine(); + socketOut.println("subject " + msg.getSubject()); + socketIn.readLine(); + socketOut.println("data " + msg.getData()); + socketIn.readLine(); + socketOut.println("to " + msg.printTo()); + socketIn.readLine(); + socketOut.println("from " + msg.getFrom().toString()); + socketIn.readLine(); + socketOut.println("send"); + String result = socketIn.readLine(); + if (result.startsWith("error")) + sendErrorMail(msg, result); + socketIn.close(); + socketOut.close(); + socket.close(); } catch (IOException e) { - logger.severe("Failed to close socket during shutdown" + socket.toString()); - e.printStackTrace(); + sendErrorMail(msg, "error failed to connect to server"); } + } + + private int domainLookup(Email email) throws UnknownDomain { + logger.info("Performing domain lookup for address " + email.toString()); + if (this.mailboxServers.containsKey(email.getDomain())) + return this.mailboxServers.get(email.getDomain()); + throw new UnknownDomain("error domain not found: " + email.getDomain()); + } + + private void sendErrorMail(Message msg, String error) { + logger.info("Trying to send error mail to address " + msg.getFrom()); + int port; + try { + port = domainLookup(msg.getFrom()); + } catch (UnknownDomain e) { + logger.severe("Sending error mail failed because sender domain is unknown"); + return; + } + ArrayList newTo = new ArrayList<>(); + newTo.add(msg.getFrom()); + msg.setTo(newTo); + try { + msg.setFrom(new Email("mailer@127.0.0.1")); + } catch (MalformedInputException e) { + logger.severe("The server's E-Mail address is wrong. This should not be happening!"); + e.printStackTrace(); + return; + } + try { + Socket socket = new Socket("127.0.0.1", port); + PrintWriter socketOut = new PrintWriter(socket.getOutputStream(), true); + BufferedReader socketIn = new BufferedReader(new InputStreamReader(socket.getInputStream())); + socketIn.readLine(); + socketOut.println("begin"); + socketIn.readLine(); + socketOut.println("subject " + msg.getSubject()); + socketIn.readLine(); + socketOut.println("data " + error); + socketIn.readLine(); + socketOut.println("to " + msg.printTo()); + socketIn.readLine(); + socketOut.println("from " + msg.getFrom().toString()); + socketIn.readLine(); + socketOut.println("send"); + String result = socketIn.readLine(); + if (result.startsWith("error")) + logger.severe("Sending error mail failed: " + result); + socketIn.close(); + socketOut.close(); + socket.close(); + } catch (IOException e) { + logger.severe("Sending error mail failed because socket communication failed"); + } + } + + private void shutdown() { Thread.currentThread().interrupt(); } }