Pull logic out of consume() method and correctly implement locking mechanism

This commit is contained in:
Tobias Eidelpes 2020-11-12 18:09:23 +01:00
parent 2723ee4555
commit cbf1d040ef
4 changed files with 148 additions and 50 deletions

View File

@ -0,0 +1,7 @@
package dslab.exception;
public class UnknownDomain extends Exception {
public UnknownDomain(String errorMessage) {
super(errorMessage);
}
}

View File

@ -21,11 +21,8 @@ public class ClientConnection implements Runnable {
private Message msg = new Message();
private BlockingQueue<Message> blockingQueue;
public ClientConnection(Socket connection, BlockingQueue<Message> blockingQueue) {
public ClientConnection(Socket connection) {
this.socket = connection;
this.blockingQueue = blockingQueue;
}
@Override
@ -128,7 +125,7 @@ public class ClientConnection implements Runnable {
public void sendMessage() throws MissingInputException {
this.msg.allFieldsSet();
TransferServer.Producer producer = new TransferServer.Producer(this.blockingQueue, this.msg);
TransferServer.Producer producer = new TransferServer.Producer(this.msg);
new Thread(producer).start();
this.msg = new Message();
}

View File

@ -19,11 +19,9 @@ public class ClientListener extends Thread {
private final Logger logger = Logger.getLogger(ClientListener.class.getName());
private final ArrayList<ClientConnection> clients = new ArrayList<>();
private final ExecutorService executorService = Executors.newCachedThreadPool();
private final BlockingQueue<Message> blockingQueue;
public ClientListener(ServerSocket serverSocket, BlockingQueue<Message> blockingQueue) {
public ClientListener(ServerSocket serverSocket) {
this.serverSocket = serverSocket;
this.blockingQueue = blockingQueue;
}
@Override
@ -33,7 +31,7 @@ public class ClientListener extends Thread {
try {
Socket s = serverSocket.accept();
logger.fine("Processing incoming socket " + s.toString());
ClientConnection clientConnection = new ClientConnection(s, this.blockingQueue);
ClientConnection clientConnection = new ClientConnection(s);
clients.add(clientConnection);
executorService.submit(clientConnection);
} catch (InterruptedIOException | SocketException e) {

View File

@ -1,11 +1,11 @@
package dslab.transfer;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.io.*;
import java.net.*;
import java.rmi.ConnectIOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Logger;
@ -14,7 +14,10 @@ import at.ac.tuwien.dsg.orvell.Shell;
import at.ac.tuwien.dsg.orvell.StopShellException;
import at.ac.tuwien.dsg.orvell.annotation.Command;
import dslab.ComponentFactory;
import dslab.Email;
import dslab.Message;
import dslab.exception.MalformedInputException;
import dslab.exception.UnknownDomain;
import dslab.util.Config;
public class TransferServer implements ITransferServer, Runnable {
@ -22,8 +25,10 @@ public class TransferServer implements ITransferServer, Runnable {
private ServerSocket serverSocket;
private final Shell shell;
private final Integer serverPort;
private ArrayList<Config> mailboxServers;
private final BlockingQueue<Message> blockingQueue = new LinkedBlockingQueue<>(10);
public static final HashMap<String, Integer> mailboxServers = new HashMap<>();
public static volatile BlockingQueue<Message> blockingQueue = new LinkedBlockingQueue<>(10);
private final Consumer consumer;
public static final Object lock = new Object();
/**
@ -35,7 +40,11 @@ public class TransferServer implements ITransferServer, Runnable {
* @param out the output stream to write console output to
*/
public TransferServer(String componentId, Config config, InputStream in, PrintStream out) {
// TODO parse domains for mailbox servers
Config univerze = new Config("mailbox-univer-ze.properties");
Config earthplanet = new Config("mailbox-earth-planet.properties");
mailboxServers.put(univerze.getString("domain"), univerze.getInt("dmtp.tcp.port"));
mailboxServers.put(earthplanet.getString("domain"), earthplanet.getInt("dmtp.tcp.port"));
this.consumer = new Consumer(mailboxServers);
this.shell = new Shell(in, out);
this.shell.register(this);
this.shell.setPrompt("Transferserver> ");
@ -52,8 +61,8 @@ public class TransferServer implements ITransferServer, Runnable {
e.printStackTrace();
shutdown();
}
new ClientListener(serverSocket, blockingQueue).start();
// TODO start consumer thread
new ClientListener(serverSocket).start();
this.consumer.start();
this.shell.run();
}
@ -67,7 +76,7 @@ public class TransferServer implements ITransferServer, Runnable {
logger.severe("Error closing serverSocket " + serverSocket.toString());
e.printStackTrace();
}
// TODO stop consumer thread(s)
this.consumer.interrupt();
throw new StopShellException();
}
@ -76,12 +85,10 @@ public class TransferServer implements ITransferServer, Runnable {
server.run();
}
static class Producer implements Runnable {
private final BlockingQueue<Message> blockingQueue;
static class Producer extends Thread {
private final Message msg;
Producer(BlockingQueue<Message> blockingQueue, Message msg) {
this.blockingQueue = blockingQueue;
Producer(Message msg) {
this.msg = msg;
}
@ -96,12 +103,13 @@ public class TransferServer implements ITransferServer, Runnable {
}
private void produce() throws InterruptedException {
synchronized (this) {
while (blockingQueue.size() == 10)
wait();
synchronized (lock) {
while (blockingQueue.size() == 10) {
lock.wait();
}
blockingQueue.put(msg);
logger.info("Added message " + msg.toString() + " to queue");
notify();
lock.notify();
}
}
@ -110,12 +118,12 @@ public class TransferServer implements ITransferServer, Runnable {
}
}
class Consumer implements Runnable {
private final BlockingQueue<Message> blockingQueue;
private Socket socket;
static class Consumer extends Thread {
private final HashMap<String, Integer> mailboxServers;
Consumer(BlockingQueue<Message> blockingQueue) {
this.blockingQueue = blockingQueue;
Consumer(HashMap<String, Integer> mailboxServers) {
this.mailboxServers = mailboxServers;
logger.info("MailboxServers: " + mailboxServers.toString());
}
@Override
@ -125,32 +133,120 @@ public class TransferServer implements ITransferServer, Runnable {
} catch (InterruptedException e) {
logger.info("Consumer thread has been interrupted. Exiting..." + this.toString());
shutdown();
} // TODO catch exception where recipient not known to server and send message to "from" user
}
}
private void consume() throws InterruptedException {
while (true) {
synchronized (this) {
while (blockingQueue.size() == 0)
wait();
while (!Thread.currentThread().isInterrupted()) {
synchronized (lock) {
while (blockingQueue.isEmpty()) {
logger.info("Queue currently empty. Waiting for messages to appear...");
lock.wait();
}
logger.info("Queue not empty. Processing message...");
Message msg = blockingQueue.take();
logger.info("Took message " + msg.toString() + " from queue");
// TODO open socket to all MailboxServers specified in "to" field
// TODO replay DMTP message
// TODO wait for reply and throw "recipient unknown" if unknown email in "to" field.
notify();
for (Email recipient : msg.getTo()) {
int port;
try {
port = domainLookup(recipient);
logger.info("Domain lookup successful. Port is: " + port);
} catch (UnknownDomain e) {
sendErrorMail(msg, e.getMessage());
// TODO exit consumer?
shutdown();
continue;
}
replayMessage(msg, port);
}
lock.notify();
}
}
}
private void replayMessage(Message msg, int port) {
logger.info("Replaying message for message: " + msg.toString() + " on port " + port);
try {
Socket socket = new Socket("127.0.0.1", port);
PrintWriter socketOut = new PrintWriter(socket.getOutputStream(), true);
BufferedReader socketIn = new BufferedReader(new InputStreamReader(socket.getInputStream()));
socketIn.readLine();
socketOut.println("begin");
socketIn.readLine();
socketOut.println("subject " + msg.getSubject());
socketIn.readLine();
socketOut.println("data " + msg.getData());
socketIn.readLine();
socketOut.println("to " + msg.printTo());
socketIn.readLine();
socketOut.println("from " + msg.getFrom().toString());
socketIn.readLine();
socketOut.println("send");
String result = socketIn.readLine();
if (result.startsWith("error"))
sendErrorMail(msg, result);
socketIn.close();
socketOut.close();
socket.close();
} catch (IOException e) {
sendErrorMail(msg, "error failed to connect to server");
}
}
private int domainLookup(Email email) throws UnknownDomain {
logger.info("Performing domain lookup for address " + email.toString());
if (this.mailboxServers.containsKey(email.getDomain()))
return this.mailboxServers.get(email.getDomain());
throw new UnknownDomain("error domain not found: " + email.getDomain());
}
private void sendErrorMail(Message msg, String error) {
logger.info("Trying to send error mail to address " + msg.getFrom());
int port;
try {
port = domainLookup(msg.getFrom());
} catch (UnknownDomain e) {
logger.severe("Sending error mail failed because sender domain is unknown");
return;
}
ArrayList<Email> newTo = new ArrayList<>();
newTo.add(msg.getFrom());
msg.setTo(newTo);
try {
msg.setFrom(new Email("mailer@127.0.0.1"));
} catch (MalformedInputException e) {
logger.severe("The server's E-Mail address is wrong. This should not be happening!");
e.printStackTrace();
return;
}
try {
Socket socket = new Socket("127.0.0.1", port);
PrintWriter socketOut = new PrintWriter(socket.getOutputStream(), true);
BufferedReader socketIn = new BufferedReader(new InputStreamReader(socket.getInputStream()));
socketIn.readLine();
socketOut.println("begin");
socketIn.readLine();
socketOut.println("subject " + msg.getSubject());
socketIn.readLine();
socketOut.println("data " + error);
socketIn.readLine();
socketOut.println("to " + msg.printTo());
socketIn.readLine();
socketOut.println("from " + msg.getFrom().toString());
socketIn.readLine();
socketOut.println("send");
String result = socketIn.readLine();
if (result.startsWith("error"))
logger.severe("Sending error mail failed: " + result);
socketIn.close();
socketOut.close();
socket.close();
} catch (IOException e) {
logger.severe("Sending error mail failed because socket communication failed");
}
}
private void shutdown() {
try {
if (socket != null)
socket.close();
} catch (IOException e) {
logger.severe("Failed to close socket during shutdown" + socket.toString());
e.printStackTrace();
}
Thread.currentThread().interrupt();
}
}