From be629aa4323415bd35e0ddb132043e3e3ff174f1 Mon Sep 17 00:00:00 2001 From: Tobias Eidelpes Date: Mon, 17 May 2021 15:44:23 +0200 Subject: [PATCH] Add template for assignment 3 --- ass3-elastic/pom.xml | 53 +++ .../dst/ass3/elastic/ContainerException.java | 22 ++ .../java/dst/ass3/elastic/ContainerInfo.java | 95 ++++++ .../elastic/ContainerNotFoundException.java | 23 ++ .../dst/ass3/elastic/IContainerService.java | 36 ++ .../ass3/elastic/IElasticityController.java | 7 + .../dst/ass3/elastic/IElasticityFactory.java | 10 + .../ass3/elastic/ImageNotFoundException.java | 22 ++ .../ass3/elastic/impl/ElasticityFactory.java | 24 ++ ass3-elastic/src/main/resources/logback.xml | 16 + .../ass3/elastic/ContainerServiceTest.java | 107 ++++++ .../elastic/ElasticityControllerTest.java | 136 ++++++++ ass3-event/pom.xml | 36 ++ .../main/java/dst/ass3/event/Constants.java | 15 + .../ass3/event/EventProcessingFactory.java | 16 + .../java/dst/ass3/event/EventPublisher.java | 173 ++++++++++ .../java/dst/ass3/event/EventSubscriber.java | 178 ++++++++++ .../event/IEventProcessingEnvironment.java | 39 +++ .../dst/ass3/event/IEventSourceFunction.java | 25 ++ .../dst/ass3/event/dto/TripEventInfoDTO.java | 54 +++ .../event/model/domain/ITripEventInfo.java | 9 + .../dst/ass3/event/model/domain/Region.java | 7 + .../ass3/event/model/domain/TripState.java | 5 + .../dst/ass3/event/model/events/Alert.java | 49 +++ .../model/events/AverageMatchingDuration.java | 48 +++ .../event/model/events/LifecycleEvent.java | 84 +++++ .../event/model/events/MatchingDuration.java | 59 ++++ .../model/events/MatchingTimeoutWarning.java | 37 +++ .../event/model/events/TripFailedWarning.java | 37 +++ .../dst/ass3/event/model/events/Warning.java | 37 +++ .../src/main/resources/executionPlan.json | 1 + ass3-event/src/main/resources/logback.xml | 16 + .../dst/ass3/event/Ass3EventTestBase.java | 69 ++++ .../dst/ass3/event/Ass3EventTestSuite.java | 23 ++ .../ass3/event/EventProcessingTestBase.java | 98 ++++++ .../java/dst/ass3/event/StaticQueueSink.java | 83 +++++ .../dst/ass3/event/tests/Ass3_3_1Test.java | 152 +++++++++ .../dst/ass3/event/tests/Ass3_3_2Test.java | 89 +++++ .../dst/ass3/event/tests/Ass3_3_3Test.java | 211 ++++++++++++ .../dst/ass3/event/tests/Ass3_3_4Test.java | 308 ++++++++++++++++++ ass3-event/src/test/resources/logback.xml | 16 + ass3-messaging/pom.xml | 49 +++ .../java/dst/ass3/messaging/Constants.java | 38 +++ .../java/dst/ass3/messaging/GeoPoint.java | 54 +++ .../dst/ass3/messaging/IMessagingFactory.java | 21 ++ .../dst/ass3/messaging/IQueueManager.java | 28 ++ .../dst/ass3/messaging/IRequestGateway.java | 21 ++ .../dst/ass3/messaging/IWorkloadMonitor.java | 34 ++ .../main/java/dst/ass3/messaging/Region.java | 7 + .../java/dst/ass3/messaging/TripRequest.java | 71 ++++ .../dst/ass3/messaging/WorkerResponse.java | 76 +++++ .../ass3/messaging/impl/MessagingFactory.java | 32 ++ ass3-messaging/src/main/resources/logback.xml | 16 + .../dst/ass3/messaging/RabbitResource.java | 51 +++ .../dst/ass3/messaging/impl/Ass3_1_Suite.java | 13 + .../ass3/messaging/impl/QueueManagerTest.java | 97 ++++++ .../messaging/impl/RequestGatewayTest.java | 112 +++++++ .../messaging/impl/WorkloadMonitorTest.java | 169 ++++++++++ ass3-worker/Dockerfile | 1 + ass3-worker/pom.xml | 46 +++ ass3-worker/redis-data.sh | 11 + .../java/dst/ass3/worker/package-info.java | 6 + ass3-worker/worker.py | 1 + pom.xml | 77 +++++ 64 files changed, 3556 insertions(+) create mode 100644 ass3-elastic/pom.xml create mode 100644 ass3-elastic/src/main/java/dst/ass3/elastic/ContainerException.java create mode 100644 ass3-elastic/src/main/java/dst/ass3/elastic/ContainerInfo.java create mode 100644 ass3-elastic/src/main/java/dst/ass3/elastic/ContainerNotFoundException.java create mode 100644 ass3-elastic/src/main/java/dst/ass3/elastic/IContainerService.java create mode 100644 ass3-elastic/src/main/java/dst/ass3/elastic/IElasticityController.java create mode 100644 ass3-elastic/src/main/java/dst/ass3/elastic/IElasticityFactory.java create mode 100644 ass3-elastic/src/main/java/dst/ass3/elastic/ImageNotFoundException.java create mode 100644 ass3-elastic/src/main/java/dst/ass3/elastic/impl/ElasticityFactory.java create mode 100644 ass3-elastic/src/main/resources/logback.xml create mode 100644 ass3-elastic/src/test/java/dst/ass3/elastic/ContainerServiceTest.java create mode 100644 ass3-elastic/src/test/java/dst/ass3/elastic/ElasticityControllerTest.java create mode 100644 ass3-event/pom.xml create mode 100644 ass3-event/src/main/java/dst/ass3/event/Constants.java create mode 100644 ass3-event/src/main/java/dst/ass3/event/EventProcessingFactory.java create mode 100644 ass3-event/src/main/java/dst/ass3/event/EventPublisher.java create mode 100644 ass3-event/src/main/java/dst/ass3/event/EventSubscriber.java create mode 100644 ass3-event/src/main/java/dst/ass3/event/IEventProcessingEnvironment.java create mode 100644 ass3-event/src/main/java/dst/ass3/event/IEventSourceFunction.java create mode 100644 ass3-event/src/main/java/dst/ass3/event/dto/TripEventInfoDTO.java create mode 100644 ass3-event/src/main/java/dst/ass3/event/model/domain/ITripEventInfo.java create mode 100644 ass3-event/src/main/java/dst/ass3/event/model/domain/Region.java create mode 100644 ass3-event/src/main/java/dst/ass3/event/model/domain/TripState.java create mode 100644 ass3-event/src/main/java/dst/ass3/event/model/events/Alert.java create mode 100644 ass3-event/src/main/java/dst/ass3/event/model/events/AverageMatchingDuration.java create mode 100644 ass3-event/src/main/java/dst/ass3/event/model/events/LifecycleEvent.java create mode 100644 ass3-event/src/main/java/dst/ass3/event/model/events/MatchingDuration.java create mode 100644 ass3-event/src/main/java/dst/ass3/event/model/events/MatchingTimeoutWarning.java create mode 100644 ass3-event/src/main/java/dst/ass3/event/model/events/TripFailedWarning.java create mode 100644 ass3-event/src/main/java/dst/ass3/event/model/events/Warning.java create mode 100644 ass3-event/src/main/resources/executionPlan.json create mode 100644 ass3-event/src/main/resources/logback.xml create mode 100644 ass3-event/src/test/java/dst/ass3/event/Ass3EventTestBase.java create mode 100644 ass3-event/src/test/java/dst/ass3/event/Ass3EventTestSuite.java create mode 100644 ass3-event/src/test/java/dst/ass3/event/EventProcessingTestBase.java create mode 100644 ass3-event/src/test/java/dst/ass3/event/StaticQueueSink.java create mode 100644 ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_1Test.java create mode 100644 ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_2Test.java create mode 100644 ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_3Test.java create mode 100644 ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_4Test.java create mode 100644 ass3-event/src/test/resources/logback.xml create mode 100644 ass3-messaging/pom.xml create mode 100644 ass3-messaging/src/main/java/dst/ass3/messaging/Constants.java create mode 100644 ass3-messaging/src/main/java/dst/ass3/messaging/GeoPoint.java create mode 100644 ass3-messaging/src/main/java/dst/ass3/messaging/IMessagingFactory.java create mode 100644 ass3-messaging/src/main/java/dst/ass3/messaging/IQueueManager.java create mode 100644 ass3-messaging/src/main/java/dst/ass3/messaging/IRequestGateway.java create mode 100644 ass3-messaging/src/main/java/dst/ass3/messaging/IWorkloadMonitor.java create mode 100644 ass3-messaging/src/main/java/dst/ass3/messaging/Region.java create mode 100644 ass3-messaging/src/main/java/dst/ass3/messaging/TripRequest.java create mode 100644 ass3-messaging/src/main/java/dst/ass3/messaging/WorkerResponse.java create mode 100644 ass3-messaging/src/main/java/dst/ass3/messaging/impl/MessagingFactory.java create mode 100644 ass3-messaging/src/main/resources/logback.xml create mode 100644 ass3-messaging/src/test/java/dst/ass3/messaging/RabbitResource.java create mode 100644 ass3-messaging/src/test/java/dst/ass3/messaging/impl/Ass3_1_Suite.java create mode 100644 ass3-messaging/src/test/java/dst/ass3/messaging/impl/QueueManagerTest.java create mode 100644 ass3-messaging/src/test/java/dst/ass3/messaging/impl/RequestGatewayTest.java create mode 100644 ass3-messaging/src/test/java/dst/ass3/messaging/impl/WorkloadMonitorTest.java create mode 100644 ass3-worker/Dockerfile create mode 100644 ass3-worker/pom.xml create mode 100644 ass3-worker/redis-data.sh create mode 100644 ass3-worker/src/test/java/dst/ass3/worker/package-info.java create mode 100644 ass3-worker/worker.py diff --git a/ass3-elastic/pom.xml b/ass3-elastic/pom.xml new file mode 100644 index 0000000..88daebf --- /dev/null +++ b/ass3-elastic/pom.xml @@ -0,0 +1,53 @@ + + + + 4.0.0 + + + at.ac.tuwien.infosys.dst + dst + 2021.1 + .. + + + ass3-elastic + + DST :: Assignment 3 :: Elasticity + + jar + + + + at.ac.tuwien.infosys.dst + ass3-messaging + ${project.version} + + + + at.ac.tuwien.infosys.dst + ass3-messaging + ${project.version} + test-jar + test + + + + com.github.docker-java + docker-java + + + + org.mockito + mockito-core + test + + + + org.springframework.boot + spring-boot-starter-amqp + test + + + + diff --git a/ass3-elastic/src/main/java/dst/ass3/elastic/ContainerException.java b/ass3-elastic/src/main/java/dst/ass3/elastic/ContainerException.java new file mode 100644 index 0000000..0678fa4 --- /dev/null +++ b/ass3-elastic/src/main/java/dst/ass3/elastic/ContainerException.java @@ -0,0 +1,22 @@ +package dst.ass3.elastic; + +/** + * Exception indicating that the ContainerService encountered an error when performing a task. + */ +public class ContainerException extends Exception { + + public ContainerException() { + } + + public ContainerException(String message) { + super(message); + } + + public ContainerException(String message, Throwable cause) { + super(message, cause); + } + + public ContainerException(Throwable cause) { + super(cause); + } +} diff --git a/ass3-elastic/src/main/java/dst/ass3/elastic/ContainerInfo.java b/ass3-elastic/src/main/java/dst/ass3/elastic/ContainerInfo.java new file mode 100644 index 0000000..6cbcf76 --- /dev/null +++ b/ass3-elastic/src/main/java/dst/ass3/elastic/ContainerInfo.java @@ -0,0 +1,95 @@ +package dst.ass3.elastic; + + +import dst.ass3.messaging.Region; + +import java.util.Objects; + +/** + * Value type that represents a container. + */ +public class ContainerInfo { + + /** + * The name of the container image. + */ + private String image; + + /** + * The container ID. + */ + private String containerId; + + /** + * True if the container is running. + */ + private boolean running; + + /** + * If the container is a worker (indicated by the image dst/ass3-worker), then this field should contain the worker + * region. Otherwise it can be null. + */ + private Region workerRegion; + + public String getImage() { + return image; + } + + public void setImage(String image) { + this.image = image; + } + + public String getContainerId() { + return containerId; + } + + public void setContainerId(String containerId) { + this.containerId = containerId; + } + + public boolean isRunning() { + return running; + } + + public void setRunning(boolean running) { + this.running = running; + } + + public Region getWorkerRegion() { + return workerRegion; + } + + public void setWorkerRegion(Region workerRegion) { + this.workerRegion = workerRegion; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ContainerInfo that = (ContainerInfo) o; + return isRunning() == that.isRunning() && + Objects.equals(getImage(), that.getImage()) && + Objects.equals(getContainerId(), that.getContainerId()) && + getWorkerRegion() == that.getWorkerRegion(); + } + + @Override + public int hashCode() { + return Objects.hash(getImage(), getContainerId(), isRunning(), getWorkerRegion()); + } + + @Override + public String toString() { + return "ContainerInfo{" + + "image='" + image + '\'' + + ", containerId='" + containerId + '\'' + + ", running=" + running + + ", workerRegion=" + workerRegion + + '}'; + } +} diff --git a/ass3-elastic/src/main/java/dst/ass3/elastic/ContainerNotFoundException.java b/ass3-elastic/src/main/java/dst/ass3/elastic/ContainerNotFoundException.java new file mode 100644 index 0000000..797cfd3 --- /dev/null +++ b/ass3-elastic/src/main/java/dst/ass3/elastic/ContainerNotFoundException.java @@ -0,0 +1,23 @@ +package dst.ass3.elastic; + +/** + * Indicates that a container could not be found. + */ +public class ContainerNotFoundException extends ContainerException { + + public ContainerNotFoundException() { + } + + public ContainerNotFoundException(String message) { + super(message); + } + + public ContainerNotFoundException(String message, Throwable cause) { + super(message, cause); + } + + public ContainerNotFoundException(Throwable cause) { + super(cause); + } + +} diff --git a/ass3-elastic/src/main/java/dst/ass3/elastic/IContainerService.java b/ass3-elastic/src/main/java/dst/ass3/elastic/IContainerService.java new file mode 100644 index 0000000..e213897 --- /dev/null +++ b/ass3-elastic/src/main/java/dst/ass3/elastic/IContainerService.java @@ -0,0 +1,36 @@ +package dst.ass3.elastic; + +import dst.ass3.messaging.Region; + +import java.util.List; + +public interface IContainerService { + + /** + * Returns a list of all running containers. + * + * @return a list of ContainerInfo objects + * @throws ContainerException if an error occurred when trying to fetch the running containers. + */ + List listContainers() throws ContainerException; + + /** + * Stops the container with the given container ID. + * + * @param containerId ID of the container to stop. + * @throws ContainerNotFoundException if the container to stop is not running + * @throws ContainerException if another error occurred when trying to stop the container + */ + void stopContainer(String containerId) throws ContainerException; + + /** + * Starts a worker for the specific {@link dst.ass3.messaging.Region}. + * + * @param region {@link Region} of the worker to start + * @return ContainerInfo of the started container / worker + * @throws ImageNotFoundException if the worker docker image is not available + * @throws ContainerException if another error occurred when trying to start the worker + */ + ContainerInfo startWorker(Region region) throws ContainerException; + +} diff --git a/ass3-elastic/src/main/java/dst/ass3/elastic/IElasticityController.java b/ass3-elastic/src/main/java/dst/ass3/elastic/IElasticityController.java new file mode 100644 index 0000000..9e35c7e --- /dev/null +++ b/ass3-elastic/src/main/java/dst/ass3/elastic/IElasticityController.java @@ -0,0 +1,7 @@ +package dst.ass3.elastic; + +public interface IElasticityController { + + void adjustWorkers() throws ContainerException; + +} diff --git a/ass3-elastic/src/main/java/dst/ass3/elastic/IElasticityFactory.java b/ass3-elastic/src/main/java/dst/ass3/elastic/IElasticityFactory.java new file mode 100644 index 0000000..b65674d --- /dev/null +++ b/ass3-elastic/src/main/java/dst/ass3/elastic/IElasticityFactory.java @@ -0,0 +1,10 @@ +package dst.ass3.elastic; + +import dst.ass3.messaging.IWorkloadMonitor; + +public interface IElasticityFactory { + IContainerService createContainerService(); + + IElasticityController createElasticityController(IContainerService containerService, + IWorkloadMonitor workloadMonitor); +} diff --git a/ass3-elastic/src/main/java/dst/ass3/elastic/ImageNotFoundException.java b/ass3-elastic/src/main/java/dst/ass3/elastic/ImageNotFoundException.java new file mode 100644 index 0000000..bc659c5 --- /dev/null +++ b/ass3-elastic/src/main/java/dst/ass3/elastic/ImageNotFoundException.java @@ -0,0 +1,22 @@ +package dst.ass3.elastic; + +/** + * Exception indicating that the image which should be used for a container start is not available. + */ +public class ImageNotFoundException extends ContainerException { + + public ImageNotFoundException() { + } + + public ImageNotFoundException(String message) { + super(message); + } + + public ImageNotFoundException(String message, Throwable cause) { + super(message, cause); + } + + public ImageNotFoundException(Throwable cause) { + super(cause); + } +} diff --git a/ass3-elastic/src/main/java/dst/ass3/elastic/impl/ElasticityFactory.java b/ass3-elastic/src/main/java/dst/ass3/elastic/impl/ElasticityFactory.java new file mode 100644 index 0000000..64d2cd9 --- /dev/null +++ b/ass3-elastic/src/main/java/dst/ass3/elastic/impl/ElasticityFactory.java @@ -0,0 +1,24 @@ +package dst.ass3.elastic.impl; + +import dst.ass3.elastic.IContainerService; +import dst.ass3.elastic.IElasticityController; +import dst.ass3.elastic.IElasticityFactory; +import dst.ass3.messaging.IWorkloadMonitor; +import dst.ass3.messaging.impl.MessagingFactory; + +public class ElasticityFactory implements IElasticityFactory { + + @Override + public IContainerService createContainerService() { + // TODO + return null; + } + + @Override + public IElasticityController createElasticityController(IContainerService containerService, + IWorkloadMonitor workloadMonitor) { + // TODO + return null; + } + +} diff --git a/ass3-elastic/src/main/resources/logback.xml b/ass3-elastic/src/main/resources/logback.xml new file mode 100644 index 0000000..9a6b351 --- /dev/null +++ b/ass3-elastic/src/main/resources/logback.xml @@ -0,0 +1,16 @@ + + + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} - %highlight(%5p) [%12.12thread] %cyan(%-40.40logger{39}): %m%n + + + + + + + + + + diff --git a/ass3-elastic/src/test/java/dst/ass3/elastic/ContainerServiceTest.java b/ass3-elastic/src/test/java/dst/ass3/elastic/ContainerServiceTest.java new file mode 100644 index 0000000..e4d5d97 --- /dev/null +++ b/ass3-elastic/src/test/java/dst/ass3/elastic/ContainerServiceTest.java @@ -0,0 +1,107 @@ +package dst.ass3.elastic; + +import dst.ass3.elastic.impl.ElasticityFactory; +import dst.ass3.messaging.RabbitResource; +import dst.ass3.messaging.Region; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.Queue; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.CoreMatchers.*; +import static org.hamcrest.MatcherAssert.assertThat; + +public class ContainerServiceTest { + + private static final Logger LOG = LoggerFactory.getLogger(ContainerServiceTest.class); + + @Rule + public RabbitResource rabbit = new RabbitResource(); + + @Rule + public Timeout timeout = new Timeout(30, TimeUnit.SECONDS); + + IContainerService containerService; + IElasticityFactory factory; + + @Before + public void setUp() throws Exception { + factory = new ElasticityFactory(); + + containerService = factory.createContainerService(); + + rabbit.getAdmin().declareQueue(new Queue("dst.at_vienna")); + rabbit.getAdmin().declareQueue(new Queue("dst.at_linz")); + rabbit.getAdmin().declareQueue(new Queue("dst.de_berlin")); + } + + @After + public void tearDown() throws Exception { + rabbit.getAdmin().deleteQueue("dst.at_vienna"); + rabbit.getAdmin().deleteQueue("dst.at_linz"); + rabbit.getAdmin().deleteQueue("dst.de_berlin"); + } + + @Test + public void spawnListStop_lifecycleWorks() throws Exception { + List containers = containerService.listContainers(); + assertThat("Please stop all containers before running the test", containers.size(), is(0)); + + ContainerInfo c1 = containerService.startWorker(Region.AT_VIENNA); + LOG.info("Started container {}", c1); + + ContainerInfo c2 = containerService.startWorker(Region.AT_LINZ); + LOG.info("Started container {}", c2); + + ContainerInfo c3 = containerService.startWorker(Region.DE_BERLIN); + LOG.info("Started container {}", c3); + + LOG.info("Waiting for containers to boot..."); + Thread.sleep(5000); + + containers = containerService.listContainers(); + + assertThat(containers.size(), is(3)); + + LOG.info("Stopping containers..."); + containerService.stopContainer(c1.getContainerId()); + containerService.stopContainer(c2.getContainerId()); + containerService.stopContainer(c3.getContainerId()); + + Thread.sleep(5000); + + containers = containerService.listContainers(); + assertThat(containers.size(), is(0)); + } + + @Test(expected = ContainerNotFoundException.class) + public void stopNonExistingContainer_throwsException() throws Exception { + containerService.stopContainer("Non-Existing-Id"); + } + + @Test + public void listContainers_containsCompleteInfo() throws Exception { + ContainerInfo c1 = containerService.startWorker(Region.AT_VIENNA); + LOG.info("Started container {}", c1); + LOG.info("Waiting for container to boot..."); + Thread.sleep(5000); + List containers = containerService.listContainers(); + ContainerInfo containerInfo = containers.stream() + .filter(c -> c1.getContainerId().equals(c.getContainerId())) + .findFirst().get(); + assertThat(containerInfo, notNullValue()); + assertThat(containerInfo.getImage(), equalTo("dst/ass3-worker")); + assertThat(containerInfo.getWorkerRegion(), equalTo(Region.AT_VIENNA)); + assertThat(containerInfo.isRunning(), is(true)); + LOG.info("Stopping container..."); + containerService.stopContainer(containerInfo.getContainerId()); + } + +} diff --git a/ass3-elastic/src/test/java/dst/ass3/elastic/ElasticityControllerTest.java b/ass3-elastic/src/test/java/dst/ass3/elastic/ElasticityControllerTest.java new file mode 100644 index 0000000..0248c97 --- /dev/null +++ b/ass3-elastic/src/test/java/dst/ass3/elastic/ElasticityControllerTest.java @@ -0,0 +1,136 @@ +package dst.ass3.elastic; + +import dst.ass3.elastic.impl.ElasticityFactory; +import dst.ass3.messaging.IWorkloadMonitor; +import dst.ass3.messaging.Region; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +@RunWith(MockitoJUnitRunner.class) +public class ElasticityControllerTest { + private static final String WORKER_IMAGE = "dst/ass3-worker"; + + IElasticityFactory factory; + + @Mock + IContainerService containerService; + @Mock + IWorkloadMonitor workloadMonitor; + + IElasticityController elasticityController; + Map processingTimes = new HashMap<>(); + Map workerCount = new HashMap<>(); + Map requestCount = new HashMap<>(); + List containers = new ArrayList<>(); + + @Before + public void setUp() throws Exception { + factory = new ElasticityFactory(); + elasticityController = factory.createElasticityController(containerService, workloadMonitor); + + processingTimes.clear(); + processingTimes.put(Region.AT_VIENNA, 5000.0); + processingTimes.put(Region.DE_BERLIN, 10000.0); + processingTimes.put(Region.AT_LINZ, 2000.0); + + when(workloadMonitor.getAverageProcessingTime()).thenReturn(processingTimes); + + workerCount.clear(); + workerCount.put(Region.AT_VIENNA, 95L); + workerCount.put(Region.DE_BERLIN, 87L); + workerCount.put(Region.AT_LINZ, 61L); + when(workloadMonitor.getWorkerCount()).thenReturn(workerCount); + + requestCount.clear(); + requestCount.put(Region.AT_VIENNA, 600L); + requestCount.put(Region.DE_BERLIN, 1000L); + requestCount.put(Region.AT_LINZ, 1005L); + when(workloadMonitor.getRequestCount()).thenReturn(requestCount); + + containers.clear(); + for (int i = 0; i < 95; i++) { + containers.add(containerInfo("vienna" + i, WORKER_IMAGE, Region.AT_VIENNA, true)); + } + for (int i = 0; i < 87; i++) { + containers.add(containerInfo("berlin" + i, WORKER_IMAGE, Region.DE_BERLIN, true)); + } + for (int i = 0; i < 61; i++) { + containers.add(containerInfo("linz" + i, WORKER_IMAGE, Region.AT_LINZ, true)); + } + when(containerService.listContainers()).thenReturn(containers); + } + + @After + public void tearDown() { + verify(workloadMonitor, atLeast(1)).getWorkerCount(); + verify(workloadMonitor, atLeast(1)).getRequestCount(); + verify(workloadMonitor, atLeast(1)).getAverageProcessingTime(); + } + + @Test + public void notEnoughWorkers_scaleUp() throws Exception { + // remove 10 vienna workers and 10 linz workers + List containersToRemove = containers.stream() + .filter(c -> c.getContainerId().startsWith("vienna7") || c.getContainerId().startsWith("linz1")) + .collect(Collectors.toList()); + containers.removeAll(containersToRemove); + workerCount.put(Region.AT_VIENNA, 85L); + workerCount.put(Region.AT_LINZ, 51L); + + elasticityController.adjustWorkers(); + + verify(containerService, never()).stopContainer((String) any(String.class)); + verify(containerService, times(15)).startWorker(Region.AT_VIENNA); + verify(containerService, times(16)).startWorker(Region.AT_LINZ); + verify(containerService, never()).startWorker(Region.DE_BERLIN); + verify(containerService, never()).listContainers(); + } + + @Test + public void tooManyWorkers_scaleDown() throws Exception { + // add 20 more, some should be stopped + for (int i = 0; i < 20; i++) { + containers.add(containerInfo("linz1" + i, WORKER_IMAGE, Region.AT_LINZ, true)); + } + workerCount.put(Region.AT_LINZ, 81L); + + elasticityController.adjustWorkers(); + + verify(containerService, times(14)).stopContainer(contains("linz")); + verify(containerService, never()).stopContainer(contains("berlin")); + verify(containerService, never()).stopContainer(contains("vienna")); + verify(containerService, never()).startWorker((Region) any(Region.class)); + verify(containerService, times(1)).listContainers(); + } + + @Test + public void justEnoughWorkers_doNotScale() throws Exception { + elasticityController.adjustWorkers(); + verify(containerService, never()).startWorker((Region) any(Region.class)); + verify(containerService, never()).stopContainer((String) any()); + verify(containerService, never()).listContainers(); + } + + private ContainerInfo containerInfo(String id, String image, Region workerRegion, boolean running) { + ContainerInfo info = new ContainerInfo(); + info.setContainerId(id); + info.setImage(image); + info.setWorkerRegion(workerRegion); + info.setRunning(running); + return info; + } + +} diff --git a/ass3-event/pom.xml b/ass3-event/pom.xml new file mode 100644 index 0000000..939dfb1 --- /dev/null +++ b/ass3-event/pom.xml @@ -0,0 +1,36 @@ + + + + 4.0.0 + + + at.ac.tuwien.infosys.dst + dst + 2021.1 + .. + + + ass3-event + + jar + + DST :: Assignment 3 :: Event Stream Processing + + + + org.apache.flink + flink-streaming-java_2.12 + + + org.apache.flink + flink-cep_2.12 + + + org.apache.flink + flink-clients_2.12 + + + + diff --git a/ass3-event/src/main/java/dst/ass3/event/Constants.java b/ass3-event/src/main/java/dst/ass3/event/Constants.java new file mode 100644 index 0000000..d06492a --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/Constants.java @@ -0,0 +1,15 @@ +package dst.ass3.event; + +/** + * Constants. + */ +public final class Constants { + + /** + * The TCP port of the {@link EventPublisher}. + */ + public static final int EVENT_PUBLISHER_PORT = 1338; + + private Constants() { + } +} diff --git a/ass3-event/src/main/java/dst/ass3/event/EventProcessingFactory.java b/ass3-event/src/main/java/dst/ass3/event/EventProcessingFactory.java new file mode 100644 index 0000000..6a8c668 --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/EventProcessingFactory.java @@ -0,0 +1,16 @@ +package dst.ass3.event; + +/** + * Creates your {@link IEventProcessingEnvironment} and {@link IEventSourceFunction} implementation instances. + */ +public class EventProcessingFactory { + public static IEventProcessingEnvironment createEventProcessingEnvironment() { + // TODO + return null; + } + + public static IEventSourceFunction createEventSourceFunction() { + // TODO + return null; + } +} diff --git a/ass3-event/src/main/java/dst/ass3/event/EventPublisher.java b/ass3-event/src/main/java/dst/ass3/event/EventPublisher.java new file mode 100644 index 0000000..3001d6a --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/EventPublisher.java @@ -0,0 +1,173 @@ +package dst.ass3.event; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; + +import dst.ass3.event.model.domain.ITripEventInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; +import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.group.ChannelGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.group.DefaultChannelGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.serialization.ClassResolvers; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.serialization.ObjectDecoder; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.serialization.ObjectEncoder; + +/** + * An EventPublisher accepts incoming TCP socket connections on a given port and is able to broadcast {@link ITripEventInfo} + * objects to these clients. + */ +public class EventPublisher { + + private static final Logger LOG = LoggerFactory.getLogger(EventPublisher.class); + + private final Object clientChannelMonitor = new Object(); + + private final int port; + private final AtomicBoolean closed; + + private EventLoopGroup bossGroup; + private EventLoopGroup workerGroup; + private ChannelGroup clientChannels; + + public EventPublisher(int port) { + this.port = port; + this.closed = new AtomicBoolean(false); + } + + public int getPort() { + return port; + } + + /** + * Broadcast an event to all listening channels. Does nothing if no clients are connected. + * + * @param event the event to publish + * @throws IllegalStateException if the publisher hasn't been started yet or has been closed + */ + public void publish(ITripEventInfo event) { + if (clientChannels == null || closed.get()) { + throw new IllegalStateException(); + } + + clientChannels.writeAndFlush(event).syncUninterruptibly(); + + // wait a bit for event to propagate + try { + Thread.sleep(10); + } catch (InterruptedException e) {} + } + + /** + * Like {@link #publish(ITripEventInfo)} but waits for a given number of milliseconds and then passes the current system + * time to a factory function. + * + * @param delay the delay in ms + * @param provider the provider + */ + public void publish(long delay, Function provider) { + if (delay > 0) { + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + } + } + publish(provider.apply(System.currentTimeMillis())); + } + + /** + * This method blocks if no clients are connected, and is notified as soon as a client connects. If clients are + * connected, the method returns immediately. + */ + public void waitForClients() { + if (clientChannels.isEmpty()) { + LOG.debug("Waiting for clients to connect..."); + synchronized (clientChannelMonitor) { + try { + clientChannelMonitor.wait(); + } catch (InterruptedException e) { + LOG.debug("Interrupted while waiting on client connections", e); + } + } + } + } + + public int getConnectedClientCount() { + if (clientChannels == null || closed.get()) { + throw new IllegalStateException(); + } + return clientChannels.size(); + } + + /** + * Closes all active client connections. + */ + public void dropClients() { + if (clientChannels == null || closed.get()) { + throw new IllegalStateException(); + } + clientChannels.close().syncUninterruptibly().group().clear(); + } + + /** + * Start the server and accept incoming connections. Will call {@link #close()} if an error occurs during + * connection. + */ + public void start() { + bossGroup = new NioEventLoopGroup(); + workerGroup = new NioEventLoopGroup(); + clientChannels = new DefaultChannelGroup(workerGroup.next()); + + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ClientChannelInitializer()); + + // Bind and start to accept incoming connections + ChannelFuture f = b.bind(port).addListener(future -> { + if (!future.isSuccess()) { + LOG.error("Error while binding socket"); + close(); + } + }).syncUninterruptibly(); + LOG.info("Accepting connections on {}", f.channel()); + } + + /** + * Closes all channels and resources. + */ + public void close() { + if (closed.compareAndSet(false, true)) { + LOG.info("Shutting down event loops"); + if (bossGroup != null) { + bossGroup.shutdownGracefully(); + } + if (workerGroup != null) { + workerGroup.shutdownGracefully(); + } + } + } + + private class ClientChannelInitializer extends ChannelInitializer { + @Override + public void initChannel(SocketChannel ch) throws Exception { + LOG.info("Initializing client channel {}", ch); + clientChannels.add(ch); + + ch.pipeline() + .addFirst(new ObjectEncoder()) + .addFirst(new ObjectDecoder(ClassResolvers.cacheDisabled(ClassLoader.getSystemClassLoader()))); + + synchronized (clientChannelMonitor) { + clientChannelMonitor.notifyAll(); + } + } + } +} diff --git a/ass3-event/src/main/java/dst/ass3/event/EventSubscriber.java b/ass3-event/src/main/java/dst/ass3/event/EventSubscriber.java new file mode 100644 index 0000000..1a43764 --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/EventSubscriber.java @@ -0,0 +1,178 @@ +package dst.ass3.event; + +import java.lang.reflect.Proxy; +import java.net.SocketAddress; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import dst.ass3.event.model.domain.ITripEventInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap; +import org.apache.flink.shaded.netty4.io.netty.channel.Channel; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption; +import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.serialization.ClassResolvers; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.serialization.ObjectDecoder; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.serialization.ObjectEncoder; +import org.apache.flink.shaded.netty4.io.netty.util.concurrent.Future; + +/** + * An EventSubscriber receives ITripEventInfo objects through a netty SocketChannel. Create and connect an + * EventSubscriber using {@link #subscribe(SocketAddress)}. To receive events, call {@link #receive()}. + */ +public class EventSubscriber { + + private static final Logger LOG = LoggerFactory.getLogger(EventSubscriber.class); + + private static final ITripEventInfo POISON_PILL = (ITripEventInfo) Proxy.newProxyInstance( + ITripEventInfo.class.getClassLoader(), new Class[]{ITripEventInfo.class}, (p, m, a) -> null); + + private final SocketAddress publisherAddress; + + private final BlockingQueue queue; + + private volatile boolean closed; + + private Channel channel; + private EventLoopGroup loop; + + private EventSubscriber(SocketAddress publisherAddress) { + this.publisherAddress = publisherAddress; + this.queue = new LinkedBlockingQueue<>(); + } + + /** + * Blocks to receive the next ITripEventInfo published into the channel. Returns {@code null} if the underlying + * channel has been closed or the thread was interrupted. + * + * @return the next ITripEventInfo object + * @throws IllegalStateException thrown if the previous call returned null and the channel was closed + */ + public ITripEventInfo receive() throws IllegalStateException { + synchronized (queue) { + if (closed && queue.isEmpty()) { + throw new IllegalStateException(); + } + } + + ITripEventInfo event; + try { + event = queue.take(); + + if (event == POISON_PILL) { + return null; + } else { + return event; + } + } catch (InterruptedException e) { + return null; + } + } + + private Future start() { + loop = new NioEventLoopGroup(); + + channel = new Bootstrap() + .group(loop) + .channel(NioSocketChannel.class) + .option(ChannelOption.TCP_NODELAY, true) + .handler(new EventSubscriberHandler()) + .connect(publisherAddress) // ChannelFuture + .addListener(future -> { + if (!future.isSuccess()) { + LOG.error("Error while connecting"); + close(); + } + }) + .syncUninterruptibly() + .channel(); + + LOG.info("Connected to channel {}", channel); + + return loop.submit(() -> { + try { + channel.closeFuture().sync(); + } catch (InterruptedException e) { + // noop + } finally { + close(); + } + }); + } + + /** + * Closes all resources and threads used by the EventSubscriber. + */ + public void close() { + try { + if (loop != null) { + synchronized (queue) { + if (!loop.isShutdown() && !loop.isTerminated() && !loop.isShuttingDown()) { + LOG.info("Shutting down event loop"); + loop.shutdownGracefully(); + } + } + } + } finally { + synchronized (queue) { + if (!closed) { + LOG.debug("Adding poison pill to queue"); + closed = true; + queue.add(POISON_PILL); + } + } + } + } + + /** + * Creates a new EventSubscriber that connects to given SocketAddress. + * + * @param address the socket address + * @return a new EventSubscriber + */ + public static EventSubscriber subscribe(SocketAddress address) { + EventSubscriber eventSubscriber = new EventSubscriber(address); + eventSubscriber.start(); + return eventSubscriber; + } + + private class EventSubscriberHandler extends ChannelInboundHandlerAdapter { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + ctx.read(); + + if (!(msg instanceof ITripEventInfo)) { + LOG.error("Unknown message type received {}", msg); + return; + } + + synchronized (queue) { + if (!closed) { + queue.add((ITripEventInfo) msg); + } + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + LOG.error("EventSubscriberHandler caught an exception", cause); + ctx.close(); + close(); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + ctx.pipeline() + .addFirst(new ObjectEncoder()) + .addFirst(new ObjectDecoder(ClassResolvers.cacheDisabled(ClassLoader.getSystemClassLoader()))); + } + + } +} + diff --git a/ass3-event/src/main/java/dst/ass3/event/IEventProcessingEnvironment.java b/ass3-event/src/main/java/dst/ass3/event/IEventProcessingEnvironment.java new file mode 100644 index 0000000..ae4b727 --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/IEventProcessingEnvironment.java @@ -0,0 +1,39 @@ +package dst.ass3.event; + +import dst.ass3.event.model.events.*; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.windowing.time.Time; + +/** + * This class should be used to implement the event processing steps as described in the assignment. The test classes + * will inject SinkFunctions, create a new StreamExecutionEnvironment and then call {@link + * #initialize(StreamExecutionEnvironment)}. + */ +public interface IEventProcessingEnvironment { + + /** + * Initializes the event processing graph on the {@link StreamExecutionEnvironment}. This function is called + * after all sinks have been set. + */ + void initialize(StreamExecutionEnvironment env); + + /** + * Sets the timeout limit of a streaming event. + * + * @param time the timeout limit + */ + void setMatchingDurationTimeout(Time time); + + void setLifecycleEventStreamSink(SinkFunction sink); + + void setMatchingDurationStreamSink(SinkFunction sink); + + void setAverageMatchingDurationStreamSink(SinkFunction sink); + + void setMatchingTimeoutWarningStreamSink(SinkFunction sink); + + void setTripFailedWarningStreamSink(SinkFunction sink); + + void setAlertStreamSink(SinkFunction sink); +} diff --git a/ass3-event/src/main/java/dst/ass3/event/IEventSourceFunction.java b/ass3-event/src/main/java/dst/ass3/event/IEventSourceFunction.java new file mode 100644 index 0000000..49278e7 --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/IEventSourceFunction.java @@ -0,0 +1,25 @@ +package dst.ass3.event; + +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import dst.ass3.event.model.domain.ITripEventInfo; + +/** + * A RichFunction & SourceFunction for ITripEventInfo objects. + */ +public interface IEventSourceFunction extends RichFunction, SourceFunction { + + @Override + void open(Configuration parameters) throws Exception; + + @Override + void close() throws Exception; + + @Override + void run(SourceContext ctx) throws Exception; + + @Override + void cancel(); +} diff --git a/ass3-event/src/main/java/dst/ass3/event/dto/TripEventInfoDTO.java b/ass3-event/src/main/java/dst/ass3/event/dto/TripEventInfoDTO.java new file mode 100644 index 0000000..2c604b5 --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/dto/TripEventInfoDTO.java @@ -0,0 +1,54 @@ +package dst.ass3.event.dto; + +import java.io.Serializable; + +import dst.ass3.event.model.domain.ITripEventInfo; +import dst.ass3.event.model.domain.Region; +import dst.ass3.event.model.domain.TripState; + +public class TripEventInfoDTO implements Serializable, ITripEventInfo { + + private static final long serialVersionUID = 4134104076758220138L; + + private Long tripId; + private Long timestamp; + private TripState state; + private Region region; + + public TripEventInfoDTO(Long tripId, Long timestamp, TripState state, Region region) { + this.tripId = tripId; + this.timestamp = timestamp; + this.state = state; + this.region = region; + } + + @Override + public Long getTripId() { + return tripId; + } + + @Override + public Long getTimestamp() { + return timestamp; + } + + @Override + public TripState getState() { + return state; + } + + @Override + public Region getRegion() { + return region; + } + + @Override + public String toString() { + return "TripEventInfoDTO{" + + "tripId=" + tripId + + ", timestamp=" + timestamp + + ", state=" + state + + ", region=" + region + + '}'; + } +} diff --git a/ass3-event/src/main/java/dst/ass3/event/model/domain/ITripEventInfo.java b/ass3-event/src/main/java/dst/ass3/event/model/domain/ITripEventInfo.java new file mode 100644 index 0000000..0fa9870 --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/model/domain/ITripEventInfo.java @@ -0,0 +1,9 @@ +package dst.ass3.event.model.domain; + +public interface ITripEventInfo { + + Long getTripId(); + Long getTimestamp(); + TripState getState(); + Region getRegion(); +} diff --git a/ass3-event/src/main/java/dst/ass3/event/model/domain/Region.java b/ass3-event/src/main/java/dst/ass3/event/model/domain/Region.java new file mode 100644 index 0000000..d61cce1 --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/model/domain/Region.java @@ -0,0 +1,7 @@ +package dst.ass3.event.model.domain; + +public enum Region { + AT_VIENNA, + AT_LINZ, + DE_BERLIN +} diff --git a/ass3-event/src/main/java/dst/ass3/event/model/domain/TripState.java b/ass3-event/src/main/java/dst/ass3/event/model/domain/TripState.java new file mode 100644 index 0000000..9453273 --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/model/domain/TripState.java @@ -0,0 +1,5 @@ +package dst.ass3.event.model.domain; + +public enum TripState { + CREATED, QUEUED, MATCHED, APPROACHING, IN_PROGRESS, CANCELLED, COMPLETED +} diff --git a/ass3-event/src/main/java/dst/ass3/event/model/events/Alert.java b/ass3-event/src/main/java/dst/ass3/event/model/events/Alert.java new file mode 100644 index 0000000..f4a7f1a --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/model/events/Alert.java @@ -0,0 +1,49 @@ +package dst.ass3.event.model.events; + +import java.io.Serializable; +import java.util.List; + +import dst.ass3.event.model.domain.Region; + +/** + * A system alert that aggregates several warnings. + */ +public class Alert implements Serializable { + + private static final long serialVersionUID = -4561132671849230635L; + + private Region region; + private List warnings; + + public Alert() { + } + + public Alert(Region region, List warnings) { + this.region = region; + this.warnings = warnings; + } + + public Region getRegion() { + return region; + } + + public void setRegion(Region region) { + this.region = region; + } + + public List getWarnings() { + return warnings; + } + + public void setWarnings(List warnings) { + this.warnings = warnings; + } + + @Override + public String toString() { + return "Alert{" + + "region='" + region + '\'' + + ", warnings=" + warnings + + '}'; + } +} diff --git a/ass3-event/src/main/java/dst/ass3/event/model/events/AverageMatchingDuration.java b/ass3-event/src/main/java/dst/ass3/event/model/events/AverageMatchingDuration.java new file mode 100644 index 0000000..9a3f8f7 --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/model/events/AverageMatchingDuration.java @@ -0,0 +1,48 @@ +package dst.ass3.event.model.events; + +import java.io.Serializable; + +import dst.ass3.event.model.domain.Region; + +/** + * The average of several {@link MatchingDuration} values. + */ +public class AverageMatchingDuration implements Serializable { + + private static final long serialVersionUID = -3767582104941550250L; + + private Region region; + private double duration; + + public AverageMatchingDuration() { + } + + public AverageMatchingDuration(Region region, double duration) { + this.region = region; + this.duration = duration; + } + + public Region getRegion() { + return region; + } + + public void setRegion(Region region) { + this.region = region; + } + + public double getDuration() { + return duration; + } + + public void setDuration(double duration) { + this.duration = duration; + } + + @Override + public String toString() { + return "AverageMatchingDuration{" + + "region='" + region + '\'' + + ", duration=" + duration + + '}'; + } +} diff --git a/ass3-event/src/main/java/dst/ass3/event/model/events/LifecycleEvent.java b/ass3-event/src/main/java/dst/ass3/event/model/events/LifecycleEvent.java new file mode 100644 index 0000000..8b2a8fa --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/model/events/LifecycleEvent.java @@ -0,0 +1,84 @@ +package dst.ass3.event.model.events; + +import java.io.Serializable; + +import dst.ass3.event.model.domain.ITripEventInfo; +import dst.ass3.event.model.domain.Region; +import dst.ass3.event.model.domain.TripState; + +/** + * Indicates a change in the lifecycle state of an ITripEventInfo. + */ +public class LifecycleEvent implements Serializable { + + private static final long serialVersionUID = 8665269919851487210L; + + /** + * The id of the trip, as returned by {@link ITripEventInfo#getTripId()}. + */ + private long tripId; + + private TripState state; + private Region region; + + /** + * The instant the event was recorded (unix epoch in milliseconds) + */ + private long timestamp; + + public LifecycleEvent() { + } + + public LifecycleEvent(ITripEventInfo eventInfo) { + this(eventInfo.getTripId(), eventInfo.getState(), eventInfo.getRegion(), eventInfo.getTimestamp()); + } + + public LifecycleEvent(long tripId, TripState state, Region region, long timestamp) { + this.tripId = tripId; + this.state = state; + this.region = region; + this.timestamp = timestamp; + } + + public long getTripId() { + return tripId; + } + + public void setTripId(long tripId) { + this.tripId = tripId; + } + + public TripState getState() { + return state; + } + + public void setState(TripState state) { + this.state = state; + } + + public Region getRegion() { + return region; + } + + public void setRegion(Region region) { + this.region = region; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + @Override + public String toString() { + return "LifecycleEvent{" + + "tripId=" + tripId + + ", state=" + state + + ", region=" + region + + ", timestamp=" + timestamp + + '}'; + } +} diff --git a/ass3-event/src/main/java/dst/ass3/event/model/events/MatchingDuration.java b/ass3-event/src/main/java/dst/ass3/event/model/events/MatchingDuration.java new file mode 100644 index 0000000..9bf82f2 --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/model/events/MatchingDuration.java @@ -0,0 +1,59 @@ +package dst.ass3.event.model.events; + +import java.io.Serializable; + +import dst.ass3.event.model.domain.Region; + +/** + * Indicates the amount of time an ITripEventInfo took to get from CREATED to MATCHED. + */ +public class MatchingDuration implements Serializable { + + private static final long serialVersionUID = -6976972381929291369L; + + private long eventId; + private Region region; + private long duration; + + public MatchingDuration() { + } + + public MatchingDuration(long eventId, Region region, long duration) { + this.eventId = eventId; + this.region = region; + this.duration = duration; + } + + public long getEventId() { + return eventId; + } + + public void setEventId(long eventId) { + this.eventId = eventId; + } + + public Region getRegion() { + return region; + } + + public void setRegion(Region region) { + this.region = region; + } + + public long getDuration() { + return duration; + } + + public void setDuration(long duration) { + this.duration = duration; + } + + @Override + public String toString() { + return "MatchingDuration{" + + "eventId=" + eventId + + ", region='" + region + '\'' + + ", duration=" + duration + + '}'; + } +} diff --git a/ass3-event/src/main/java/dst/ass3/event/model/events/MatchingTimeoutWarning.java b/ass3-event/src/main/java/dst/ass3/event/model/events/MatchingTimeoutWarning.java new file mode 100644 index 0000000..66776d3 --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/model/events/MatchingTimeoutWarning.java @@ -0,0 +1,37 @@ +package dst.ass3.event.model.events; + +import dst.ass3.event.model.domain.Region; + +/** + * Warning that indicates that a matching event has not reached the lifecycle state MATCHED within a given time frame. + */ +public class MatchingTimeoutWarning extends Warning { + + private static final long serialVersionUID = 7955599732178947649L; + + private long tripId; + + public MatchingTimeoutWarning() { + super(null); + } + + public MatchingTimeoutWarning(long tripId, Region region) { + super(region); + this.tripId = tripId; + } + + public long getTripId() { + return tripId; + } + + public void setTripId(long tripId) { + this.tripId = tripId; + } + + @Override + public String toString() { + return "MatchingTimeoutWarning{" + + "tripId=" + tripId + + '}'; + } +} diff --git a/ass3-event/src/main/java/dst/ass3/event/model/events/TripFailedWarning.java b/ass3-event/src/main/java/dst/ass3/event/model/events/TripFailedWarning.java new file mode 100644 index 0000000..1eaffde --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/model/events/TripFailedWarning.java @@ -0,0 +1,37 @@ +package dst.ass3.event.model.events; + +import dst.ass3.event.model.domain.Region; + +/** + * Indicates that matching a trip has probably failed. + */ +public class TripFailedWarning extends Warning { + + private static final long serialVersionUID = -9120187311385112769L; + + private long eventId; + + public TripFailedWarning() { + super(null); + } + + public TripFailedWarning(long eventId, Region region) { + super(region); + this.eventId = eventId; + } + + public long getEventId() { + return eventId; + } + + public void setEventId(long eventId) { + this.eventId = eventId; + } + + @Override + public String toString() { + return "TripFailedWarning{" + + "eventId=" + eventId + + '}'; + } +} diff --git a/ass3-event/src/main/java/dst/ass3/event/model/events/Warning.java b/ass3-event/src/main/java/dst/ass3/event/model/events/Warning.java new file mode 100644 index 0000000..09ffac2 --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/model/events/Warning.java @@ -0,0 +1,37 @@ +package dst.ass3.event.model.events; + +import java.io.Serializable; + +import dst.ass3.event.model.domain.Region; + +/** + * Base class for region warnings. + */ +public abstract class Warning implements Serializable { + + private static final long serialVersionUID = 273266717303711974L; + + private Region region; + + public Warning() { + } + + public Warning(Region region) { + this.region = region; + } + + public Region getRegion() { + return region; + } + + public void setRegion(Region region) { + this.region = region; + } + + @Override + public String toString() { + return "Warning{" + + "region='" + region + '\'' + + '}'; + } +} diff --git a/ass3-event/src/main/resources/executionPlan.json b/ass3-event/src/main/resources/executionPlan.json new file mode 100644 index 0000000..afd691b --- /dev/null +++ b/ass3-event/src/main/resources/executionPlan.json @@ -0,0 +1 @@ +// TODO: add the data from the execution plan export diff --git a/ass3-event/src/main/resources/logback.xml b/ass3-event/src/main/resources/logback.xml new file mode 100644 index 0000000..e423131 --- /dev/null +++ b/ass3-event/src/main/resources/logback.xml @@ -0,0 +1,16 @@ + + + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} - %highlight(%5p) [%12.12thread] %cyan(%-40.40logger{39}): %m%n + + + + + + + + + + diff --git a/ass3-event/src/test/java/dst/ass3/event/Ass3EventTestBase.java b/ass3-event/src/test/java/dst/ass3/event/Ass3EventTestBase.java new file mode 100644 index 0000000..87722ab --- /dev/null +++ b/ass3-event/src/test/java/dst/ass3/event/Ass3EventTestBase.java @@ -0,0 +1,69 @@ +package dst.ass3.event; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.MoreExecutors; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.junit.After; +import org.junit.Before; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Ass3EventTestBase. + */ +public abstract class Ass3EventTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(Ass3EventTestBase.class); + + protected EventPublisher publisher; + protected StreamExecutionEnvironment flink; + protected ExecutorService executor; + + private static EventPublisher previousPublisher; + + @Before + public void setUpResources() throws Exception { + executor = Executors.newCachedThreadPool(); + + if (previousPublisher != null) { + previousPublisher.close(); + } + + publisher = createEventPublisher(); + previousPublisher = publisher; + publisher.start(); + + flink = createStreamExecutionEnvironment(); + } + + @After + public void tearDownResources() throws Exception { + publisher.close(); + MoreExecutors.shutdownAndAwaitTermination(executor, 5, TimeUnit.SECONDS); + previousPublisher = null; + } + + protected EventPublisher createEventPublisher() { + return new EventPublisher(Constants.EVENT_PUBLISHER_PORT); + } + + protected StreamExecutionEnvironment createStreamExecutionEnvironment() { + return StreamExecutionEnvironment.createLocalEnvironment(1); + } + + protected static void sleep(long ms) { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + // ignore + } + } + + protected static long now() { + return System.currentTimeMillis(); + } + +} diff --git a/ass3-event/src/test/java/dst/ass3/event/Ass3EventTestSuite.java b/ass3-event/src/test/java/dst/ass3/event/Ass3EventTestSuite.java new file mode 100644 index 0000000..21d8fe7 --- /dev/null +++ b/ass3-event/src/test/java/dst/ass3/event/Ass3EventTestSuite.java @@ -0,0 +1,23 @@ +package dst.ass3.event; + +import dst.ass3.event.tests.Ass3_3_2Test; +import dst.ass3.event.tests.Ass3_3_3Test; +import dst.ass3.event.tests.Ass3_3_4Test; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; +import org.junit.runners.Suite.SuiteClasses; + +import dst.ass3.event.tests.Ass3_3_1Test; + +/** + * Ass3EventTestSuite. + */ +@RunWith(Suite.class) +@SuiteClasses({ + Ass3_3_1Test.class, + Ass3_3_2Test.class, + Ass3_3_3Test.class, + Ass3_3_4Test.class +}) +public class Ass3EventTestSuite { +} diff --git a/ass3-event/src/test/java/dst/ass3/event/EventProcessingTestBase.java b/ass3-event/src/test/java/dst/ass3/event/EventProcessingTestBase.java new file mode 100644 index 0000000..4873649 --- /dev/null +++ b/ass3-event/src/test/java/dst/ass3/event/EventProcessingTestBase.java @@ -0,0 +1,98 @@ +package dst.ass3.event; + +import static org.junit.Assert.assertNotNull; + +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import dst.ass3.event.model.events.*; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * EventProcessingTestBase. + */ +public class EventProcessingTestBase extends Ass3EventTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(EventProcessingTestBase.class); + + @Rule + public Timeout timeout = new Timeout(15, TimeUnit.SECONDS); + + private IEventProcessingEnvironment epe; + + protected StaticQueueSink lifecycleEvents; + protected StaticQueueSink matchingDurations; + protected StaticQueueSink averageMatchingDurations; + protected StaticQueueSink matchingTimeoutWarnings; + protected StaticQueueSink tripFailedWarnings; + protected StaticQueueSink alerts; + + @Before + public void setUpEnvironment() throws Exception { + epe = EventProcessingFactory.createEventProcessingEnvironment(); + + assertNotNull("#createEventProcessingEnvironment() not implemented", epe); + + lifecycleEvents = new StaticQueueSink<>("lifecycleEvents"); + matchingDurations = new StaticQueueSink<>("matchingDurations"); + averageMatchingDurations = new StaticQueueSink<>("averageMatchingDurations"); + matchingTimeoutWarnings = new StaticQueueSink<>("matchingTimeoutWarnings"); + tripFailedWarnings = new StaticQueueSink<>("tripFailedWarnings"); + alerts = new StaticQueueSink<>("alerts"); + + epe.setLifecycleEventStreamSink(lifecycleEvents); + epe.setMatchingDurationStreamSink(matchingDurations); + epe.setAverageMatchingDurationStreamSink(averageMatchingDurations); + epe.setMatchingTimeoutWarningStreamSink(matchingTimeoutWarnings); + epe.setTripFailedWarningStreamSink(tripFailedWarnings); + epe.setAlertStreamSink(alerts); + epe.setMatchingDurationTimeout(Time.seconds(15)); + } + + public JobExecutionResult initAndExecute() throws Exception { + return initAndExecute(null); + } + + public JobExecutionResult initAndExecute(Consumer initializer) throws Exception { + try { + if (initializer != null) { + initializer.accept(epe); + } + LOG.info("Initializing StreamExecutionEnvironment with {}", epe); + epe.initialize(flink); + } catch (Exception e) { + LOG.error("Error while initializing StreamExecutionEnvironment", e); + throw e; + } + + try { + LOG.info("Executing flink {}", flink); + return flink.execute(); + } catch (Exception e) { + LOG.error("Error while executing flink", e); + throw e; + } + } + + public Future initAndExecuteAsync(Consumer initializer) { + return executor.submit(() -> initAndExecute(initializer)); + } + + public Future initAndExecuteAsync() { + return executor.submit(() -> initAndExecute()); + } + + @After + public void tearDownCollectors() throws Exception { + StaticQueueSink.clearAll(); + } + +} diff --git a/ass3-event/src/test/java/dst/ass3/event/StaticQueueSink.java b/ass3-event/src/test/java/dst/ass3/event/StaticQueueSink.java new file mode 100644 index 0000000..805cd8f --- /dev/null +++ b/ass3-event/src/test/java/dst/ass3/event/StaticQueueSink.java @@ -0,0 +1,83 @@ +package dst.ass3.event; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.flink.streaming.api.functions.sink.SinkFunction; + +/** + * A SinkFunction that collects objects into a queue located in a shared global state. Each collector accesses a + * specific key in the shared state. + * + * @param the sink input type + */ +public class StaticQueueSink implements SinkFunction { + + private static final long serialVersionUID = -3965500756295835669L; + + private static Map> state = new ConcurrentHashMap<>(); + + private String key; + + public StaticQueueSink(String key) { + this.key = key; + } + + @Override + public void invoke(T value, Context context) throws Exception { + get().add(value); + } + + public void clear() { + get().clear(); + } + + public List take(int n) { + List list = new ArrayList<>(n); + + for (int i = 0; i < n; i++) { + try { + list.add(get().take()); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted while accessing queue", e); + } + } + + return list; + } + + public T take() { + try { + return get().take(); + } catch (InterruptedException e) { + return null; + } + } + + public T poll(long ms) { + try { + return get().poll(ms, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + return null; + } + } + + public synchronized BlockingQueue get() { + return get(key); + } + + @SuppressWarnings("unchecked") + private static BlockingQueue get(String key) { + return (BlockingQueue) state.computeIfAbsent(key, k -> new LinkedBlockingQueue<>()); + } + + public static void clearAll() { + state.clear(); + } +} diff --git a/ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_1Test.java b/ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_1Test.java new file mode 100644 index 0000000..4335481 --- /dev/null +++ b/ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_1Test.java @@ -0,0 +1,152 @@ +package dst.ass3.event.tests; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +import java.io.ByteArrayOutputStream; +import java.io.NotSerializableException; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import dst.ass3.event.dto.TripEventInfoDTO; +import dst.ass3.event.model.domain.ITripEventInfo; +import dst.ass3.event.model.domain.Region; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import dst.ass3.event.Ass3EventTestBase; +import dst.ass3.event.EventProcessingFactory; +import dst.ass3.event.IEventSourceFunction; +import dst.ass3.event.model.domain.TripState; + +public class Ass3_3_1Test extends Ass3EventTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(Ass3_3_1Test.class); + + @Rule + public Timeout timeout = new Timeout(15, TimeUnit.SECONDS); + + private IEventSourceFunction sourceFunction; + + @Before + public void setUp() throws Exception { + sourceFunction = EventProcessingFactory.createEventSourceFunction(); + assertNotNull("EventProcessingFactory#createEventSourceFunction() not implemented", sourceFunction); + } + + @Test + public void open_shouldConnectToSubscriber() throws Exception { + assertEquals( + "IEventSourceFunction should not be connected upon construction", + 0, publisher.getConnectedClientCount() + ); + + sourceFunction.open(new Configuration()); + publisher.waitForClients(); + + assertEquals( + "Expected IEventSourceFunction to connect to publisher after open is called", + 1, publisher.getConnectedClientCount() + ); + } + + @Test + public void run_shouldCollectPublishedEvents() throws Exception { + sourceFunction.open(new Configuration()); + publisher.waitForClients(); + + Future> result = executor.submit(() -> { + MockContext ctx = new MockContext<>(); + LOG.info("Running IEventSourceFunction with MockContext"); + sourceFunction.run(ctx); + LOG.info("Done running IEventSourceFunction, returning collected events"); + return ctx.collected; + }); + + publisher.publish(new TripEventInfoDTO(1L, 0L, TripState.CREATED, Region.AT_VIENNA)); + publisher.publish(new TripEventInfoDTO(2L, 0L, TripState.CREATED, Region.DE_BERLIN)); + + sleep(1000); + + LOG.info("Calling cancel on SourceFunction"); + sourceFunction.cancel(); + + LOG.info("Dropping subscriber connections"); + publisher.dropClients(); + + LOG.info("Calling close on SourceFunction"); + sourceFunction.close(); + + List collected = result.get(); + assertEquals(2, collected.size()); + + ITripEventInfo e0 = collected.get(0); + ITripEventInfo e1 = collected.get(1); + + assertEquals(1L, e0.getTripId(), 0); + assertEquals(2L, e1.getTripId(), 0); + } + + @Test + public void shouldBeSerializable() throws Exception { + try (ObjectOutputStream out = new ObjectOutputStream(new ByteArrayOutputStream())) { + out.writeObject(sourceFunction); + out.flush(); + } catch (NotSerializableException e) { + fail("Implementation of IEventSourceFunction is not serializable"); + } + } + + private static class MockContext implements SourceFunction.SourceContext { + + private final Object checkpointLock = new Object(); + + private List collected = new ArrayList<>(); + + public List getCollected() { + return collected; + } + + @Override + public void collect(T element) { + collected.add(element); + } + + @Override + public void collectWithTimestamp(T element, long timestamp) { + collected.add(element); + } + + @Override + public void emitWatermark(Watermark mark) { + + } + + @Override + public void markAsTemporarilyIdle() { + + } + + @Override + public Object getCheckpointLock() { + return checkpointLock; + } + + @Override + public void close() { + + } + } + +} diff --git a/ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_2Test.java b/ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_2Test.java new file mode 100644 index 0000000..faa0b44 --- /dev/null +++ b/ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_2Test.java @@ -0,0 +1,89 @@ +package dst.ass3.event.tests; + +import dst.ass3.event.EventProcessingTestBase; +import dst.ass3.event.dto.TripEventInfoDTO; +import dst.ass3.event.model.domain.Region; +import dst.ass3.event.model.events.LifecycleEvent; +import org.apache.flink.api.common.JobExecutionResult; +import org.hamcrest.Matchers; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Future; + +import static dst.ass3.event.model.domain.TripState.CREATED; +import static dst.ass3.event.model.domain.TripState.MATCHED; +import static dst.ass3.event.model.domain.TripState.QUEUED; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.hamcrest.MatcherAssert.assertThat; + +public class Ass3_3_2Test extends EventProcessingTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(Ass3_3_2Test.class); + + @Test + public void lifecycleEventStream_worksCorrectly() throws Exception { + // run flink in new thread + Future flinkExecution = initAndExecuteAsync(); + + // wait until a subscriber connects + LOG.info("Waiting for subscribers to connect"); + publisher.waitForClients(); + + LifecycleEvent event; + + long then = System.currentTimeMillis(); + + // causes LifecycleEvent 0 + LOG.info("Publishing e0"); + publisher.publish(new TripEventInfoDTO(0L, then, CREATED, Region.AT_VIENNA)); + + LOG.info("Collecting LifecycleEvent for e0"); + event = lifecycleEvents.take(); + LOG.info("Round-trip took {}ms", System.currentTimeMillis() - then); + + assertEquals("Event ID not set correctly", 0L, event.getTripId()); + assertEquals("State not set correctly", CREATED, event.getState()); + assertEquals("Region not set correctly", Region.AT_VIENNA, event.getRegion()); + assertThat("Timestamp not set correctly", event.getTimestamp(), Matchers.greaterThanOrEqualTo(then)); + assertThat("Timestamp not set correctly", event.getTimestamp(), + Matchers.lessThanOrEqualTo(System.currentTimeMillis())); + + // should be filtered + LOG.info("Publishing e1, should be filtered"); + publisher.publish(new TripEventInfoDTO(1L, now(), CREATED, null)); + + assertNull("Events without a region should be filtered", lifecycleEvents.poll(500)); + + // causes LifecycleEvent 1 + LOG.info("Publishing e2"); + publisher.publish(new TripEventInfoDTO(2L, now(), QUEUED, Region.DE_BERLIN)); + + LOG.info("Collecting LifecycleEvent for e2"); + event = lifecycleEvents.take(); + assertEquals(2L, event.getTripId()); + assertEquals(QUEUED, event.getState()); + + // should be filtered + LOG.info("Publishing e3, should be filtered"); + publisher.publish(new TripEventInfoDTO(3L, now(), CREATED, null)); + assertNull("Events without a region should be filtered", lifecycleEvents.poll(500)); + + // causes LifecycleEvent 2 + LOG.info("Publishing e4"); + publisher.publish(new TripEventInfoDTO(4L, now(), MATCHED, Region.AT_LINZ)); + + LOG.info("Collecting LifecycleEvent for e4"); + event = lifecycleEvents.take(); + assertEquals(4L, event.getTripId()); + assertEquals(MATCHED, event.getState()); + + // disconnect subscribers + publisher.dropClients(); + + // wait for execution to end + flinkExecution.get(); + } +} diff --git a/ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_3Test.java b/ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_3Test.java new file mode 100644 index 0000000..cf1a17b --- /dev/null +++ b/ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_3Test.java @@ -0,0 +1,211 @@ +package dst.ass3.event.tests; + +import dst.ass3.event.EventProcessingTestBase; +import dst.ass3.event.dto.TripEventInfoDTO; +import dst.ass3.event.model.domain.Region; +import dst.ass3.event.model.events.LifecycleEvent; +import dst.ass3.event.model.events.MatchingDuration; +import dst.ass3.event.model.events.MatchingTimeoutWarning; +import dst.ass3.event.model.events.TripFailedWarning; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Future; + +import static dst.ass3.event.model.domain.TripState.*; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.*; + +public class Ass3_3_3Test extends EventProcessingTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(Ass3_3_3Test.class); + + @Test + public void matchingDurations_areCalculatedCorrectly() throws Exception { + // tests whether duration calculation and stream keying works correctly + // expects LifecycleEvent stream to work correctly + + Future flinkExecution = initAndExecuteAsync(); + + LOG.info("Waiting for subscribers to connect"); + publisher.waitForClients(); + + LifecycleEvent e1Start; + LifecycleEvent e2Start; + LifecycleEvent e1End; + LifecycleEvent e2End; + + publisher.publish(5, t -> new TripEventInfoDTO(1L, t, CREATED, Region.AT_VIENNA)); // 1 starts before 2 + publisher.publish(5, t -> new TripEventInfoDTO(2L, t, CREATED, Region.AT_VIENNA)); + publisher.publish(5, t -> new TripEventInfoDTO(3L, t, CREATED, Region.AT_VIENNA)); // 3 never finishes + + LOG.info("Waiting for LifecycleEvent for event 1 being CREATED"); + e1Start = lifecycleEvents.take(); + LOG.info("Waiting for LifecycleEvent for event 2 being CREATED"); + e2Start = lifecycleEvents.take(); + LOG.info("Waiting for LifecycleEvent for event 3 being CREATED"); + lifecycleEvents.take(); + + sleep(500); + publisher.publish(5, t -> new TripEventInfoDTO(1L, t, QUEUED, Region.AT_VIENNA)); + publisher.publish(5, t -> new TripEventInfoDTO(2L, t, QUEUED, Region.AT_VIENNA)); + + LOG.info("Waiting for LifecycleEvent for event 1 being QUEUED"); + lifecycleEvents.take(); + LOG.info("Waiting for LifecycleEvent for event 2 being QUEUED"); + lifecycleEvents.take(); + + sleep(500); // event 2 took about ~1000ms + publisher.publish(new TripEventInfoDTO(2L, now(), MATCHED, Region.AT_VIENNA)); // 2 finishes before 1 + + LOG.info("Waiting for LifecycleEvent for event 2 being MATCHED"); + e2End = lifecycleEvents.take(); + + sleep(500); // event 1 took about ~1500ms + publisher.publish(new TripEventInfoDTO(1L, now(), MATCHED, Region.AT_VIENNA)); + + LOG.info("Waiting for LifecycleEvent for event 1 being MATCHED"); + e1End = lifecycleEvents.take(); + + LOG.info("Collecting MatchingDuration event for event 2"); + MatchingDuration d0 = matchingDurations.take(); + + LOG.info("Collecting MatchingDuration event for event 1"); + MatchingDuration d1 = matchingDurations.take(); + + assertEquals("Expected event 2 to finish first", 2L, d0.getEventId()); // event 2 finished before 1 + assertEquals("Expected event 1 to finish last", 1L, d1.getEventId()); + + assertThat("Expected MatchingDuration to be >= 0", d0.getDuration(), greaterThan(0L)); + assertThat("Expected MatchingDuration to be >= 0", d1.getDuration(), greaterThan(0L)); + + assertEquals("MatchingDuration was not calculated from LifecycleEvents correctly", + e2End.getTimestamp() - e2Start.getTimestamp(), d0.getDuration(), 100); + + assertEquals("MatchingDuration was not calculated from LifecycleEvents correctly", + e1End.getTimestamp() - e1Start.getTimestamp(), d1.getDuration(), 100); + + publisher.dropClients(); + flinkExecution.get(); + } + + @Test + public void durationsWithInterleavedEvents_areCalculatedCorrectly() throws Exception { + // tests whether CEP rule is tolerant towards multiple state changes between QUEUED and MATCHED + + Future flinkExecution = initAndExecuteAsync(e -> + e.setMatchingDurationTimeout(Time.seconds(1)) + ); + + LOG.info("Waiting for subscribers to connect"); + publisher.waitForClients(); + + publisher.publish(5, t -> new TripEventInfoDTO(1L, t, CREATED, Region.AT_VIENNA)); + publisher.publish(5, t -> new TripEventInfoDTO(1L, t, QUEUED, Region.AT_VIENNA)); + publisher.publish(5, t -> new TripEventInfoDTO(2L, t, CREATED, Region.AT_VIENNA)); // never finishes + + // change state several times (tests another aspect of the CEP rule) + publisher.publish(5, t -> new TripEventInfoDTO(1L, t, MATCHED, Region.AT_VIENNA)); + publisher.publish(5, t -> new TripEventInfoDTO(1L, t, QUEUED, Region.AT_VIENNA)); + publisher.publish(5, t -> new TripEventInfoDTO(1L, t, MATCHED, Region.AT_VIENNA)); + publisher.publish(5, t -> new TripEventInfoDTO(1L, t, QUEUED, Region.AT_VIENNA)); + + publisher.publish(5, t -> new TripEventInfoDTO(1L, t, MATCHED, Region.AT_VIENNA)); + + publisher.dropClients(); + flinkExecution.get(); + + List result = new ArrayList<>(matchingDurations.get()); + assertEquals("Expected one event to have finished", 1, result.size()); + + MatchingDuration d0 = result.get(0); + assertEquals("Expected event 1 to have finished", 1L, d0.getEventId()); + } + + @Test + public void timeoutWarnings_areEmittedCorrectly() throws Exception { + Future flinkExecution = initAndExecuteAsync(e -> { + e.setMatchingDurationTimeout(Time.seconds(1)); + }); + + LOG.info("Waiting for subscribers to connect"); + publisher.waitForClients(); + + publisher.publish(new TripEventInfoDTO(1L, now(), CREATED, Region.AT_VIENNA)); // never finishes + + sleep(100); + publisher.publish(new TripEventInfoDTO(2L, now(), CREATED, Region.AT_VIENNA)); // never finishes + + // confounding event + sleep(100); + publisher.publish(new TripEventInfoDTO(1L, now(), QUEUED, Region.AT_VIENNA)); + + sleep(1500); // wait for event to time out + + publisher.dropClients(); + + LOG.info("Waiting for Flink execution to end"); + flinkExecution.get(); + + LOG.info("Collecting timeout warning for event 1"); + MatchingTimeoutWarning w1 = matchingTimeoutWarnings.take(); + + LOG.info("Collecting timeout warning for event 2"); + MatchingTimeoutWarning w2 = matchingTimeoutWarnings.take(); + + assertEquals("Expected event 1 to time out first", 1L, w1.getTripId()); + assertEquals("Expected event 2 to time out second", 2L, w2.getTripId()); + } + + @Test + public void tripFailedWarnings_areEmittedCorrectly() throws Exception { + Future flinkExecution = initAndExecuteAsync(); + + LOG.info("Waiting for subscribers to connect"); + publisher.waitForClients(); + + publisher.publish(0, t -> new TripEventInfoDTO(1L, t, CREATED, Region.AT_VIENNA)); + publisher.publish(0, t -> new TripEventInfoDTO(1L, t, QUEUED, Region.AT_VIENNA)); + publisher.publish(0, t -> new TripEventInfoDTO(2L, t, CREATED, Region.AT_VIENNA)); + publisher.publish(0, t -> new TripEventInfoDTO(2L, t, QUEUED, Region.AT_VIENNA)); + + // event 1 fail #1 + publisher.publish(5, t -> new TripEventInfoDTO(1L, t, MATCHED, Region.AT_VIENNA)); + publisher.publish(5, t -> new TripEventInfoDTO(1L, t, QUEUED, Region.AT_VIENNA)); + + // event 2 fail #1 + publisher.publish(5, t -> new TripEventInfoDTO(2L, t, MATCHED, Region.AT_VIENNA)); + publisher.publish(5, t -> new TripEventInfoDTO(2L, t, QUEUED, Region.AT_VIENNA)); + + // event 1 fail #2 + publisher.publish(5, t -> new TripEventInfoDTO(1L, t, MATCHED, Region.AT_VIENNA)); + publisher.publish(5, t -> new TripEventInfoDTO(1L, t, QUEUED, Region.AT_VIENNA)); + + // event 2 fail #2 and then success + publisher.publish(5, t -> new TripEventInfoDTO(2L, t, MATCHED, Region.AT_VIENNA)); + publisher.publish(5, t -> new TripEventInfoDTO(2L, t, QUEUED, Region.AT_VIENNA)); + publisher.publish(5, t -> new TripEventInfoDTO(2L, t, MATCHED, Region.AT_VIENNA)); + + LOG.info("Checking that no TripFailedWarning was issued yet"); + assertNull(tripFailedWarnings.poll(500)); + + LOG.info("Triggering third failure"); + // event 1 fail #3 + publisher.publish(5, t -> new TripEventInfoDTO(1L, t, MATCHED, Region.AT_VIENNA)); + publisher.publish(5, t -> new TripEventInfoDTO(1L, t, QUEUED, Region.AT_VIENNA)); + + LOG.info("Waiting for TripFailedWarning for event 1"); + TripFailedWarning warning = tripFailedWarnings.take(); + assertEquals(1L, warning.getEventId()); + assertEquals(Region.AT_VIENNA, warning.getRegion()); + + publisher.dropClients(); + flinkExecution.get(); + } +} diff --git a/ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_4Test.java b/ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_4Test.java new file mode 100644 index 0000000..6a99d1e --- /dev/null +++ b/ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_4Test.java @@ -0,0 +1,308 @@ +package dst.ass3.event.tests; + +import dst.ass3.event.EventProcessingTestBase; +import dst.ass3.event.dto.TripEventInfoDTO; +import dst.ass3.event.model.domain.Region; +import dst.ass3.event.model.events.*; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Future; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +import static dst.ass3.event.model.domain.TripState.*; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.*; + +public class Ass3_3_4Test extends EventProcessingTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(Ass3_3_4Test.class); + + @Test + public void multipleTripFailures_triggerAlert() throws Exception { + // checks that the window works correctly + // expects TripFailedWarning stream to work correctly + + Future flinkExecution = initAndExecuteAsync(); + + LOG.info("Waiting for subscribers to connect"); + publisher.waitForClients(); + + Consumer causeWarning = (eventId) -> { + publisher.publish(5, t -> new TripEventInfoDTO(eventId, t, MATCHED, Region.AT_VIENNA)); + publisher.publish(5, t -> new TripEventInfoDTO(eventId, t, QUEUED, Region.AT_VIENNA)); + publisher.publish(5, t -> new TripEventInfoDTO(eventId, t, MATCHED, Region.AT_VIENNA)); + publisher.publish(5, t -> new TripEventInfoDTO(eventId, t, QUEUED, Region.AT_VIENNA)); + publisher.publish(5, t -> new TripEventInfoDTO(eventId, t, MATCHED, Region.AT_VIENNA)); + publisher.publish(5, t -> new TripEventInfoDTO(eventId, t, QUEUED, Region.AT_VIENNA)); + }; + + // all of these events will fail + publisher.publish(new TripEventInfoDTO(1L, now(), CREATED, Region.AT_VIENNA)); + publisher.publish(new TripEventInfoDTO(1L, now(), QUEUED, Region.AT_VIENNA)); + publisher.publish(new TripEventInfoDTO(2L, now(), CREATED, Region.AT_VIENNA)); + publisher.publish(new TripEventInfoDTO(2L, now(), QUEUED, Region.AT_VIENNA)); + publisher.publish(new TripEventInfoDTO(3L, now(), CREATED, Region.AT_VIENNA)); + publisher.publish(new TripEventInfoDTO(3L, now(), QUEUED, Region.AT_VIENNA)); + publisher.publish(new TripEventInfoDTO(4L, now(), CREATED, Region.AT_VIENNA)); + publisher.publish(new TripEventInfoDTO(4L, now(), QUEUED, Region.AT_VIENNA)); + publisher.publish(new TripEventInfoDTO(5L, now(), CREATED, Region.AT_VIENNA)); + publisher.publish(new TripEventInfoDTO(5L, now(), QUEUED, Region.AT_VIENNA)); + publisher.publish(new TripEventInfoDTO(6L, now(), CREATED, Region.AT_VIENNA)); + publisher.publish(new TripEventInfoDTO(6L, now(), QUEUED, Region.AT_VIENNA)); + + // warning 3 warnings + causeWarning.accept(1L); + causeWarning.accept(2L); + causeWarning.accept(3L); + + LOG.info("Collecting alert for first three warnings"); + assertNotNull(alerts.take()); + + LOG.info("Checking that the fourth warning does not trigger an alert"); + causeWarning.accept(4L); + assertNull(alerts.poll(500)); + + LOG.info("Checking that the fifth warning does not trigger an alert"); + causeWarning.accept(5L); + assertNull(alerts.poll(500)); + + LOG.info("Checking that the sixth warning triggered an alert"); + causeWarning.accept(6L); + assertNotNull(alerts.take()); + + publisher.dropClients(); + flinkExecution.get(); + } + + @Test + public void matchingFailuresAndTimeouts_triggerAlert() throws Exception { + // checks whether keying works correctly, and whether Warning streams are unioned correctly + // expects both warning streams to work correctly + + Future flinkExecution = initAndExecuteAsync(e -> + e.setMatchingDurationTimeout(Time.seconds(3)) + ); + + LOG.info("Waiting for subscribers to connect"); + publisher.waitForClients(); + + BiConsumer causeWarning = (eventId, region) -> { + publisher.publish(5, t -> new TripEventInfoDTO(eventId, t, MATCHED, region)); + publisher.publish(5, t -> new TripEventInfoDTO(eventId, t, QUEUED, region)); + publisher.publish(5, t -> new TripEventInfoDTO(eventId, t, MATCHED, region)); + publisher.publish(5, t -> new TripEventInfoDTO(eventId, t, QUEUED, region)); + publisher.publish(5, t -> new TripEventInfoDTO(eventId, t, MATCHED, region)); + publisher.publish(5, t -> new TripEventInfoDTO(eventId, t, QUEUED, region)); + }; + + publisher.publish(new TripEventInfoDTO(1L, now(), CREATED, Region.AT_VIENNA)); // vienna e1 will fail + publisher.publish(new TripEventInfoDTO(1L, now(), QUEUED, Region.AT_VIENNA)); // vienna e1 will fail + publisher.publish(new TripEventInfoDTO(2L, now(), CREATED, Region.AT_VIENNA)); // vienna e2 will fail + publisher.publish(new TripEventInfoDTO(2L, now(), QUEUED, Region.AT_VIENNA)); // vienna e2 will fail + publisher.publish(new TripEventInfoDTO(3L, now(), CREATED, Region.AT_VIENNA)); // vienna e3 will time out + publisher.publish(new TripEventInfoDTO(3L, now(), QUEUED, Region.AT_VIENNA)); // vienna e3 will time out + publisher.publish(new TripEventInfoDTO(4L, now(), CREATED, Region.DE_BERLIN)); // berlin e4 will fail + publisher.publish(new TripEventInfoDTO(4L, now(), QUEUED, Region.DE_BERLIN)); // berlin e4 will fail + + // s1 warning #1 + causeWarning.accept(1L, Region.AT_VIENNA); + + // s1 warning #2 + causeWarning.accept(2L, Region.AT_VIENNA); + + // s2 warning #1 + causeWarning.accept(4L, Region.DE_BERLIN); + + LOG.info("Checking that no alert has been issued yet"); + assertNull(alerts.poll(500)); + + // make sure the other events don't time out + publisher.publish(new TripEventInfoDTO(1L, now(), MATCHED, Region.AT_VIENNA)); // vienna e1 will fail + publisher.publish(new TripEventInfoDTO(2L, now(), MATCHED, Region.AT_VIENNA)); // vienna e2 will fail + publisher.publish(new TripEventInfoDTO(4L, now(), MATCHED, Region.DE_BERLIN)); // berlin e4 will fail + + sleep(4000); // waiting for e3 to time out + + publisher.dropClients(); + flinkExecution.get(); + + LOG.info("Collecting Alert event"); + Alert alert = alerts.take(); + assertEquals("Expected only a single alert", 0, alerts.get().size()); + + assertEquals(Region.AT_VIENNA, alert.getRegion()); + assertEquals("An alert should comprise three warnings", 3, alert.getWarnings().size()); + + Warning w0 = alert.getWarnings().get(0); + Warning w1 = alert.getWarnings().get(1); + Warning w2 = alert.getWarnings().get(2); + + assertThat(w0, instanceOf(TripFailedWarning.class)); + assertThat(w1, instanceOf(TripFailedWarning.class)); + assertThat(w2, instanceOf(MatchingTimeoutWarning.class)); + + assertEquals(Region.AT_VIENNA, w0.getRegion()); + assertEquals(Region.AT_VIENNA, w1.getRegion()); + assertEquals(Region.AT_VIENNA, w2.getRegion()); + } + + @Test + public void averageMatchingDurationWindow_worksCorrectly() throws Exception { + // makes sure the event is triggered at the correct instant + + Future flinkExecution = initAndExecuteAsync(); + + LOG.info("Waiting for subscribers to connect"); + publisher.waitForClients(); + + sleep(250); + publisher.publish(new TripEventInfoDTO(1L, now(), CREATED, Region.AT_VIENNA)); + publisher.publish(new TripEventInfoDTO(2L, now(), CREATED, Region.AT_VIENNA)); + publisher.publish(new TripEventInfoDTO(3L, now(), CREATED, Region.AT_VIENNA)); + publisher.publish(new TripEventInfoDTO(4L, now(), CREATED, Region.AT_VIENNA)); + publisher.publish(new TripEventInfoDTO(5L, now(), CREATED, Region.AT_VIENNA)); + publisher.publish(new TripEventInfoDTO(6L, now(), CREATED, Region.AT_VIENNA)); + publisher.publish(new TripEventInfoDTO(7L, now(), CREATED, Region.DE_BERLIN)); + + sleep(100); + publisher.publish(new TripEventInfoDTO(1L, now(), QUEUED, Region.AT_VIENNA)); + publisher.publish(new TripEventInfoDTO(2L, now(), QUEUED, Region.AT_VIENNA)); + publisher.publish(new TripEventInfoDTO(3L, now(), QUEUED, Region.AT_VIENNA)); + publisher.publish(new TripEventInfoDTO(4L, now(), QUEUED, Region.AT_VIENNA)); + publisher.publish(new TripEventInfoDTO(5L, now(), QUEUED, Region.AT_VIENNA)); + publisher.publish(new TripEventInfoDTO(6L, now(), QUEUED, Region.AT_VIENNA)); + publisher.publish(new TripEventInfoDTO(7L, now(), QUEUED, Region.DE_BERLIN)); + + sleep(100); + publisher.publish(new TripEventInfoDTO(1L, now(), MATCHED, Region.AT_VIENNA)); + publisher.publish(new TripEventInfoDTO(2L, now(), MATCHED, Region.AT_VIENNA)); + publisher.publish(new TripEventInfoDTO(3L, now(), MATCHED, Region.AT_VIENNA)); + publisher.publish(new TripEventInfoDTO(4L, now(), MATCHED, Region.AT_VIENNA)); + + // the first four events should not trigger anything + LOG.info("Checking AverageMatchingDuration events after the first four MATCHED events of AT_VIENNA"); + assertNull(averageMatchingDurations.poll(500)); + + // this event is from a different region + publisher.publish(new TripEventInfoDTO(7L, now(), MATCHED, Region.DE_BERLIN)); + LOG.info("Checking AverageMatchingDuration events after the first MATCHED event of DE_BERLIN"); + assertNull(averageMatchingDurations.poll(500)); + + // fifth event in s1 triggers the window operation + publisher.publish(new TripEventInfoDTO(5L, now(), MATCHED, Region.AT_VIENNA)); + LOG.info("Collecting AverageMatchingDuration event for AT_VIENNA"); + AverageMatchingDuration event = averageMatchingDurations.take(); + assertNotNull(event); + assertEquals(Region.AT_VIENNA, event.getRegion()); + + // should be in a new window and therefore not trigger + publisher.publish(new TripEventInfoDTO(6L, now(), MATCHED, Region.AT_VIENNA)); + LOG.info("Checking AverageMatchingDuration events after the sixth MATCHED event of AT_VIENNA"); + assertNull(averageMatchingDurations.poll(500)); + + publisher.dropClients(); + + flinkExecution.get(); + } + + @Test + public void averageMatchingDurations_areCalculatedCorrectly() throws Exception { + // makes sure the keying works properly and that the calculation is done from MatchingDuration events + // requires MatchingDuration events to be calculated correctly + + Future flinkExecution = initAndExecuteAsync(); + + LOG.info("Waiting for subscribers to connect"); + publisher.waitForClients(); + + List viennaDurations = new ArrayList<>(5); + List berlinDurations = new ArrayList<>(5); + + sleep(250); + publisher.publish(5, t -> new TripEventInfoDTO(1L, t, CREATED, Region.AT_VIENNA)); + publisher.publish(5, t -> new TripEventInfoDTO(2L, t, CREATED, Region.AT_VIENNA)); + publisher.publish(5, t -> new TripEventInfoDTO(3L, t, CREATED, Region.AT_VIENNA)); + publisher.publish(5, t -> new TripEventInfoDTO(4L, t, CREATED, Region.AT_VIENNA)); + publisher.publish(5, t -> new TripEventInfoDTO(5L, t, CREATED, Region.AT_VIENNA)); + + sleep(250); + publisher.publish(5, t -> new TripEventInfoDTO(6L, t, CREATED, Region.DE_BERLIN)); + publisher.publish(5, t -> new TripEventInfoDTO(7L, t, CREATED, Region.DE_BERLIN)); + publisher.publish(5, t -> new TripEventInfoDTO(8L, t, CREATED, Region.DE_BERLIN)); + publisher.publish(5, t -> new TripEventInfoDTO(9L, t, CREATED, Region.DE_BERLIN)); + publisher.publish(5, t -> new TripEventInfoDTO(10L, t, CREATED, Region.DE_BERLIN)); + + sleep(125); + publisher.publish(5, t -> new TripEventInfoDTO(6L, t, QUEUED, Region.DE_BERLIN)); + publisher.publish(5, t -> new TripEventInfoDTO(7L, t, QUEUED, Region.DE_BERLIN)); + publisher.publish(5, t -> new TripEventInfoDTO(8L, t, QUEUED, Region.DE_BERLIN)); + publisher.publish(5, t -> new TripEventInfoDTO(9L, t, QUEUED, Region.DE_BERLIN)); + publisher.publish(5, t -> new TripEventInfoDTO(10L, t, QUEUED, Region.DE_BERLIN)); + + sleep(125); + publisher.publish(5, t -> new TripEventInfoDTO(1L, t, QUEUED, Region.AT_VIENNA)); + publisher.publish(5, t -> new TripEventInfoDTO(2L, t, QUEUED, Region.AT_VIENNA)); + publisher.publish(5, t -> new TripEventInfoDTO(3L, t, QUEUED, Region.AT_VIENNA)); + publisher.publish(5, t -> new TripEventInfoDTO(4L, t, QUEUED, Region.AT_VIENNA)); + publisher.publish(5, t -> new TripEventInfoDTO(5L, t, QUEUED, Region.AT_VIENNA)); + + publisher.publish(5, t -> new TripEventInfoDTO(6L, t, MATCHED, Region.DE_BERLIN)); + publisher.publish(5, t -> new TripEventInfoDTO(7L, t, MATCHED, Region.DE_BERLIN)); + publisher.publish(5, t -> new TripEventInfoDTO(8L, t, MATCHED, Region.DE_BERLIN)); + + LOG.info("Collecting MatchingDuration events 1,2,3 for DE_BERLIN"); + berlinDurations.addAll(matchingDurations.take(3)); + + sleep(500); + publisher.publish(5, t -> new TripEventInfoDTO(1L, t, MATCHED, Region.AT_VIENNA)); + publisher.publish(5, t -> new TripEventInfoDTO(2L, t, MATCHED, Region.AT_VIENNA)); + publisher.publish(5, t -> new TripEventInfoDTO(3L, t, MATCHED, Region.AT_VIENNA)); + + LOG.info("Collecting MatchingDuration events 1,2,3 for AT_VIENNA"); + viennaDurations.addAll(matchingDurations.take(3)); + + publisher.publish(5, t -> new TripEventInfoDTO(9L, t, MATCHED, Region.DE_BERLIN)); + publisher.publish(5, t -> new TripEventInfoDTO(10L, t, MATCHED, Region.DE_BERLIN)); + + LOG.info("Collecting MatchingDuration events 4,5 for DE_BERLIN"); + berlinDurations.addAll(matchingDurations.take(2)); + + sleep(500); + publisher.publish(5, t -> new TripEventInfoDTO(4L, t, MATCHED, Region.AT_VIENNA)); + publisher.publish(5, t -> new TripEventInfoDTO(5L, t, MATCHED, Region.AT_VIENNA)); + + LOG.info("Collecting MatchingDuration events 4,5 for AT_VIENNA"); + viennaDurations.addAll(matchingDurations.take(2)); + + LOG.info("Collecting AverageMatchingDuration event for DE_BERLIN"); + AverageMatchingDuration e0 = averageMatchingDurations.take(); // berlin + + LOG.info("Collecting AverageMatchingDuration event for AT_VIENNA"); + AverageMatchingDuration e1 = averageMatchingDurations.take(); // vienna + + assertEquals("Expected calculation for berlin to have been triggered first", e0.getRegion(), Region.DE_BERLIN); + assertEquals("Expected calculation for vienna to have been triggered second", e1.getRegion(), Region.AT_VIENNA); + + assertEquals("Wrong number of MatchingDuration events for AT_VIENNA", 5, viennaDurations.size()); + assertEquals("Wrong number of MatchingDuration events for DE_BERLIN", 5, berlinDurations.size()); + + double viennaAvg = viennaDurations.stream().mapToLong(MatchingDuration::getDuration).average().orElse(-1); + double berlinAvg = berlinDurations.stream().mapToLong(MatchingDuration::getDuration).average().orElse(-1); + + assertEquals("Average duration was not calculated from MatchingDuration events correctly", e0.getDuration(), + berlinAvg, 100); + assertEquals("Average duration was not calculated from MatchingDuration events correctly", e1.getDuration(), + viennaAvg, 100); + + publisher.dropClients(); + flinkExecution.get(); + } +} diff --git a/ass3-event/src/test/resources/logback.xml b/ass3-event/src/test/resources/logback.xml new file mode 100644 index 0000000..aa08bf5 --- /dev/null +++ b/ass3-event/src/test/resources/logback.xml @@ -0,0 +1,16 @@ + + + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} - %highlight(%5p) [%12.12thread] %cyan(%-40.40logger{39}): %m%n + + + + + + + + + + diff --git a/ass3-messaging/pom.xml b/ass3-messaging/pom.xml new file mode 100644 index 0000000..5da1aa9 --- /dev/null +++ b/ass3-messaging/pom.xml @@ -0,0 +1,49 @@ + + + + 4.0.0 + + + at.ac.tuwien.infosys.dst + dst + 2021.1 + .. + + + ass3-messaging + + DST :: Assignment 3 :: Messaging + + jar + + + + com.rabbitmq + amqp-client + + + com.rabbitmq + http-client + + + org.springframework + spring-web + + + org.apache.httpcomponents + httpclient + + + com.fasterxml.jackson.core + jackson-databind + + + + org.springframework.boot + spring-boot-starter-amqp + test + + + + diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/Constants.java b/ass3-messaging/src/main/java/dst/ass3/messaging/Constants.java new file mode 100644 index 0000000..0055997 --- /dev/null +++ b/ass3-messaging/src/main/java/dst/ass3/messaging/Constants.java @@ -0,0 +1,38 @@ +package dst.ass3.messaging; + +/** + * Contains several constants related to the RabbitMQ infrastructure and expected names for queues, exchanges and + * routing keys. + */ +public final class Constants { + + public static final String RMQ_HOST = "192.168.99.99"; + public static final String RMQ_PORT = "5672"; + public static final String RMQ_VHOST = "/"; + public static final String RMQ_USER = "dst"; + public static final String RMQ_PASSWORD = "dst"; + + public static final String RMQ_API_PORT = "15672"; + public static final String RMQ_API_URL = "http://" + RMQ_HOST + ":" + RMQ_API_PORT + "/api/"; + + public static final String QUEUE_AT_VIENNA = "dst.at_vienna"; + public static final String QUEUE_AT_LINZ = "dst.at_linz"; + public static final String QUEUE_DE_BERLIN = "dst.de_berlin"; + + + public static final String[] WORK_QUEUES = { + QUEUE_AT_VIENNA, + QUEUE_AT_LINZ, + QUEUE_DE_BERLIN + }; + + public static final String TOPIC_EXCHANGE = "dst.workers"; + + public static final String ROUTING_KEY_AT_VIENNA = "requests.at_vienna"; + public static final String ROUTING_KEY_AT_LINZ = "requests.at_linz"; + public static final String ROUTING_KEY_DE_BERLIN = "requests.de_berlin"; + + private Constants() { + // util class + } +} diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/GeoPoint.java b/ass3-messaging/src/main/java/dst/ass3/messaging/GeoPoint.java new file mode 100644 index 0000000..11294bc --- /dev/null +++ b/ass3-messaging/src/main/java/dst/ass3/messaging/GeoPoint.java @@ -0,0 +1,54 @@ +package dst.ass3.messaging; + +import java.util.Objects; + +public class GeoPoint { + + + private Double longitude; + private Double latitude; + + public GeoPoint() { + } + + public GeoPoint(Double longitude, Double latitude) { + this.longitude = longitude; + this.latitude = latitude; + } + + public Double getLongitude() { + return longitude; + } + + public void setLongitude(Double longitude) { + this.longitude = longitude; + } + + public Double getLatitude() { + return latitude; + } + + public void setLatitude(Double latitude) { + this.latitude = latitude; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + GeoPoint geoPoint = (GeoPoint) o; + return Objects.equals(getLongitude(), geoPoint.getLongitude()) && + Objects.equals(getLatitude(), geoPoint.getLatitude()); + } + + @Override + public int hashCode() { + return Objects.hash(getLongitude(), getLatitude()); + } + + +} diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/IMessagingFactory.java b/ass3-messaging/src/main/java/dst/ass3/messaging/IMessagingFactory.java new file mode 100644 index 0000000..d0ee0e1 --- /dev/null +++ b/ass3-messaging/src/main/java/dst/ass3/messaging/IMessagingFactory.java @@ -0,0 +1,21 @@ +package dst.ass3.messaging; + +import java.io.Closeable; +import java.io.IOException; + +public interface IMessagingFactory extends Closeable { + + IQueueManager createQueueManager(); + + IRequestGateway createRequestGateway(); + + IWorkloadMonitor createWorkloadMonitor(); + + /** + * Closes any resource the factory may create. + * + * @throws IOException propagated exceptions + */ + @Override + void close() throws IOException; +} diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/IQueueManager.java b/ass3-messaging/src/main/java/dst/ass3/messaging/IQueueManager.java new file mode 100644 index 0000000..542d0cb --- /dev/null +++ b/ass3-messaging/src/main/java/dst/ass3/messaging/IQueueManager.java @@ -0,0 +1,28 @@ +package dst.ass3.messaging; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Responsible for creating and tearing down all necessary RabbitMQ queues or exchanges necessary for running the system. + */ +public interface IQueueManager extends Closeable { + + /** + * Initializes all queues or topic exchanges necessary for running the system. + */ + void setUp(); + + /** + * Removes all queues or topic exchanged associated with the system. + */ + void tearDown(); + + /** + * Closes underlying conection or resources, if any. + * + * @throws IOException propagated exceptions + */ + @Override + void close() throws IOException; +} diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/IRequestGateway.java b/ass3-messaging/src/main/java/dst/ass3/messaging/IRequestGateway.java new file mode 100644 index 0000000..078fecc --- /dev/null +++ b/ass3-messaging/src/main/java/dst/ass3/messaging/IRequestGateway.java @@ -0,0 +1,21 @@ +package dst.ass3.messaging; + +import java.io.Closeable; +import java.io.IOException; + +public interface IRequestGateway extends Closeable { + + /** + * Serializes and routes a request to the correct queue. + * @param request the request + */ + void submitRequest(TripRequest request); + + /** + * Closes any resources that may have been initialized (connections, channels, etc.) + * + * @throws IOException propagated exceptions + */ + @Override + void close() throws IOException; +} diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/IWorkloadMonitor.java b/ass3-messaging/src/main/java/dst/ass3/messaging/IWorkloadMonitor.java new file mode 100644 index 0000000..b868632 --- /dev/null +++ b/ass3-messaging/src/main/java/dst/ass3/messaging/IWorkloadMonitor.java @@ -0,0 +1,34 @@ +package dst.ass3.messaging; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Map; + +public interface IWorkloadMonitor extends Closeable { + + /** + * Returns for each region the amount of waiting requests. + * + * @return a map + */ + Map getRequestCount(); + + /** + * Returns the amount of workers for each region. This can be deduced from the amount of consumers to each + * queue. + * + * @return a map + */ + Map getWorkerCount(); + + /** + * Returns for each region the average processing time of the last 10 recorded requests. The data comes from + * subscriptions to the respective topics. + * + * @return a map + */ + Map getAverageProcessingTime(); + + @Override + void close() throws IOException; +} diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/Region.java b/ass3-messaging/src/main/java/dst/ass3/messaging/Region.java new file mode 100644 index 0000000..ede8a28 --- /dev/null +++ b/ass3-messaging/src/main/java/dst/ass3/messaging/Region.java @@ -0,0 +1,7 @@ +package dst.ass3.messaging; + +public enum Region { + AT_VIENNA, + AT_LINZ, + DE_BERLIN +} diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/TripRequest.java b/ass3-messaging/src/main/java/dst/ass3/messaging/TripRequest.java new file mode 100644 index 0000000..d7728ff --- /dev/null +++ b/ass3-messaging/src/main/java/dst/ass3/messaging/TripRequest.java @@ -0,0 +1,71 @@ +package dst.ass3.messaging; + +import java.util.Objects; + +public class TripRequest { + + private String id; + private Region region; + private GeoPoint pickup; + + public TripRequest() { + } + + public TripRequest(String id, Region region, GeoPoint pickup) { + this.id = id; + this.region = region; + this.pickup = pickup; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public Region getRegion() { + return region; + } + + public void setRegion(Region region) { + this.region = region; + } + + public GeoPoint getPickup() { + return pickup; + } + + public void setPickup(GeoPoint pickup) { + this.pickup = pickup; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TripRequest that = (TripRequest) o; + return Objects.equals(getId(), that.getId()) && + getRegion() == that.getRegion() && + Objects.equals(getPickup(), that.getPickup()); + } + + @Override + public int hashCode() { + return Objects.hash(getId(), getRegion(), getPickup()); + } + + @Override + public String toString() { + return "TripRequest{" + + "id='" + id + '\'' + + ", region=" + region + + ", pickup=" + pickup + + '}'; + } +} diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/WorkerResponse.java b/ass3-messaging/src/main/java/dst/ass3/messaging/WorkerResponse.java new file mode 100644 index 0000000..725d013 --- /dev/null +++ b/ass3-messaging/src/main/java/dst/ass3/messaging/WorkerResponse.java @@ -0,0 +1,76 @@ +package dst.ass3.messaging; + +import java.util.Objects; + +/** + * Message sent by a worker after it is finished processing a request. + */ +public class WorkerResponse { + + /** + * The ID of the original {@link TripRequest}. + */ + private String requestId; + + /** + * The time it took to process the request (in milliseconds). + */ + private Long processingTime; + + /** + * The proposed driver + */ + private Long driverId; + + public String getRequestId() { + return requestId; + } + + public void setRequestId(String requestId) { + this.requestId = requestId; + } + + public Long getProcessingTime() { + return processingTime; + } + + public void setProcessingTime(Long processingTime) { + this.processingTime = processingTime; + } + + public Long getDriverId() { + return driverId; + } + + public void setDriverId(Long driverId) { + this.driverId = driverId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + WorkerResponse that = (WorkerResponse) o; + return Objects.equals(getRequestId(), that.getRequestId()) && + Objects.equals(getProcessingTime(), that.getProcessingTime()) && + Objects.equals(getDriverId(), that.getDriverId()); + } + + @Override + public int hashCode() { + return Objects.hash(getRequestId(), getProcessingTime(), getDriverId()); + } + + @Override + public String toString() { + return "WorkerResponse{" + + "requestId='" + requestId + '\'' + + ", processingTime=" + processingTime + + ", driverId=" + driverId + + '}'; + } +} diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/impl/MessagingFactory.java b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/MessagingFactory.java new file mode 100644 index 0000000..4ecc2c4 --- /dev/null +++ b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/MessagingFactory.java @@ -0,0 +1,32 @@ +package dst.ass3.messaging.impl; + +import dst.ass3.messaging.IMessagingFactory; +import dst.ass3.messaging.IQueueManager; +import dst.ass3.messaging.IRequestGateway; +import dst.ass3.messaging.IWorkloadMonitor; + +public class MessagingFactory implements IMessagingFactory { + + @Override + public IQueueManager createQueueManager() { + // TODO + return null; + } + + @Override + public IRequestGateway createRequestGateway() { + // TODO + return null; + } + + @Override + public IWorkloadMonitor createWorkloadMonitor() { + // TODO + return null; + } + + @Override + public void close() { + // implement if needed + } +} diff --git a/ass3-messaging/src/main/resources/logback.xml b/ass3-messaging/src/main/resources/logback.xml new file mode 100644 index 0000000..9a6b351 --- /dev/null +++ b/ass3-messaging/src/main/resources/logback.xml @@ -0,0 +1,16 @@ + + + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} - %highlight(%5p) [%12.12thread] %cyan(%-40.40logger{39}): %m%n + + + + + + + + + + diff --git a/ass3-messaging/src/test/java/dst/ass3/messaging/RabbitResource.java b/ass3-messaging/src/test/java/dst/ass3/messaging/RabbitResource.java new file mode 100644 index 0000000..b8ca8a2 --- /dev/null +++ b/ass3-messaging/src/test/java/dst/ass3/messaging/RabbitResource.java @@ -0,0 +1,51 @@ +package dst.ass3.messaging; + +import com.rabbitmq.http.client.Client; +import org.junit.rules.ExternalResource; +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.amqp.rabbit.core.RabbitTemplate; + +import org.apache.http.auth.Credentials; + +public class RabbitResource extends ExternalResource { + + private RabbitAdmin admin; + private Client manager; + private CachingConnectionFactory connectionFactory; + private RabbitTemplate client; + + @Override + protected void before() throws Throwable { + + manager = new Client(Constants.RMQ_API_URL, Constants.RMQ_USER, Constants.RMQ_PASSWORD); + + connectionFactory = new CachingConnectionFactory(Constants.RMQ_HOST); + connectionFactory.setUsername(Constants.RMQ_USER); + connectionFactory.setPassword(Constants.RMQ_PASSWORD); + + client = new RabbitTemplate(connectionFactory); + admin = new RabbitAdmin(connectionFactory); + } + + @Override + protected void after() { + connectionFactory.destroy(); + } + + public Client getManager() { + return manager; + } + + public RabbitTemplate getClient() { + return client; + } + + public RabbitAdmin getAdmin() { + return admin; + } + + public CachingConnectionFactory getConnectionFactory() { + return connectionFactory; + } +} diff --git a/ass3-messaging/src/test/java/dst/ass3/messaging/impl/Ass3_1_Suite.java b/ass3-messaging/src/test/java/dst/ass3/messaging/impl/Ass3_1_Suite.java new file mode 100644 index 0000000..03a6159 --- /dev/null +++ b/ass3-messaging/src/test/java/dst/ass3/messaging/impl/Ass3_1_Suite.java @@ -0,0 +1,13 @@ +package dst.ass3.messaging.impl; + +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +@RunWith(Suite.class) +@Suite.SuiteClasses({ + QueueManagerTest.class, + RequestGatewayTest.class, + WorkloadMonitorTest.class +}) +public class Ass3_1_Suite { +} diff --git a/ass3-messaging/src/test/java/dst/ass3/messaging/impl/QueueManagerTest.java b/ass3-messaging/src/test/java/dst/ass3/messaging/impl/QueueManagerTest.java new file mode 100644 index 0000000..2c70a08 --- /dev/null +++ b/ass3-messaging/src/test/java/dst/ass3/messaging/impl/QueueManagerTest.java @@ -0,0 +1,97 @@ +package dst.ass3.messaging.impl; + +import com.rabbitmq.http.client.domain.ExchangeInfo; +import com.rabbitmq.http.client.domain.QueueInfo; +import dst.ass3.messaging.IMessagingFactory; +import dst.ass3.messaging.IQueueManager; +import dst.ass3.messaging.RabbitResource; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.springframework.amqp.core.Exchange; +import org.springframework.amqp.core.Queue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static dst.ass3.messaging.Constants.*; +import static org.hamcrest.CoreMatchers.*; +import static org.hamcrest.MatcherAssert.assertThat; + +public class QueueManagerTest { + + @Rule + public RabbitResource rabbit = new RabbitResource(); + + @Rule + public Timeout timeout = new Timeout(10, TimeUnit.SECONDS); + + private IMessagingFactory factory = new MessagingFactory(); + private IQueueManager queueManager; + + @Before + public void setUp() throws Exception { + factory = new MessagingFactory(); + queueManager = factory.createQueueManager(); + } + + @After + public void tearDown() throws Exception { + try { + queueManager.close(); + } catch (IOException e) { + // ignore + } + } + + @Test + public void setUp_createsQueues() throws Exception { + queueManager.setUp(); + + try { + List queues = rabbit.getManager().getQueues(); + assertThat(queues.size(), not(0)); + + // make sure all work queues exist + List queueNames = queues.stream().map(QueueInfo::getName).collect(Collectors.toList()); + Arrays.stream(WORK_QUEUES) + .forEach(wq -> assertThat(queueNames, hasItem(wq))); + } finally { + queueManager.tearDown(); + } + } + + @Test + public void setUp_createsExchange() throws Exception { + queueManager.setUp(); + try { + ExchangeInfo exchange = rabbit.getManager().getExchange(RMQ_VHOST, TOPIC_EXCHANGE); + assertThat(exchange, notNullValue()); + } finally { + queueManager.tearDown(); + } + } + + @Test + public void tearDown_removesQueues() throws Exception { + queueManager.setUp(); + queueManager.tearDown(); + List queues = rabbit.getManager().getQueues(); + List queueNames = queues.stream().map(QueueInfo::getName).collect(Collectors.toList()); + Arrays.stream(WORK_QUEUES) + .forEach(wq -> assertThat(queueNames, not(hasItem(wq)))); + } + + @Test + public void tearDown_removesExchange() throws Exception { + queueManager.setUp(); + queueManager.tearDown(); + ExchangeInfo exchange = rabbit.getManager().getExchange(RMQ_VHOST, TOPIC_EXCHANGE); + assertThat(exchange, nullValue()); + } +} diff --git a/ass3-messaging/src/test/java/dst/ass3/messaging/impl/RequestGatewayTest.java b/ass3-messaging/src/test/java/dst/ass3/messaging/impl/RequestGatewayTest.java new file mode 100644 index 0000000..f86ee8e --- /dev/null +++ b/ass3-messaging/src/test/java/dst/ass3/messaging/impl/RequestGatewayTest.java @@ -0,0 +1,112 @@ +package dst.ass3.messaging.impl; + +import dst.ass3.messaging.*; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.Message; + +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.CoreMatchers.*; +import static org.hamcrest.MatcherAssert.assertThat; + +public class RequestGatewayTest { + + private static final Logger LOG = LoggerFactory.getLogger(RequestGatewayTest.class); + + @Rule + public RabbitResource rabbit = new RabbitResource(); + + @Rule + public Timeout timeout = new Timeout(10, TimeUnit.SECONDS); + + private IMessagingFactory factory; + private IQueueManager queueManager; + private IRequestGateway requestGateway; + + @Before + public void setUp() throws Exception { + factory = new MessagingFactory(); + queueManager = factory.createQueueManager(); + requestGateway = factory.createRequestGateway(); + + queueManager.setUp(); + } + + @After + public void tearDown() throws Exception { + queueManager.tearDown(); + + requestGateway.close(); + queueManager.close(); + factory.close(); + } + + @Test + public void submitRequest_routesRequestsToCorrectQueues() throws Exception { + TripRequest r1 = new TripRequest("id1", Region.AT_VIENNA, new GeoPoint(34.22, -22.11)); + TripRequest r2 = new TripRequest("id2", Region.AT_VIENNA, new GeoPoint(98.2, -1.11)); + TripRequest r3 = new TripRequest("id3", Region.AT_LINZ, new GeoPoint(85.33, -25d)); + + LOG.info("Sending request {}", r1); + requestGateway.submitRequest(r1); + LOG.info("Sending request {}", r2); + requestGateway.submitRequest(r2); + LOG.info("Sending request {}", r3); + requestGateway.submitRequest(r3); + + LOG.info("Taking requests from queue {}", Constants.QUEUE_AT_VIENNA); + Message m1 = rabbit.getClient().receive(Constants.QUEUE_AT_VIENNA, 1000); + assertThat(m1, notNullValue()); + + LOG.info("Taking requests from queue {}", Constants.QUEUE_AT_VIENNA); + Message m2 = rabbit.getClient().receive(Constants.QUEUE_AT_VIENNA, 1000); + assertThat(m2, notNullValue()); + + LOG.info("Taking requests from queue {}", Constants.QUEUE_AT_LINZ); + Message m3 = rabbit.getClient().receive(Constants.QUEUE_AT_LINZ, 1000); + assertThat(m3, notNullValue()); + + assertThat("Expected queue to be empty as no request for that region were issued", + rabbit.getClient().receive(Constants.QUEUE_DE_BERLIN, 1000), nullValue()); + } + + + @Test + public void submitRequest_serializesIntoJsonFormat() throws Exception { + TripRequest r1 = new TripRequest("id1", Region.AT_VIENNA, new GeoPoint(1d, 2d)); + TripRequest r2 = new TripRequest("id2", Region.AT_LINZ, new GeoPoint(3d, 4d)); + TripRequest r3 = new TripRequest("id3", Region.DE_BERLIN, new GeoPoint(5d, 6d)); + + LOG.info("Sending request {}", r1); + requestGateway.submitRequest(r1); + LOG.info("Sending request {}", r2); + requestGateway.submitRequest(r2); + LOG.info("Sending request {}", r3); + requestGateway.submitRequest(r3); + + LOG.info("Taking request from queue {}", Constants.QUEUE_AT_VIENNA); + Message m1 = rabbit.getClient().receive(Constants.QUEUE_AT_VIENNA, 1000); + assertThat(m1, notNullValue()); + assertThat(new String(m1.getBody()), + equalTo("{\"id\":\"id1\",\"region\":\"AT_VIENNA\",\"pickup\":{\"longitude\":1.0,\"latitude\":2.0}}")); + + LOG.info("Taking request from queue {}", Constants.QUEUE_AT_LINZ); + Message m2 = rabbit.getClient().receive(Constants.QUEUE_AT_LINZ, 1000); + assertThat(m2, notNullValue()); + assertThat(new String(m2.getBody()), + equalTo("{\"id\":\"id2\",\"region\":\"AT_LINZ\",\"pickup\":{\"longitude\":3.0,\"latitude\":4.0}}")); + + LOG.info("Taking request from queue {}", Constants.QUEUE_DE_BERLIN); + Message m3 = rabbit.getClient().receive(Constants.QUEUE_DE_BERLIN, 1000); + assertThat(m3, notNullValue()); + assertThat(new String(m3.getBody()), + equalTo("{\"id\":\"id3\",\"region\":\"DE_BERLIN\",\"pickup\":{\"longitude\":5.0,\"latitude\":6.0}}")); + + } +} diff --git a/ass3-messaging/src/test/java/dst/ass3/messaging/impl/WorkloadMonitorTest.java b/ass3-messaging/src/test/java/dst/ass3/messaging/impl/WorkloadMonitorTest.java new file mode 100644 index 0000000..f96900c --- /dev/null +++ b/ass3-messaging/src/test/java/dst/ass3/messaging/impl/WorkloadMonitorTest.java @@ -0,0 +1,169 @@ +package dst.ass3.messaging.impl; + +import com.rabbitmq.http.client.domain.QueueInfo; +import dst.ass3.messaging.*; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.rabbit.listener.MessageListenerContainer; +import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; +import org.springframework.context.Lifecycle; + +import java.util.*; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import static dst.ass3.messaging.Constants.TOPIC_EXCHANGE; +import static dst.ass3.messaging.Constants.WORK_QUEUES; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.MatcherAssert.assertThat; + +public class WorkloadMonitorTest { + + private static final Logger LOG = LoggerFactory.getLogger(RequestGatewayTest.class); + + @Rule + public RabbitResource rabbit = new RabbitResource(); + + @Rule + public Timeout timeout = new Timeout(60, TimeUnit.SECONDS); + + private IMessagingFactory factory; + private IQueueManager queueManager; + private IRequestGateway requestGateway; + private IWorkloadMonitor workloadMonitor; + + @Before + public void setUp() throws Exception { + factory = new MessagingFactory(); + queueManager = factory.createQueueManager(); + requestGateway = factory.createRequestGateway(); + + queueManager.setUp(); + + workloadMonitor = factory.createWorkloadMonitor(); + } + + @After + public void tearDown() throws Exception { + queueManager.tearDown(); + + requestGateway.close(); + queueManager.close(); + factory.close(); + } + + @Test + public void getRequestCount_returnsCorrectCount() throws Exception { + try { + Map countForRegion = new HashMap<>(); + for (Region region : Region.values()) { + countForRegion.put(region, ThreadLocalRandom.current().nextLong(10, 20 + 1)); + for (long i = 0; i < countForRegion.get(region); i++) { + GeoPoint pickup = new GeoPoint(48.11, 16.22); + TripRequest request = new TripRequest("id" + i, region, pickup); + LOG.info("Sending request {}", request); + requestGateway.submitRequest(request); + } + } + + // wait for the messages to be processed by rabbit + Thread.sleep(2000); + + assertThat(workloadMonitor.getRequestCount(), equalTo(countForRegion)); + } finally { + workloadMonitor.close(); + } + } + + @Test + public void multipleWorkloadMonitors_uniqueQueueForEachMonitor() throws Exception { + try (IWorkloadMonitor workloadMonitor2 = factory.createWorkloadMonitor(); + IWorkloadMonitor workloadMonitor3 = factory.createWorkloadMonitor();) { + long nonWorkQueues = rabbit.getManager().getQueues().stream().filter(q -> !Arrays.asList(WORK_QUEUES).contains(q.getName())).count(); + assertThat(nonWorkQueues, greaterThanOrEqualTo(3L)); + } finally { + workloadMonitor.close(); + } + } + + @Test + public void getAverageProcessingTime_correctAverageTime() throws Exception { + try { + Map avgTimes = new HashMap<>(); + for (Region region : Region.values()) { + long count = ThreadLocalRandom.current().nextLong(15, 25); + long regionTime = 0; + for (long i = 0; i < count; i++) { + long requestTime = ThreadLocalRandom.current().nextLong(1000, 20000 + 1); + if (count - i <= 10) { + regionTime += requestTime; + } + + String body = String.format("{\"requestId\": \"%s\", \"processingTime\": \"%d\", \"driverId\": \"1\"}", UUID.randomUUID(), requestTime); + LOG.info("Sending request {}", body); + rabbit.getClient().convertAndSend(TOPIC_EXCHANGE, "requests." + region.toString().toLowerCase(), body); + } + avgTimes.put(region, ((double) regionTime / 10)); + } + + // wait for the messages to be processed by rabbit + Thread.sleep(2000); + + assertThat(workloadMonitor.getAverageProcessingTime(), equalTo(avgTimes)); + } finally { + workloadMonitor.close(); + } + } + + @Test + public void getWorkerCount_returnsCorrectCount() throws Exception { + try { + // spawn a random number of consumers + Map> consumersForRegion = new HashMap<>(); + Map consumerCountForRegion = new HashMap<>(); + for (Region region : Region.values()) { + List consumers = new ArrayList<>(); + consumersForRegion.put(region, consumers); + consumerCountForRegion.put(region, ThreadLocalRandom.current().nextLong(10, 20 + 1)); + for (long i = 0; i < consumerCountForRegion.get(region); i++) { + consumers.add(spawnConsumer("dst." + region.toString().toLowerCase())); + } + } + + // wait for rabbit to get to know the new consumers + Thread.sleep(2000); + + Map workerCount = workloadMonitor.getWorkerCount(); + + // stop all consumers + consumersForRegion.entrySet().stream().map(Map.Entry::getValue).flatMap(Collection::stream).forEach(Lifecycle::stop); + + assertThat(workerCount, equalTo(consumerCountForRegion)); + } finally { + workloadMonitor.close(); + } + } + + private MessageListenerContainer spawnConsumer(String queue) { + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(rabbit.getConnectionFactory()); + container.addQueueNames(queue); + container.start(); + return container; + } + + @Test + public void close_removesQueues() throws Exception { + workloadMonitor.close(); + + List queues = rabbit.getManager().getQueues(); + long nonWorkQueues = rabbit.getManager().getQueues().stream().filter(q -> !Arrays.asList(WORK_QUEUES).contains(q.getName())).count(); + assertThat(nonWorkQueues, is(0L)); + } +} diff --git a/ass3-worker/Dockerfile b/ass3-worker/Dockerfile new file mode 100644 index 0000000..f87f5c1 --- /dev/null +++ b/ass3-worker/Dockerfile @@ -0,0 +1 @@ +# TODO \ No newline at end of file diff --git a/ass3-worker/pom.xml b/ass3-worker/pom.xml new file mode 100644 index 0000000..0ab7e64 --- /dev/null +++ b/ass3-worker/pom.xml @@ -0,0 +1,46 @@ + + + + 4.0.0 + + + at.ac.tuwien.infosys.dst + dst + 2021.1 + .. + + + ass3-worker + + jar + + DST :: Assignment 3 :: Worker + + + + at.ac.tuwien.infosys.dst + ass3-messaging + ${project.version} + test-jar + test + + + redis.clients + jedis + test + + + org.springframework.boot + spring-boot-starter-amqp + test + + + com.github.docker-java + docker-java + test + + + + diff --git a/ass3-worker/redis-data.sh b/ass3-worker/redis-data.sh new file mode 100644 index 0000000..3c94964 --- /dev/null +++ b/ass3-worker/redis-data.sh @@ -0,0 +1,11 @@ +#!/usr/bin/env bash +redis-cli hmset drivers:at_vienna "1234" "48.19819 16.37127" "5678" "50.19819 20.37127" + +# Make sure that all required resources are created and correctly configured +# To test your implementation: +# Publish the following message via the RabbitMQ interface in the queue 'dst.at_vienna' +# {"id":"request1","region":"AT_VIENNA","pickup":{"longitude": 16.371334 ,"latitude": 48.198122}} +# To see if your implementation did remove the driver execute the following command in the VM +# redis-cli hgetall drivers:at_vienna +# the output should not include driver with id 1234 + diff --git a/ass3-worker/src/test/java/dst/ass3/worker/package-info.java b/ass3-worker/src/test/java/dst/ass3/worker/package-info.java new file mode 100644 index 0000000..0922bba --- /dev/null +++ b/ass3-worker/src/test/java/dst/ass3/worker/package-info.java @@ -0,0 +1,6 @@ +/** + * This package allows you to write your own integration tests for the worker. For example, you could use the + * RabbitTemplate, Jedis, and DockerClient library to programmatically insert test data, and verify whether containers + * are spawned correctly. + */ +package dst.ass3.worker; \ No newline at end of file diff --git a/ass3-worker/worker.py b/ass3-worker/worker.py new file mode 100644 index 0000000..f87f5c1 --- /dev/null +++ b/ass3-worker/worker.py @@ -0,0 +1 @@ +# TODO \ No newline at end of file diff --git a/pom.xml b/pom.xml index 137e20a..cc5abba 100644 --- a/pom.xml +++ b/pom.xml @@ -387,6 +387,47 @@ ${httpclient.version} + + + org.apache.flink + flink-streaming-java_2.12 + ${flink.version} + + + org.apache.flink + flink-cep_2.12 + ${flink.version} + + + org.apache.flink + flink-clients_2.12 + ${flink.version} + + + com.github.docker-java + docker-java + ${docker-api-client.version} + + + com.rabbitmq + amqp-client + ${rabbitmq-client.version} + + + com.rabbitmq + http-client + ${rabbitmq-http.version} + + + org.springframework + spring-web + ${spring.version} + + + org.springframework.boot + spring-boot-starter-amqp + ${spring-boot.version} + @@ -405,6 +446,10 @@ ass2-service/facade ass2-aop ass2-ioc + ass3-messaging + ass3-event + ass3-elastic + ass3-worker @@ -488,6 +533,32 @@ + + ass3-event + + ass3-event + + + + ass3-messaging + + ass3-messaging + + + + ass3-elastic + + ass3-messaging + ass3-elastic + + + + ass3-worker + + ass3-messaging + ass3-worker + + @@ -529,6 +600,12 @@ 0.6.1 1.7.0 4.5.13 + + 1.13.0 + 3.2.7 + 1.4 + 3.9.0.RELEASE + 5.11.0