diff --git a/src/main/java/dslab/transfer/ClientConnection.java b/src/main/java/dslab/transfer/ClientConnection.java index e9a99bf..c1e575f 100644 --- a/src/main/java/dslab/transfer/ClientConnection.java +++ b/src/main/java/dslab/transfer/ClientConnection.java @@ -10,6 +10,7 @@ import java.io.*; import java.net.Socket; import java.net.SocketException; import java.util.ArrayList; +import java.util.concurrent.BlockingQueue; import java.util.logging.Logger; public class ClientConnection implements Runnable { @@ -20,8 +21,11 @@ public class ClientConnection implements Runnable { private Message msg = new Message(); - public ClientConnection(Socket connection) { + private BlockingQueue blockingQueue; + + public ClientConnection(Socket connection, BlockingQueue blockingQueue) { this.socket = connection; + this.blockingQueue = blockingQueue; } @Override @@ -120,6 +124,7 @@ public class ClientConnection implements Runnable { public void sendMessage() throws MissingInputException { msg.allFieldsSet(); - // TODO send message to asynchronous queue managed by TransferServer + TransferServer.Producer producer = new TransferServer.Producer(this.blockingQueue, this.msg); + new Thread(producer).start(); } } diff --git a/src/main/java/dslab/transfer/ClientListener.java b/src/main/java/dslab/transfer/ClientListener.java index 7a1e068..0c5bd66 100644 --- a/src/main/java/dslab/transfer/ClientListener.java +++ b/src/main/java/dslab/transfer/ClientListener.java @@ -1,11 +1,14 @@ package dslab.transfer; +import dslab.Message; + import java.io.IOException; import java.io.InterruptedIOException; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; import java.util.ArrayList; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -16,9 +19,11 @@ public class ClientListener extends Thread { private final Logger logger = Logger.getLogger(ClientListener.class.getName()); private final ArrayList clients = new ArrayList<>(); private final ExecutorService executorService = Executors.newCachedThreadPool(); + private final BlockingQueue blockingQueue; - public ClientListener(ServerSocket serverSocket) { + public ClientListener(ServerSocket serverSocket, BlockingQueue blockingQueue) { this.serverSocket = serverSocket; + this.blockingQueue = blockingQueue; } @Override @@ -28,7 +33,7 @@ public class ClientListener extends Thread { try { Socket s = serverSocket.accept(); logger.fine("Processing incoming socket " + s.toString()); - ClientConnection clientConnection = new ClientConnection(s); + ClientConnection clientConnection = new ClientConnection(s, this.blockingQueue); clients.add(clientConnection); executorService.submit(clientConnection); } catch (InterruptedIOException | SocketException e) { diff --git a/src/main/java/dslab/transfer/TransferServer.java b/src/main/java/dslab/transfer/TransferServer.java index 5296bdd..28a9c87 100644 --- a/src/main/java/dslab/transfer/TransferServer.java +++ b/src/main/java/dslab/transfer/TransferServer.java @@ -4,21 +4,26 @@ import java.io.IOException; import java.io.InputStream; import java.io.PrintStream; import java.net.ServerSocket; +import java.net.Socket; import java.util.ArrayList; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Logger; import at.ac.tuwien.dsg.orvell.Shell; import at.ac.tuwien.dsg.orvell.StopShellException; import at.ac.tuwien.dsg.orvell.annotation.Command; import dslab.ComponentFactory; +import dslab.Message; import dslab.util.Config; public class TransferServer implements ITransferServer, Runnable { - Logger logger = Logger.getLogger(TransferServer.class.getName()); + private static final Logger logger = Logger.getLogger(TransferServer.class.getName()); private ServerSocket serverSocket; private final Shell shell; private final Integer serverPort; private ArrayList mailboxServers; + private final BlockingQueue blockingQueue = new LinkedBlockingQueue<>(10); /** @@ -32,6 +37,8 @@ public class TransferServer implements ITransferServer, Runnable { public TransferServer(String componentId, Config config, InputStream in, PrintStream out) { Config earthPlanet = new Config("mailbox-earth-planet"); Config univerZe = new Config("mailbox-univer-ze"); + mailboxServers.add(earthPlanet); + mailboxServers.add(univerZe); // TODO parse domains for mailbox servers this.shell = new Shell(in, out); this.shell.register(this); @@ -49,7 +56,7 @@ public class TransferServer implements ITransferServer, Runnable { e.printStackTrace(); shutdown(); } - new ClientListener(serverSocket).start(); + new ClientListener(serverSocket, blockingQueue).start(); this.shell.run(); } @@ -70,4 +77,83 @@ public class TransferServer implements ITransferServer, Runnable { ITransferServer server = ComponentFactory.createTransferServer(args[0], System.in, System.out); server.run(); } + + static class Producer implements Runnable { + private final BlockingQueue blockingQueue; + private final Message msg; + + Producer(BlockingQueue blockingQueue, Message msg) { + this.blockingQueue = blockingQueue; + this.msg = msg; + } + + @Override + public void run() { + try { + produce(); + } catch (InterruptedException e) { + logger.info("Producer thread has been interrupted. Exiting..." + this.toString()); + shutdown(); + } + } + + private void produce() throws InterruptedException { + synchronized (this) { + while (blockingQueue.size() == 10) + wait(); + blockingQueue.put(msg); + logger.info("Added message " + msg.toString() + " to queue"); + notify(); + } + } + + private void shutdown() { + Thread.currentThread().interrupt(); + } + } + + class Consumer implements Runnable { + private final BlockingQueue blockingQueue; + private Socket socket; + + Consumer(BlockingQueue blockingQueue) { + this.blockingQueue = blockingQueue; + } + + @Override + public void run() { + try { + consume(); + } catch (InterruptedException e) { + logger.info("Consumer thread has been interrupted. Exiting..." + this.toString()); + shutdown(); + } // TODO catch exception where recipient not known to server and send message to "from" user + } + + private void consume() throws InterruptedException { + while (true) { + synchronized (this) { + while (blockingQueue.size() == 0) + wait(); + Message msg = blockingQueue.take(); + logger.info("Took message " + msg.toString() + " from queue"); + // TODO open socket to all MailboxServers specified in "to" field + // TODO replay DMTP message + // TODO wait for reply and throw "recipient unknown" if unknown email in "to" field. + notify(); + } + } + } + + private void shutdown() { + try { + if (socket != null) + socket.close(); + } catch (IOException e) { + logger.severe("Failed to close socket during shutdown" + socket.toString()); + e.printStackTrace(); + } + Thread.currentThread().interrupt(); + } + } }