diff --git a/ass3-elastic/pom.xml b/ass3-elastic/pom.xml
new file mode 100644
index 0000000..88daebf
--- /dev/null
+++ b/ass3-elastic/pom.xml
@@ -0,0 +1,53 @@
+
+
+
+ 4.0.0
+
+
+ at.ac.tuwien.infosys.dst
+ dst
+ 2021.1
+ ..
+
+
+ ass3-elastic
+
+ DST :: Assignment 3 :: Elasticity
+
+ jar
+
+
+
+ at.ac.tuwien.infosys.dst
+ ass3-messaging
+ ${project.version}
+
+
+
+ at.ac.tuwien.infosys.dst
+ ass3-messaging
+ ${project.version}
+ test-jar
+ test
+
+
+
+ com.github.docker-java
+ docker-java
+
+
+
+ org.mockito
+ mockito-core
+ test
+
+
+
+ org.springframework.boot
+ spring-boot-starter-amqp
+ test
+
+
+
+
diff --git a/ass3-elastic/src/main/java/dst/ass3/elastic/ContainerException.java b/ass3-elastic/src/main/java/dst/ass3/elastic/ContainerException.java
new file mode 100644
index 0000000..0678fa4
--- /dev/null
+++ b/ass3-elastic/src/main/java/dst/ass3/elastic/ContainerException.java
@@ -0,0 +1,22 @@
+package dst.ass3.elastic;
+
+/**
+ * Exception indicating that the ContainerService encountered an error when performing a task.
+ */
+public class ContainerException extends Exception {
+
+ public ContainerException() {
+ }
+
+ public ContainerException(String message) {
+ super(message);
+ }
+
+ public ContainerException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ContainerException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/ass3-elastic/src/main/java/dst/ass3/elastic/ContainerInfo.java b/ass3-elastic/src/main/java/dst/ass3/elastic/ContainerInfo.java
new file mode 100644
index 0000000..6cbcf76
--- /dev/null
+++ b/ass3-elastic/src/main/java/dst/ass3/elastic/ContainerInfo.java
@@ -0,0 +1,95 @@
+package dst.ass3.elastic;
+
+
+import dst.ass3.messaging.Region;
+
+import java.util.Objects;
+
+/**
+ * Value type that represents a container.
+ */
+public class ContainerInfo {
+
+ /**
+ * The name of the container image.
+ */
+ private String image;
+
+ /**
+ * The container ID.
+ */
+ private String containerId;
+
+ /**
+ * True if the container is running.
+ */
+ private boolean running;
+
+ /**
+ * If the container is a worker (indicated by the image dst/ass3-worker), then this field should contain the worker
+ * region. Otherwise it can be null.
+ */
+ private Region workerRegion;
+
+ public String getImage() {
+ return image;
+ }
+
+ public void setImage(String image) {
+ this.image = image;
+ }
+
+ public String getContainerId() {
+ return containerId;
+ }
+
+ public void setContainerId(String containerId) {
+ this.containerId = containerId;
+ }
+
+ public boolean isRunning() {
+ return running;
+ }
+
+ public void setRunning(boolean running) {
+ this.running = running;
+ }
+
+ public Region getWorkerRegion() {
+ return workerRegion;
+ }
+
+ public void setWorkerRegion(Region workerRegion) {
+ this.workerRegion = workerRegion;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ContainerInfo that = (ContainerInfo) o;
+ return isRunning() == that.isRunning() &&
+ Objects.equals(getImage(), that.getImage()) &&
+ Objects.equals(getContainerId(), that.getContainerId()) &&
+ getWorkerRegion() == that.getWorkerRegion();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getImage(), getContainerId(), isRunning(), getWorkerRegion());
+ }
+
+ @Override
+ public String toString() {
+ return "ContainerInfo{" +
+ "image='" + image + '\'' +
+ ", containerId='" + containerId + '\'' +
+ ", running=" + running +
+ ", workerRegion=" + workerRegion +
+ '}';
+ }
+}
diff --git a/ass3-elastic/src/main/java/dst/ass3/elastic/ContainerNotFoundException.java b/ass3-elastic/src/main/java/dst/ass3/elastic/ContainerNotFoundException.java
new file mode 100644
index 0000000..797cfd3
--- /dev/null
+++ b/ass3-elastic/src/main/java/dst/ass3/elastic/ContainerNotFoundException.java
@@ -0,0 +1,23 @@
+package dst.ass3.elastic;
+
+/**
+ * Indicates that a container could not be found.
+ */
+public class ContainerNotFoundException extends ContainerException {
+
+ public ContainerNotFoundException() {
+ }
+
+ public ContainerNotFoundException(String message) {
+ super(message);
+ }
+
+ public ContainerNotFoundException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ContainerNotFoundException(Throwable cause) {
+ super(cause);
+ }
+
+}
diff --git a/ass3-elastic/src/main/java/dst/ass3/elastic/IContainerService.java b/ass3-elastic/src/main/java/dst/ass3/elastic/IContainerService.java
new file mode 100644
index 0000000..e213897
--- /dev/null
+++ b/ass3-elastic/src/main/java/dst/ass3/elastic/IContainerService.java
@@ -0,0 +1,36 @@
+package dst.ass3.elastic;
+
+import dst.ass3.messaging.Region;
+
+import java.util.List;
+
+public interface IContainerService {
+
+ /**
+ * Returns a list of all running containers.
+ *
+ * @return a list of ContainerInfo objects
+ * @throws ContainerException if an error occurred when trying to fetch the running containers.
+ */
+ List listContainers() throws ContainerException;
+
+ /**
+ * Stops the container with the given container ID.
+ *
+ * @param containerId ID of the container to stop.
+ * @throws ContainerNotFoundException if the container to stop is not running
+ * @throws ContainerException if another error occurred when trying to stop the container
+ */
+ void stopContainer(String containerId) throws ContainerException;
+
+ /**
+ * Starts a worker for the specific {@link dst.ass3.messaging.Region}.
+ *
+ * @param region {@link Region} of the worker to start
+ * @return ContainerInfo of the started container / worker
+ * @throws ImageNotFoundException if the worker docker image is not available
+ * @throws ContainerException if another error occurred when trying to start the worker
+ */
+ ContainerInfo startWorker(Region region) throws ContainerException;
+
+}
diff --git a/ass3-elastic/src/main/java/dst/ass3/elastic/IElasticityController.java b/ass3-elastic/src/main/java/dst/ass3/elastic/IElasticityController.java
new file mode 100644
index 0000000..9e35c7e
--- /dev/null
+++ b/ass3-elastic/src/main/java/dst/ass3/elastic/IElasticityController.java
@@ -0,0 +1,7 @@
+package dst.ass3.elastic;
+
+public interface IElasticityController {
+
+ void adjustWorkers() throws ContainerException;
+
+}
diff --git a/ass3-elastic/src/main/java/dst/ass3/elastic/IElasticityFactory.java b/ass3-elastic/src/main/java/dst/ass3/elastic/IElasticityFactory.java
new file mode 100644
index 0000000..b65674d
--- /dev/null
+++ b/ass3-elastic/src/main/java/dst/ass3/elastic/IElasticityFactory.java
@@ -0,0 +1,10 @@
+package dst.ass3.elastic;
+
+import dst.ass3.messaging.IWorkloadMonitor;
+
+public interface IElasticityFactory {
+ IContainerService createContainerService();
+
+ IElasticityController createElasticityController(IContainerService containerService,
+ IWorkloadMonitor workloadMonitor);
+}
diff --git a/ass3-elastic/src/main/java/dst/ass3/elastic/ImageNotFoundException.java b/ass3-elastic/src/main/java/dst/ass3/elastic/ImageNotFoundException.java
new file mode 100644
index 0000000..bc659c5
--- /dev/null
+++ b/ass3-elastic/src/main/java/dst/ass3/elastic/ImageNotFoundException.java
@@ -0,0 +1,22 @@
+package dst.ass3.elastic;
+
+/**
+ * Exception indicating that the image which should be used for a container start is not available.
+ */
+public class ImageNotFoundException extends ContainerException {
+
+ public ImageNotFoundException() {
+ }
+
+ public ImageNotFoundException(String message) {
+ super(message);
+ }
+
+ public ImageNotFoundException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ImageNotFoundException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/ass3-elastic/src/main/java/dst/ass3/elastic/impl/ElasticityFactory.java b/ass3-elastic/src/main/java/dst/ass3/elastic/impl/ElasticityFactory.java
new file mode 100644
index 0000000..64d2cd9
--- /dev/null
+++ b/ass3-elastic/src/main/java/dst/ass3/elastic/impl/ElasticityFactory.java
@@ -0,0 +1,24 @@
+package dst.ass3.elastic.impl;
+
+import dst.ass3.elastic.IContainerService;
+import dst.ass3.elastic.IElasticityController;
+import dst.ass3.elastic.IElasticityFactory;
+import dst.ass3.messaging.IWorkloadMonitor;
+import dst.ass3.messaging.impl.MessagingFactory;
+
+public class ElasticityFactory implements IElasticityFactory {
+
+ @Override
+ public IContainerService createContainerService() {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public IElasticityController createElasticityController(IContainerService containerService,
+ IWorkloadMonitor workloadMonitor) {
+ // TODO
+ return null;
+ }
+
+}
diff --git a/ass3-elastic/src/main/resources/logback.xml b/ass3-elastic/src/main/resources/logback.xml
new file mode 100644
index 0000000..9a6b351
--- /dev/null
+++ b/ass3-elastic/src/main/resources/logback.xml
@@ -0,0 +1,16 @@
+
+
+
+
+
+ %d{yyyy-MM-dd HH:mm:ss.SSS} - %highlight(%5p) [%12.12thread] %cyan(%-40.40logger{39}): %m%n
+
+
+
+
+
+
+
+
+
+
diff --git a/ass3-elastic/src/test/java/dst/ass3/elastic/ContainerServiceTest.java b/ass3-elastic/src/test/java/dst/ass3/elastic/ContainerServiceTest.java
new file mode 100644
index 0000000..e4d5d97
--- /dev/null
+++ b/ass3-elastic/src/test/java/dst/ass3/elastic/ContainerServiceTest.java
@@ -0,0 +1,107 @@
+package dst.ass3.elastic;
+
+import dst.ass3.elastic.impl.ElasticityFactory;
+import dst.ass3.messaging.RabbitResource;
+import dst.ass3.messaging.Region;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.amqp.core.Queue;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.CoreMatchers.*;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class ContainerServiceTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ContainerServiceTest.class);
+
+ @Rule
+ public RabbitResource rabbit = new RabbitResource();
+
+ @Rule
+ public Timeout timeout = new Timeout(30, TimeUnit.SECONDS);
+
+ IContainerService containerService;
+ IElasticityFactory factory;
+
+ @Before
+ public void setUp() throws Exception {
+ factory = new ElasticityFactory();
+
+ containerService = factory.createContainerService();
+
+ rabbit.getAdmin().declareQueue(new Queue("dst.at_vienna"));
+ rabbit.getAdmin().declareQueue(new Queue("dst.at_linz"));
+ rabbit.getAdmin().declareQueue(new Queue("dst.de_berlin"));
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ rabbit.getAdmin().deleteQueue("dst.at_vienna");
+ rabbit.getAdmin().deleteQueue("dst.at_linz");
+ rabbit.getAdmin().deleteQueue("dst.de_berlin");
+ }
+
+ @Test
+ public void spawnListStop_lifecycleWorks() throws Exception {
+ List containers = containerService.listContainers();
+ assertThat("Please stop all containers before running the test", containers.size(), is(0));
+
+ ContainerInfo c1 = containerService.startWorker(Region.AT_VIENNA);
+ LOG.info("Started container {}", c1);
+
+ ContainerInfo c2 = containerService.startWorker(Region.AT_LINZ);
+ LOG.info("Started container {}", c2);
+
+ ContainerInfo c3 = containerService.startWorker(Region.DE_BERLIN);
+ LOG.info("Started container {}", c3);
+
+ LOG.info("Waiting for containers to boot...");
+ Thread.sleep(5000);
+
+ containers = containerService.listContainers();
+
+ assertThat(containers.size(), is(3));
+
+ LOG.info("Stopping containers...");
+ containerService.stopContainer(c1.getContainerId());
+ containerService.stopContainer(c2.getContainerId());
+ containerService.stopContainer(c3.getContainerId());
+
+ Thread.sleep(5000);
+
+ containers = containerService.listContainers();
+ assertThat(containers.size(), is(0));
+ }
+
+ @Test(expected = ContainerNotFoundException.class)
+ public void stopNonExistingContainer_throwsException() throws Exception {
+ containerService.stopContainer("Non-Existing-Id");
+ }
+
+ @Test
+ public void listContainers_containsCompleteInfo() throws Exception {
+ ContainerInfo c1 = containerService.startWorker(Region.AT_VIENNA);
+ LOG.info("Started container {}", c1);
+ LOG.info("Waiting for container to boot...");
+ Thread.sleep(5000);
+ List containers = containerService.listContainers();
+ ContainerInfo containerInfo = containers.stream()
+ .filter(c -> c1.getContainerId().equals(c.getContainerId()))
+ .findFirst().get();
+ assertThat(containerInfo, notNullValue());
+ assertThat(containerInfo.getImage(), equalTo("dst/ass3-worker"));
+ assertThat(containerInfo.getWorkerRegion(), equalTo(Region.AT_VIENNA));
+ assertThat(containerInfo.isRunning(), is(true));
+ LOG.info("Stopping container...");
+ containerService.stopContainer(containerInfo.getContainerId());
+ }
+
+}
diff --git a/ass3-elastic/src/test/java/dst/ass3/elastic/ElasticityControllerTest.java b/ass3-elastic/src/test/java/dst/ass3/elastic/ElasticityControllerTest.java
new file mode 100644
index 0000000..0248c97
--- /dev/null
+++ b/ass3-elastic/src/test/java/dst/ass3/elastic/ElasticityControllerTest.java
@@ -0,0 +1,136 @@
+package dst.ass3.elastic;
+
+import dst.ass3.elastic.impl.ElasticityFactory;
+import dst.ass3.messaging.IWorkloadMonitor;
+import dst.ass3.messaging.Region;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ElasticityControllerTest {
+ private static final String WORKER_IMAGE = "dst/ass3-worker";
+
+ IElasticityFactory factory;
+
+ @Mock
+ IContainerService containerService;
+ @Mock
+ IWorkloadMonitor workloadMonitor;
+
+ IElasticityController elasticityController;
+ Map processingTimes = new HashMap<>();
+ Map workerCount = new HashMap<>();
+ Map requestCount = new HashMap<>();
+ List containers = new ArrayList<>();
+
+ @Before
+ public void setUp() throws Exception {
+ factory = new ElasticityFactory();
+ elasticityController = factory.createElasticityController(containerService, workloadMonitor);
+
+ processingTimes.clear();
+ processingTimes.put(Region.AT_VIENNA, 5000.0);
+ processingTimes.put(Region.DE_BERLIN, 10000.0);
+ processingTimes.put(Region.AT_LINZ, 2000.0);
+
+ when(workloadMonitor.getAverageProcessingTime()).thenReturn(processingTimes);
+
+ workerCount.clear();
+ workerCount.put(Region.AT_VIENNA, 95L);
+ workerCount.put(Region.DE_BERLIN, 87L);
+ workerCount.put(Region.AT_LINZ, 61L);
+ when(workloadMonitor.getWorkerCount()).thenReturn(workerCount);
+
+ requestCount.clear();
+ requestCount.put(Region.AT_VIENNA, 600L);
+ requestCount.put(Region.DE_BERLIN, 1000L);
+ requestCount.put(Region.AT_LINZ, 1005L);
+ when(workloadMonitor.getRequestCount()).thenReturn(requestCount);
+
+ containers.clear();
+ for (int i = 0; i < 95; i++) {
+ containers.add(containerInfo("vienna" + i, WORKER_IMAGE, Region.AT_VIENNA, true));
+ }
+ for (int i = 0; i < 87; i++) {
+ containers.add(containerInfo("berlin" + i, WORKER_IMAGE, Region.DE_BERLIN, true));
+ }
+ for (int i = 0; i < 61; i++) {
+ containers.add(containerInfo("linz" + i, WORKER_IMAGE, Region.AT_LINZ, true));
+ }
+ when(containerService.listContainers()).thenReturn(containers);
+ }
+
+ @After
+ public void tearDown() {
+ verify(workloadMonitor, atLeast(1)).getWorkerCount();
+ verify(workloadMonitor, atLeast(1)).getRequestCount();
+ verify(workloadMonitor, atLeast(1)).getAverageProcessingTime();
+ }
+
+ @Test
+ public void notEnoughWorkers_scaleUp() throws Exception {
+ // remove 10 vienna workers and 10 linz workers
+ List containersToRemove = containers.stream()
+ .filter(c -> c.getContainerId().startsWith("vienna7") || c.getContainerId().startsWith("linz1"))
+ .collect(Collectors.toList());
+ containers.removeAll(containersToRemove);
+ workerCount.put(Region.AT_VIENNA, 85L);
+ workerCount.put(Region.AT_LINZ, 51L);
+
+ elasticityController.adjustWorkers();
+
+ verify(containerService, never()).stopContainer((String) any(String.class));
+ verify(containerService, times(15)).startWorker(Region.AT_VIENNA);
+ verify(containerService, times(16)).startWorker(Region.AT_LINZ);
+ verify(containerService, never()).startWorker(Region.DE_BERLIN);
+ verify(containerService, never()).listContainers();
+ }
+
+ @Test
+ public void tooManyWorkers_scaleDown() throws Exception {
+ // add 20 more, some should be stopped
+ for (int i = 0; i < 20; i++) {
+ containers.add(containerInfo("linz1" + i, WORKER_IMAGE, Region.AT_LINZ, true));
+ }
+ workerCount.put(Region.AT_LINZ, 81L);
+
+ elasticityController.adjustWorkers();
+
+ verify(containerService, times(14)).stopContainer(contains("linz"));
+ verify(containerService, never()).stopContainer(contains("berlin"));
+ verify(containerService, never()).stopContainer(contains("vienna"));
+ verify(containerService, never()).startWorker((Region) any(Region.class));
+ verify(containerService, times(1)).listContainers();
+ }
+
+ @Test
+ public void justEnoughWorkers_doNotScale() throws Exception {
+ elasticityController.adjustWorkers();
+ verify(containerService, never()).startWorker((Region) any(Region.class));
+ verify(containerService, never()).stopContainer((String) any());
+ verify(containerService, never()).listContainers();
+ }
+
+ private ContainerInfo containerInfo(String id, String image, Region workerRegion, boolean running) {
+ ContainerInfo info = new ContainerInfo();
+ info.setContainerId(id);
+ info.setImage(image);
+ info.setWorkerRegion(workerRegion);
+ info.setRunning(running);
+ return info;
+ }
+
+}
diff --git a/ass3-event/pom.xml b/ass3-event/pom.xml
new file mode 100644
index 0000000..939dfb1
--- /dev/null
+++ b/ass3-event/pom.xml
@@ -0,0 +1,36 @@
+
+
+
+ 4.0.0
+
+
+ at.ac.tuwien.infosys.dst
+ dst
+ 2021.1
+ ..
+
+
+ ass3-event
+
+ jar
+
+ DST :: Assignment 3 :: Event Stream Processing
+
+
+
+ org.apache.flink
+ flink-streaming-java_2.12
+
+
+ org.apache.flink
+ flink-cep_2.12
+
+
+ org.apache.flink
+ flink-clients_2.12
+
+
+
+
diff --git a/ass3-event/src/main/java/dst/ass3/event/Constants.java b/ass3-event/src/main/java/dst/ass3/event/Constants.java
new file mode 100644
index 0000000..d06492a
--- /dev/null
+++ b/ass3-event/src/main/java/dst/ass3/event/Constants.java
@@ -0,0 +1,15 @@
+package dst.ass3.event;
+
+/**
+ * Constants.
+ */
+public final class Constants {
+
+ /**
+ * The TCP port of the {@link EventPublisher}.
+ */
+ public static final int EVENT_PUBLISHER_PORT = 1338;
+
+ private Constants() {
+ }
+}
diff --git a/ass3-event/src/main/java/dst/ass3/event/EventProcessingFactory.java b/ass3-event/src/main/java/dst/ass3/event/EventProcessingFactory.java
new file mode 100644
index 0000000..6a8c668
--- /dev/null
+++ b/ass3-event/src/main/java/dst/ass3/event/EventProcessingFactory.java
@@ -0,0 +1,16 @@
+package dst.ass3.event;
+
+/**
+ * Creates your {@link IEventProcessingEnvironment} and {@link IEventSourceFunction} implementation instances.
+ */
+public class EventProcessingFactory {
+ public static IEventProcessingEnvironment createEventProcessingEnvironment() {
+ // TODO
+ return null;
+ }
+
+ public static IEventSourceFunction createEventSourceFunction() {
+ // TODO
+ return null;
+ }
+}
diff --git a/ass3-event/src/main/java/dst/ass3/event/EventPublisher.java b/ass3-event/src/main/java/dst/ass3/event/EventPublisher.java
new file mode 100644
index 0000000..3001d6a
--- /dev/null
+++ b/ass3-event/src/main/java/dst/ass3/event/EventPublisher.java
@@ -0,0 +1,173 @@
+package dst.ass3.event;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+import dst.ass3.event.model.domain.ITripEventInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
+import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.group.ChannelGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.group.DefaultChannelGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.serialization.ClassResolvers;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.serialization.ObjectDecoder;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.serialization.ObjectEncoder;
+
+/**
+ * An EventPublisher accepts incoming TCP socket connections on a given port and is able to broadcast {@link ITripEventInfo}
+ * objects to these clients.
+ */
+public class EventPublisher {
+
+ private static final Logger LOG = LoggerFactory.getLogger(EventPublisher.class);
+
+ private final Object clientChannelMonitor = new Object();
+
+ private final int port;
+ private final AtomicBoolean closed;
+
+ private EventLoopGroup bossGroup;
+ private EventLoopGroup workerGroup;
+ private ChannelGroup clientChannels;
+
+ public EventPublisher(int port) {
+ this.port = port;
+ this.closed = new AtomicBoolean(false);
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ /**
+ * Broadcast an event to all listening channels. Does nothing if no clients are connected.
+ *
+ * @param event the event to publish
+ * @throws IllegalStateException if the publisher hasn't been started yet or has been closed
+ */
+ public void publish(ITripEventInfo event) {
+ if (clientChannels == null || closed.get()) {
+ throw new IllegalStateException();
+ }
+
+ clientChannels.writeAndFlush(event).syncUninterruptibly();
+
+ // wait a bit for event to propagate
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {}
+ }
+
+ /**
+ * Like {@link #publish(ITripEventInfo)} but waits for a given number of milliseconds and then passes the current system
+ * time to a factory function.
+ *
+ * @param delay the delay in ms
+ * @param provider the provider
+ */
+ public void publish(long delay, Function provider) {
+ if (delay > 0) {
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException e) {
+ }
+ }
+ publish(provider.apply(System.currentTimeMillis()));
+ }
+
+ /**
+ * This method blocks if no clients are connected, and is notified as soon as a client connects. If clients are
+ * connected, the method returns immediately.
+ */
+ public void waitForClients() {
+ if (clientChannels.isEmpty()) {
+ LOG.debug("Waiting for clients to connect...");
+ synchronized (clientChannelMonitor) {
+ try {
+ clientChannelMonitor.wait();
+ } catch (InterruptedException e) {
+ LOG.debug("Interrupted while waiting on client connections", e);
+ }
+ }
+ }
+ }
+
+ public int getConnectedClientCount() {
+ if (clientChannels == null || closed.get()) {
+ throw new IllegalStateException();
+ }
+ return clientChannels.size();
+ }
+
+ /**
+ * Closes all active client connections.
+ */
+ public void dropClients() {
+ if (clientChannels == null || closed.get()) {
+ throw new IllegalStateException();
+ }
+ clientChannels.close().syncUninterruptibly().group().clear();
+ }
+
+ /**
+ * Start the server and accept incoming connections. Will call {@link #close()} if an error occurs during
+ * connection.
+ */
+ public void start() {
+ bossGroup = new NioEventLoopGroup();
+ workerGroup = new NioEventLoopGroup();
+ clientChannels = new DefaultChannelGroup(workerGroup.next());
+
+ ServerBootstrap b = new ServerBootstrap();
+ b.group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .childHandler(new ClientChannelInitializer());
+
+ // Bind and start to accept incoming connections
+ ChannelFuture f = b.bind(port).addListener(future -> {
+ if (!future.isSuccess()) {
+ LOG.error("Error while binding socket");
+ close();
+ }
+ }).syncUninterruptibly();
+ LOG.info("Accepting connections on {}", f.channel());
+ }
+
+ /**
+ * Closes all channels and resources.
+ */
+ public void close() {
+ if (closed.compareAndSet(false, true)) {
+ LOG.info("Shutting down event loops");
+ if (bossGroup != null) {
+ bossGroup.shutdownGracefully();
+ }
+ if (workerGroup != null) {
+ workerGroup.shutdownGracefully();
+ }
+ }
+ }
+
+ private class ClientChannelInitializer extends ChannelInitializer {
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception {
+ LOG.info("Initializing client channel {}", ch);
+ clientChannels.add(ch);
+
+ ch.pipeline()
+ .addFirst(new ObjectEncoder())
+ .addFirst(new ObjectDecoder(ClassResolvers.cacheDisabled(ClassLoader.getSystemClassLoader())));
+
+ synchronized (clientChannelMonitor) {
+ clientChannelMonitor.notifyAll();
+ }
+ }
+ }
+}
diff --git a/ass3-event/src/main/java/dst/ass3/event/EventSubscriber.java b/ass3-event/src/main/java/dst/ass3/event/EventSubscriber.java
new file mode 100644
index 0000000..1a43764
--- /dev/null
+++ b/ass3-event/src/main/java/dst/ass3/event/EventSubscriber.java
@@ -0,0 +1,178 @@
+package dst.ass3.event;
+
+import java.lang.reflect.Proxy;
+import java.net.SocketAddress;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import dst.ass3.event.model.domain.ITripEventInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
+import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.serialization.ClassResolvers;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.serialization.ObjectDecoder;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.serialization.ObjectEncoder;
+import org.apache.flink.shaded.netty4.io.netty.util.concurrent.Future;
+
+/**
+ * An EventSubscriber receives ITripEventInfo objects through a netty SocketChannel. Create and connect an
+ * EventSubscriber using {@link #subscribe(SocketAddress)}. To receive events, call {@link #receive()}.
+ */
+public class EventSubscriber {
+
+ private static final Logger LOG = LoggerFactory.getLogger(EventSubscriber.class);
+
+ private static final ITripEventInfo POISON_PILL = (ITripEventInfo) Proxy.newProxyInstance(
+ ITripEventInfo.class.getClassLoader(), new Class[]{ITripEventInfo.class}, (p, m, a) -> null);
+
+ private final SocketAddress publisherAddress;
+
+ private final BlockingQueue queue;
+
+ private volatile boolean closed;
+
+ private Channel channel;
+ private EventLoopGroup loop;
+
+ private EventSubscriber(SocketAddress publisherAddress) {
+ this.publisherAddress = publisherAddress;
+ this.queue = new LinkedBlockingQueue<>();
+ }
+
+ /**
+ * Blocks to receive the next ITripEventInfo published into the channel. Returns {@code null} if the underlying
+ * channel has been closed or the thread was interrupted.
+ *
+ * @return the next ITripEventInfo object
+ * @throws IllegalStateException thrown if the previous call returned null and the channel was closed
+ */
+ public ITripEventInfo receive() throws IllegalStateException {
+ synchronized (queue) {
+ if (closed && queue.isEmpty()) {
+ throw new IllegalStateException();
+ }
+ }
+
+ ITripEventInfo event;
+ try {
+ event = queue.take();
+
+ if (event == POISON_PILL) {
+ return null;
+ } else {
+ return event;
+ }
+ } catch (InterruptedException e) {
+ return null;
+ }
+ }
+
+ private Future> start() {
+ loop = new NioEventLoopGroup();
+
+ channel = new Bootstrap()
+ .group(loop)
+ .channel(NioSocketChannel.class)
+ .option(ChannelOption.TCP_NODELAY, true)
+ .handler(new EventSubscriberHandler())
+ .connect(publisherAddress) // ChannelFuture
+ .addListener(future -> {
+ if (!future.isSuccess()) {
+ LOG.error("Error while connecting");
+ close();
+ }
+ })
+ .syncUninterruptibly()
+ .channel();
+
+ LOG.info("Connected to channel {}", channel);
+
+ return loop.submit(() -> {
+ try {
+ channel.closeFuture().sync();
+ } catch (InterruptedException e) {
+ // noop
+ } finally {
+ close();
+ }
+ });
+ }
+
+ /**
+ * Closes all resources and threads used by the EventSubscriber.
+ */
+ public void close() {
+ try {
+ if (loop != null) {
+ synchronized (queue) {
+ if (!loop.isShutdown() && !loop.isTerminated() && !loop.isShuttingDown()) {
+ LOG.info("Shutting down event loop");
+ loop.shutdownGracefully();
+ }
+ }
+ }
+ } finally {
+ synchronized (queue) {
+ if (!closed) {
+ LOG.debug("Adding poison pill to queue");
+ closed = true;
+ queue.add(POISON_PILL);
+ }
+ }
+ }
+ }
+
+ /**
+ * Creates a new EventSubscriber that connects to given SocketAddress.
+ *
+ * @param address the socket address
+ * @return a new EventSubscriber
+ */
+ public static EventSubscriber subscribe(SocketAddress address) {
+ EventSubscriber eventSubscriber = new EventSubscriber(address);
+ eventSubscriber.start();
+ return eventSubscriber;
+ }
+
+ private class EventSubscriberHandler extends ChannelInboundHandlerAdapter {
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ ctx.read();
+
+ if (!(msg instanceof ITripEventInfo)) {
+ LOG.error("Unknown message type received {}", msg);
+ return;
+ }
+
+ synchronized (queue) {
+ if (!closed) {
+ queue.add((ITripEventInfo) msg);
+ }
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ LOG.error("EventSubscriberHandler caught an exception", cause);
+ ctx.close();
+ close();
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) {
+ ctx.pipeline()
+ .addFirst(new ObjectEncoder())
+ .addFirst(new ObjectDecoder(ClassResolvers.cacheDisabled(ClassLoader.getSystemClassLoader())));
+ }
+
+ }
+}
+
diff --git a/ass3-event/src/main/java/dst/ass3/event/IEventProcessingEnvironment.java b/ass3-event/src/main/java/dst/ass3/event/IEventProcessingEnvironment.java
new file mode 100644
index 0000000..ae4b727
--- /dev/null
+++ b/ass3-event/src/main/java/dst/ass3/event/IEventProcessingEnvironment.java
@@ -0,0 +1,39 @@
+package dst.ass3.event;
+
+import dst.ass3.event.model.events.*;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.windowing.time.Time;
+
+/**
+ * This class should be used to implement the event processing steps as described in the assignment. The test classes
+ * will inject SinkFunctions, create a new StreamExecutionEnvironment and then call {@link
+ * #initialize(StreamExecutionEnvironment)}.
+ */
+public interface IEventProcessingEnvironment {
+
+ /**
+ * Initializes the event processing graph on the {@link StreamExecutionEnvironment}. This function is called
+ * after all sinks have been set.
+ */
+ void initialize(StreamExecutionEnvironment env);
+
+ /**
+ * Sets the timeout limit of a streaming event.
+ *
+ * @param time the timeout limit
+ */
+ void setMatchingDurationTimeout(Time time);
+
+ void setLifecycleEventStreamSink(SinkFunction sink);
+
+ void setMatchingDurationStreamSink(SinkFunction sink);
+
+ void setAverageMatchingDurationStreamSink(SinkFunction sink);
+
+ void setMatchingTimeoutWarningStreamSink(SinkFunction sink);
+
+ void setTripFailedWarningStreamSink(SinkFunction sink);
+
+ void setAlertStreamSink(SinkFunction sink);
+}
diff --git a/ass3-event/src/main/java/dst/ass3/event/IEventSourceFunction.java b/ass3-event/src/main/java/dst/ass3/event/IEventSourceFunction.java
new file mode 100644
index 0000000..49278e7
--- /dev/null
+++ b/ass3-event/src/main/java/dst/ass3/event/IEventSourceFunction.java
@@ -0,0 +1,25 @@
+package dst.ass3.event;
+
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import dst.ass3.event.model.domain.ITripEventInfo;
+
+/**
+ * A RichFunction & SourceFunction for ITripEventInfo objects.
+ */
+public interface IEventSourceFunction extends RichFunction, SourceFunction {
+
+ @Override
+ void open(Configuration parameters) throws Exception;
+
+ @Override
+ void close() throws Exception;
+
+ @Override
+ void run(SourceContext ctx) throws Exception;
+
+ @Override
+ void cancel();
+}
diff --git a/ass3-event/src/main/java/dst/ass3/event/dto/TripEventInfoDTO.java b/ass3-event/src/main/java/dst/ass3/event/dto/TripEventInfoDTO.java
new file mode 100644
index 0000000..2c604b5
--- /dev/null
+++ b/ass3-event/src/main/java/dst/ass3/event/dto/TripEventInfoDTO.java
@@ -0,0 +1,54 @@
+package dst.ass3.event.dto;
+
+import java.io.Serializable;
+
+import dst.ass3.event.model.domain.ITripEventInfo;
+import dst.ass3.event.model.domain.Region;
+import dst.ass3.event.model.domain.TripState;
+
+public class TripEventInfoDTO implements Serializable, ITripEventInfo {
+
+ private static final long serialVersionUID = 4134104076758220138L;
+
+ private Long tripId;
+ private Long timestamp;
+ private TripState state;
+ private Region region;
+
+ public TripEventInfoDTO(Long tripId, Long timestamp, TripState state, Region region) {
+ this.tripId = tripId;
+ this.timestamp = timestamp;
+ this.state = state;
+ this.region = region;
+ }
+
+ @Override
+ public Long getTripId() {
+ return tripId;
+ }
+
+ @Override
+ public Long getTimestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public TripState getState() {
+ return state;
+ }
+
+ @Override
+ public Region getRegion() {
+ return region;
+ }
+
+ @Override
+ public String toString() {
+ return "TripEventInfoDTO{" +
+ "tripId=" + tripId +
+ ", timestamp=" + timestamp +
+ ", state=" + state +
+ ", region=" + region +
+ '}';
+ }
+}
diff --git a/ass3-event/src/main/java/dst/ass3/event/model/domain/ITripEventInfo.java b/ass3-event/src/main/java/dst/ass3/event/model/domain/ITripEventInfo.java
new file mode 100644
index 0000000..0fa9870
--- /dev/null
+++ b/ass3-event/src/main/java/dst/ass3/event/model/domain/ITripEventInfo.java
@@ -0,0 +1,9 @@
+package dst.ass3.event.model.domain;
+
+public interface ITripEventInfo {
+
+ Long getTripId();
+ Long getTimestamp();
+ TripState getState();
+ Region getRegion();
+}
diff --git a/ass3-event/src/main/java/dst/ass3/event/model/domain/Region.java b/ass3-event/src/main/java/dst/ass3/event/model/domain/Region.java
new file mode 100644
index 0000000..d61cce1
--- /dev/null
+++ b/ass3-event/src/main/java/dst/ass3/event/model/domain/Region.java
@@ -0,0 +1,7 @@
+package dst.ass3.event.model.domain;
+
+public enum Region {
+ AT_VIENNA,
+ AT_LINZ,
+ DE_BERLIN
+}
diff --git a/ass3-event/src/main/java/dst/ass3/event/model/domain/TripState.java b/ass3-event/src/main/java/dst/ass3/event/model/domain/TripState.java
new file mode 100644
index 0000000..9453273
--- /dev/null
+++ b/ass3-event/src/main/java/dst/ass3/event/model/domain/TripState.java
@@ -0,0 +1,5 @@
+package dst.ass3.event.model.domain;
+
+public enum TripState {
+ CREATED, QUEUED, MATCHED, APPROACHING, IN_PROGRESS, CANCELLED, COMPLETED
+}
diff --git a/ass3-event/src/main/java/dst/ass3/event/model/events/Alert.java b/ass3-event/src/main/java/dst/ass3/event/model/events/Alert.java
new file mode 100644
index 0000000..f4a7f1a
--- /dev/null
+++ b/ass3-event/src/main/java/dst/ass3/event/model/events/Alert.java
@@ -0,0 +1,49 @@
+package dst.ass3.event.model.events;
+
+import java.io.Serializable;
+import java.util.List;
+
+import dst.ass3.event.model.domain.Region;
+
+/**
+ * A system alert that aggregates several warnings.
+ */
+public class Alert implements Serializable {
+
+ private static final long serialVersionUID = -4561132671849230635L;
+
+ private Region region;
+ private List warnings;
+
+ public Alert() {
+ }
+
+ public Alert(Region region, List warnings) {
+ this.region = region;
+ this.warnings = warnings;
+ }
+
+ public Region getRegion() {
+ return region;
+ }
+
+ public void setRegion(Region region) {
+ this.region = region;
+ }
+
+ public List getWarnings() {
+ return warnings;
+ }
+
+ public void setWarnings(List warnings) {
+ this.warnings = warnings;
+ }
+
+ @Override
+ public String toString() {
+ return "Alert{" +
+ "region='" + region + '\'' +
+ ", warnings=" + warnings +
+ '}';
+ }
+}
diff --git a/ass3-event/src/main/java/dst/ass3/event/model/events/AverageMatchingDuration.java b/ass3-event/src/main/java/dst/ass3/event/model/events/AverageMatchingDuration.java
new file mode 100644
index 0000000..9a3f8f7
--- /dev/null
+++ b/ass3-event/src/main/java/dst/ass3/event/model/events/AverageMatchingDuration.java
@@ -0,0 +1,48 @@
+package dst.ass3.event.model.events;
+
+import java.io.Serializable;
+
+import dst.ass3.event.model.domain.Region;
+
+/**
+ * The average of several {@link MatchingDuration} values.
+ */
+public class AverageMatchingDuration implements Serializable {
+
+ private static final long serialVersionUID = -3767582104941550250L;
+
+ private Region region;
+ private double duration;
+
+ public AverageMatchingDuration() {
+ }
+
+ public AverageMatchingDuration(Region region, double duration) {
+ this.region = region;
+ this.duration = duration;
+ }
+
+ public Region getRegion() {
+ return region;
+ }
+
+ public void setRegion(Region region) {
+ this.region = region;
+ }
+
+ public double getDuration() {
+ return duration;
+ }
+
+ public void setDuration(double duration) {
+ this.duration = duration;
+ }
+
+ @Override
+ public String toString() {
+ return "AverageMatchingDuration{" +
+ "region='" + region + '\'' +
+ ", duration=" + duration +
+ '}';
+ }
+}
diff --git a/ass3-event/src/main/java/dst/ass3/event/model/events/LifecycleEvent.java b/ass3-event/src/main/java/dst/ass3/event/model/events/LifecycleEvent.java
new file mode 100644
index 0000000..8b2a8fa
--- /dev/null
+++ b/ass3-event/src/main/java/dst/ass3/event/model/events/LifecycleEvent.java
@@ -0,0 +1,84 @@
+package dst.ass3.event.model.events;
+
+import java.io.Serializable;
+
+import dst.ass3.event.model.domain.ITripEventInfo;
+import dst.ass3.event.model.domain.Region;
+import dst.ass3.event.model.domain.TripState;
+
+/**
+ * Indicates a change in the lifecycle state of an ITripEventInfo.
+ */
+public class LifecycleEvent implements Serializable {
+
+ private static final long serialVersionUID = 8665269919851487210L;
+
+ /**
+ * The id of the trip, as returned by {@link ITripEventInfo#getTripId()}.
+ */
+ private long tripId;
+
+ private TripState state;
+ private Region region;
+
+ /**
+ * The instant the event was recorded (unix epoch in milliseconds)
+ */
+ private long timestamp;
+
+ public LifecycleEvent() {
+ }
+
+ public LifecycleEvent(ITripEventInfo eventInfo) {
+ this(eventInfo.getTripId(), eventInfo.getState(), eventInfo.getRegion(), eventInfo.getTimestamp());
+ }
+
+ public LifecycleEvent(long tripId, TripState state, Region region, long timestamp) {
+ this.tripId = tripId;
+ this.state = state;
+ this.region = region;
+ this.timestamp = timestamp;
+ }
+
+ public long getTripId() {
+ return tripId;
+ }
+
+ public void setTripId(long tripId) {
+ this.tripId = tripId;
+ }
+
+ public TripState getState() {
+ return state;
+ }
+
+ public void setState(TripState state) {
+ this.state = state;
+ }
+
+ public Region getRegion() {
+ return region;
+ }
+
+ public void setRegion(Region region) {
+ this.region = region;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public String toString() {
+ return "LifecycleEvent{" +
+ "tripId=" + tripId +
+ ", state=" + state +
+ ", region=" + region +
+ ", timestamp=" + timestamp +
+ '}';
+ }
+}
diff --git a/ass3-event/src/main/java/dst/ass3/event/model/events/MatchingDuration.java b/ass3-event/src/main/java/dst/ass3/event/model/events/MatchingDuration.java
new file mode 100644
index 0000000..9bf82f2
--- /dev/null
+++ b/ass3-event/src/main/java/dst/ass3/event/model/events/MatchingDuration.java
@@ -0,0 +1,59 @@
+package dst.ass3.event.model.events;
+
+import java.io.Serializable;
+
+import dst.ass3.event.model.domain.Region;
+
+/**
+ * Indicates the amount of time an ITripEventInfo took to get from CREATED to MATCHED.
+ */
+public class MatchingDuration implements Serializable {
+
+ private static final long serialVersionUID = -6976972381929291369L;
+
+ private long eventId;
+ private Region region;
+ private long duration;
+
+ public MatchingDuration() {
+ }
+
+ public MatchingDuration(long eventId, Region region, long duration) {
+ this.eventId = eventId;
+ this.region = region;
+ this.duration = duration;
+ }
+
+ public long getEventId() {
+ return eventId;
+ }
+
+ public void setEventId(long eventId) {
+ this.eventId = eventId;
+ }
+
+ public Region getRegion() {
+ return region;
+ }
+
+ public void setRegion(Region region) {
+ this.region = region;
+ }
+
+ public long getDuration() {
+ return duration;
+ }
+
+ public void setDuration(long duration) {
+ this.duration = duration;
+ }
+
+ @Override
+ public String toString() {
+ return "MatchingDuration{" +
+ "eventId=" + eventId +
+ ", region='" + region + '\'' +
+ ", duration=" + duration +
+ '}';
+ }
+}
diff --git a/ass3-event/src/main/java/dst/ass3/event/model/events/MatchingTimeoutWarning.java b/ass3-event/src/main/java/dst/ass3/event/model/events/MatchingTimeoutWarning.java
new file mode 100644
index 0000000..66776d3
--- /dev/null
+++ b/ass3-event/src/main/java/dst/ass3/event/model/events/MatchingTimeoutWarning.java
@@ -0,0 +1,37 @@
+package dst.ass3.event.model.events;
+
+import dst.ass3.event.model.domain.Region;
+
+/**
+ * Warning that indicates that a matching event has not reached the lifecycle state MATCHED within a given time frame.
+ */
+public class MatchingTimeoutWarning extends Warning {
+
+ private static final long serialVersionUID = 7955599732178947649L;
+
+ private long tripId;
+
+ public MatchingTimeoutWarning() {
+ super(null);
+ }
+
+ public MatchingTimeoutWarning(long tripId, Region region) {
+ super(region);
+ this.tripId = tripId;
+ }
+
+ public long getTripId() {
+ return tripId;
+ }
+
+ public void setTripId(long tripId) {
+ this.tripId = tripId;
+ }
+
+ @Override
+ public String toString() {
+ return "MatchingTimeoutWarning{" +
+ "tripId=" + tripId +
+ '}';
+ }
+}
diff --git a/ass3-event/src/main/java/dst/ass3/event/model/events/TripFailedWarning.java b/ass3-event/src/main/java/dst/ass3/event/model/events/TripFailedWarning.java
new file mode 100644
index 0000000..1eaffde
--- /dev/null
+++ b/ass3-event/src/main/java/dst/ass3/event/model/events/TripFailedWarning.java
@@ -0,0 +1,37 @@
+package dst.ass3.event.model.events;
+
+import dst.ass3.event.model.domain.Region;
+
+/**
+ * Indicates that matching a trip has probably failed.
+ */
+public class TripFailedWarning extends Warning {
+
+ private static final long serialVersionUID = -9120187311385112769L;
+
+ private long eventId;
+
+ public TripFailedWarning() {
+ super(null);
+ }
+
+ public TripFailedWarning(long eventId, Region region) {
+ super(region);
+ this.eventId = eventId;
+ }
+
+ public long getEventId() {
+ return eventId;
+ }
+
+ public void setEventId(long eventId) {
+ this.eventId = eventId;
+ }
+
+ @Override
+ public String toString() {
+ return "TripFailedWarning{" +
+ "eventId=" + eventId +
+ '}';
+ }
+}
diff --git a/ass3-event/src/main/java/dst/ass3/event/model/events/Warning.java b/ass3-event/src/main/java/dst/ass3/event/model/events/Warning.java
new file mode 100644
index 0000000..09ffac2
--- /dev/null
+++ b/ass3-event/src/main/java/dst/ass3/event/model/events/Warning.java
@@ -0,0 +1,37 @@
+package dst.ass3.event.model.events;
+
+import java.io.Serializable;
+
+import dst.ass3.event.model.domain.Region;
+
+/**
+ * Base class for region warnings.
+ */
+public abstract class Warning implements Serializable {
+
+ private static final long serialVersionUID = 273266717303711974L;
+
+ private Region region;
+
+ public Warning() {
+ }
+
+ public Warning(Region region) {
+ this.region = region;
+ }
+
+ public Region getRegion() {
+ return region;
+ }
+
+ public void setRegion(Region region) {
+ this.region = region;
+ }
+
+ @Override
+ public String toString() {
+ return "Warning{" +
+ "region='" + region + '\'' +
+ '}';
+ }
+}
diff --git a/ass3-event/src/main/resources/executionPlan.json b/ass3-event/src/main/resources/executionPlan.json
new file mode 100644
index 0000000..afd691b
--- /dev/null
+++ b/ass3-event/src/main/resources/executionPlan.json
@@ -0,0 +1 @@
+// TODO: add the data from the execution plan export
diff --git a/ass3-event/src/main/resources/logback.xml b/ass3-event/src/main/resources/logback.xml
new file mode 100644
index 0000000..e423131
--- /dev/null
+++ b/ass3-event/src/main/resources/logback.xml
@@ -0,0 +1,16 @@
+
+
+
+
+
+ %d{yyyy-MM-dd HH:mm:ss.SSS} - %highlight(%5p) [%12.12thread] %cyan(%-40.40logger{39}): %m%n
+
+
+
+
+
+
+
+
+
+
diff --git a/ass3-event/src/test/java/dst/ass3/event/Ass3EventTestBase.java b/ass3-event/src/test/java/dst/ass3/event/Ass3EventTestBase.java
new file mode 100644
index 0000000..87722ab
--- /dev/null
+++ b/ass3-event/src/test/java/dst/ass3/event/Ass3EventTestBase.java
@@ -0,0 +1,69 @@
+package dst.ass3.event;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.MoreExecutors;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.After;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Ass3EventTestBase.
+ */
+public abstract class Ass3EventTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Ass3EventTestBase.class);
+
+ protected EventPublisher publisher;
+ protected StreamExecutionEnvironment flink;
+ protected ExecutorService executor;
+
+ private static EventPublisher previousPublisher;
+
+ @Before
+ public void setUpResources() throws Exception {
+ executor = Executors.newCachedThreadPool();
+
+ if (previousPublisher != null) {
+ previousPublisher.close();
+ }
+
+ publisher = createEventPublisher();
+ previousPublisher = publisher;
+ publisher.start();
+
+ flink = createStreamExecutionEnvironment();
+ }
+
+ @After
+ public void tearDownResources() throws Exception {
+ publisher.close();
+ MoreExecutors.shutdownAndAwaitTermination(executor, 5, TimeUnit.SECONDS);
+ previousPublisher = null;
+ }
+
+ protected EventPublisher createEventPublisher() {
+ return new EventPublisher(Constants.EVENT_PUBLISHER_PORT);
+ }
+
+ protected StreamExecutionEnvironment createStreamExecutionEnvironment() {
+ return StreamExecutionEnvironment.createLocalEnvironment(1);
+ }
+
+ protected static void sleep(long ms) {
+ try {
+ Thread.sleep(ms);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+
+ protected static long now() {
+ return System.currentTimeMillis();
+ }
+
+}
diff --git a/ass3-event/src/test/java/dst/ass3/event/Ass3EventTestSuite.java b/ass3-event/src/test/java/dst/ass3/event/Ass3EventTestSuite.java
new file mode 100644
index 0000000..21d8fe7
--- /dev/null
+++ b/ass3-event/src/test/java/dst/ass3/event/Ass3EventTestSuite.java
@@ -0,0 +1,23 @@
+package dst.ass3.event;
+
+import dst.ass3.event.tests.Ass3_3_2Test;
+import dst.ass3.event.tests.Ass3_3_3Test;
+import dst.ass3.event.tests.Ass3_3_4Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+import org.junit.runners.Suite.SuiteClasses;
+
+import dst.ass3.event.tests.Ass3_3_1Test;
+
+/**
+ * Ass3EventTestSuite.
+ */
+@RunWith(Suite.class)
+@SuiteClasses({
+ Ass3_3_1Test.class,
+ Ass3_3_2Test.class,
+ Ass3_3_3Test.class,
+ Ass3_3_4Test.class
+})
+public class Ass3EventTestSuite {
+}
diff --git a/ass3-event/src/test/java/dst/ass3/event/EventProcessingTestBase.java b/ass3-event/src/test/java/dst/ass3/event/EventProcessingTestBase.java
new file mode 100644
index 0000000..4873649
--- /dev/null
+++ b/ass3-event/src/test/java/dst/ass3/event/EventProcessingTestBase.java
@@ -0,0 +1,98 @@
+package dst.ass3.event;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import dst.ass3.event.model.events.*;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * EventProcessingTestBase.
+ */
+public class EventProcessingTestBase extends Ass3EventTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(EventProcessingTestBase.class);
+
+ @Rule
+ public Timeout timeout = new Timeout(15, TimeUnit.SECONDS);
+
+ private IEventProcessingEnvironment epe;
+
+ protected StaticQueueSink lifecycleEvents;
+ protected StaticQueueSink matchingDurations;
+ protected StaticQueueSink averageMatchingDurations;
+ protected StaticQueueSink matchingTimeoutWarnings;
+ protected StaticQueueSink tripFailedWarnings;
+ protected StaticQueueSink alerts;
+
+ @Before
+ public void setUpEnvironment() throws Exception {
+ epe = EventProcessingFactory.createEventProcessingEnvironment();
+
+ assertNotNull("#createEventProcessingEnvironment() not implemented", epe);
+
+ lifecycleEvents = new StaticQueueSink<>("lifecycleEvents");
+ matchingDurations = new StaticQueueSink<>("matchingDurations");
+ averageMatchingDurations = new StaticQueueSink<>("averageMatchingDurations");
+ matchingTimeoutWarnings = new StaticQueueSink<>("matchingTimeoutWarnings");
+ tripFailedWarnings = new StaticQueueSink<>("tripFailedWarnings");
+ alerts = new StaticQueueSink<>("alerts");
+
+ epe.setLifecycleEventStreamSink(lifecycleEvents);
+ epe.setMatchingDurationStreamSink(matchingDurations);
+ epe.setAverageMatchingDurationStreamSink(averageMatchingDurations);
+ epe.setMatchingTimeoutWarningStreamSink(matchingTimeoutWarnings);
+ epe.setTripFailedWarningStreamSink(tripFailedWarnings);
+ epe.setAlertStreamSink(alerts);
+ epe.setMatchingDurationTimeout(Time.seconds(15));
+ }
+
+ public JobExecutionResult initAndExecute() throws Exception {
+ return initAndExecute(null);
+ }
+
+ public JobExecutionResult initAndExecute(Consumer initializer) throws Exception {
+ try {
+ if (initializer != null) {
+ initializer.accept(epe);
+ }
+ LOG.info("Initializing StreamExecutionEnvironment with {}", epe);
+ epe.initialize(flink);
+ } catch (Exception e) {
+ LOG.error("Error while initializing StreamExecutionEnvironment", e);
+ throw e;
+ }
+
+ try {
+ LOG.info("Executing flink {}", flink);
+ return flink.execute();
+ } catch (Exception e) {
+ LOG.error("Error while executing flink", e);
+ throw e;
+ }
+ }
+
+ public Future initAndExecuteAsync(Consumer initializer) {
+ return executor.submit(() -> initAndExecute(initializer));
+ }
+
+ public Future initAndExecuteAsync() {
+ return executor.submit(() -> initAndExecute());
+ }
+
+ @After
+ public void tearDownCollectors() throws Exception {
+ StaticQueueSink.clearAll();
+ }
+
+}
diff --git a/ass3-event/src/test/java/dst/ass3/event/StaticQueueSink.java b/ass3-event/src/test/java/dst/ass3/event/StaticQueueSink.java
new file mode 100644
index 0000000..805cd8f
--- /dev/null
+++ b/ass3-event/src/test/java/dst/ass3/event/StaticQueueSink.java
@@ -0,0 +1,83 @@
+package dst.ass3.event;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+/**
+ * A SinkFunction that collects objects into a queue located in a shared global state. Each collector accesses a
+ * specific key in the shared state.
+ *
+ * @param the sink input type
+ */
+public class StaticQueueSink implements SinkFunction {
+
+ private static final long serialVersionUID = -3965500756295835669L;
+
+ private static Map> state = new ConcurrentHashMap<>();
+
+ private String key;
+
+ public StaticQueueSink(String key) {
+ this.key = key;
+ }
+
+ @Override
+ public void invoke(T value, Context context) throws Exception {
+ get().add(value);
+ }
+
+ public void clear() {
+ get().clear();
+ }
+
+ public List take(int n) {
+ List list = new ArrayList<>(n);
+
+ for (int i = 0; i < n; i++) {
+ try {
+ list.add(get().take());
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted while accessing queue", e);
+ }
+ }
+
+ return list;
+ }
+
+ public T take() {
+ try {
+ return get().take();
+ } catch (InterruptedException e) {
+ return null;
+ }
+ }
+
+ public T poll(long ms) {
+ try {
+ return get().poll(ms, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ return null;
+ }
+ }
+
+ public synchronized BlockingQueue get() {
+ return get(key);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static BlockingQueue get(String key) {
+ return (BlockingQueue) state.computeIfAbsent(key, k -> new LinkedBlockingQueue<>());
+ }
+
+ public static void clearAll() {
+ state.clear();
+ }
+}
diff --git a/ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_1Test.java b/ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_1Test.java
new file mode 100644
index 0000000..4335481
--- /dev/null
+++ b/ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_1Test.java
@@ -0,0 +1,152 @@
+package dst.ass3.event.tests;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayOutputStream;
+import java.io.NotSerializableException;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import dst.ass3.event.dto.TripEventInfoDTO;
+import dst.ass3.event.model.domain.ITripEventInfo;
+import dst.ass3.event.model.domain.Region;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import dst.ass3.event.Ass3EventTestBase;
+import dst.ass3.event.EventProcessingFactory;
+import dst.ass3.event.IEventSourceFunction;
+import dst.ass3.event.model.domain.TripState;
+
+public class Ass3_3_1Test extends Ass3EventTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Ass3_3_1Test.class);
+
+ @Rule
+ public Timeout timeout = new Timeout(15, TimeUnit.SECONDS);
+
+ private IEventSourceFunction sourceFunction;
+
+ @Before
+ public void setUp() throws Exception {
+ sourceFunction = EventProcessingFactory.createEventSourceFunction();
+ assertNotNull("EventProcessingFactory#createEventSourceFunction() not implemented", sourceFunction);
+ }
+
+ @Test
+ public void open_shouldConnectToSubscriber() throws Exception {
+ assertEquals(
+ "IEventSourceFunction should not be connected upon construction",
+ 0, publisher.getConnectedClientCount()
+ );
+
+ sourceFunction.open(new Configuration());
+ publisher.waitForClients();
+
+ assertEquals(
+ "Expected IEventSourceFunction to connect to publisher after open is called",
+ 1, publisher.getConnectedClientCount()
+ );
+ }
+
+ @Test
+ public void run_shouldCollectPublishedEvents() throws Exception {
+ sourceFunction.open(new Configuration());
+ publisher.waitForClients();
+
+ Future> result = executor.submit(() -> {
+ MockContext ctx = new MockContext<>();
+ LOG.info("Running IEventSourceFunction with MockContext");
+ sourceFunction.run(ctx);
+ LOG.info("Done running IEventSourceFunction, returning collected events");
+ return ctx.collected;
+ });
+
+ publisher.publish(new TripEventInfoDTO(1L, 0L, TripState.CREATED, Region.AT_VIENNA));
+ publisher.publish(new TripEventInfoDTO(2L, 0L, TripState.CREATED, Region.DE_BERLIN));
+
+ sleep(1000);
+
+ LOG.info("Calling cancel on SourceFunction");
+ sourceFunction.cancel();
+
+ LOG.info("Dropping subscriber connections");
+ publisher.dropClients();
+
+ LOG.info("Calling close on SourceFunction");
+ sourceFunction.close();
+
+ List collected = result.get();
+ assertEquals(2, collected.size());
+
+ ITripEventInfo e0 = collected.get(0);
+ ITripEventInfo e1 = collected.get(1);
+
+ assertEquals(1L, e0.getTripId(), 0);
+ assertEquals(2L, e1.getTripId(), 0);
+ }
+
+ @Test
+ public void shouldBeSerializable() throws Exception {
+ try (ObjectOutputStream out = new ObjectOutputStream(new ByteArrayOutputStream())) {
+ out.writeObject(sourceFunction);
+ out.flush();
+ } catch (NotSerializableException e) {
+ fail("Implementation of IEventSourceFunction is not serializable");
+ }
+ }
+
+ private static class MockContext implements SourceFunction.SourceContext {
+
+ private final Object checkpointLock = new Object();
+
+ private List collected = new ArrayList<>();
+
+ public List getCollected() {
+ return collected;
+ }
+
+ @Override
+ public void collect(T element) {
+ collected.add(element);
+ }
+
+ @Override
+ public void collectWithTimestamp(T element, long timestamp) {
+ collected.add(element);
+ }
+
+ @Override
+ public void emitWatermark(Watermark mark) {
+
+ }
+
+ @Override
+ public void markAsTemporarilyIdle() {
+
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return checkpointLock;
+ }
+
+ @Override
+ public void close() {
+
+ }
+ }
+
+}
diff --git a/ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_2Test.java b/ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_2Test.java
new file mode 100644
index 0000000..faa0b44
--- /dev/null
+++ b/ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_2Test.java
@@ -0,0 +1,89 @@
+package dst.ass3.event.tests;
+
+import dst.ass3.event.EventProcessingTestBase;
+import dst.ass3.event.dto.TripEventInfoDTO;
+import dst.ass3.event.model.domain.Region;
+import dst.ass3.event.model.events.LifecycleEvent;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Future;
+
+import static dst.ass3.event.model.domain.TripState.CREATED;
+import static dst.ass3.event.model.domain.TripState.MATCHED;
+import static dst.ass3.event.model.domain.TripState.QUEUED;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class Ass3_3_2Test extends EventProcessingTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Ass3_3_2Test.class);
+
+ @Test
+ public void lifecycleEventStream_worksCorrectly() throws Exception {
+ // run flink in new thread
+ Future flinkExecution = initAndExecuteAsync();
+
+ // wait until a subscriber connects
+ LOG.info("Waiting for subscribers to connect");
+ publisher.waitForClients();
+
+ LifecycleEvent event;
+
+ long then = System.currentTimeMillis();
+
+ // causes LifecycleEvent 0
+ LOG.info("Publishing e0");
+ publisher.publish(new TripEventInfoDTO(0L, then, CREATED, Region.AT_VIENNA));
+
+ LOG.info("Collecting LifecycleEvent for e0");
+ event = lifecycleEvents.take();
+ LOG.info("Round-trip took {}ms", System.currentTimeMillis() - then);
+
+ assertEquals("Event ID not set correctly", 0L, event.getTripId());
+ assertEquals("State not set correctly", CREATED, event.getState());
+ assertEquals("Region not set correctly", Region.AT_VIENNA, event.getRegion());
+ assertThat("Timestamp not set correctly", event.getTimestamp(), Matchers.greaterThanOrEqualTo(then));
+ assertThat("Timestamp not set correctly", event.getTimestamp(),
+ Matchers.lessThanOrEqualTo(System.currentTimeMillis()));
+
+ // should be filtered
+ LOG.info("Publishing e1, should be filtered");
+ publisher.publish(new TripEventInfoDTO(1L, now(), CREATED, null));
+
+ assertNull("Events without a region should be filtered", lifecycleEvents.poll(500));
+
+ // causes LifecycleEvent 1
+ LOG.info("Publishing e2");
+ publisher.publish(new TripEventInfoDTO(2L, now(), QUEUED, Region.DE_BERLIN));
+
+ LOG.info("Collecting LifecycleEvent for e2");
+ event = lifecycleEvents.take();
+ assertEquals(2L, event.getTripId());
+ assertEquals(QUEUED, event.getState());
+
+ // should be filtered
+ LOG.info("Publishing e3, should be filtered");
+ publisher.publish(new TripEventInfoDTO(3L, now(), CREATED, null));
+ assertNull("Events without a region should be filtered", lifecycleEvents.poll(500));
+
+ // causes LifecycleEvent 2
+ LOG.info("Publishing e4");
+ publisher.publish(new TripEventInfoDTO(4L, now(), MATCHED, Region.AT_LINZ));
+
+ LOG.info("Collecting LifecycleEvent for e4");
+ event = lifecycleEvents.take();
+ assertEquals(4L, event.getTripId());
+ assertEquals(MATCHED, event.getState());
+
+ // disconnect subscribers
+ publisher.dropClients();
+
+ // wait for execution to end
+ flinkExecution.get();
+ }
+}
diff --git a/ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_3Test.java b/ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_3Test.java
new file mode 100644
index 0000000..cf1a17b
--- /dev/null
+++ b/ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_3Test.java
@@ -0,0 +1,211 @@
+package dst.ass3.event.tests;
+
+import dst.ass3.event.EventProcessingTestBase;
+import dst.ass3.event.dto.TripEventInfoDTO;
+import dst.ass3.event.model.domain.Region;
+import dst.ass3.event.model.events.LifecycleEvent;
+import dst.ass3.event.model.events.MatchingDuration;
+import dst.ass3.event.model.events.MatchingTimeoutWarning;
+import dst.ass3.event.model.events.TripFailedWarning;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Future;
+
+import static dst.ass3.event.model.domain.TripState.*;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.*;
+
+public class Ass3_3_3Test extends EventProcessingTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Ass3_3_3Test.class);
+
+ @Test
+ public void matchingDurations_areCalculatedCorrectly() throws Exception {
+ // tests whether duration calculation and stream keying works correctly
+ // expects LifecycleEvent stream to work correctly
+
+ Future flinkExecution = initAndExecuteAsync();
+
+ LOG.info("Waiting for subscribers to connect");
+ publisher.waitForClients();
+
+ LifecycleEvent e1Start;
+ LifecycleEvent e2Start;
+ LifecycleEvent e1End;
+ LifecycleEvent e2End;
+
+ publisher.publish(5, t -> new TripEventInfoDTO(1L, t, CREATED, Region.AT_VIENNA)); // 1 starts before 2
+ publisher.publish(5, t -> new TripEventInfoDTO(2L, t, CREATED, Region.AT_VIENNA));
+ publisher.publish(5, t -> new TripEventInfoDTO(3L, t, CREATED, Region.AT_VIENNA)); // 3 never finishes
+
+ LOG.info("Waiting for LifecycleEvent for event 1 being CREATED");
+ e1Start = lifecycleEvents.take();
+ LOG.info("Waiting for LifecycleEvent for event 2 being CREATED");
+ e2Start = lifecycleEvents.take();
+ LOG.info("Waiting for LifecycleEvent for event 3 being CREATED");
+ lifecycleEvents.take();
+
+ sleep(500);
+ publisher.publish(5, t -> new TripEventInfoDTO(1L, t, QUEUED, Region.AT_VIENNA));
+ publisher.publish(5, t -> new TripEventInfoDTO(2L, t, QUEUED, Region.AT_VIENNA));
+
+ LOG.info("Waiting for LifecycleEvent for event 1 being QUEUED");
+ lifecycleEvents.take();
+ LOG.info("Waiting for LifecycleEvent for event 2 being QUEUED");
+ lifecycleEvents.take();
+
+ sleep(500); // event 2 took about ~1000ms
+ publisher.publish(new TripEventInfoDTO(2L, now(), MATCHED, Region.AT_VIENNA)); // 2 finishes before 1
+
+ LOG.info("Waiting for LifecycleEvent for event 2 being MATCHED");
+ e2End = lifecycleEvents.take();
+
+ sleep(500); // event 1 took about ~1500ms
+ publisher.publish(new TripEventInfoDTO(1L, now(), MATCHED, Region.AT_VIENNA));
+
+ LOG.info("Waiting for LifecycleEvent for event 1 being MATCHED");
+ e1End = lifecycleEvents.take();
+
+ LOG.info("Collecting MatchingDuration event for event 2");
+ MatchingDuration d0 = matchingDurations.take();
+
+ LOG.info("Collecting MatchingDuration event for event 1");
+ MatchingDuration d1 = matchingDurations.take();
+
+ assertEquals("Expected event 2 to finish first", 2L, d0.getEventId()); // event 2 finished before 1
+ assertEquals("Expected event 1 to finish last", 1L, d1.getEventId());
+
+ assertThat("Expected MatchingDuration to be >= 0", d0.getDuration(), greaterThan(0L));
+ assertThat("Expected MatchingDuration to be >= 0", d1.getDuration(), greaterThan(0L));
+
+ assertEquals("MatchingDuration was not calculated from LifecycleEvents correctly",
+ e2End.getTimestamp() - e2Start.getTimestamp(), d0.getDuration(), 100);
+
+ assertEquals("MatchingDuration was not calculated from LifecycleEvents correctly",
+ e1End.getTimestamp() - e1Start.getTimestamp(), d1.getDuration(), 100);
+
+ publisher.dropClients();
+ flinkExecution.get();
+ }
+
+ @Test
+ public void durationsWithInterleavedEvents_areCalculatedCorrectly() throws Exception {
+ // tests whether CEP rule is tolerant towards multiple state changes between QUEUED and MATCHED
+
+ Future flinkExecution = initAndExecuteAsync(e ->
+ e.setMatchingDurationTimeout(Time.seconds(1))
+ );
+
+ LOG.info("Waiting for subscribers to connect");
+ publisher.waitForClients();
+
+ publisher.publish(5, t -> new TripEventInfoDTO(1L, t, CREATED, Region.AT_VIENNA));
+ publisher.publish(5, t -> new TripEventInfoDTO(1L, t, QUEUED, Region.AT_VIENNA));
+ publisher.publish(5, t -> new TripEventInfoDTO(2L, t, CREATED, Region.AT_VIENNA)); // never finishes
+
+ // change state several times (tests another aspect of the CEP rule)
+ publisher.publish(5, t -> new TripEventInfoDTO(1L, t, MATCHED, Region.AT_VIENNA));
+ publisher.publish(5, t -> new TripEventInfoDTO(1L, t, QUEUED, Region.AT_VIENNA));
+ publisher.publish(5, t -> new TripEventInfoDTO(1L, t, MATCHED, Region.AT_VIENNA));
+ publisher.publish(5, t -> new TripEventInfoDTO(1L, t, QUEUED, Region.AT_VIENNA));
+
+ publisher.publish(5, t -> new TripEventInfoDTO(1L, t, MATCHED, Region.AT_VIENNA));
+
+ publisher.dropClients();
+ flinkExecution.get();
+
+ List result = new ArrayList<>(matchingDurations.get());
+ assertEquals("Expected one event to have finished", 1, result.size());
+
+ MatchingDuration d0 = result.get(0);
+ assertEquals("Expected event 1 to have finished", 1L, d0.getEventId());
+ }
+
+ @Test
+ public void timeoutWarnings_areEmittedCorrectly() throws Exception {
+ Future flinkExecution = initAndExecuteAsync(e -> {
+ e.setMatchingDurationTimeout(Time.seconds(1));
+ });
+
+ LOG.info("Waiting for subscribers to connect");
+ publisher.waitForClients();
+
+ publisher.publish(new TripEventInfoDTO(1L, now(), CREATED, Region.AT_VIENNA)); // never finishes
+
+ sleep(100);
+ publisher.publish(new TripEventInfoDTO(2L, now(), CREATED, Region.AT_VIENNA)); // never finishes
+
+ // confounding event
+ sleep(100);
+ publisher.publish(new TripEventInfoDTO(1L, now(), QUEUED, Region.AT_VIENNA));
+
+ sleep(1500); // wait for event to time out
+
+ publisher.dropClients();
+
+ LOG.info("Waiting for Flink execution to end");
+ flinkExecution.get();
+
+ LOG.info("Collecting timeout warning for event 1");
+ MatchingTimeoutWarning w1 = matchingTimeoutWarnings.take();
+
+ LOG.info("Collecting timeout warning for event 2");
+ MatchingTimeoutWarning w2 = matchingTimeoutWarnings.take();
+
+ assertEquals("Expected event 1 to time out first", 1L, w1.getTripId());
+ assertEquals("Expected event 2 to time out second", 2L, w2.getTripId());
+ }
+
+ @Test
+ public void tripFailedWarnings_areEmittedCorrectly() throws Exception {
+ Future flinkExecution = initAndExecuteAsync();
+
+ LOG.info("Waiting for subscribers to connect");
+ publisher.waitForClients();
+
+ publisher.publish(0, t -> new TripEventInfoDTO(1L, t, CREATED, Region.AT_VIENNA));
+ publisher.publish(0, t -> new TripEventInfoDTO(1L, t, QUEUED, Region.AT_VIENNA));
+ publisher.publish(0, t -> new TripEventInfoDTO(2L, t, CREATED, Region.AT_VIENNA));
+ publisher.publish(0, t -> new TripEventInfoDTO(2L, t, QUEUED, Region.AT_VIENNA));
+
+ // event 1 fail #1
+ publisher.publish(5, t -> new TripEventInfoDTO(1L, t, MATCHED, Region.AT_VIENNA));
+ publisher.publish(5, t -> new TripEventInfoDTO(1L, t, QUEUED, Region.AT_VIENNA));
+
+ // event 2 fail #1
+ publisher.publish(5, t -> new TripEventInfoDTO(2L, t, MATCHED, Region.AT_VIENNA));
+ publisher.publish(5, t -> new TripEventInfoDTO(2L, t, QUEUED, Region.AT_VIENNA));
+
+ // event 1 fail #2
+ publisher.publish(5, t -> new TripEventInfoDTO(1L, t, MATCHED, Region.AT_VIENNA));
+ publisher.publish(5, t -> new TripEventInfoDTO(1L, t, QUEUED, Region.AT_VIENNA));
+
+ // event 2 fail #2 and then success
+ publisher.publish(5, t -> new TripEventInfoDTO(2L, t, MATCHED, Region.AT_VIENNA));
+ publisher.publish(5, t -> new TripEventInfoDTO(2L, t, QUEUED, Region.AT_VIENNA));
+ publisher.publish(5, t -> new TripEventInfoDTO(2L, t, MATCHED, Region.AT_VIENNA));
+
+ LOG.info("Checking that no TripFailedWarning was issued yet");
+ assertNull(tripFailedWarnings.poll(500));
+
+ LOG.info("Triggering third failure");
+ // event 1 fail #3
+ publisher.publish(5, t -> new TripEventInfoDTO(1L, t, MATCHED, Region.AT_VIENNA));
+ publisher.publish(5, t -> new TripEventInfoDTO(1L, t, QUEUED, Region.AT_VIENNA));
+
+ LOG.info("Waiting for TripFailedWarning for event 1");
+ TripFailedWarning warning = tripFailedWarnings.take();
+ assertEquals(1L, warning.getEventId());
+ assertEquals(Region.AT_VIENNA, warning.getRegion());
+
+ publisher.dropClients();
+ flinkExecution.get();
+ }
+}
diff --git a/ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_4Test.java b/ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_4Test.java
new file mode 100644
index 0000000..6a99d1e
--- /dev/null
+++ b/ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_4Test.java
@@ -0,0 +1,308 @@
+package dst.ass3.event.tests;
+
+import dst.ass3.event.EventProcessingTestBase;
+import dst.ass3.event.dto.TripEventInfoDTO;
+import dst.ass3.event.model.domain.Region;
+import dst.ass3.event.model.events.*;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+import static dst.ass3.event.model.domain.TripState.*;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.*;
+
+public class Ass3_3_4Test extends EventProcessingTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Ass3_3_4Test.class);
+
+ @Test
+ public void multipleTripFailures_triggerAlert() throws Exception {
+ // checks that the window works correctly
+ // expects TripFailedWarning stream to work correctly
+
+ Future flinkExecution = initAndExecuteAsync();
+
+ LOG.info("Waiting for subscribers to connect");
+ publisher.waitForClients();
+
+ Consumer causeWarning = (eventId) -> {
+ publisher.publish(5, t -> new TripEventInfoDTO(eventId, t, MATCHED, Region.AT_VIENNA));
+ publisher.publish(5, t -> new TripEventInfoDTO(eventId, t, QUEUED, Region.AT_VIENNA));
+ publisher.publish(5, t -> new TripEventInfoDTO(eventId, t, MATCHED, Region.AT_VIENNA));
+ publisher.publish(5, t -> new TripEventInfoDTO(eventId, t, QUEUED, Region.AT_VIENNA));
+ publisher.publish(5, t -> new TripEventInfoDTO(eventId, t, MATCHED, Region.AT_VIENNA));
+ publisher.publish(5, t -> new TripEventInfoDTO(eventId, t, QUEUED, Region.AT_VIENNA));
+ };
+
+ // all of these events will fail
+ publisher.publish(new TripEventInfoDTO(1L, now(), CREATED, Region.AT_VIENNA));
+ publisher.publish(new TripEventInfoDTO(1L, now(), QUEUED, Region.AT_VIENNA));
+ publisher.publish(new TripEventInfoDTO(2L, now(), CREATED, Region.AT_VIENNA));
+ publisher.publish(new TripEventInfoDTO(2L, now(), QUEUED, Region.AT_VIENNA));
+ publisher.publish(new TripEventInfoDTO(3L, now(), CREATED, Region.AT_VIENNA));
+ publisher.publish(new TripEventInfoDTO(3L, now(), QUEUED, Region.AT_VIENNA));
+ publisher.publish(new TripEventInfoDTO(4L, now(), CREATED, Region.AT_VIENNA));
+ publisher.publish(new TripEventInfoDTO(4L, now(), QUEUED, Region.AT_VIENNA));
+ publisher.publish(new TripEventInfoDTO(5L, now(), CREATED, Region.AT_VIENNA));
+ publisher.publish(new TripEventInfoDTO(5L, now(), QUEUED, Region.AT_VIENNA));
+ publisher.publish(new TripEventInfoDTO(6L, now(), CREATED, Region.AT_VIENNA));
+ publisher.publish(new TripEventInfoDTO(6L, now(), QUEUED, Region.AT_VIENNA));
+
+ // warning 3 warnings
+ causeWarning.accept(1L);
+ causeWarning.accept(2L);
+ causeWarning.accept(3L);
+
+ LOG.info("Collecting alert for first three warnings");
+ assertNotNull(alerts.take());
+
+ LOG.info("Checking that the fourth warning does not trigger an alert");
+ causeWarning.accept(4L);
+ assertNull(alerts.poll(500));
+
+ LOG.info("Checking that the fifth warning does not trigger an alert");
+ causeWarning.accept(5L);
+ assertNull(alerts.poll(500));
+
+ LOG.info("Checking that the sixth warning triggered an alert");
+ causeWarning.accept(6L);
+ assertNotNull(alerts.take());
+
+ publisher.dropClients();
+ flinkExecution.get();
+ }
+
+ @Test
+ public void matchingFailuresAndTimeouts_triggerAlert() throws Exception {
+ // checks whether keying works correctly, and whether Warning streams are unioned correctly
+ // expects both warning streams to work correctly
+
+ Future flinkExecution = initAndExecuteAsync(e ->
+ e.setMatchingDurationTimeout(Time.seconds(3))
+ );
+
+ LOG.info("Waiting for subscribers to connect");
+ publisher.waitForClients();
+
+ BiConsumer causeWarning = (eventId, region) -> {
+ publisher.publish(5, t -> new TripEventInfoDTO(eventId, t, MATCHED, region));
+ publisher.publish(5, t -> new TripEventInfoDTO(eventId, t, QUEUED, region));
+ publisher.publish(5, t -> new TripEventInfoDTO(eventId, t, MATCHED, region));
+ publisher.publish(5, t -> new TripEventInfoDTO(eventId, t, QUEUED, region));
+ publisher.publish(5, t -> new TripEventInfoDTO(eventId, t, MATCHED, region));
+ publisher.publish(5, t -> new TripEventInfoDTO(eventId, t, QUEUED, region));
+ };
+
+ publisher.publish(new TripEventInfoDTO(1L, now(), CREATED, Region.AT_VIENNA)); // vienna e1 will fail
+ publisher.publish(new TripEventInfoDTO(1L, now(), QUEUED, Region.AT_VIENNA)); // vienna e1 will fail
+ publisher.publish(new TripEventInfoDTO(2L, now(), CREATED, Region.AT_VIENNA)); // vienna e2 will fail
+ publisher.publish(new TripEventInfoDTO(2L, now(), QUEUED, Region.AT_VIENNA)); // vienna e2 will fail
+ publisher.publish(new TripEventInfoDTO(3L, now(), CREATED, Region.AT_VIENNA)); // vienna e3 will time out
+ publisher.publish(new TripEventInfoDTO(3L, now(), QUEUED, Region.AT_VIENNA)); // vienna e3 will time out
+ publisher.publish(new TripEventInfoDTO(4L, now(), CREATED, Region.DE_BERLIN)); // berlin e4 will fail
+ publisher.publish(new TripEventInfoDTO(4L, now(), QUEUED, Region.DE_BERLIN)); // berlin e4 will fail
+
+ // s1 warning #1
+ causeWarning.accept(1L, Region.AT_VIENNA);
+
+ // s1 warning #2
+ causeWarning.accept(2L, Region.AT_VIENNA);
+
+ // s2 warning #1
+ causeWarning.accept(4L, Region.DE_BERLIN);
+
+ LOG.info("Checking that no alert has been issued yet");
+ assertNull(alerts.poll(500));
+
+ // make sure the other events don't time out
+ publisher.publish(new TripEventInfoDTO(1L, now(), MATCHED, Region.AT_VIENNA)); // vienna e1 will fail
+ publisher.publish(new TripEventInfoDTO(2L, now(), MATCHED, Region.AT_VIENNA)); // vienna e2 will fail
+ publisher.publish(new TripEventInfoDTO(4L, now(), MATCHED, Region.DE_BERLIN)); // berlin e4 will fail
+
+ sleep(4000); // waiting for e3 to time out
+
+ publisher.dropClients();
+ flinkExecution.get();
+
+ LOG.info("Collecting Alert event");
+ Alert alert = alerts.take();
+ assertEquals("Expected only a single alert", 0, alerts.get().size());
+
+ assertEquals(Region.AT_VIENNA, alert.getRegion());
+ assertEquals("An alert should comprise three warnings", 3, alert.getWarnings().size());
+
+ Warning w0 = alert.getWarnings().get(0);
+ Warning w1 = alert.getWarnings().get(1);
+ Warning w2 = alert.getWarnings().get(2);
+
+ assertThat(w0, instanceOf(TripFailedWarning.class));
+ assertThat(w1, instanceOf(TripFailedWarning.class));
+ assertThat(w2, instanceOf(MatchingTimeoutWarning.class));
+
+ assertEquals(Region.AT_VIENNA, w0.getRegion());
+ assertEquals(Region.AT_VIENNA, w1.getRegion());
+ assertEquals(Region.AT_VIENNA, w2.getRegion());
+ }
+
+ @Test
+ public void averageMatchingDurationWindow_worksCorrectly() throws Exception {
+ // makes sure the event is triggered at the correct instant
+
+ Future flinkExecution = initAndExecuteAsync();
+
+ LOG.info("Waiting for subscribers to connect");
+ publisher.waitForClients();
+
+ sleep(250);
+ publisher.publish(new TripEventInfoDTO(1L, now(), CREATED, Region.AT_VIENNA));
+ publisher.publish(new TripEventInfoDTO(2L, now(), CREATED, Region.AT_VIENNA));
+ publisher.publish(new TripEventInfoDTO(3L, now(), CREATED, Region.AT_VIENNA));
+ publisher.publish(new TripEventInfoDTO(4L, now(), CREATED, Region.AT_VIENNA));
+ publisher.publish(new TripEventInfoDTO(5L, now(), CREATED, Region.AT_VIENNA));
+ publisher.publish(new TripEventInfoDTO(6L, now(), CREATED, Region.AT_VIENNA));
+ publisher.publish(new TripEventInfoDTO(7L, now(), CREATED, Region.DE_BERLIN));
+
+ sleep(100);
+ publisher.publish(new TripEventInfoDTO(1L, now(), QUEUED, Region.AT_VIENNA));
+ publisher.publish(new TripEventInfoDTO(2L, now(), QUEUED, Region.AT_VIENNA));
+ publisher.publish(new TripEventInfoDTO(3L, now(), QUEUED, Region.AT_VIENNA));
+ publisher.publish(new TripEventInfoDTO(4L, now(), QUEUED, Region.AT_VIENNA));
+ publisher.publish(new TripEventInfoDTO(5L, now(), QUEUED, Region.AT_VIENNA));
+ publisher.publish(new TripEventInfoDTO(6L, now(), QUEUED, Region.AT_VIENNA));
+ publisher.publish(new TripEventInfoDTO(7L, now(), QUEUED, Region.DE_BERLIN));
+
+ sleep(100);
+ publisher.publish(new TripEventInfoDTO(1L, now(), MATCHED, Region.AT_VIENNA));
+ publisher.publish(new TripEventInfoDTO(2L, now(), MATCHED, Region.AT_VIENNA));
+ publisher.publish(new TripEventInfoDTO(3L, now(), MATCHED, Region.AT_VIENNA));
+ publisher.publish(new TripEventInfoDTO(4L, now(), MATCHED, Region.AT_VIENNA));
+
+ // the first four events should not trigger anything
+ LOG.info("Checking AverageMatchingDuration events after the first four MATCHED events of AT_VIENNA");
+ assertNull(averageMatchingDurations.poll(500));
+
+ // this event is from a different region
+ publisher.publish(new TripEventInfoDTO(7L, now(), MATCHED, Region.DE_BERLIN));
+ LOG.info("Checking AverageMatchingDuration events after the first MATCHED event of DE_BERLIN");
+ assertNull(averageMatchingDurations.poll(500));
+
+ // fifth event in s1 triggers the window operation
+ publisher.publish(new TripEventInfoDTO(5L, now(), MATCHED, Region.AT_VIENNA));
+ LOG.info("Collecting AverageMatchingDuration event for AT_VIENNA");
+ AverageMatchingDuration event = averageMatchingDurations.take();
+ assertNotNull(event);
+ assertEquals(Region.AT_VIENNA, event.getRegion());
+
+ // should be in a new window and therefore not trigger
+ publisher.publish(new TripEventInfoDTO(6L, now(), MATCHED, Region.AT_VIENNA));
+ LOG.info("Checking AverageMatchingDuration events after the sixth MATCHED event of AT_VIENNA");
+ assertNull(averageMatchingDurations.poll(500));
+
+ publisher.dropClients();
+
+ flinkExecution.get();
+ }
+
+ @Test
+ public void averageMatchingDurations_areCalculatedCorrectly() throws Exception {
+ // makes sure the keying works properly and that the calculation is done from MatchingDuration events
+ // requires MatchingDuration events to be calculated correctly
+
+ Future flinkExecution = initAndExecuteAsync();
+
+ LOG.info("Waiting for subscribers to connect");
+ publisher.waitForClients();
+
+ List viennaDurations = new ArrayList<>(5);
+ List berlinDurations = new ArrayList<>(5);
+
+ sleep(250);
+ publisher.publish(5, t -> new TripEventInfoDTO(1L, t, CREATED, Region.AT_VIENNA));
+ publisher.publish(5, t -> new TripEventInfoDTO(2L, t, CREATED, Region.AT_VIENNA));
+ publisher.publish(5, t -> new TripEventInfoDTO(3L, t, CREATED, Region.AT_VIENNA));
+ publisher.publish(5, t -> new TripEventInfoDTO(4L, t, CREATED, Region.AT_VIENNA));
+ publisher.publish(5, t -> new TripEventInfoDTO(5L, t, CREATED, Region.AT_VIENNA));
+
+ sleep(250);
+ publisher.publish(5, t -> new TripEventInfoDTO(6L, t, CREATED, Region.DE_BERLIN));
+ publisher.publish(5, t -> new TripEventInfoDTO(7L, t, CREATED, Region.DE_BERLIN));
+ publisher.publish(5, t -> new TripEventInfoDTO(8L, t, CREATED, Region.DE_BERLIN));
+ publisher.publish(5, t -> new TripEventInfoDTO(9L, t, CREATED, Region.DE_BERLIN));
+ publisher.publish(5, t -> new TripEventInfoDTO(10L, t, CREATED, Region.DE_BERLIN));
+
+ sleep(125);
+ publisher.publish(5, t -> new TripEventInfoDTO(6L, t, QUEUED, Region.DE_BERLIN));
+ publisher.publish(5, t -> new TripEventInfoDTO(7L, t, QUEUED, Region.DE_BERLIN));
+ publisher.publish(5, t -> new TripEventInfoDTO(8L, t, QUEUED, Region.DE_BERLIN));
+ publisher.publish(5, t -> new TripEventInfoDTO(9L, t, QUEUED, Region.DE_BERLIN));
+ publisher.publish(5, t -> new TripEventInfoDTO(10L, t, QUEUED, Region.DE_BERLIN));
+
+ sleep(125);
+ publisher.publish(5, t -> new TripEventInfoDTO(1L, t, QUEUED, Region.AT_VIENNA));
+ publisher.publish(5, t -> new TripEventInfoDTO(2L, t, QUEUED, Region.AT_VIENNA));
+ publisher.publish(5, t -> new TripEventInfoDTO(3L, t, QUEUED, Region.AT_VIENNA));
+ publisher.publish(5, t -> new TripEventInfoDTO(4L, t, QUEUED, Region.AT_VIENNA));
+ publisher.publish(5, t -> new TripEventInfoDTO(5L, t, QUEUED, Region.AT_VIENNA));
+
+ publisher.publish(5, t -> new TripEventInfoDTO(6L, t, MATCHED, Region.DE_BERLIN));
+ publisher.publish(5, t -> new TripEventInfoDTO(7L, t, MATCHED, Region.DE_BERLIN));
+ publisher.publish(5, t -> new TripEventInfoDTO(8L, t, MATCHED, Region.DE_BERLIN));
+
+ LOG.info("Collecting MatchingDuration events 1,2,3 for DE_BERLIN");
+ berlinDurations.addAll(matchingDurations.take(3));
+
+ sleep(500);
+ publisher.publish(5, t -> new TripEventInfoDTO(1L, t, MATCHED, Region.AT_VIENNA));
+ publisher.publish(5, t -> new TripEventInfoDTO(2L, t, MATCHED, Region.AT_VIENNA));
+ publisher.publish(5, t -> new TripEventInfoDTO(3L, t, MATCHED, Region.AT_VIENNA));
+
+ LOG.info("Collecting MatchingDuration events 1,2,3 for AT_VIENNA");
+ viennaDurations.addAll(matchingDurations.take(3));
+
+ publisher.publish(5, t -> new TripEventInfoDTO(9L, t, MATCHED, Region.DE_BERLIN));
+ publisher.publish(5, t -> new TripEventInfoDTO(10L, t, MATCHED, Region.DE_BERLIN));
+
+ LOG.info("Collecting MatchingDuration events 4,5 for DE_BERLIN");
+ berlinDurations.addAll(matchingDurations.take(2));
+
+ sleep(500);
+ publisher.publish(5, t -> new TripEventInfoDTO(4L, t, MATCHED, Region.AT_VIENNA));
+ publisher.publish(5, t -> new TripEventInfoDTO(5L, t, MATCHED, Region.AT_VIENNA));
+
+ LOG.info("Collecting MatchingDuration events 4,5 for AT_VIENNA");
+ viennaDurations.addAll(matchingDurations.take(2));
+
+ LOG.info("Collecting AverageMatchingDuration event for DE_BERLIN");
+ AverageMatchingDuration e0 = averageMatchingDurations.take(); // berlin
+
+ LOG.info("Collecting AverageMatchingDuration event for AT_VIENNA");
+ AverageMatchingDuration e1 = averageMatchingDurations.take(); // vienna
+
+ assertEquals("Expected calculation for berlin to have been triggered first", e0.getRegion(), Region.DE_BERLIN);
+ assertEquals("Expected calculation for vienna to have been triggered second", e1.getRegion(), Region.AT_VIENNA);
+
+ assertEquals("Wrong number of MatchingDuration events for AT_VIENNA", 5, viennaDurations.size());
+ assertEquals("Wrong number of MatchingDuration events for DE_BERLIN", 5, berlinDurations.size());
+
+ double viennaAvg = viennaDurations.stream().mapToLong(MatchingDuration::getDuration).average().orElse(-1);
+ double berlinAvg = berlinDurations.stream().mapToLong(MatchingDuration::getDuration).average().orElse(-1);
+
+ assertEquals("Average duration was not calculated from MatchingDuration events correctly", e0.getDuration(),
+ berlinAvg, 100);
+ assertEquals("Average duration was not calculated from MatchingDuration events correctly", e1.getDuration(),
+ viennaAvg, 100);
+
+ publisher.dropClients();
+ flinkExecution.get();
+ }
+}
diff --git a/ass3-event/src/test/resources/logback.xml b/ass3-event/src/test/resources/logback.xml
new file mode 100644
index 0000000..aa08bf5
--- /dev/null
+++ b/ass3-event/src/test/resources/logback.xml
@@ -0,0 +1,16 @@
+
+
+
+
+
+ %d{yyyy-MM-dd HH:mm:ss.SSS} - %highlight(%5p) [%12.12thread] %cyan(%-40.40logger{39}): %m%n
+
+
+
+
+
+
+
+
+
+
diff --git a/ass3-messaging/pom.xml b/ass3-messaging/pom.xml
new file mode 100644
index 0000000..5da1aa9
--- /dev/null
+++ b/ass3-messaging/pom.xml
@@ -0,0 +1,49 @@
+
+
+
+ 4.0.0
+
+
+ at.ac.tuwien.infosys.dst
+ dst
+ 2021.1
+ ..
+
+
+ ass3-messaging
+
+ DST :: Assignment 3 :: Messaging
+
+ jar
+
+
+
+ com.rabbitmq
+ amqp-client
+
+
+ com.rabbitmq
+ http-client
+
+
+ org.springframework
+ spring-web
+
+
+ org.apache.httpcomponents
+ httpclient
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+
+ org.springframework.boot
+ spring-boot-starter-amqp
+ test
+
+
+
+
diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/Constants.java b/ass3-messaging/src/main/java/dst/ass3/messaging/Constants.java
new file mode 100644
index 0000000..0055997
--- /dev/null
+++ b/ass3-messaging/src/main/java/dst/ass3/messaging/Constants.java
@@ -0,0 +1,38 @@
+package dst.ass3.messaging;
+
+/**
+ * Contains several constants related to the RabbitMQ infrastructure and expected names for queues, exchanges and
+ * routing keys.
+ */
+public final class Constants {
+
+ public static final String RMQ_HOST = "192.168.99.99";
+ public static final String RMQ_PORT = "5672";
+ public static final String RMQ_VHOST = "/";
+ public static final String RMQ_USER = "dst";
+ public static final String RMQ_PASSWORD = "dst";
+
+ public static final String RMQ_API_PORT = "15672";
+ public static final String RMQ_API_URL = "http://" + RMQ_HOST + ":" + RMQ_API_PORT + "/api/";
+
+ public static final String QUEUE_AT_VIENNA = "dst.at_vienna";
+ public static final String QUEUE_AT_LINZ = "dst.at_linz";
+ public static final String QUEUE_DE_BERLIN = "dst.de_berlin";
+
+
+ public static final String[] WORK_QUEUES = {
+ QUEUE_AT_VIENNA,
+ QUEUE_AT_LINZ,
+ QUEUE_DE_BERLIN
+ };
+
+ public static final String TOPIC_EXCHANGE = "dst.workers";
+
+ public static final String ROUTING_KEY_AT_VIENNA = "requests.at_vienna";
+ public static final String ROUTING_KEY_AT_LINZ = "requests.at_linz";
+ public static final String ROUTING_KEY_DE_BERLIN = "requests.de_berlin";
+
+ private Constants() {
+ // util class
+ }
+}
diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/GeoPoint.java b/ass3-messaging/src/main/java/dst/ass3/messaging/GeoPoint.java
new file mode 100644
index 0000000..11294bc
--- /dev/null
+++ b/ass3-messaging/src/main/java/dst/ass3/messaging/GeoPoint.java
@@ -0,0 +1,54 @@
+package dst.ass3.messaging;
+
+import java.util.Objects;
+
+public class GeoPoint {
+
+
+ private Double longitude;
+ private Double latitude;
+
+ public GeoPoint() {
+ }
+
+ public GeoPoint(Double longitude, Double latitude) {
+ this.longitude = longitude;
+ this.latitude = latitude;
+ }
+
+ public Double getLongitude() {
+ return longitude;
+ }
+
+ public void setLongitude(Double longitude) {
+ this.longitude = longitude;
+ }
+
+ public Double getLatitude() {
+ return latitude;
+ }
+
+ public void setLatitude(Double latitude) {
+ this.latitude = latitude;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ GeoPoint geoPoint = (GeoPoint) o;
+ return Objects.equals(getLongitude(), geoPoint.getLongitude()) &&
+ Objects.equals(getLatitude(), geoPoint.getLatitude());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getLongitude(), getLatitude());
+ }
+
+
+}
diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/IMessagingFactory.java b/ass3-messaging/src/main/java/dst/ass3/messaging/IMessagingFactory.java
new file mode 100644
index 0000000..d0ee0e1
--- /dev/null
+++ b/ass3-messaging/src/main/java/dst/ass3/messaging/IMessagingFactory.java
@@ -0,0 +1,21 @@
+package dst.ass3.messaging;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public interface IMessagingFactory extends Closeable {
+
+ IQueueManager createQueueManager();
+
+ IRequestGateway createRequestGateway();
+
+ IWorkloadMonitor createWorkloadMonitor();
+
+ /**
+ * Closes any resource the factory may create.
+ *
+ * @throws IOException propagated exceptions
+ */
+ @Override
+ void close() throws IOException;
+}
diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/IQueueManager.java b/ass3-messaging/src/main/java/dst/ass3/messaging/IQueueManager.java
new file mode 100644
index 0000000..542d0cb
--- /dev/null
+++ b/ass3-messaging/src/main/java/dst/ass3/messaging/IQueueManager.java
@@ -0,0 +1,28 @@
+package dst.ass3.messaging;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Responsible for creating and tearing down all necessary RabbitMQ queues or exchanges necessary for running the system.
+ */
+public interface IQueueManager extends Closeable {
+
+ /**
+ * Initializes all queues or topic exchanges necessary for running the system.
+ */
+ void setUp();
+
+ /**
+ * Removes all queues or topic exchanged associated with the system.
+ */
+ void tearDown();
+
+ /**
+ * Closes underlying conection or resources, if any.
+ *
+ * @throws IOException propagated exceptions
+ */
+ @Override
+ void close() throws IOException;
+}
diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/IRequestGateway.java b/ass3-messaging/src/main/java/dst/ass3/messaging/IRequestGateway.java
new file mode 100644
index 0000000..078fecc
--- /dev/null
+++ b/ass3-messaging/src/main/java/dst/ass3/messaging/IRequestGateway.java
@@ -0,0 +1,21 @@
+package dst.ass3.messaging;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public interface IRequestGateway extends Closeable {
+
+ /**
+ * Serializes and routes a request to the correct queue.
+ * @param request the request
+ */
+ void submitRequest(TripRequest request);
+
+ /**
+ * Closes any resources that may have been initialized (connections, channels, etc.)
+ *
+ * @throws IOException propagated exceptions
+ */
+ @Override
+ void close() throws IOException;
+}
diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/IWorkloadMonitor.java b/ass3-messaging/src/main/java/dst/ass3/messaging/IWorkloadMonitor.java
new file mode 100644
index 0000000..b868632
--- /dev/null
+++ b/ass3-messaging/src/main/java/dst/ass3/messaging/IWorkloadMonitor.java
@@ -0,0 +1,34 @@
+package dst.ass3.messaging;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+
+public interface IWorkloadMonitor extends Closeable {
+
+ /**
+ * Returns for each region the amount of waiting requests.
+ *
+ * @return a map
+ */
+ Map getRequestCount();
+
+ /**
+ * Returns the amount of workers for each region. This can be deduced from the amount of consumers to each
+ * queue.
+ *
+ * @return a map
+ */
+ Map getWorkerCount();
+
+ /**
+ * Returns for each region the average processing time of the last 10 recorded requests. The data comes from
+ * subscriptions to the respective topics.
+ *
+ * @return a map
+ */
+ Map getAverageProcessingTime();
+
+ @Override
+ void close() throws IOException;
+}
diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/Region.java b/ass3-messaging/src/main/java/dst/ass3/messaging/Region.java
new file mode 100644
index 0000000..ede8a28
--- /dev/null
+++ b/ass3-messaging/src/main/java/dst/ass3/messaging/Region.java
@@ -0,0 +1,7 @@
+package dst.ass3.messaging;
+
+public enum Region {
+ AT_VIENNA,
+ AT_LINZ,
+ DE_BERLIN
+}
diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/TripRequest.java b/ass3-messaging/src/main/java/dst/ass3/messaging/TripRequest.java
new file mode 100644
index 0000000..d7728ff
--- /dev/null
+++ b/ass3-messaging/src/main/java/dst/ass3/messaging/TripRequest.java
@@ -0,0 +1,71 @@
+package dst.ass3.messaging;
+
+import java.util.Objects;
+
+public class TripRequest {
+
+ private String id;
+ private Region region;
+ private GeoPoint pickup;
+
+ public TripRequest() {
+ }
+
+ public TripRequest(String id, Region region, GeoPoint pickup) {
+ this.id = id;
+ this.region = region;
+ this.pickup = pickup;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public Region getRegion() {
+ return region;
+ }
+
+ public void setRegion(Region region) {
+ this.region = region;
+ }
+
+ public GeoPoint getPickup() {
+ return pickup;
+ }
+
+ public void setPickup(GeoPoint pickup) {
+ this.pickup = pickup;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TripRequest that = (TripRequest) o;
+ return Objects.equals(getId(), that.getId()) &&
+ getRegion() == that.getRegion() &&
+ Objects.equals(getPickup(), that.getPickup());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getId(), getRegion(), getPickup());
+ }
+
+ @Override
+ public String toString() {
+ return "TripRequest{" +
+ "id='" + id + '\'' +
+ ", region=" + region +
+ ", pickup=" + pickup +
+ '}';
+ }
+}
diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/WorkerResponse.java b/ass3-messaging/src/main/java/dst/ass3/messaging/WorkerResponse.java
new file mode 100644
index 0000000..725d013
--- /dev/null
+++ b/ass3-messaging/src/main/java/dst/ass3/messaging/WorkerResponse.java
@@ -0,0 +1,76 @@
+package dst.ass3.messaging;
+
+import java.util.Objects;
+
+/**
+ * Message sent by a worker after it is finished processing a request.
+ */
+public class WorkerResponse {
+
+ /**
+ * The ID of the original {@link TripRequest}.
+ */
+ private String requestId;
+
+ /**
+ * The time it took to process the request (in milliseconds).
+ */
+ private Long processingTime;
+
+ /**
+ * The proposed driver
+ */
+ private Long driverId;
+
+ public String getRequestId() {
+ return requestId;
+ }
+
+ public void setRequestId(String requestId) {
+ this.requestId = requestId;
+ }
+
+ public Long getProcessingTime() {
+ return processingTime;
+ }
+
+ public void setProcessingTime(Long processingTime) {
+ this.processingTime = processingTime;
+ }
+
+ public Long getDriverId() {
+ return driverId;
+ }
+
+ public void setDriverId(Long driverId) {
+ this.driverId = driverId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ WorkerResponse that = (WorkerResponse) o;
+ return Objects.equals(getRequestId(), that.getRequestId()) &&
+ Objects.equals(getProcessingTime(), that.getProcessingTime()) &&
+ Objects.equals(getDriverId(), that.getDriverId());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getRequestId(), getProcessingTime(), getDriverId());
+ }
+
+ @Override
+ public String toString() {
+ return "WorkerResponse{" +
+ "requestId='" + requestId + '\'' +
+ ", processingTime=" + processingTime +
+ ", driverId=" + driverId +
+ '}';
+ }
+}
diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/impl/MessagingFactory.java b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/MessagingFactory.java
new file mode 100644
index 0000000..4ecc2c4
--- /dev/null
+++ b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/MessagingFactory.java
@@ -0,0 +1,32 @@
+package dst.ass3.messaging.impl;
+
+import dst.ass3.messaging.IMessagingFactory;
+import dst.ass3.messaging.IQueueManager;
+import dst.ass3.messaging.IRequestGateway;
+import dst.ass3.messaging.IWorkloadMonitor;
+
+public class MessagingFactory implements IMessagingFactory {
+
+ @Override
+ public IQueueManager createQueueManager() {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public IRequestGateway createRequestGateway() {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public IWorkloadMonitor createWorkloadMonitor() {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public void close() {
+ // implement if needed
+ }
+}
diff --git a/ass3-messaging/src/main/resources/logback.xml b/ass3-messaging/src/main/resources/logback.xml
new file mode 100644
index 0000000..9a6b351
--- /dev/null
+++ b/ass3-messaging/src/main/resources/logback.xml
@@ -0,0 +1,16 @@
+
+
+
+
+
+ %d{yyyy-MM-dd HH:mm:ss.SSS} - %highlight(%5p) [%12.12thread] %cyan(%-40.40logger{39}): %m%n
+
+
+
+
+
+
+
+
+
+
diff --git a/ass3-messaging/src/test/java/dst/ass3/messaging/RabbitResource.java b/ass3-messaging/src/test/java/dst/ass3/messaging/RabbitResource.java
new file mode 100644
index 0000000..b8ca8a2
--- /dev/null
+++ b/ass3-messaging/src/test/java/dst/ass3/messaging/RabbitResource.java
@@ -0,0 +1,51 @@
+package dst.ass3.messaging;
+
+import com.rabbitmq.http.client.Client;
+import org.junit.rules.ExternalResource;
+import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
+import org.springframework.amqp.rabbit.core.RabbitAdmin;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+
+import org.apache.http.auth.Credentials;
+
+public class RabbitResource extends ExternalResource {
+
+ private RabbitAdmin admin;
+ private Client manager;
+ private CachingConnectionFactory connectionFactory;
+ private RabbitTemplate client;
+
+ @Override
+ protected void before() throws Throwable {
+
+ manager = new Client(Constants.RMQ_API_URL, Constants.RMQ_USER, Constants.RMQ_PASSWORD);
+
+ connectionFactory = new CachingConnectionFactory(Constants.RMQ_HOST);
+ connectionFactory.setUsername(Constants.RMQ_USER);
+ connectionFactory.setPassword(Constants.RMQ_PASSWORD);
+
+ client = new RabbitTemplate(connectionFactory);
+ admin = new RabbitAdmin(connectionFactory);
+ }
+
+ @Override
+ protected void after() {
+ connectionFactory.destroy();
+ }
+
+ public Client getManager() {
+ return manager;
+ }
+
+ public RabbitTemplate getClient() {
+ return client;
+ }
+
+ public RabbitAdmin getAdmin() {
+ return admin;
+ }
+
+ public CachingConnectionFactory getConnectionFactory() {
+ return connectionFactory;
+ }
+}
diff --git a/ass3-messaging/src/test/java/dst/ass3/messaging/impl/Ass3_1_Suite.java b/ass3-messaging/src/test/java/dst/ass3/messaging/impl/Ass3_1_Suite.java
new file mode 100644
index 0000000..03a6159
--- /dev/null
+++ b/ass3-messaging/src/test/java/dst/ass3/messaging/impl/Ass3_1_Suite.java
@@ -0,0 +1,13 @@
+package dst.ass3.messaging.impl;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ QueueManagerTest.class,
+ RequestGatewayTest.class,
+ WorkloadMonitorTest.class
+})
+public class Ass3_1_Suite {
+}
diff --git a/ass3-messaging/src/test/java/dst/ass3/messaging/impl/QueueManagerTest.java b/ass3-messaging/src/test/java/dst/ass3/messaging/impl/QueueManagerTest.java
new file mode 100644
index 0000000..2c70a08
--- /dev/null
+++ b/ass3-messaging/src/test/java/dst/ass3/messaging/impl/QueueManagerTest.java
@@ -0,0 +1,97 @@
+package dst.ass3.messaging.impl;
+
+import com.rabbitmq.http.client.domain.ExchangeInfo;
+import com.rabbitmq.http.client.domain.QueueInfo;
+import dst.ass3.messaging.IMessagingFactory;
+import dst.ass3.messaging.IQueueManager;
+import dst.ass3.messaging.RabbitResource;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.springframework.amqp.core.Exchange;
+import org.springframework.amqp.core.Queue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static dst.ass3.messaging.Constants.*;
+import static org.hamcrest.CoreMatchers.*;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class QueueManagerTest {
+
+ @Rule
+ public RabbitResource rabbit = new RabbitResource();
+
+ @Rule
+ public Timeout timeout = new Timeout(10, TimeUnit.SECONDS);
+
+ private IMessagingFactory factory = new MessagingFactory();
+ private IQueueManager queueManager;
+
+ @Before
+ public void setUp() throws Exception {
+ factory = new MessagingFactory();
+ queueManager = factory.createQueueManager();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ try {
+ queueManager.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+
+ @Test
+ public void setUp_createsQueues() throws Exception {
+ queueManager.setUp();
+
+ try {
+ List queues = rabbit.getManager().getQueues();
+ assertThat(queues.size(), not(0));
+
+ // make sure all work queues exist
+ List queueNames = queues.stream().map(QueueInfo::getName).collect(Collectors.toList());
+ Arrays.stream(WORK_QUEUES)
+ .forEach(wq -> assertThat(queueNames, hasItem(wq)));
+ } finally {
+ queueManager.tearDown();
+ }
+ }
+
+ @Test
+ public void setUp_createsExchange() throws Exception {
+ queueManager.setUp();
+ try {
+ ExchangeInfo exchange = rabbit.getManager().getExchange(RMQ_VHOST, TOPIC_EXCHANGE);
+ assertThat(exchange, notNullValue());
+ } finally {
+ queueManager.tearDown();
+ }
+ }
+
+ @Test
+ public void tearDown_removesQueues() throws Exception {
+ queueManager.setUp();
+ queueManager.tearDown();
+ List queues = rabbit.getManager().getQueues();
+ List queueNames = queues.stream().map(QueueInfo::getName).collect(Collectors.toList());
+ Arrays.stream(WORK_QUEUES)
+ .forEach(wq -> assertThat(queueNames, not(hasItem(wq))));
+ }
+
+ @Test
+ public void tearDown_removesExchange() throws Exception {
+ queueManager.setUp();
+ queueManager.tearDown();
+ ExchangeInfo exchange = rabbit.getManager().getExchange(RMQ_VHOST, TOPIC_EXCHANGE);
+ assertThat(exchange, nullValue());
+ }
+}
diff --git a/ass3-messaging/src/test/java/dst/ass3/messaging/impl/RequestGatewayTest.java b/ass3-messaging/src/test/java/dst/ass3/messaging/impl/RequestGatewayTest.java
new file mode 100644
index 0000000..f86ee8e
--- /dev/null
+++ b/ass3-messaging/src/test/java/dst/ass3/messaging/impl/RequestGatewayTest.java
@@ -0,0 +1,112 @@
+package dst.ass3.messaging.impl;
+
+import dst.ass3.messaging.*;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.amqp.core.Message;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.CoreMatchers.*;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class RequestGatewayTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RequestGatewayTest.class);
+
+ @Rule
+ public RabbitResource rabbit = new RabbitResource();
+
+ @Rule
+ public Timeout timeout = new Timeout(10, TimeUnit.SECONDS);
+
+ private IMessagingFactory factory;
+ private IQueueManager queueManager;
+ private IRequestGateway requestGateway;
+
+ @Before
+ public void setUp() throws Exception {
+ factory = new MessagingFactory();
+ queueManager = factory.createQueueManager();
+ requestGateway = factory.createRequestGateway();
+
+ queueManager.setUp();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ queueManager.tearDown();
+
+ requestGateway.close();
+ queueManager.close();
+ factory.close();
+ }
+
+ @Test
+ public void submitRequest_routesRequestsToCorrectQueues() throws Exception {
+ TripRequest r1 = new TripRequest("id1", Region.AT_VIENNA, new GeoPoint(34.22, -22.11));
+ TripRequest r2 = new TripRequest("id2", Region.AT_VIENNA, new GeoPoint(98.2, -1.11));
+ TripRequest r3 = new TripRequest("id3", Region.AT_LINZ, new GeoPoint(85.33, -25d));
+
+ LOG.info("Sending request {}", r1);
+ requestGateway.submitRequest(r1);
+ LOG.info("Sending request {}", r2);
+ requestGateway.submitRequest(r2);
+ LOG.info("Sending request {}", r3);
+ requestGateway.submitRequest(r3);
+
+ LOG.info("Taking requests from queue {}", Constants.QUEUE_AT_VIENNA);
+ Message m1 = rabbit.getClient().receive(Constants.QUEUE_AT_VIENNA, 1000);
+ assertThat(m1, notNullValue());
+
+ LOG.info("Taking requests from queue {}", Constants.QUEUE_AT_VIENNA);
+ Message m2 = rabbit.getClient().receive(Constants.QUEUE_AT_VIENNA, 1000);
+ assertThat(m2, notNullValue());
+
+ LOG.info("Taking requests from queue {}", Constants.QUEUE_AT_LINZ);
+ Message m3 = rabbit.getClient().receive(Constants.QUEUE_AT_LINZ, 1000);
+ assertThat(m3, notNullValue());
+
+ assertThat("Expected queue to be empty as no request for that region were issued",
+ rabbit.getClient().receive(Constants.QUEUE_DE_BERLIN, 1000), nullValue());
+ }
+
+
+ @Test
+ public void submitRequest_serializesIntoJsonFormat() throws Exception {
+ TripRequest r1 = new TripRequest("id1", Region.AT_VIENNA, new GeoPoint(1d, 2d));
+ TripRequest r2 = new TripRequest("id2", Region.AT_LINZ, new GeoPoint(3d, 4d));
+ TripRequest r3 = new TripRequest("id3", Region.DE_BERLIN, new GeoPoint(5d, 6d));
+
+ LOG.info("Sending request {}", r1);
+ requestGateway.submitRequest(r1);
+ LOG.info("Sending request {}", r2);
+ requestGateway.submitRequest(r2);
+ LOG.info("Sending request {}", r3);
+ requestGateway.submitRequest(r3);
+
+ LOG.info("Taking request from queue {}", Constants.QUEUE_AT_VIENNA);
+ Message m1 = rabbit.getClient().receive(Constants.QUEUE_AT_VIENNA, 1000);
+ assertThat(m1, notNullValue());
+ assertThat(new String(m1.getBody()),
+ equalTo("{\"id\":\"id1\",\"region\":\"AT_VIENNA\",\"pickup\":{\"longitude\":1.0,\"latitude\":2.0}}"));
+
+ LOG.info("Taking request from queue {}", Constants.QUEUE_AT_LINZ);
+ Message m2 = rabbit.getClient().receive(Constants.QUEUE_AT_LINZ, 1000);
+ assertThat(m2, notNullValue());
+ assertThat(new String(m2.getBody()),
+ equalTo("{\"id\":\"id2\",\"region\":\"AT_LINZ\",\"pickup\":{\"longitude\":3.0,\"latitude\":4.0}}"));
+
+ LOG.info("Taking request from queue {}", Constants.QUEUE_DE_BERLIN);
+ Message m3 = rabbit.getClient().receive(Constants.QUEUE_DE_BERLIN, 1000);
+ assertThat(m3, notNullValue());
+ assertThat(new String(m3.getBody()),
+ equalTo("{\"id\":\"id3\",\"region\":\"DE_BERLIN\",\"pickup\":{\"longitude\":5.0,\"latitude\":6.0}}"));
+
+ }
+}
diff --git a/ass3-messaging/src/test/java/dst/ass3/messaging/impl/WorkloadMonitorTest.java b/ass3-messaging/src/test/java/dst/ass3/messaging/impl/WorkloadMonitorTest.java
new file mode 100644
index 0000000..f96900c
--- /dev/null
+++ b/ass3-messaging/src/test/java/dst/ass3/messaging/impl/WorkloadMonitorTest.java
@@ -0,0 +1,169 @@
+package dst.ass3.messaging.impl;
+
+import com.rabbitmq.http.client.domain.QueueInfo;
+import dst.ass3.messaging.*;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
+import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
+import org.springframework.context.Lifecycle;
+
+import java.util.*;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import static dst.ass3.messaging.Constants.TOPIC_EXCHANGE;
+import static dst.ass3.messaging.Constants.WORK_QUEUES;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class WorkloadMonitorTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RequestGatewayTest.class);
+
+ @Rule
+ public RabbitResource rabbit = new RabbitResource();
+
+ @Rule
+ public Timeout timeout = new Timeout(60, TimeUnit.SECONDS);
+
+ private IMessagingFactory factory;
+ private IQueueManager queueManager;
+ private IRequestGateway requestGateway;
+ private IWorkloadMonitor workloadMonitor;
+
+ @Before
+ public void setUp() throws Exception {
+ factory = new MessagingFactory();
+ queueManager = factory.createQueueManager();
+ requestGateway = factory.createRequestGateway();
+
+ queueManager.setUp();
+
+ workloadMonitor = factory.createWorkloadMonitor();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ queueManager.tearDown();
+
+ requestGateway.close();
+ queueManager.close();
+ factory.close();
+ }
+
+ @Test
+ public void getRequestCount_returnsCorrectCount() throws Exception {
+ try {
+ Map countForRegion = new HashMap<>();
+ for (Region region : Region.values()) {
+ countForRegion.put(region, ThreadLocalRandom.current().nextLong(10, 20 + 1));
+ for (long i = 0; i < countForRegion.get(region); i++) {
+ GeoPoint pickup = new GeoPoint(48.11, 16.22);
+ TripRequest request = new TripRequest("id" + i, region, pickup);
+ LOG.info("Sending request {}", request);
+ requestGateway.submitRequest(request);
+ }
+ }
+
+ // wait for the messages to be processed by rabbit
+ Thread.sleep(2000);
+
+ assertThat(workloadMonitor.getRequestCount(), equalTo(countForRegion));
+ } finally {
+ workloadMonitor.close();
+ }
+ }
+
+ @Test
+ public void multipleWorkloadMonitors_uniqueQueueForEachMonitor() throws Exception {
+ try (IWorkloadMonitor workloadMonitor2 = factory.createWorkloadMonitor();
+ IWorkloadMonitor workloadMonitor3 = factory.createWorkloadMonitor();) {
+ long nonWorkQueues = rabbit.getManager().getQueues().stream().filter(q -> !Arrays.asList(WORK_QUEUES).contains(q.getName())).count();
+ assertThat(nonWorkQueues, greaterThanOrEqualTo(3L));
+ } finally {
+ workloadMonitor.close();
+ }
+ }
+
+ @Test
+ public void getAverageProcessingTime_correctAverageTime() throws Exception {
+ try {
+ Map avgTimes = new HashMap<>();
+ for (Region region : Region.values()) {
+ long count = ThreadLocalRandom.current().nextLong(15, 25);
+ long regionTime = 0;
+ for (long i = 0; i < count; i++) {
+ long requestTime = ThreadLocalRandom.current().nextLong(1000, 20000 + 1);
+ if (count - i <= 10) {
+ regionTime += requestTime;
+ }
+
+ String body = String.format("{\"requestId\": \"%s\", \"processingTime\": \"%d\", \"driverId\": \"1\"}", UUID.randomUUID(), requestTime);
+ LOG.info("Sending request {}", body);
+ rabbit.getClient().convertAndSend(TOPIC_EXCHANGE, "requests." + region.toString().toLowerCase(), body);
+ }
+ avgTimes.put(region, ((double) regionTime / 10));
+ }
+
+ // wait for the messages to be processed by rabbit
+ Thread.sleep(2000);
+
+ assertThat(workloadMonitor.getAverageProcessingTime(), equalTo(avgTimes));
+ } finally {
+ workloadMonitor.close();
+ }
+ }
+
+ @Test
+ public void getWorkerCount_returnsCorrectCount() throws Exception {
+ try {
+ // spawn a random number of consumers
+ Map> consumersForRegion = new HashMap<>();
+ Map consumerCountForRegion = new HashMap<>();
+ for (Region region : Region.values()) {
+ List consumers = new ArrayList<>();
+ consumersForRegion.put(region, consumers);
+ consumerCountForRegion.put(region, ThreadLocalRandom.current().nextLong(10, 20 + 1));
+ for (long i = 0; i < consumerCountForRegion.get(region); i++) {
+ consumers.add(spawnConsumer("dst." + region.toString().toLowerCase()));
+ }
+ }
+
+ // wait for rabbit to get to know the new consumers
+ Thread.sleep(2000);
+
+ Map workerCount = workloadMonitor.getWorkerCount();
+
+ // stop all consumers
+ consumersForRegion.entrySet().stream().map(Map.Entry::getValue).flatMap(Collection::stream).forEach(Lifecycle::stop);
+
+ assertThat(workerCount, equalTo(consumerCountForRegion));
+ } finally {
+ workloadMonitor.close();
+ }
+ }
+
+ private MessageListenerContainer spawnConsumer(String queue) {
+ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(rabbit.getConnectionFactory());
+ container.addQueueNames(queue);
+ container.start();
+ return container;
+ }
+
+ @Test
+ public void close_removesQueues() throws Exception {
+ workloadMonitor.close();
+
+ List queues = rabbit.getManager().getQueues();
+ long nonWorkQueues = rabbit.getManager().getQueues().stream().filter(q -> !Arrays.asList(WORK_QUEUES).contains(q.getName())).count();
+ assertThat(nonWorkQueues, is(0L));
+ }
+}
diff --git a/ass3-worker/Dockerfile b/ass3-worker/Dockerfile
new file mode 100644
index 0000000..f87f5c1
--- /dev/null
+++ b/ass3-worker/Dockerfile
@@ -0,0 +1 @@
+# TODO
\ No newline at end of file
diff --git a/ass3-worker/pom.xml b/ass3-worker/pom.xml
new file mode 100644
index 0000000..0ab7e64
--- /dev/null
+++ b/ass3-worker/pom.xml
@@ -0,0 +1,46 @@
+
+
+
+ 4.0.0
+
+
+ at.ac.tuwien.infosys.dst
+ dst
+ 2021.1
+ ..
+
+
+ ass3-worker
+
+ jar
+
+ DST :: Assignment 3 :: Worker
+
+
+
+ at.ac.tuwien.infosys.dst
+ ass3-messaging
+ ${project.version}
+ test-jar
+ test
+
+
+ redis.clients
+ jedis
+ test
+
+
+ org.springframework.boot
+ spring-boot-starter-amqp
+ test
+
+
+ com.github.docker-java
+ docker-java
+ test
+
+
+
+
diff --git a/ass3-worker/redis-data.sh b/ass3-worker/redis-data.sh
new file mode 100644
index 0000000..3c94964
--- /dev/null
+++ b/ass3-worker/redis-data.sh
@@ -0,0 +1,11 @@
+#!/usr/bin/env bash
+redis-cli hmset drivers:at_vienna "1234" "48.19819 16.37127" "5678" "50.19819 20.37127"
+
+# Make sure that all required resources are created and correctly configured
+# To test your implementation:
+# Publish the following message via the RabbitMQ interface in the queue 'dst.at_vienna'
+# {"id":"request1","region":"AT_VIENNA","pickup":{"longitude": 16.371334 ,"latitude": 48.198122}}
+# To see if your implementation did remove the driver execute the following command in the VM
+# redis-cli hgetall drivers:at_vienna
+# the output should not include driver with id 1234
+
diff --git a/ass3-worker/src/test/java/dst/ass3/worker/package-info.java b/ass3-worker/src/test/java/dst/ass3/worker/package-info.java
new file mode 100644
index 0000000..0922bba
--- /dev/null
+++ b/ass3-worker/src/test/java/dst/ass3/worker/package-info.java
@@ -0,0 +1,6 @@
+/**
+ * This package allows you to write your own integration tests for the worker. For example, you could use the
+ * RabbitTemplate, Jedis, and DockerClient library to programmatically insert test data, and verify whether containers
+ * are spawned correctly.
+ */
+package dst.ass3.worker;
\ No newline at end of file
diff --git a/ass3-worker/worker.py b/ass3-worker/worker.py
new file mode 100644
index 0000000..f87f5c1
--- /dev/null
+++ b/ass3-worker/worker.py
@@ -0,0 +1 @@
+# TODO
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 137e20a..cc5abba 100644
--- a/pom.xml
+++ b/pom.xml
@@ -387,6 +387,47 @@
${httpclient.version}
+
+
+ org.apache.flink
+ flink-streaming-java_2.12
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-cep_2.12
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-clients_2.12
+ ${flink.version}
+
+
+ com.github.docker-java
+ docker-java
+ ${docker-api-client.version}
+
+
+ com.rabbitmq
+ amqp-client
+ ${rabbitmq-client.version}
+
+
+ com.rabbitmq
+ http-client
+ ${rabbitmq-http.version}
+
+
+ org.springframework
+ spring-web
+ ${spring.version}
+
+
+ org.springframework.boot
+ spring-boot-starter-amqp
+ ${spring-boot.version}
+
@@ -405,6 +446,10 @@
ass2-service/facade
ass2-aop
ass2-ioc
+ ass3-messaging
+ ass3-event
+ ass3-elastic
+ ass3-worker
@@ -488,6 +533,32 @@
+
+ ass3-event
+
+ ass3-event
+
+
+
+ ass3-messaging
+
+ ass3-messaging
+
+
+
+ ass3-elastic
+
+ ass3-messaging
+ ass3-elastic
+
+
+
+ ass3-worker
+
+ ass3-messaging
+ ass3-worker
+
+
@@ -529,6 +600,12 @@
0.6.1
1.7.0
4.5.13
+
+ 1.13.0
+ 3.2.7
+ 1.4
+ 3.9.0.RELEASE
+ 5.11.0