Implement producer/consumer classes

This commit is contained in:
Tobias Eidelpes 2020-10-29 17:44:56 +01:00
parent 24efe6a3d4
commit d39a3151fa
3 changed files with 102 additions and 6 deletions

View File

@ -10,6 +10,7 @@ import java.io.*;
import java.net.Socket; import java.net.Socket;
import java.net.SocketException; import java.net.SocketException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Logger; import java.util.logging.Logger;
public class ClientConnection implements Runnable { public class ClientConnection implements Runnable {
@ -20,8 +21,11 @@ public class ClientConnection implements Runnable {
private Message msg = new Message(); private Message msg = new Message();
public ClientConnection(Socket connection) { private BlockingQueue<Message> blockingQueue;
public ClientConnection(Socket connection, BlockingQueue<Message> blockingQueue) {
this.socket = connection; this.socket = connection;
this.blockingQueue = blockingQueue;
} }
@Override @Override
@ -120,6 +124,7 @@ public class ClientConnection implements Runnable {
public void sendMessage() throws MissingInputException { public void sendMessage() throws MissingInputException {
msg.allFieldsSet(); msg.allFieldsSet();
// TODO send message to asynchronous queue managed by TransferServer TransferServer.Producer producer = new TransferServer.Producer(this.blockingQueue, this.msg);
new Thread(producer).start();
} }
} }

View File

@ -1,11 +1,14 @@
package dslab.transfer; package dslab.transfer;
import dslab.Message;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
import java.net.SocketException; import java.net.SocketException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -16,9 +19,11 @@ public class ClientListener extends Thread {
private final Logger logger = Logger.getLogger(ClientListener.class.getName()); private final Logger logger = Logger.getLogger(ClientListener.class.getName());
private final ArrayList<ClientConnection> clients = new ArrayList<>(); private final ArrayList<ClientConnection> clients = new ArrayList<>();
private final ExecutorService executorService = Executors.newCachedThreadPool(); private final ExecutorService executorService = Executors.newCachedThreadPool();
private final BlockingQueue<Message> blockingQueue;
public ClientListener(ServerSocket serverSocket) { public ClientListener(ServerSocket serverSocket, BlockingQueue<Message> blockingQueue) {
this.serverSocket = serverSocket; this.serverSocket = serverSocket;
this.blockingQueue = blockingQueue;
} }
@Override @Override
@ -28,7 +33,7 @@ public class ClientListener extends Thread {
try { try {
Socket s = serverSocket.accept(); Socket s = serverSocket.accept();
logger.fine("Processing incoming socket " + s.toString()); logger.fine("Processing incoming socket " + s.toString());
ClientConnection clientConnection = new ClientConnection(s); ClientConnection clientConnection = new ClientConnection(s, this.blockingQueue);
clients.add(clientConnection); clients.add(clientConnection);
executorService.submit(clientConnection); executorService.submit(clientConnection);
} catch (InterruptedIOException | SocketException e) { } catch (InterruptedIOException | SocketException e) {

View File

@ -4,21 +4,26 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.PrintStream; import java.io.PrintStream;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Logger; import java.util.logging.Logger;
import at.ac.tuwien.dsg.orvell.Shell; import at.ac.tuwien.dsg.orvell.Shell;
import at.ac.tuwien.dsg.orvell.StopShellException; import at.ac.tuwien.dsg.orvell.StopShellException;
import at.ac.tuwien.dsg.orvell.annotation.Command; import at.ac.tuwien.dsg.orvell.annotation.Command;
import dslab.ComponentFactory; import dslab.ComponentFactory;
import dslab.Message;
import dslab.util.Config; import dslab.util.Config;
public class TransferServer implements ITransferServer, Runnable { public class TransferServer implements ITransferServer, Runnable {
Logger logger = Logger.getLogger(TransferServer.class.getName()); private static final Logger logger = Logger.getLogger(TransferServer.class.getName());
private ServerSocket serverSocket; private ServerSocket serverSocket;
private final Shell shell; private final Shell shell;
private final Integer serverPort; private final Integer serverPort;
private ArrayList<Config> mailboxServers; private ArrayList<Config> mailboxServers;
private final BlockingQueue<Message> blockingQueue = new LinkedBlockingQueue<>(10);
/** /**
@ -32,6 +37,8 @@ public class TransferServer implements ITransferServer, Runnable {
public TransferServer(String componentId, Config config, InputStream in, PrintStream out) { public TransferServer(String componentId, Config config, InputStream in, PrintStream out) {
Config earthPlanet = new Config("mailbox-earth-planet"); Config earthPlanet = new Config("mailbox-earth-planet");
Config univerZe = new Config("mailbox-univer-ze"); Config univerZe = new Config("mailbox-univer-ze");
mailboxServers.add(earthPlanet);
mailboxServers.add(univerZe);
// TODO parse domains for mailbox servers // TODO parse domains for mailbox servers
this.shell = new Shell(in, out); this.shell = new Shell(in, out);
this.shell.register(this); this.shell.register(this);
@ -49,7 +56,7 @@ public class TransferServer implements ITransferServer, Runnable {
e.printStackTrace(); e.printStackTrace();
shutdown(); shutdown();
} }
new ClientListener(serverSocket).start(); new ClientListener(serverSocket, blockingQueue).start();
this.shell.run(); this.shell.run();
} }
@ -70,4 +77,83 @@ public class TransferServer implements ITransferServer, Runnable {
ITransferServer server = ComponentFactory.createTransferServer(args[0], System.in, System.out); ITransferServer server = ComponentFactory.createTransferServer(args[0], System.in, System.out);
server.run(); server.run();
} }
static class Producer implements Runnable {
private final BlockingQueue<Message> blockingQueue;
private final Message msg;
Producer(BlockingQueue<Message> blockingQueue, Message msg) {
this.blockingQueue = blockingQueue;
this.msg = msg;
}
@Override
public void run() {
try {
produce();
} catch (InterruptedException e) {
logger.info("Producer thread has been interrupted. Exiting..." + this.toString());
shutdown();
}
}
private void produce() throws InterruptedException {
synchronized (this) {
while (blockingQueue.size() == 10)
wait();
blockingQueue.put(msg);
logger.info("Added message " + msg.toString() + " to queue");
notify();
}
}
private void shutdown() {
Thread.currentThread().interrupt();
}
}
class Consumer implements Runnable {
private final BlockingQueue<Message> blockingQueue;
private Socket socket;
Consumer(BlockingQueue<Message> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
try {
consume();
} catch (InterruptedException e) {
logger.info("Consumer thread has been interrupted. Exiting..." + this.toString());
shutdown();
} // TODO catch exception where recipient not known to server and send message to "from" user
}
private void consume() throws InterruptedException {
while (true) {
synchronized (this) {
while (blockingQueue.size() == 0)
wait();
Message msg = blockingQueue.take();
logger.info("Took message " + msg.toString() + " from queue");
// TODO open socket to all MailboxServers specified in "to" field
// TODO replay DMTP message
// TODO wait for reply and throw "recipient unknown" if unknown email in "to" field.
notify();
}
}
}
private void shutdown() {
try {
if (socket != null)
socket.close();
} catch (IOException e) {
logger.severe("Failed to close socket during shutdown" + socket.toString());
e.printStackTrace();
}
Thread.currentThread().interrupt();
}
}
} }