Add template for assignment 3

This commit is contained in:
Tobias Eidelpes 2021-05-17 15:44:23 +02:00
parent 1363e83aa3
commit be629aa432
64 changed files with 3556 additions and 0 deletions

53
ass3-elastic/pom.xml Normal file
View File

@ -0,0 +1,53 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>at.ac.tuwien.infosys.dst</groupId>
<artifactId>dst</artifactId>
<version>2021.1</version>
<relativePath>..</relativePath>
</parent>
<artifactId>ass3-elastic</artifactId>
<name>DST :: Assignment 3 :: Elasticity</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>at.ac.tuwien.infosys.dst</groupId>
<artifactId>ass3-messaging</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>at.ac.tuwien.infosys.dst</groupId>
<artifactId>ass3-messaging</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.docker-java</groupId>
<artifactId>docker-java</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,22 @@
package dst.ass3.elastic;
/**
* Exception indicating that the ContainerService encountered an error when performing a task.
*/
public class ContainerException extends Exception {
public ContainerException() {
}
public ContainerException(String message) {
super(message);
}
public ContainerException(String message, Throwable cause) {
super(message, cause);
}
public ContainerException(Throwable cause) {
super(cause);
}
}

View File

@ -0,0 +1,95 @@
package dst.ass3.elastic;
import dst.ass3.messaging.Region;
import java.util.Objects;
/**
* Value type that represents a container.
*/
public class ContainerInfo {
/**
* The name of the container image.
*/
private String image;
/**
* The container ID.
*/
private String containerId;
/**
* True if the container is running.
*/
private boolean running;
/**
* If the container is a worker (indicated by the image dst/ass3-worker), then this field should contain the worker
* region. Otherwise it can be null.
*/
private Region workerRegion;
public String getImage() {
return image;
}
public void setImage(String image) {
this.image = image;
}
public String getContainerId() {
return containerId;
}
public void setContainerId(String containerId) {
this.containerId = containerId;
}
public boolean isRunning() {
return running;
}
public void setRunning(boolean running) {
this.running = running;
}
public Region getWorkerRegion() {
return workerRegion;
}
public void setWorkerRegion(Region workerRegion) {
this.workerRegion = workerRegion;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ContainerInfo that = (ContainerInfo) o;
return isRunning() == that.isRunning() &&
Objects.equals(getImage(), that.getImage()) &&
Objects.equals(getContainerId(), that.getContainerId()) &&
getWorkerRegion() == that.getWorkerRegion();
}
@Override
public int hashCode() {
return Objects.hash(getImage(), getContainerId(), isRunning(), getWorkerRegion());
}
@Override
public String toString() {
return "ContainerInfo{" +
"image='" + image + '\'' +
", containerId='" + containerId + '\'' +
", running=" + running +
", workerRegion=" + workerRegion +
'}';
}
}

View File

@ -0,0 +1,23 @@
package dst.ass3.elastic;
/**
* Indicates that a container could not be found.
*/
public class ContainerNotFoundException extends ContainerException {
public ContainerNotFoundException() {
}
public ContainerNotFoundException(String message) {
super(message);
}
public ContainerNotFoundException(String message, Throwable cause) {
super(message, cause);
}
public ContainerNotFoundException(Throwable cause) {
super(cause);
}
}

View File

@ -0,0 +1,36 @@
package dst.ass3.elastic;
import dst.ass3.messaging.Region;
import java.util.List;
public interface IContainerService {
/**
* Returns a list of all running containers.
*
* @return a list of ContainerInfo objects
* @throws ContainerException if an error occurred when trying to fetch the running containers.
*/
List<ContainerInfo> listContainers() throws ContainerException;
/**
* Stops the container with the given container ID.
*
* @param containerId ID of the container to stop.
* @throws ContainerNotFoundException if the container to stop is not running
* @throws ContainerException if another error occurred when trying to stop the container
*/
void stopContainer(String containerId) throws ContainerException;
/**
* Starts a worker for the specific {@link dst.ass3.messaging.Region}.
*
* @param region {@link Region} of the worker to start
* @return ContainerInfo of the started container / worker
* @throws ImageNotFoundException if the worker docker image is not available
* @throws ContainerException if another error occurred when trying to start the worker
*/
ContainerInfo startWorker(Region region) throws ContainerException;
}

View File

@ -0,0 +1,7 @@
package dst.ass3.elastic;
public interface IElasticityController {
void adjustWorkers() throws ContainerException;
}

View File

@ -0,0 +1,10 @@
package dst.ass3.elastic;
import dst.ass3.messaging.IWorkloadMonitor;
public interface IElasticityFactory {
IContainerService createContainerService();
IElasticityController createElasticityController(IContainerService containerService,
IWorkloadMonitor workloadMonitor);
}

View File

@ -0,0 +1,22 @@
package dst.ass3.elastic;
/**
* Exception indicating that the image which should be used for a container start is not available.
*/
public class ImageNotFoundException extends ContainerException {
public ImageNotFoundException() {
}
public ImageNotFoundException(String message) {
super(message);
}
public ImageNotFoundException(String message, Throwable cause) {
super(message, cause);
}
public ImageNotFoundException(Throwable cause) {
super(cause);
}
}

View File

@ -0,0 +1,24 @@
package dst.ass3.elastic.impl;
import dst.ass3.elastic.IContainerService;
import dst.ass3.elastic.IElasticityController;
import dst.ass3.elastic.IElasticityFactory;
import dst.ass3.messaging.IWorkloadMonitor;
import dst.ass3.messaging.impl.MessagingFactory;
public class ElasticityFactory implements IElasticityFactory {
@Override
public IContainerService createContainerService() {
// TODO
return null;
}
@Override
public IElasticityController createElasticityController(IContainerService containerService,
IWorkloadMonitor workloadMonitor) {
// TODO
return null;
}
}

View File

@ -0,0 +1,16 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are assigned the type ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} - %highlight(%5p) [%12.12thread] %cyan(%-40.40logger{39}): %m%n</pattern>
</encoder>
</appender>
<logger name="org.apache.http" level="WARN"/>
<root level="${log.level:-INFO}">
<appender-ref ref="STDOUT"/>
</root>
</configuration>

View File

@ -0,0 +1,107 @@
package dst.ass3.elastic;
import dst.ass3.elastic.impl.ElasticityFactory;
import dst.ass3.messaging.RabbitResource;
import dst.ass3.messaging.Region;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Queue;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.*;
import static org.hamcrest.MatcherAssert.assertThat;
public class ContainerServiceTest {
private static final Logger LOG = LoggerFactory.getLogger(ContainerServiceTest.class);
@Rule
public RabbitResource rabbit = new RabbitResource();
@Rule
public Timeout timeout = new Timeout(30, TimeUnit.SECONDS);
IContainerService containerService;
IElasticityFactory factory;
@Before
public void setUp() throws Exception {
factory = new ElasticityFactory();
containerService = factory.createContainerService();
rabbit.getAdmin().declareQueue(new Queue("dst.at_vienna"));
rabbit.getAdmin().declareQueue(new Queue("dst.at_linz"));
rabbit.getAdmin().declareQueue(new Queue("dst.de_berlin"));
}
@After
public void tearDown() throws Exception {
rabbit.getAdmin().deleteQueue("dst.at_vienna");
rabbit.getAdmin().deleteQueue("dst.at_linz");
rabbit.getAdmin().deleteQueue("dst.de_berlin");
}
@Test
public void spawnListStop_lifecycleWorks() throws Exception {
List<ContainerInfo> containers = containerService.listContainers();
assertThat("Please stop all containers before running the test", containers.size(), is(0));
ContainerInfo c1 = containerService.startWorker(Region.AT_VIENNA);
LOG.info("Started container {}", c1);
ContainerInfo c2 = containerService.startWorker(Region.AT_LINZ);
LOG.info("Started container {}", c2);
ContainerInfo c3 = containerService.startWorker(Region.DE_BERLIN);
LOG.info("Started container {}", c3);
LOG.info("Waiting for containers to boot...");
Thread.sleep(5000);
containers = containerService.listContainers();
assertThat(containers.size(), is(3));
LOG.info("Stopping containers...");
containerService.stopContainer(c1.getContainerId());
containerService.stopContainer(c2.getContainerId());
containerService.stopContainer(c3.getContainerId());
Thread.sleep(5000);
containers = containerService.listContainers();
assertThat(containers.size(), is(0));
}
@Test(expected = ContainerNotFoundException.class)
public void stopNonExistingContainer_throwsException() throws Exception {
containerService.stopContainer("Non-Existing-Id");
}
@Test
public void listContainers_containsCompleteInfo() throws Exception {
ContainerInfo c1 = containerService.startWorker(Region.AT_VIENNA);
LOG.info("Started container {}", c1);
LOG.info("Waiting for container to boot...");
Thread.sleep(5000);
List<ContainerInfo> containers = containerService.listContainers();
ContainerInfo containerInfo = containers.stream()
.filter(c -> c1.getContainerId().equals(c.getContainerId()))
.findFirst().get();
assertThat(containerInfo, notNullValue());
assertThat(containerInfo.getImage(), equalTo("dst/ass3-worker"));
assertThat(containerInfo.getWorkerRegion(), equalTo(Region.AT_VIENNA));
assertThat(containerInfo.isRunning(), is(true));
LOG.info("Stopping container...");
containerService.stopContainer(containerInfo.getContainerId());
}
}

View File

@ -0,0 +1,136 @@
package dst.ass3.elastic;
import dst.ass3.elastic.impl.ElasticityFactory;
import dst.ass3.messaging.IWorkloadMonitor;
import dst.ass3.messaging.Region;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
@RunWith(MockitoJUnitRunner.class)
public class ElasticityControllerTest {
private static final String WORKER_IMAGE = "dst/ass3-worker";
IElasticityFactory factory;
@Mock
IContainerService containerService;
@Mock
IWorkloadMonitor workloadMonitor;
IElasticityController elasticityController;
Map<Region, Double> processingTimes = new HashMap<>();
Map<Region, Long> workerCount = new HashMap<>();
Map<Region, Long> requestCount = new HashMap<>();
List<ContainerInfo> containers = new ArrayList<>();
@Before
public void setUp() throws Exception {
factory = new ElasticityFactory();
elasticityController = factory.createElasticityController(containerService, workloadMonitor);
processingTimes.clear();
processingTimes.put(Region.AT_VIENNA, 5000.0);
processingTimes.put(Region.DE_BERLIN, 10000.0);
processingTimes.put(Region.AT_LINZ, 2000.0);
when(workloadMonitor.getAverageProcessingTime()).thenReturn(processingTimes);
workerCount.clear();
workerCount.put(Region.AT_VIENNA, 95L);
workerCount.put(Region.DE_BERLIN, 87L);
workerCount.put(Region.AT_LINZ, 61L);
when(workloadMonitor.getWorkerCount()).thenReturn(workerCount);
requestCount.clear();
requestCount.put(Region.AT_VIENNA, 600L);
requestCount.put(Region.DE_BERLIN, 1000L);
requestCount.put(Region.AT_LINZ, 1005L);
when(workloadMonitor.getRequestCount()).thenReturn(requestCount);
containers.clear();
for (int i = 0; i < 95; i++) {
containers.add(containerInfo("vienna" + i, WORKER_IMAGE, Region.AT_VIENNA, true));
}
for (int i = 0; i < 87; i++) {
containers.add(containerInfo("berlin" + i, WORKER_IMAGE, Region.DE_BERLIN, true));
}
for (int i = 0; i < 61; i++) {
containers.add(containerInfo("linz" + i, WORKER_IMAGE, Region.AT_LINZ, true));
}
when(containerService.listContainers()).thenReturn(containers);
}
@After
public void tearDown() {
verify(workloadMonitor, atLeast(1)).getWorkerCount();
verify(workloadMonitor, atLeast(1)).getRequestCount();
verify(workloadMonitor, atLeast(1)).getAverageProcessingTime();
}
@Test
public void notEnoughWorkers_scaleUp() throws Exception {
// remove 10 vienna workers and 10 linz workers
List<ContainerInfo> containersToRemove = containers.stream()
.filter(c -> c.getContainerId().startsWith("vienna7") || c.getContainerId().startsWith("linz1"))
.collect(Collectors.toList());
containers.removeAll(containersToRemove);
workerCount.put(Region.AT_VIENNA, 85L);
workerCount.put(Region.AT_LINZ, 51L);
elasticityController.adjustWorkers();
verify(containerService, never()).stopContainer((String) any(String.class));
verify(containerService, times(15)).startWorker(Region.AT_VIENNA);
verify(containerService, times(16)).startWorker(Region.AT_LINZ);
verify(containerService, never()).startWorker(Region.DE_BERLIN);
verify(containerService, never()).listContainers();
}
@Test
public void tooManyWorkers_scaleDown() throws Exception {
// add 20 more, some should be stopped
for (int i = 0; i < 20; i++) {
containers.add(containerInfo("linz1" + i, WORKER_IMAGE, Region.AT_LINZ, true));
}
workerCount.put(Region.AT_LINZ, 81L);
elasticityController.adjustWorkers();
verify(containerService, times(14)).stopContainer(contains("linz"));
verify(containerService, never()).stopContainer(contains("berlin"));
verify(containerService, never()).stopContainer(contains("vienna"));
verify(containerService, never()).startWorker((Region) any(Region.class));
verify(containerService, times(1)).listContainers();
}
@Test
public void justEnoughWorkers_doNotScale() throws Exception {
elasticityController.adjustWorkers();
verify(containerService, never()).startWorker((Region) any(Region.class));
verify(containerService, never()).stopContainer((String) any());
verify(containerService, never()).listContainers();
}
private ContainerInfo containerInfo(String id, String image, Region workerRegion, boolean running) {
ContainerInfo info = new ContainerInfo();
info.setContainerId(id);
info.setImage(image);
info.setWorkerRegion(workerRegion);
info.setRunning(running);
return info;
}
}

36
ass3-event/pom.xml Normal file
View File

@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>at.ac.tuwien.infosys.dst</groupId>
<artifactId>dst</artifactId>
<version>2021.1</version>
<relativePath>..</relativePath>
</parent>
<artifactId>ass3-event</artifactId>
<packaging>jar</packaging>
<name>DST :: Assignment 3 :: Event Stream Processing</name>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.12</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,15 @@
package dst.ass3.event;
/**
* Constants.
*/
public final class Constants {
/**
* The TCP port of the {@link EventPublisher}.
*/
public static final int EVENT_PUBLISHER_PORT = 1338;
private Constants() {
}
}

View File

@ -0,0 +1,16 @@
package dst.ass3.event;
/**
* Creates your {@link IEventProcessingEnvironment} and {@link IEventSourceFunction} implementation instances.
*/
public class EventProcessingFactory {
public static IEventProcessingEnvironment createEventProcessingEnvironment() {
// TODO
return null;
}
public static IEventSourceFunction createEventSourceFunction() {
// TODO
return null;
}
}

View File

@ -0,0 +1,173 @@
package dst.ass3.event;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import dst.ass3.event.model.domain.ITripEventInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.group.ChannelGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.group.DefaultChannelGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.serialization.ClassResolvers;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.serialization.ObjectDecoder;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.serialization.ObjectEncoder;
/**
* An EventPublisher accepts incoming TCP socket connections on a given port and is able to broadcast {@link ITripEventInfo}
* objects to these clients.
*/
public class EventPublisher {
private static final Logger LOG = LoggerFactory.getLogger(EventPublisher.class);
private final Object clientChannelMonitor = new Object();
private final int port;
private final AtomicBoolean closed;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private ChannelGroup clientChannels;
public EventPublisher(int port) {
this.port = port;
this.closed = new AtomicBoolean(false);
}
public int getPort() {
return port;
}
/**
* Broadcast an event to all listening channels. Does nothing if no clients are connected.
*
* @param event the event to publish
* @throws IllegalStateException if the publisher hasn't been started yet or has been closed
*/
public void publish(ITripEventInfo event) {
if (clientChannels == null || closed.get()) {
throw new IllegalStateException();
}
clientChannels.writeAndFlush(event).syncUninterruptibly();
// wait a bit for event to propagate
try {
Thread.sleep(10);
} catch (InterruptedException e) {}
}
/**
* Like {@link #publish(ITripEventInfo)} but waits for a given number of milliseconds and then passes the current system
* time to a factory function.
*
* @param delay the delay in ms
* @param provider the provider
*/
public void publish(long delay, Function<Long, ITripEventInfo> provider) {
if (delay > 0) {
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
}
}
publish(provider.apply(System.currentTimeMillis()));
}
/**
* This method blocks if no clients are connected, and is notified as soon as a client connects. If clients are
* connected, the method returns immediately.
*/
public void waitForClients() {
if (clientChannels.isEmpty()) {
LOG.debug("Waiting for clients to connect...");
synchronized (clientChannelMonitor) {
try {
clientChannelMonitor.wait();
} catch (InterruptedException e) {
LOG.debug("Interrupted while waiting on client connections", e);
}
}
}
}
public int getConnectedClientCount() {
if (clientChannels == null || closed.get()) {
throw new IllegalStateException();
}
return clientChannels.size();
}
/**
* Closes all active client connections.
*/
public void dropClients() {
if (clientChannels == null || closed.get()) {
throw new IllegalStateException();
}
clientChannels.close().syncUninterruptibly().group().clear();
}
/**
* Start the server and accept incoming connections. Will call {@link #close()} if an error occurs during
* connection.
*/
public void start() {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
clientChannels = new DefaultChannelGroup(workerGroup.next());
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ClientChannelInitializer());
// Bind and start to accept incoming connections
ChannelFuture f = b.bind(port).addListener(future -> {
if (!future.isSuccess()) {
LOG.error("Error while binding socket");
close();
}
}).syncUninterruptibly();
LOG.info("Accepting connections on {}", f.channel());
}
/**
* Closes all channels and resources.
*/
public void close() {
if (closed.compareAndSet(false, true)) {
LOG.info("Shutting down event loops");
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
}
}
private class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
public void initChannel(SocketChannel ch) throws Exception {
LOG.info("Initializing client channel {}", ch);
clientChannels.add(ch);
ch.pipeline()
.addFirst(new ObjectEncoder())
.addFirst(new ObjectDecoder(ClassResolvers.cacheDisabled(ClassLoader.getSystemClassLoader())));
synchronized (clientChannelMonitor) {
clientChannelMonitor.notifyAll();
}
}
}
}

View File

@ -0,0 +1,178 @@
package dst.ass3.event;
import java.lang.reflect.Proxy;
import java.net.SocketAddress;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import dst.ass3.event.model.domain.ITripEventInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.serialization.ClassResolvers;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.serialization.ObjectDecoder;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.serialization.ObjectEncoder;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.Future;
/**
* An EventSubscriber receives ITripEventInfo objects through a netty SocketChannel. Create and connect an
* EventSubscriber using {@link #subscribe(SocketAddress)}. To receive events, call {@link #receive()}.
*/
public class EventSubscriber {
private static final Logger LOG = LoggerFactory.getLogger(EventSubscriber.class);
private static final ITripEventInfo POISON_PILL = (ITripEventInfo) Proxy.newProxyInstance(
ITripEventInfo.class.getClassLoader(), new Class[]{ITripEventInfo.class}, (p, m, a) -> null);
private final SocketAddress publisherAddress;
private final BlockingQueue<ITripEventInfo> queue;
private volatile boolean closed;
private Channel channel;
private EventLoopGroup loop;
private EventSubscriber(SocketAddress publisherAddress) {
this.publisherAddress = publisherAddress;
this.queue = new LinkedBlockingQueue<>();
}
/**
* Blocks to receive the next ITripEventInfo published into the channel. Returns {@code null} if the underlying
* channel has been closed or the thread was interrupted.
*
* @return the next ITripEventInfo object
* @throws IllegalStateException thrown if the previous call returned null and the channel was closed
*/
public ITripEventInfo receive() throws IllegalStateException {
synchronized (queue) {
if (closed && queue.isEmpty()) {
throw new IllegalStateException();
}
}
ITripEventInfo event;
try {
event = queue.take();
if (event == POISON_PILL) {
return null;
} else {
return event;
}
} catch (InterruptedException e) {
return null;
}
}
private Future<?> start() {
loop = new NioEventLoopGroup();
channel = new Bootstrap()
.group(loop)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new EventSubscriberHandler())
.connect(publisherAddress) // ChannelFuture
.addListener(future -> {
if (!future.isSuccess()) {
LOG.error("Error while connecting");
close();
}
})
.syncUninterruptibly()
.channel();
LOG.info("Connected to channel {}", channel);
return loop.submit(() -> {
try {
channel.closeFuture().sync();
} catch (InterruptedException e) {
// noop
} finally {
close();
}
});
}
/**
* Closes all resources and threads used by the EventSubscriber.
*/
public void close() {
try {
if (loop != null) {
synchronized (queue) {
if (!loop.isShutdown() && !loop.isTerminated() && !loop.isShuttingDown()) {
LOG.info("Shutting down event loop");
loop.shutdownGracefully();
}
}
}
} finally {
synchronized (queue) {
if (!closed) {
LOG.debug("Adding poison pill to queue");
closed = true;
queue.add(POISON_PILL);
}
}
}
}
/**
* Creates a new EventSubscriber that connects to given SocketAddress.
*
* @param address the socket address
* @return a new EventSubscriber
*/
public static EventSubscriber subscribe(SocketAddress address) {
EventSubscriber eventSubscriber = new EventSubscriber(address);
eventSubscriber.start();
return eventSubscriber;
}
private class EventSubscriberHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.read();
if (!(msg instanceof ITripEventInfo)) {
LOG.error("Unknown message type received {}", msg);
return;
}
synchronized (queue) {
if (!closed) {
queue.add((ITripEventInfo) msg);
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LOG.error("EventSubscriberHandler caught an exception", cause);
ctx.close();
close();
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.pipeline()
.addFirst(new ObjectEncoder())
.addFirst(new ObjectDecoder(ClassResolvers.cacheDisabled(ClassLoader.getSystemClassLoader())));
}
}
}

View File

@ -0,0 +1,39 @@
package dst.ass3.event;
import dst.ass3.event.model.events.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* This class should be used to implement the event processing steps as described in the assignment. The test classes
* will inject SinkFunctions, create a new StreamExecutionEnvironment and then call {@link
* #initialize(StreamExecutionEnvironment)}.
*/
public interface IEventProcessingEnvironment {
/**
* Initializes the event processing graph on the {@link StreamExecutionEnvironment}. This function is called
* after all sinks have been set.
*/
void initialize(StreamExecutionEnvironment env);
/**
* Sets the timeout limit of a streaming event.
*
* @param time the timeout limit
*/
void setMatchingDurationTimeout(Time time);
void setLifecycleEventStreamSink(SinkFunction<LifecycleEvent> sink);
void setMatchingDurationStreamSink(SinkFunction<MatchingDuration> sink);
void setAverageMatchingDurationStreamSink(SinkFunction<AverageMatchingDuration> sink);
void setMatchingTimeoutWarningStreamSink(SinkFunction<MatchingTimeoutWarning> sink);
void setTripFailedWarningStreamSink(SinkFunction<TripFailedWarning> sink);
void setAlertStreamSink(SinkFunction<Alert> sink);
}

View File

@ -0,0 +1,25 @@
package dst.ass3.event;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import dst.ass3.event.model.domain.ITripEventInfo;
/**
* A RichFunction & SourceFunction for ITripEventInfo objects.
*/
public interface IEventSourceFunction extends RichFunction, SourceFunction<ITripEventInfo> {
@Override
void open(Configuration parameters) throws Exception;
@Override
void close() throws Exception;
@Override
void run(SourceContext<ITripEventInfo> ctx) throws Exception;
@Override
void cancel();
}

View File

@ -0,0 +1,54 @@
package dst.ass3.event.dto;
import java.io.Serializable;
import dst.ass3.event.model.domain.ITripEventInfo;
import dst.ass3.event.model.domain.Region;
import dst.ass3.event.model.domain.TripState;
public class TripEventInfoDTO implements Serializable, ITripEventInfo {
private static final long serialVersionUID = 4134104076758220138L;
private Long tripId;
private Long timestamp;
private TripState state;
private Region region;
public TripEventInfoDTO(Long tripId, Long timestamp, TripState state, Region region) {
this.tripId = tripId;
this.timestamp = timestamp;
this.state = state;
this.region = region;
}
@Override
public Long getTripId() {
return tripId;
}
@Override
public Long getTimestamp() {
return timestamp;
}
@Override
public TripState getState() {
return state;
}
@Override
public Region getRegion() {
return region;
}
@Override
public String toString() {
return "TripEventInfoDTO{" +
"tripId=" + tripId +
", timestamp=" + timestamp +
", state=" + state +
", region=" + region +
'}';
}
}

View File

@ -0,0 +1,9 @@
package dst.ass3.event.model.domain;
public interface ITripEventInfo {
Long getTripId();
Long getTimestamp();
TripState getState();
Region getRegion();
}

View File

@ -0,0 +1,7 @@
package dst.ass3.event.model.domain;
public enum Region {
AT_VIENNA,
AT_LINZ,
DE_BERLIN
}

View File

@ -0,0 +1,5 @@
package dst.ass3.event.model.domain;
public enum TripState {
CREATED, QUEUED, MATCHED, APPROACHING, IN_PROGRESS, CANCELLED, COMPLETED
}

View File

@ -0,0 +1,49 @@
package dst.ass3.event.model.events;
import java.io.Serializable;
import java.util.List;
import dst.ass3.event.model.domain.Region;
/**
* A system alert that aggregates several warnings.
*/
public class Alert implements Serializable {
private static final long serialVersionUID = -4561132671849230635L;
private Region region;
private List<Warning> warnings;
public Alert() {
}
public Alert(Region region, List<Warning> warnings) {
this.region = region;
this.warnings = warnings;
}
public Region getRegion() {
return region;
}
public void setRegion(Region region) {
this.region = region;
}
public List<Warning> getWarnings() {
return warnings;
}
public void setWarnings(List<Warning> warnings) {
this.warnings = warnings;
}
@Override
public String toString() {
return "Alert{" +
"region='" + region + '\'' +
", warnings=" + warnings +
'}';
}
}

View File

@ -0,0 +1,48 @@
package dst.ass3.event.model.events;
import java.io.Serializable;
import dst.ass3.event.model.domain.Region;
/**
* The average of several {@link MatchingDuration} values.
*/
public class AverageMatchingDuration implements Serializable {
private static final long serialVersionUID = -3767582104941550250L;
private Region region;
private double duration;
public AverageMatchingDuration() {
}
public AverageMatchingDuration(Region region, double duration) {
this.region = region;
this.duration = duration;
}
public Region getRegion() {
return region;
}
public void setRegion(Region region) {
this.region = region;
}
public double getDuration() {
return duration;
}
public void setDuration(double duration) {
this.duration = duration;
}
@Override
public String toString() {
return "AverageMatchingDuration{" +
"region='" + region + '\'' +
", duration=" + duration +
'}';
}
}

View File

@ -0,0 +1,84 @@
package dst.ass3.event.model.events;
import java.io.Serializable;
import dst.ass3.event.model.domain.ITripEventInfo;
import dst.ass3.event.model.domain.Region;
import dst.ass3.event.model.domain.TripState;
/**
* Indicates a change in the lifecycle state of an ITripEventInfo.
*/
public class LifecycleEvent implements Serializable {
private static final long serialVersionUID = 8665269919851487210L;
/**
* The id of the trip, as returned by {@link ITripEventInfo#getTripId()}.
*/
private long tripId;
private TripState state;
private Region region;
/**
* The instant the event was recorded (unix epoch in milliseconds)
*/
private long timestamp;
public LifecycleEvent() {
}
public LifecycleEvent(ITripEventInfo eventInfo) {
this(eventInfo.getTripId(), eventInfo.getState(), eventInfo.getRegion(), eventInfo.getTimestamp());
}
public LifecycleEvent(long tripId, TripState state, Region region, long timestamp) {
this.tripId = tripId;
this.state = state;
this.region = region;
this.timestamp = timestamp;
}
public long getTripId() {
return tripId;
}
public void setTripId(long tripId) {
this.tripId = tripId;
}
public TripState getState() {
return state;
}
public void setState(TripState state) {
this.state = state;
}
public Region getRegion() {
return region;
}
public void setRegion(Region region) {
this.region = region;
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
@Override
public String toString() {
return "LifecycleEvent{" +
"tripId=" + tripId +
", state=" + state +
", region=" + region +
", timestamp=" + timestamp +
'}';
}
}

View File

@ -0,0 +1,59 @@
package dst.ass3.event.model.events;
import java.io.Serializable;
import dst.ass3.event.model.domain.Region;
/**
* Indicates the amount of time an ITripEventInfo took to get from CREATED to MATCHED.
*/
public class MatchingDuration implements Serializable {
private static final long serialVersionUID = -6976972381929291369L;
private long eventId;
private Region region;
private long duration;
public MatchingDuration() {
}
public MatchingDuration(long eventId, Region region, long duration) {
this.eventId = eventId;
this.region = region;
this.duration = duration;
}
public long getEventId() {
return eventId;
}
public void setEventId(long eventId) {
this.eventId = eventId;
}
public Region getRegion() {
return region;
}
public void setRegion(Region region) {
this.region = region;
}
public long getDuration() {
return duration;
}
public void setDuration(long duration) {
this.duration = duration;
}
@Override
public String toString() {
return "MatchingDuration{" +
"eventId=" + eventId +
", region='" + region + '\'' +
", duration=" + duration +
'}';
}
}

View File

@ -0,0 +1,37 @@
package dst.ass3.event.model.events;
import dst.ass3.event.model.domain.Region;
/**
* Warning that indicates that a matching event has not reached the lifecycle state MATCHED within a given time frame.
*/
public class MatchingTimeoutWarning extends Warning {
private static final long serialVersionUID = 7955599732178947649L;
private long tripId;
public MatchingTimeoutWarning() {
super(null);
}
public MatchingTimeoutWarning(long tripId, Region region) {
super(region);
this.tripId = tripId;
}
public long getTripId() {
return tripId;
}
public void setTripId(long tripId) {
this.tripId = tripId;
}
@Override
public String toString() {
return "MatchingTimeoutWarning{" +
"tripId=" + tripId +
'}';
}
}

View File

@ -0,0 +1,37 @@
package dst.ass3.event.model.events;
import dst.ass3.event.model.domain.Region;
/**
* Indicates that matching a trip has probably failed.
*/
public class TripFailedWarning extends Warning {
private static final long serialVersionUID = -9120187311385112769L;
private long eventId;
public TripFailedWarning() {
super(null);
}
public TripFailedWarning(long eventId, Region region) {
super(region);
this.eventId = eventId;
}
public long getEventId() {
return eventId;
}
public void setEventId(long eventId) {
this.eventId = eventId;
}
@Override
public String toString() {
return "TripFailedWarning{" +
"eventId=" + eventId +
'}';
}
}

View File

@ -0,0 +1,37 @@
package dst.ass3.event.model.events;
import java.io.Serializable;
import dst.ass3.event.model.domain.Region;
/**
* Base class for region warnings.
*/
public abstract class Warning implements Serializable {
private static final long serialVersionUID = 273266717303711974L;
private Region region;
public Warning() {
}
public Warning(Region region) {
this.region = region;
}
public Region getRegion() {
return region;
}
public void setRegion(Region region) {
this.region = region;
}
@Override
public String toString() {
return "Warning{" +
"region='" + region + '\'' +
'}';
}
}

View File

@ -0,0 +1 @@
// TODO: add the data from the execution plan export

View File

@ -0,0 +1,16 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are assigned the type ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} - %highlight(%5p) [%12.12thread] %cyan(%-40.40logger{39}): %m%n</pattern>
</encoder>
</appender>
<logger name="org.apache.flink" level="WARN"/> <!-- change this to INFO if you want to see more flink logging data-->
<root level="${log.level:-INFO}">
<appender-ref ref="STDOUT"/>
</root>
</configuration>

View File

@ -0,0 +1,69 @@
package dst.ass3.event;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.MoreExecutors;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.After;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Ass3EventTestBase.
*/
public abstract class Ass3EventTestBase {
private static final Logger LOG = LoggerFactory.getLogger(Ass3EventTestBase.class);
protected EventPublisher publisher;
protected StreamExecutionEnvironment flink;
protected ExecutorService executor;
private static EventPublisher previousPublisher;
@Before
public void setUpResources() throws Exception {
executor = Executors.newCachedThreadPool();
if (previousPublisher != null) {
previousPublisher.close();
}
publisher = createEventPublisher();
previousPublisher = publisher;
publisher.start();
flink = createStreamExecutionEnvironment();
}
@After
public void tearDownResources() throws Exception {
publisher.close();
MoreExecutors.shutdownAndAwaitTermination(executor, 5, TimeUnit.SECONDS);
previousPublisher = null;
}
protected EventPublisher createEventPublisher() {
return new EventPublisher(Constants.EVENT_PUBLISHER_PORT);
}
protected StreamExecutionEnvironment createStreamExecutionEnvironment() {
return StreamExecutionEnvironment.createLocalEnvironment(1);
}
protected static void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
// ignore
}
}
protected static long now() {
return System.currentTimeMillis();
}
}

View File

@ -0,0 +1,23 @@
package dst.ass3.event;
import dst.ass3.event.tests.Ass3_3_2Test;
import dst.ass3.event.tests.Ass3_3_3Test;
import dst.ass3.event.tests.Ass3_3_4Test;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
import org.junit.runners.Suite.SuiteClasses;
import dst.ass3.event.tests.Ass3_3_1Test;
/**
* Ass3EventTestSuite.
*/
@RunWith(Suite.class)
@SuiteClasses({
Ass3_3_1Test.class,
Ass3_3_2Test.class,
Ass3_3_3Test.class,
Ass3_3_4Test.class
})
public class Ass3EventTestSuite {
}

View File

@ -0,0 +1,98 @@
package dst.ass3.event;
import static org.junit.Assert.assertNotNull;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import dst.ass3.event.model.events.*;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* EventProcessingTestBase.
*/
public class EventProcessingTestBase extends Ass3EventTestBase {
private static final Logger LOG = LoggerFactory.getLogger(EventProcessingTestBase.class);
@Rule
public Timeout timeout = new Timeout(15, TimeUnit.SECONDS);
private IEventProcessingEnvironment epe;
protected StaticQueueSink<LifecycleEvent> lifecycleEvents;
protected StaticQueueSink<MatchingDuration> matchingDurations;
protected StaticQueueSink<AverageMatchingDuration> averageMatchingDurations;
protected StaticQueueSink<MatchingTimeoutWarning> matchingTimeoutWarnings;
protected StaticQueueSink<TripFailedWarning> tripFailedWarnings;
protected StaticQueueSink<Alert> alerts;
@Before
public void setUpEnvironment() throws Exception {
epe = EventProcessingFactory.createEventProcessingEnvironment();
assertNotNull("#createEventProcessingEnvironment() not implemented", epe);
lifecycleEvents = new StaticQueueSink<>("lifecycleEvents");
matchingDurations = new StaticQueueSink<>("matchingDurations");
averageMatchingDurations = new StaticQueueSink<>("averageMatchingDurations");
matchingTimeoutWarnings = new StaticQueueSink<>("matchingTimeoutWarnings");
tripFailedWarnings = new StaticQueueSink<>("tripFailedWarnings");
alerts = new StaticQueueSink<>("alerts");
epe.setLifecycleEventStreamSink(lifecycleEvents);
epe.setMatchingDurationStreamSink(matchingDurations);
epe.setAverageMatchingDurationStreamSink(averageMatchingDurations);
epe.setMatchingTimeoutWarningStreamSink(matchingTimeoutWarnings);
epe.setTripFailedWarningStreamSink(tripFailedWarnings);
epe.setAlertStreamSink(alerts);
epe.setMatchingDurationTimeout(Time.seconds(15));
}
public JobExecutionResult initAndExecute() throws Exception {
return initAndExecute(null);
}
public JobExecutionResult initAndExecute(Consumer<IEventProcessingEnvironment> initializer) throws Exception {
try {
if (initializer != null) {
initializer.accept(epe);
}
LOG.info("Initializing StreamExecutionEnvironment with {}", epe);
epe.initialize(flink);
} catch (Exception e) {
LOG.error("Error while initializing StreamExecutionEnvironment", e);
throw e;
}
try {
LOG.info("Executing flink {}", flink);
return flink.execute();
} catch (Exception e) {
LOG.error("Error while executing flink", e);
throw e;
}
}
public Future<JobExecutionResult> initAndExecuteAsync(Consumer<IEventProcessingEnvironment> initializer) {
return executor.submit(() -> initAndExecute(initializer));
}
public Future<JobExecutionResult> initAndExecuteAsync() {
return executor.submit(() -> initAndExecute());
}
@After
public void tearDownCollectors() throws Exception {
StaticQueueSink.clearAll();
}
}

View File

@ -0,0 +1,83 @@
package dst.ass3.event;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
/**
* A SinkFunction that collects objects into a queue located in a shared global state. Each collector accesses a
* specific key in the shared state.
*
* @param <T> the sink input type
*/
public class StaticQueueSink<T> implements SinkFunction<T> {
private static final long serialVersionUID = -3965500756295835669L;
private static Map<String, BlockingQueue<?>> state = new ConcurrentHashMap<>();
private String key;
public StaticQueueSink(String key) {
this.key = key;
}
@Override
public void invoke(T value, Context context) throws Exception {
get().add(value);
}
public void clear() {
get().clear();
}
public List<T> take(int n) {
List<T> list = new ArrayList<>(n);
for (int i = 0; i < n; i++) {
try {
list.add(get().take());
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted while accessing queue", e);
}
}
return list;
}
public T take() {
try {
return get().take();
} catch (InterruptedException e) {
return null;
}
}
public T poll(long ms) {
try {
return get().poll(ms, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
return null;
}
}
public synchronized BlockingQueue<T> get() {
return get(key);
}
@SuppressWarnings("unchecked")
private static <R> BlockingQueue<R> get(String key) {
return (BlockingQueue) state.computeIfAbsent(key, k -> new LinkedBlockingQueue<>());
}
public static void clearAll() {
state.clear();
}
}

View File

@ -0,0 +1,152 @@
package dst.ass3.event.tests;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import java.io.ByteArrayOutputStream;
import java.io.NotSerializableException;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import dst.ass3.event.dto.TripEventInfoDTO;
import dst.ass3.event.model.domain.ITripEventInfo;
import dst.ass3.event.model.domain.Region;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import dst.ass3.event.Ass3EventTestBase;
import dst.ass3.event.EventProcessingFactory;
import dst.ass3.event.IEventSourceFunction;
import dst.ass3.event.model.domain.TripState;
public class Ass3_3_1Test extends Ass3EventTestBase {
private static final Logger LOG = LoggerFactory.getLogger(Ass3_3_1Test.class);
@Rule
public Timeout timeout = new Timeout(15, TimeUnit.SECONDS);
private IEventSourceFunction sourceFunction;
@Before
public void setUp() throws Exception {
sourceFunction = EventProcessingFactory.createEventSourceFunction();
assertNotNull("EventProcessingFactory#createEventSourceFunction() not implemented", sourceFunction);
}
@Test
public void open_shouldConnectToSubscriber() throws Exception {
assertEquals(
"IEventSourceFunction should not be connected upon construction",
0, publisher.getConnectedClientCount()
);
sourceFunction.open(new Configuration());
publisher.waitForClients();
assertEquals(
"Expected IEventSourceFunction to connect to publisher after open is called",
1, publisher.getConnectedClientCount()
);
}
@Test
public void run_shouldCollectPublishedEvents() throws Exception {
sourceFunction.open(new Configuration());
publisher.waitForClients();
Future<List<ITripEventInfo>> result = executor.submit(() -> {
MockContext<ITripEventInfo> ctx = new MockContext<>();
LOG.info("Running IEventSourceFunction with MockContext");
sourceFunction.run(ctx);
LOG.info("Done running IEventSourceFunction, returning collected events");
return ctx.collected;
});
publisher.publish(new TripEventInfoDTO(1L, 0L, TripState.CREATED, Region.AT_VIENNA));
publisher.publish(new TripEventInfoDTO(2L, 0L, TripState.CREATED, Region.DE_BERLIN));
sleep(1000);
LOG.info("Calling cancel on SourceFunction");
sourceFunction.cancel();
LOG.info("Dropping subscriber connections");
publisher.dropClients();
LOG.info("Calling close on SourceFunction");
sourceFunction.close();
List<ITripEventInfo> collected = result.get();
assertEquals(2, collected.size());
ITripEventInfo e0 = collected.get(0);
ITripEventInfo e1 = collected.get(1);
assertEquals(1L, e0.getTripId(), 0);
assertEquals(2L, e1.getTripId(), 0);
}
@Test
public void shouldBeSerializable() throws Exception {
try (ObjectOutputStream out = new ObjectOutputStream(new ByteArrayOutputStream())) {
out.writeObject(sourceFunction);
out.flush();
} catch (NotSerializableException e) {
fail("Implementation of IEventSourceFunction is not serializable");
}
}
private static class MockContext<T> implements SourceFunction.SourceContext<T> {
private final Object checkpointLock = new Object();
private List<T> collected = new ArrayList<>();
public List<T> getCollected() {
return collected;
}
@Override
public void collect(T element) {
collected.add(element);
}
@Override
public void collectWithTimestamp(T element, long timestamp) {
collected.add(element);
}
@Override
public void emitWatermark(Watermark mark) {
}
@Override
public void markAsTemporarilyIdle() {
}
@Override
public Object getCheckpointLock() {
return checkpointLock;
}
@Override
public void close() {
}
}
}

View File

@ -0,0 +1,89 @@
package dst.ass3.event.tests;
import dst.ass3.event.EventProcessingTestBase;
import dst.ass3.event.dto.TripEventInfoDTO;
import dst.ass3.event.model.domain.Region;
import dst.ass3.event.model.events.LifecycleEvent;
import org.apache.flink.api.common.JobExecutionResult;
import org.hamcrest.Matchers;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Future;
import static dst.ass3.event.model.domain.TripState.CREATED;
import static dst.ass3.event.model.domain.TripState.MATCHED;
import static dst.ass3.event.model.domain.TripState.QUEUED;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.hamcrest.MatcherAssert.assertThat;
public class Ass3_3_2Test extends EventProcessingTestBase {
private static final Logger LOG = LoggerFactory.getLogger(Ass3_3_2Test.class);
@Test
public void lifecycleEventStream_worksCorrectly() throws Exception {
// run flink in new thread
Future<JobExecutionResult> flinkExecution = initAndExecuteAsync();
// wait until a subscriber connects
LOG.info("Waiting for subscribers to connect");
publisher.waitForClients();
LifecycleEvent event;
long then = System.currentTimeMillis();
// causes LifecycleEvent 0
LOG.info("Publishing e0");
publisher.publish(new TripEventInfoDTO(0L, then, CREATED, Region.AT_VIENNA));
LOG.info("Collecting LifecycleEvent for e0");
event = lifecycleEvents.take();
LOG.info("Round-trip took {}ms", System.currentTimeMillis() - then);
assertEquals("Event ID not set correctly", 0L, event.getTripId());
assertEquals("State not set correctly", CREATED, event.getState());
assertEquals("Region not set correctly", Region.AT_VIENNA, event.getRegion());
assertThat("Timestamp not set correctly", event.getTimestamp(), Matchers.greaterThanOrEqualTo(then));
assertThat("Timestamp not set correctly", event.getTimestamp(),
Matchers.lessThanOrEqualTo(System.currentTimeMillis()));
// should be filtered
LOG.info("Publishing e1, should be filtered");
publisher.publish(new TripEventInfoDTO(1L, now(), CREATED, null));
assertNull("Events without a region should be filtered", lifecycleEvents.poll(500));
// causes LifecycleEvent 1
LOG.info("Publishing e2");
publisher.publish(new TripEventInfoDTO(2L, now(), QUEUED, Region.DE_BERLIN));
LOG.info("Collecting LifecycleEvent for e2");
event = lifecycleEvents.take();
assertEquals(2L, event.getTripId());
assertEquals(QUEUED, event.getState());
// should be filtered
LOG.info("Publishing e3, should be filtered");
publisher.publish(new TripEventInfoDTO(3L, now(), CREATED, null));
assertNull("Events without a region should be filtered", lifecycleEvents.poll(500));
// causes LifecycleEvent 2
LOG.info("Publishing e4");
publisher.publish(new TripEventInfoDTO(4L, now(), MATCHED, Region.AT_LINZ));
LOG.info("Collecting LifecycleEvent for e4");
event = lifecycleEvents.take();
assertEquals(4L, event.getTripId());
assertEquals(MATCHED, event.getState());
// disconnect subscribers
publisher.dropClients();
// wait for execution to end
flinkExecution.get();
}
}

View File

@ -0,0 +1,211 @@
package dst.ass3.event.tests;
import dst.ass3.event.EventProcessingTestBase;
import dst.ass3.event.dto.TripEventInfoDTO;
import dst.ass3.event.model.domain.Region;
import dst.ass3.event.model.events.LifecycleEvent;
import dst.ass3.event.model.events.MatchingDuration;
import dst.ass3.event.model.events.MatchingTimeoutWarning;
import dst.ass3.event.model.events.TripFailedWarning;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import static dst.ass3.event.model.domain.TripState.*;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.*;
public class Ass3_3_3Test extends EventProcessingTestBase {
private static final Logger LOG = LoggerFactory.getLogger(Ass3_3_3Test.class);
@Test
public void matchingDurations_areCalculatedCorrectly() throws Exception {
// tests whether duration calculation and stream keying works correctly
// expects LifecycleEvent stream to work correctly
Future<JobExecutionResult> flinkExecution = initAndExecuteAsync();
LOG.info("Waiting for subscribers to connect");
publisher.waitForClients();
LifecycleEvent e1Start;
LifecycleEvent e2Start;
LifecycleEvent e1End;
LifecycleEvent e2End;
publisher.publish(5, t -> new TripEventInfoDTO(1L, t, CREATED, Region.AT_VIENNA)); // 1 starts before 2
publisher.publish(5, t -> new TripEventInfoDTO(2L, t, CREATED, Region.AT_VIENNA));
publisher.publish(5, t -> new TripEventInfoDTO(3L, t, CREATED, Region.AT_VIENNA)); // 3 never finishes
LOG.info("Waiting for LifecycleEvent for event 1 being CREATED");
e1Start = lifecycleEvents.take();
LOG.info("Waiting for LifecycleEvent for event 2 being CREATED");
e2Start = lifecycleEvents.take();
LOG.info("Waiting for LifecycleEvent for event 3 being CREATED");
lifecycleEvents.take();
sleep(500);
publisher.publish(5, t -> new TripEventInfoDTO(1L, t, QUEUED, Region.AT_VIENNA));
publisher.publish(5, t -> new TripEventInfoDTO(2L, t, QUEUED, Region.AT_VIENNA));
LOG.info("Waiting for LifecycleEvent for event 1 being QUEUED");
lifecycleEvents.take();
LOG.info("Waiting for LifecycleEvent for event 2 being QUEUED");
lifecycleEvents.take();
sleep(500); // event 2 took about ~1000ms
publisher.publish(new TripEventInfoDTO(2L, now(), MATCHED, Region.AT_VIENNA)); // 2 finishes before 1
LOG.info("Waiting for LifecycleEvent for event 2 being MATCHED");
e2End = lifecycleEvents.take();
sleep(500); // event 1 took about ~1500ms
publisher.publish(new TripEventInfoDTO(1L, now(), MATCHED, Region.AT_VIENNA));
LOG.info("Waiting for LifecycleEvent for event 1 being MATCHED");
e1End = lifecycleEvents.take();
LOG.info("Collecting MatchingDuration event for event 2");
MatchingDuration d0 = matchingDurations.take();
LOG.info("Collecting MatchingDuration event for event 1");
MatchingDuration d1 = matchingDurations.take();
assertEquals("Expected event 2 to finish first", 2L, d0.getEventId()); // event 2 finished before 1
assertEquals("Expected event 1 to finish last", 1L, d1.getEventId());
assertThat("Expected MatchingDuration to be >= 0", d0.getDuration(), greaterThan(0L));
assertThat("Expected MatchingDuration to be >= 0", d1.getDuration(), greaterThan(0L));
assertEquals("MatchingDuration was not calculated from LifecycleEvents correctly",
e2End.getTimestamp() - e2Start.getTimestamp(), d0.getDuration(), 100);
assertEquals("MatchingDuration was not calculated from LifecycleEvents correctly",
e1End.getTimestamp() - e1Start.getTimestamp(), d1.getDuration(), 100);
publisher.dropClients();
flinkExecution.get();
}
@Test
public void durationsWithInterleavedEvents_areCalculatedCorrectly() throws Exception {
// tests whether CEP rule is tolerant towards multiple state changes between QUEUED and MATCHED
Future<JobExecutionResult> flinkExecution = initAndExecuteAsync(e ->
e.setMatchingDurationTimeout(Time.seconds(1))
);
LOG.info("Waiting for subscribers to connect");
publisher.waitForClients();
publisher.publish(5, t -> new TripEventInfoDTO(1L, t, CREATED, Region.AT_VIENNA));
publisher.publish(5, t -> new TripEventInfoDTO(1L, t, QUEUED, Region.AT_VIENNA));
publisher.publish(5, t -> new TripEventInfoDTO(2L, t, CREATED, Region.AT_VIENNA)); // never finishes
// change state several times (tests another aspect of the CEP rule)
publisher.publish(5, t -> new TripEventInfoDTO(1L, t, MATCHED, Region.AT_VIENNA));
publisher.publish(5, t -> new TripEventInfoDTO(1L, t, QUEUED, Region.AT_VIENNA));
publisher.publish(5, t -> new TripEventInfoDTO(1L, t, MATCHED, Region.AT_VIENNA));
publisher.publish(5, t -> new TripEventInfoDTO(1L, t, QUEUED, Region.AT_VIENNA));
publisher.publish(5, t -> new TripEventInfoDTO(1L, t, MATCHED, Region.AT_VIENNA));
publisher.dropClients();
flinkExecution.get();
List<MatchingDuration> result = new ArrayList<>(matchingDurations.get());
assertEquals("Expected one event to have finished", 1, result.size());
MatchingDuration d0 = result.get(0);
assertEquals("Expected event 1 to have finished", 1L, d0.getEventId());
}
@Test
public void timeoutWarnings_areEmittedCorrectly() throws Exception {
Future<JobExecutionResult> flinkExecution = initAndExecuteAsync(e -> {
e.setMatchingDurationTimeout(Time.seconds(1));
});
LOG.info("Waiting for subscribers to connect");
publisher.waitForClients();
publisher.publish(new TripEventInfoDTO(1L, now(), CREATED, Region.AT_VIENNA)); // never finishes
sleep(100);
publisher.publish(new TripEventInfoDTO(2L, now(), CREATED, Region.AT_VIENNA)); // never finishes
// confounding event
sleep(100);
publisher.publish(new TripEventInfoDTO(1L, now(), QUEUED, Region.AT_VIENNA));
sleep(1500); // wait for event to time out
publisher.dropClients();
LOG.info("Waiting for Flink execution to end");
flinkExecution.get();
LOG.info("Collecting timeout warning for event 1");
MatchingTimeoutWarning w1 = matchingTimeoutWarnings.take();
LOG.info("Collecting timeout warning for event 2");
MatchingTimeoutWarning w2 = matchingTimeoutWarnings.take();
assertEquals("Expected event 1 to time out first", 1L, w1.getTripId());
assertEquals("Expected event 2 to time out second", 2L, w2.getTripId());
}
@Test
public void tripFailedWarnings_areEmittedCorrectly() throws Exception {
Future<JobExecutionResult> flinkExecution = initAndExecuteAsync();
LOG.info("Waiting for subscribers to connect");
publisher.waitForClients();
publisher.publish(0, t -> new TripEventInfoDTO(1L, t, CREATED, Region.AT_VIENNA));
publisher.publish(0, t -> new TripEventInfoDTO(1L, t, QUEUED, Region.AT_VIENNA));
publisher.publish(0, t -> new TripEventInfoDTO(2L, t, CREATED, Region.AT_VIENNA));
publisher.publish(0, t -> new TripEventInfoDTO(2L, t, QUEUED, Region.AT_VIENNA));
// event 1 fail #1
publisher.publish(5, t -> new TripEventInfoDTO(1L, t, MATCHED, Region.AT_VIENNA));
publisher.publish(5, t -> new TripEventInfoDTO(1L, t, QUEUED, Region.AT_VIENNA));
// event 2 fail #1
publisher.publish(5, t -> new TripEventInfoDTO(2L, t, MATCHED, Region.AT_VIENNA));
publisher.publish(5, t -> new TripEventInfoDTO(2L, t, QUEUED, Region.AT_VIENNA));
// event 1 fail #2
publisher.publish(5, t -> new TripEventInfoDTO(1L, t, MATCHED, Region.AT_VIENNA));
publisher.publish(5, t -> new TripEventInfoDTO(1L, t, QUEUED, Region.AT_VIENNA));
// event 2 fail #2 and then success
publisher.publish(5, t -> new TripEventInfoDTO(2L, t, MATCHED, Region.AT_VIENNA));
publisher.publish(5, t -> new TripEventInfoDTO(2L, t, QUEUED, Region.AT_VIENNA));
publisher.publish(5, t -> new TripEventInfoDTO(2L, t, MATCHED, Region.AT_VIENNA));
LOG.info("Checking that no TripFailedWarning was issued yet");
assertNull(tripFailedWarnings.poll(500));
LOG.info("Triggering third failure");
// event 1 fail #3
publisher.publish(5, t -> new TripEventInfoDTO(1L, t, MATCHED, Region.AT_VIENNA));
publisher.publish(5, t -> new TripEventInfoDTO(1L, t, QUEUED, Region.AT_VIENNA));
LOG.info("Waiting for TripFailedWarning for event 1");
TripFailedWarning warning = tripFailedWarnings.take();
assertEquals(1L, warning.getEventId());
assertEquals(Region.AT_VIENNA, warning.getRegion());
publisher.dropClients();
flinkExecution.get();
}
}

View File

@ -0,0 +1,308 @@
package dst.ass3.event.tests;
import dst.ass3.event.EventProcessingTestBase;
import dst.ass3.event.dto.TripEventInfoDTO;
import dst.ass3.event.model.domain.Region;
import dst.ass3.event.model.events.*;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import static dst.ass3.event.model.domain.TripState.*;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.*;
public class Ass3_3_4Test extends EventProcessingTestBase {
private static final Logger LOG = LoggerFactory.getLogger(Ass3_3_4Test.class);
@Test
public void multipleTripFailures_triggerAlert() throws Exception {
// checks that the window works correctly
// expects TripFailedWarning stream to work correctly
Future<JobExecutionResult> flinkExecution = initAndExecuteAsync();
LOG.info("Waiting for subscribers to connect");
publisher.waitForClients();
Consumer<Long> causeWarning = (eventId) -> {
publisher.publish(5, t -> new TripEventInfoDTO(eventId, t, MATCHED, Region.AT_VIENNA));
publisher.publish(5, t -> new TripEventInfoDTO(eventId, t, QUEUED, Region.AT_VIENNA));
publisher.publish(5, t -> new TripEventInfoDTO(eventId, t, MATCHED, Region.AT_VIENNA));
publisher.publish(5, t -> new TripEventInfoDTO(eventId, t, QUEUED, Region.AT_VIENNA));
publisher.publish(5, t -> new TripEventInfoDTO(eventId, t, MATCHED, Region.AT_VIENNA));
publisher.publish(5, t -> new TripEventInfoDTO(eventId, t, QUEUED, Region.AT_VIENNA));
};
// all of these events will fail
publisher.publish(new TripEventInfoDTO(1L, now(), CREATED, Region.AT_VIENNA));
publisher.publish(new TripEventInfoDTO(1L, now(), QUEUED, Region.AT_VIENNA));
publisher.publish(new TripEventInfoDTO(2L, now(), CREATED, Region.AT_VIENNA));
publisher.publish(new TripEventInfoDTO(2L, now(), QUEUED, Region.AT_VIENNA));
publisher.publish(new TripEventInfoDTO(3L, now(), CREATED, Region.AT_VIENNA));
publisher.publish(new TripEventInfoDTO(3L, now(), QUEUED, Region.AT_VIENNA));
publisher.publish(new TripEventInfoDTO(4L, now(), CREATED, Region.AT_VIENNA));
publisher.publish(new TripEventInfoDTO(4L, now(), QUEUED, Region.AT_VIENNA));
publisher.publish(new TripEventInfoDTO(5L, now(), CREATED, Region.AT_VIENNA));
publisher.publish(new TripEventInfoDTO(5L, now(), QUEUED, Region.AT_VIENNA));
publisher.publish(new TripEventInfoDTO(6L, now(), CREATED, Region.AT_VIENNA));
publisher.publish(new TripEventInfoDTO(6L, now(), QUEUED, Region.AT_VIENNA));
// warning 3 warnings
causeWarning.accept(1L);
causeWarning.accept(2L);
causeWarning.accept(3L);
LOG.info("Collecting alert for first three warnings");
assertNotNull(alerts.take());
LOG.info("Checking that the fourth warning does not trigger an alert");
causeWarning.accept(4L);
assertNull(alerts.poll(500));
LOG.info("Checking that the fifth warning does not trigger an alert");
causeWarning.accept(5L);
assertNull(alerts.poll(500));
LOG.info("Checking that the sixth warning triggered an alert");
causeWarning.accept(6L);
assertNotNull(alerts.take());
publisher.dropClients();
flinkExecution.get();
}
@Test
public void matchingFailuresAndTimeouts_triggerAlert() throws Exception {
// checks whether keying works correctly, and whether Warning streams are unioned correctly
// expects both warning streams to work correctly
Future<JobExecutionResult> flinkExecution = initAndExecuteAsync(e ->
e.setMatchingDurationTimeout(Time.seconds(3))
);
LOG.info("Waiting for subscribers to connect");
publisher.waitForClients();
BiConsumer<Long, Region> causeWarning = (eventId, region) -> {
publisher.publish(5, t -> new TripEventInfoDTO(eventId, t, MATCHED, region));
publisher.publish(5, t -> new TripEventInfoDTO(eventId, t, QUEUED, region));
publisher.publish(5, t -> new TripEventInfoDTO(eventId, t, MATCHED, region));
publisher.publish(5, t -> new TripEventInfoDTO(eventId, t, QUEUED, region));
publisher.publish(5, t -> new TripEventInfoDTO(eventId, t, MATCHED, region));
publisher.publish(5, t -> new TripEventInfoDTO(eventId, t, QUEUED, region));
};
publisher.publish(new TripEventInfoDTO(1L, now(), CREATED, Region.AT_VIENNA)); // vienna e1 will fail
publisher.publish(new TripEventInfoDTO(1L, now(), QUEUED, Region.AT_VIENNA)); // vienna e1 will fail
publisher.publish(new TripEventInfoDTO(2L, now(), CREATED, Region.AT_VIENNA)); // vienna e2 will fail
publisher.publish(new TripEventInfoDTO(2L, now(), QUEUED, Region.AT_VIENNA)); // vienna e2 will fail
publisher.publish(new TripEventInfoDTO(3L, now(), CREATED, Region.AT_VIENNA)); // vienna e3 will time out
publisher.publish(new TripEventInfoDTO(3L, now(), QUEUED, Region.AT_VIENNA)); // vienna e3 will time out
publisher.publish(new TripEventInfoDTO(4L, now(), CREATED, Region.DE_BERLIN)); // berlin e4 will fail
publisher.publish(new TripEventInfoDTO(4L, now(), QUEUED, Region.DE_BERLIN)); // berlin e4 will fail
// s1 warning #1
causeWarning.accept(1L, Region.AT_VIENNA);
// s1 warning #2
causeWarning.accept(2L, Region.AT_VIENNA);
// s2 warning #1
causeWarning.accept(4L, Region.DE_BERLIN);
LOG.info("Checking that no alert has been issued yet");
assertNull(alerts.poll(500));
// make sure the other events don't time out
publisher.publish(new TripEventInfoDTO(1L, now(), MATCHED, Region.AT_VIENNA)); // vienna e1 will fail
publisher.publish(new TripEventInfoDTO(2L, now(), MATCHED, Region.AT_VIENNA)); // vienna e2 will fail
publisher.publish(new TripEventInfoDTO(4L, now(), MATCHED, Region.DE_BERLIN)); // berlin e4 will fail
sleep(4000); // waiting for e3 to time out
publisher.dropClients();
flinkExecution.get();
LOG.info("Collecting Alert event");
Alert alert = alerts.take();
assertEquals("Expected only a single alert", 0, alerts.get().size());
assertEquals(Region.AT_VIENNA, alert.getRegion());
assertEquals("An alert should comprise three warnings", 3, alert.getWarnings().size());
Warning w0 = alert.getWarnings().get(0);
Warning w1 = alert.getWarnings().get(1);
Warning w2 = alert.getWarnings().get(2);
assertThat(w0, instanceOf(TripFailedWarning.class));
assertThat(w1, instanceOf(TripFailedWarning.class));
assertThat(w2, instanceOf(MatchingTimeoutWarning.class));
assertEquals(Region.AT_VIENNA, w0.getRegion());
assertEquals(Region.AT_VIENNA, w1.getRegion());
assertEquals(Region.AT_VIENNA, w2.getRegion());
}
@Test
public void averageMatchingDurationWindow_worksCorrectly() throws Exception {
// makes sure the event is triggered at the correct instant
Future<JobExecutionResult> flinkExecution = initAndExecuteAsync();
LOG.info("Waiting for subscribers to connect");
publisher.waitForClients();
sleep(250);
publisher.publish(new TripEventInfoDTO(1L, now(), CREATED, Region.AT_VIENNA));
publisher.publish(new TripEventInfoDTO(2L, now(), CREATED, Region.AT_VIENNA));
publisher.publish(new TripEventInfoDTO(3L, now(), CREATED, Region.AT_VIENNA));
publisher.publish(new TripEventInfoDTO(4L, now(), CREATED, Region.AT_VIENNA));
publisher.publish(new TripEventInfoDTO(5L, now(), CREATED, Region.AT_VIENNA));
publisher.publish(new TripEventInfoDTO(6L, now(), CREATED, Region.AT_VIENNA));
publisher.publish(new TripEventInfoDTO(7L, now(), CREATED, Region.DE_BERLIN));
sleep(100);
publisher.publish(new TripEventInfoDTO(1L, now(), QUEUED, Region.AT_VIENNA));
publisher.publish(new TripEventInfoDTO(2L, now(), QUEUED, Region.AT_VIENNA));
publisher.publish(new TripEventInfoDTO(3L, now(), QUEUED, Region.AT_VIENNA));
publisher.publish(new TripEventInfoDTO(4L, now(), QUEUED, Region.AT_VIENNA));
publisher.publish(new TripEventInfoDTO(5L, now(), QUEUED, Region.AT_VIENNA));
publisher.publish(new TripEventInfoDTO(6L, now(), QUEUED, Region.AT_VIENNA));
publisher.publish(new TripEventInfoDTO(7L, now(), QUEUED, Region.DE_BERLIN));
sleep(100);
publisher.publish(new TripEventInfoDTO(1L, now(), MATCHED, Region.AT_VIENNA));
publisher.publish(new TripEventInfoDTO(2L, now(), MATCHED, Region.AT_VIENNA));
publisher.publish(new TripEventInfoDTO(3L, now(), MATCHED, Region.AT_VIENNA));
publisher.publish(new TripEventInfoDTO(4L, now(), MATCHED, Region.AT_VIENNA));
// the first four events should not trigger anything
LOG.info("Checking AverageMatchingDuration events after the first four MATCHED events of AT_VIENNA");
assertNull(averageMatchingDurations.poll(500));
// this event is from a different region
publisher.publish(new TripEventInfoDTO(7L, now(), MATCHED, Region.DE_BERLIN));
LOG.info("Checking AverageMatchingDuration events after the first MATCHED event of DE_BERLIN");
assertNull(averageMatchingDurations.poll(500));
// fifth event in s1 triggers the window operation
publisher.publish(new TripEventInfoDTO(5L, now(), MATCHED, Region.AT_VIENNA));
LOG.info("Collecting AverageMatchingDuration event for AT_VIENNA");
AverageMatchingDuration event = averageMatchingDurations.take();
assertNotNull(event);
assertEquals(Region.AT_VIENNA, event.getRegion());
// should be in a new window and therefore not trigger
publisher.publish(new TripEventInfoDTO(6L, now(), MATCHED, Region.AT_VIENNA));
LOG.info("Checking AverageMatchingDuration events after the sixth MATCHED event of AT_VIENNA");
assertNull(averageMatchingDurations.poll(500));
publisher.dropClients();
flinkExecution.get();
}
@Test
public void averageMatchingDurations_areCalculatedCorrectly() throws Exception {
// makes sure the keying works properly and that the calculation is done from MatchingDuration events
// requires MatchingDuration events to be calculated correctly
Future<JobExecutionResult> flinkExecution = initAndExecuteAsync();
LOG.info("Waiting for subscribers to connect");
publisher.waitForClients();
List<MatchingDuration> viennaDurations = new ArrayList<>(5);
List<MatchingDuration> berlinDurations = new ArrayList<>(5);
sleep(250);
publisher.publish(5, t -> new TripEventInfoDTO(1L, t, CREATED, Region.AT_VIENNA));
publisher.publish(5, t -> new TripEventInfoDTO(2L, t, CREATED, Region.AT_VIENNA));
publisher.publish(5, t -> new TripEventInfoDTO(3L, t, CREATED, Region.AT_VIENNA));
publisher.publish(5, t -> new TripEventInfoDTO(4L, t, CREATED, Region.AT_VIENNA));
publisher.publish(5, t -> new TripEventInfoDTO(5L, t, CREATED, Region.AT_VIENNA));
sleep(250);
publisher.publish(5, t -> new TripEventInfoDTO(6L, t, CREATED, Region.DE_BERLIN));
publisher.publish(5, t -> new TripEventInfoDTO(7L, t, CREATED, Region.DE_BERLIN));
publisher.publish(5, t -> new TripEventInfoDTO(8L, t, CREATED, Region.DE_BERLIN));
publisher.publish(5, t -> new TripEventInfoDTO(9L, t, CREATED, Region.DE_BERLIN));
publisher.publish(5, t -> new TripEventInfoDTO(10L, t, CREATED, Region.DE_BERLIN));
sleep(125);
publisher.publish(5, t -> new TripEventInfoDTO(6L, t, QUEUED, Region.DE_BERLIN));
publisher.publish(5, t -> new TripEventInfoDTO(7L, t, QUEUED, Region.DE_BERLIN));
publisher.publish(5, t -> new TripEventInfoDTO(8L, t, QUEUED, Region.DE_BERLIN));
publisher.publish(5, t -> new TripEventInfoDTO(9L, t, QUEUED, Region.DE_BERLIN));
publisher.publish(5, t -> new TripEventInfoDTO(10L, t, QUEUED, Region.DE_BERLIN));
sleep(125);
publisher.publish(5, t -> new TripEventInfoDTO(1L, t, QUEUED, Region.AT_VIENNA));
publisher.publish(5, t -> new TripEventInfoDTO(2L, t, QUEUED, Region.AT_VIENNA));
publisher.publish(5, t -> new TripEventInfoDTO(3L, t, QUEUED, Region.AT_VIENNA));
publisher.publish(5, t -> new TripEventInfoDTO(4L, t, QUEUED, Region.AT_VIENNA));
publisher.publish(5, t -> new TripEventInfoDTO(5L, t, QUEUED, Region.AT_VIENNA));
publisher.publish(5, t -> new TripEventInfoDTO(6L, t, MATCHED, Region.DE_BERLIN));
publisher.publish(5, t -> new TripEventInfoDTO(7L, t, MATCHED, Region.DE_BERLIN));
publisher.publish(5, t -> new TripEventInfoDTO(8L, t, MATCHED, Region.DE_BERLIN));
LOG.info("Collecting MatchingDuration events 1,2,3 for DE_BERLIN");
berlinDurations.addAll(matchingDurations.take(3));
sleep(500);
publisher.publish(5, t -> new TripEventInfoDTO(1L, t, MATCHED, Region.AT_VIENNA));
publisher.publish(5, t -> new TripEventInfoDTO(2L, t, MATCHED, Region.AT_VIENNA));
publisher.publish(5, t -> new TripEventInfoDTO(3L, t, MATCHED, Region.AT_VIENNA));
LOG.info("Collecting MatchingDuration events 1,2,3 for AT_VIENNA");
viennaDurations.addAll(matchingDurations.take(3));
publisher.publish(5, t -> new TripEventInfoDTO(9L, t, MATCHED, Region.DE_BERLIN));
publisher.publish(5, t -> new TripEventInfoDTO(10L, t, MATCHED, Region.DE_BERLIN));
LOG.info("Collecting MatchingDuration events 4,5 for DE_BERLIN");
berlinDurations.addAll(matchingDurations.take(2));
sleep(500);
publisher.publish(5, t -> new TripEventInfoDTO(4L, t, MATCHED, Region.AT_VIENNA));
publisher.publish(5, t -> new TripEventInfoDTO(5L, t, MATCHED, Region.AT_VIENNA));
LOG.info("Collecting MatchingDuration events 4,5 for AT_VIENNA");
viennaDurations.addAll(matchingDurations.take(2));
LOG.info("Collecting AverageMatchingDuration event for DE_BERLIN");
AverageMatchingDuration e0 = averageMatchingDurations.take(); // berlin
LOG.info("Collecting AverageMatchingDuration event for AT_VIENNA");
AverageMatchingDuration e1 = averageMatchingDurations.take(); // vienna
assertEquals("Expected calculation for berlin to have been triggered first", e0.getRegion(), Region.DE_BERLIN);
assertEquals("Expected calculation for vienna to have been triggered second", e1.getRegion(), Region.AT_VIENNA);
assertEquals("Wrong number of MatchingDuration events for AT_VIENNA", 5, viennaDurations.size());
assertEquals("Wrong number of MatchingDuration events for DE_BERLIN", 5, berlinDurations.size());
double viennaAvg = viennaDurations.stream().mapToLong(MatchingDuration::getDuration).average().orElse(-1);
double berlinAvg = berlinDurations.stream().mapToLong(MatchingDuration::getDuration).average().orElse(-1);
assertEquals("Average duration was not calculated from MatchingDuration events correctly", e0.getDuration(),
berlinAvg, 100);
assertEquals("Average duration was not calculated from MatchingDuration events correctly", e1.getDuration(),
viennaAvg, 100);
publisher.dropClients();
flinkExecution.get();
}
}

View File

@ -0,0 +1,16 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are assigned the type ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} - %highlight(%5p) [%12.12thread] %cyan(%-40.40logger{39}): %m%n</pattern>
</encoder>
</appender>
<logger name="org.apache.flink.shaded" level="WARN"/>
<root level="${log.level:-INFO}">
<appender-ref ref="STDOUT"/>
</root>
</configuration>

49
ass3-messaging/pom.xml Normal file
View File

@ -0,0 +1,49 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>at.ac.tuwien.infosys.dst</groupId>
<artifactId>dst</artifactId>
<version>2021.1</version>
<relativePath>..</relativePath>
</parent>
<artifactId>ass3-messaging</artifactId>
<name>DST :: Assignment 3 :: Messaging</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>http-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,38 @@
package dst.ass3.messaging;
/**
* Contains several constants related to the RabbitMQ infrastructure and expected names for queues, exchanges and
* routing keys.
*/
public final class Constants {
public static final String RMQ_HOST = "192.168.99.99";
public static final String RMQ_PORT = "5672";
public static final String RMQ_VHOST = "/";
public static final String RMQ_USER = "dst";
public static final String RMQ_PASSWORD = "dst";
public static final String RMQ_API_PORT = "15672";
public static final String RMQ_API_URL = "http://" + RMQ_HOST + ":" + RMQ_API_PORT + "/api/";
public static final String QUEUE_AT_VIENNA = "dst.at_vienna";
public static final String QUEUE_AT_LINZ = "dst.at_linz";
public static final String QUEUE_DE_BERLIN = "dst.de_berlin";
public static final String[] WORK_QUEUES = {
QUEUE_AT_VIENNA,
QUEUE_AT_LINZ,
QUEUE_DE_BERLIN
};
public static final String TOPIC_EXCHANGE = "dst.workers";
public static final String ROUTING_KEY_AT_VIENNA = "requests.at_vienna";
public static final String ROUTING_KEY_AT_LINZ = "requests.at_linz";
public static final String ROUTING_KEY_DE_BERLIN = "requests.de_berlin";
private Constants() {
// util class
}
}

View File

@ -0,0 +1,54 @@
package dst.ass3.messaging;
import java.util.Objects;
public class GeoPoint {
private Double longitude;
private Double latitude;
public GeoPoint() {
}
public GeoPoint(Double longitude, Double latitude) {
this.longitude = longitude;
this.latitude = latitude;
}
public Double getLongitude() {
return longitude;
}
public void setLongitude(Double longitude) {
this.longitude = longitude;
}
public Double getLatitude() {
return latitude;
}
public void setLatitude(Double latitude) {
this.latitude = latitude;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
GeoPoint geoPoint = (GeoPoint) o;
return Objects.equals(getLongitude(), geoPoint.getLongitude()) &&
Objects.equals(getLatitude(), geoPoint.getLatitude());
}
@Override
public int hashCode() {
return Objects.hash(getLongitude(), getLatitude());
}
}

View File

@ -0,0 +1,21 @@
package dst.ass3.messaging;
import java.io.Closeable;
import java.io.IOException;
public interface IMessagingFactory extends Closeable {
IQueueManager createQueueManager();
IRequestGateway createRequestGateway();
IWorkloadMonitor createWorkloadMonitor();
/**
* Closes any resource the factory may create.
*
* @throws IOException propagated exceptions
*/
@Override
void close() throws IOException;
}

View File

@ -0,0 +1,28 @@
package dst.ass3.messaging;
import java.io.Closeable;
import java.io.IOException;
/**
* Responsible for creating and tearing down all necessary RabbitMQ queues or exchanges necessary for running the system.
*/
public interface IQueueManager extends Closeable {
/**
* Initializes all queues or topic exchanges necessary for running the system.
*/
void setUp();
/**
* Removes all queues or topic exchanged associated with the system.
*/
void tearDown();
/**
* Closes underlying conection or resources, if any.
*
* @throws IOException propagated exceptions
*/
@Override
void close() throws IOException;
}

View File

@ -0,0 +1,21 @@
package dst.ass3.messaging;
import java.io.Closeable;
import java.io.IOException;
public interface IRequestGateway extends Closeable {
/**
* Serializes and routes a request to the correct queue.
* @param request the request
*/
void submitRequest(TripRequest request);
/**
* Closes any resources that may have been initialized (connections, channels, etc.)
*
* @throws IOException propagated exceptions
*/
@Override
void close() throws IOException;
}

View File

@ -0,0 +1,34 @@
package dst.ass3.messaging;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
public interface IWorkloadMonitor extends Closeable {
/**
* Returns for each region the amount of waiting requests.
*
* @return a map
*/
Map<Region, Long> getRequestCount();
/**
* Returns the amount of workers for each region. This can be deduced from the amount of consumers to each
* queue.
*
* @return a map
*/
Map<Region, Long> getWorkerCount();
/**
* Returns for each region the average processing time of the last 10 recorded requests. The data comes from
* subscriptions to the respective topics.
*
* @return a map
*/
Map<Region, Double> getAverageProcessingTime();
@Override
void close() throws IOException;
}

View File

@ -0,0 +1,7 @@
package dst.ass3.messaging;
public enum Region {
AT_VIENNA,
AT_LINZ,
DE_BERLIN
}

View File

@ -0,0 +1,71 @@
package dst.ass3.messaging;
import java.util.Objects;
public class TripRequest {
private String id;
private Region region;
private GeoPoint pickup;
public TripRequest() {
}
public TripRequest(String id, Region region, GeoPoint pickup) {
this.id = id;
this.region = region;
this.pickup = pickup;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Region getRegion() {
return region;
}
public void setRegion(Region region) {
this.region = region;
}
public GeoPoint getPickup() {
return pickup;
}
public void setPickup(GeoPoint pickup) {
this.pickup = pickup;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TripRequest that = (TripRequest) o;
return Objects.equals(getId(), that.getId()) &&
getRegion() == that.getRegion() &&
Objects.equals(getPickup(), that.getPickup());
}
@Override
public int hashCode() {
return Objects.hash(getId(), getRegion(), getPickup());
}
@Override
public String toString() {
return "TripRequest{" +
"id='" + id + '\'' +
", region=" + region +
", pickup=" + pickup +
'}';
}
}

View File

@ -0,0 +1,76 @@
package dst.ass3.messaging;
import java.util.Objects;
/**
* Message sent by a worker after it is finished processing a request.
*/
public class WorkerResponse {
/**
* The ID of the original {@link TripRequest}.
*/
private String requestId;
/**
* The time it took to process the request (in milliseconds).
*/
private Long processingTime;
/**
* The proposed driver
*/
private Long driverId;
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public Long getProcessingTime() {
return processingTime;
}
public void setProcessingTime(Long processingTime) {
this.processingTime = processingTime;
}
public Long getDriverId() {
return driverId;
}
public void setDriverId(Long driverId) {
this.driverId = driverId;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
WorkerResponse that = (WorkerResponse) o;
return Objects.equals(getRequestId(), that.getRequestId()) &&
Objects.equals(getProcessingTime(), that.getProcessingTime()) &&
Objects.equals(getDriverId(), that.getDriverId());
}
@Override
public int hashCode() {
return Objects.hash(getRequestId(), getProcessingTime(), getDriverId());
}
@Override
public String toString() {
return "WorkerResponse{" +
"requestId='" + requestId + '\'' +
", processingTime=" + processingTime +
", driverId=" + driverId +
'}';
}
}

View File

@ -0,0 +1,32 @@
package dst.ass3.messaging.impl;
import dst.ass3.messaging.IMessagingFactory;
import dst.ass3.messaging.IQueueManager;
import dst.ass3.messaging.IRequestGateway;
import dst.ass3.messaging.IWorkloadMonitor;
public class MessagingFactory implements IMessagingFactory {
@Override
public IQueueManager createQueueManager() {
// TODO
return null;
}
@Override
public IRequestGateway createRequestGateway() {
// TODO
return null;
}
@Override
public IWorkloadMonitor createWorkloadMonitor() {
// TODO
return null;
}
@Override
public void close() {
// implement if needed
}
}

View File

@ -0,0 +1,16 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are assigned the type ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} - %highlight(%5p) [%12.12thread] %cyan(%-40.40logger{39}): %m%n</pattern>
</encoder>
</appender>
<logger name="org.apache.http" level="WARN"/>
<root level="${log.level:-INFO}">
<appender-ref ref="STDOUT"/>
</root>
</configuration>

View File

@ -0,0 +1,51 @@
package dst.ass3.messaging;
import com.rabbitmq.http.client.Client;
import org.junit.rules.ExternalResource;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.apache.http.auth.Credentials;
public class RabbitResource extends ExternalResource {
private RabbitAdmin admin;
private Client manager;
private CachingConnectionFactory connectionFactory;
private RabbitTemplate client;
@Override
protected void before() throws Throwable {
manager = new Client(Constants.RMQ_API_URL, Constants.RMQ_USER, Constants.RMQ_PASSWORD);
connectionFactory = new CachingConnectionFactory(Constants.RMQ_HOST);
connectionFactory.setUsername(Constants.RMQ_USER);
connectionFactory.setPassword(Constants.RMQ_PASSWORD);
client = new RabbitTemplate(connectionFactory);
admin = new RabbitAdmin(connectionFactory);
}
@Override
protected void after() {
connectionFactory.destroy();
}
public Client getManager() {
return manager;
}
public RabbitTemplate getClient() {
return client;
}
public RabbitAdmin getAdmin() {
return admin;
}
public CachingConnectionFactory getConnectionFactory() {
return connectionFactory;
}
}

View File

@ -0,0 +1,13 @@
package dst.ass3.messaging.impl;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@RunWith(Suite.class)
@Suite.SuiteClasses({
QueueManagerTest.class,
RequestGatewayTest.class,
WorkloadMonitorTest.class
})
public class Ass3_1_Suite {
}

View File

@ -0,0 +1,97 @@
package dst.ass3.messaging.impl;
import com.rabbitmq.http.client.domain.ExchangeInfo;
import com.rabbitmq.http.client.domain.QueueInfo;
import dst.ass3.messaging.IMessagingFactory;
import dst.ass3.messaging.IQueueManager;
import dst.ass3.messaging.RabbitResource;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static dst.ass3.messaging.Constants.*;
import static org.hamcrest.CoreMatchers.*;
import static org.hamcrest.MatcherAssert.assertThat;
public class QueueManagerTest {
@Rule
public RabbitResource rabbit = new RabbitResource();
@Rule
public Timeout timeout = new Timeout(10, TimeUnit.SECONDS);
private IMessagingFactory factory = new MessagingFactory();
private IQueueManager queueManager;
@Before
public void setUp() throws Exception {
factory = new MessagingFactory();
queueManager = factory.createQueueManager();
}
@After
public void tearDown() throws Exception {
try {
queueManager.close();
} catch (IOException e) {
// ignore
}
}
@Test
public void setUp_createsQueues() throws Exception {
queueManager.setUp();
try {
List<QueueInfo> queues = rabbit.getManager().getQueues();
assertThat(queues.size(), not(0));
// make sure all work queues exist
List<String> queueNames = queues.stream().map(QueueInfo::getName).collect(Collectors.toList());
Arrays.stream(WORK_QUEUES)
.forEach(wq -> assertThat(queueNames, hasItem(wq)));
} finally {
queueManager.tearDown();
}
}
@Test
public void setUp_createsExchange() throws Exception {
queueManager.setUp();
try {
ExchangeInfo exchange = rabbit.getManager().getExchange(RMQ_VHOST, TOPIC_EXCHANGE);
assertThat(exchange, notNullValue());
} finally {
queueManager.tearDown();
}
}
@Test
public void tearDown_removesQueues() throws Exception {
queueManager.setUp();
queueManager.tearDown();
List<QueueInfo> queues = rabbit.getManager().getQueues();
List<String> queueNames = queues.stream().map(QueueInfo::getName).collect(Collectors.toList());
Arrays.stream(WORK_QUEUES)
.forEach(wq -> assertThat(queueNames, not(hasItem(wq))));
}
@Test
public void tearDown_removesExchange() throws Exception {
queueManager.setUp();
queueManager.tearDown();
ExchangeInfo exchange = rabbit.getManager().getExchange(RMQ_VHOST, TOPIC_EXCHANGE);
assertThat(exchange, nullValue());
}
}

View File

@ -0,0 +1,112 @@
package dst.ass3.messaging.impl;
import dst.ass3.messaging.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.*;
import static org.hamcrest.MatcherAssert.assertThat;
public class RequestGatewayTest {
private static final Logger LOG = LoggerFactory.getLogger(RequestGatewayTest.class);
@Rule
public RabbitResource rabbit = new RabbitResource();
@Rule
public Timeout timeout = new Timeout(10, TimeUnit.SECONDS);
private IMessagingFactory factory;
private IQueueManager queueManager;
private IRequestGateway requestGateway;
@Before
public void setUp() throws Exception {
factory = new MessagingFactory();
queueManager = factory.createQueueManager();
requestGateway = factory.createRequestGateway();
queueManager.setUp();
}
@After
public void tearDown() throws Exception {
queueManager.tearDown();
requestGateway.close();
queueManager.close();
factory.close();
}
@Test
public void submitRequest_routesRequestsToCorrectQueues() throws Exception {
TripRequest r1 = new TripRequest("id1", Region.AT_VIENNA, new GeoPoint(34.22, -22.11));
TripRequest r2 = new TripRequest("id2", Region.AT_VIENNA, new GeoPoint(98.2, -1.11));
TripRequest r3 = new TripRequest("id3", Region.AT_LINZ, new GeoPoint(85.33, -25d));
LOG.info("Sending request {}", r1);
requestGateway.submitRequest(r1);
LOG.info("Sending request {}", r2);
requestGateway.submitRequest(r2);
LOG.info("Sending request {}", r3);
requestGateway.submitRequest(r3);
LOG.info("Taking requests from queue {}", Constants.QUEUE_AT_VIENNA);
Message m1 = rabbit.getClient().receive(Constants.QUEUE_AT_VIENNA, 1000);
assertThat(m1, notNullValue());
LOG.info("Taking requests from queue {}", Constants.QUEUE_AT_VIENNA);
Message m2 = rabbit.getClient().receive(Constants.QUEUE_AT_VIENNA, 1000);
assertThat(m2, notNullValue());
LOG.info("Taking requests from queue {}", Constants.QUEUE_AT_LINZ);
Message m3 = rabbit.getClient().receive(Constants.QUEUE_AT_LINZ, 1000);
assertThat(m3, notNullValue());
assertThat("Expected queue to be empty as no request for that region were issued",
rabbit.getClient().receive(Constants.QUEUE_DE_BERLIN, 1000), nullValue());
}
@Test
public void submitRequest_serializesIntoJsonFormat() throws Exception {
TripRequest r1 = new TripRequest("id1", Region.AT_VIENNA, new GeoPoint(1d, 2d));
TripRequest r2 = new TripRequest("id2", Region.AT_LINZ, new GeoPoint(3d, 4d));
TripRequest r3 = new TripRequest("id3", Region.DE_BERLIN, new GeoPoint(5d, 6d));
LOG.info("Sending request {}", r1);
requestGateway.submitRequest(r1);
LOG.info("Sending request {}", r2);
requestGateway.submitRequest(r2);
LOG.info("Sending request {}", r3);
requestGateway.submitRequest(r3);
LOG.info("Taking request from queue {}", Constants.QUEUE_AT_VIENNA);
Message m1 = rabbit.getClient().receive(Constants.QUEUE_AT_VIENNA, 1000);
assertThat(m1, notNullValue());
assertThat(new String(m1.getBody()),
equalTo("{\"id\":\"id1\",\"region\":\"AT_VIENNA\",\"pickup\":{\"longitude\":1.0,\"latitude\":2.0}}"));
LOG.info("Taking request from queue {}", Constants.QUEUE_AT_LINZ);
Message m2 = rabbit.getClient().receive(Constants.QUEUE_AT_LINZ, 1000);
assertThat(m2, notNullValue());
assertThat(new String(m2.getBody()),
equalTo("{\"id\":\"id2\",\"region\":\"AT_LINZ\",\"pickup\":{\"longitude\":3.0,\"latitude\":4.0}}"));
LOG.info("Taking request from queue {}", Constants.QUEUE_DE_BERLIN);
Message m3 = rabbit.getClient().receive(Constants.QUEUE_DE_BERLIN, 1000);
assertThat(m3, notNullValue());
assertThat(new String(m3.getBody()),
equalTo("{\"id\":\"id3\",\"region\":\"DE_BERLIN\",\"pickup\":{\"longitude\":5.0,\"latitude\":6.0}}"));
}
}

View File

@ -0,0 +1,169 @@
package dst.ass3.messaging.impl;
import com.rabbitmq.http.client.domain.QueueInfo;
import dst.ass3.messaging.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.Lifecycle;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import static dst.ass3.messaging.Constants.TOPIC_EXCHANGE;
import static dst.ass3.messaging.Constants.WORK_QUEUES;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.MatcherAssert.assertThat;
public class WorkloadMonitorTest {
private static final Logger LOG = LoggerFactory.getLogger(RequestGatewayTest.class);
@Rule
public RabbitResource rabbit = new RabbitResource();
@Rule
public Timeout timeout = new Timeout(60, TimeUnit.SECONDS);
private IMessagingFactory factory;
private IQueueManager queueManager;
private IRequestGateway requestGateway;
private IWorkloadMonitor workloadMonitor;
@Before
public void setUp() throws Exception {
factory = new MessagingFactory();
queueManager = factory.createQueueManager();
requestGateway = factory.createRequestGateway();
queueManager.setUp();
workloadMonitor = factory.createWorkloadMonitor();
}
@After
public void tearDown() throws Exception {
queueManager.tearDown();
requestGateway.close();
queueManager.close();
factory.close();
}
@Test
public void getRequestCount_returnsCorrectCount() throws Exception {
try {
Map<Region, Long> countForRegion = new HashMap<>();
for (Region region : Region.values()) {
countForRegion.put(region, ThreadLocalRandom.current().nextLong(10, 20 + 1));
for (long i = 0; i < countForRegion.get(region); i++) {
GeoPoint pickup = new GeoPoint(48.11, 16.22);
TripRequest request = new TripRequest("id" + i, region, pickup);
LOG.info("Sending request {}", request);
requestGateway.submitRequest(request);
}
}
// wait for the messages to be processed by rabbit
Thread.sleep(2000);
assertThat(workloadMonitor.getRequestCount(), equalTo(countForRegion));
} finally {
workloadMonitor.close();
}
}
@Test
public void multipleWorkloadMonitors_uniqueQueueForEachMonitor() throws Exception {
try (IWorkloadMonitor workloadMonitor2 = factory.createWorkloadMonitor();
IWorkloadMonitor workloadMonitor3 = factory.createWorkloadMonitor();) {
long nonWorkQueues = rabbit.getManager().getQueues().stream().filter(q -> !Arrays.asList(WORK_QUEUES).contains(q.getName())).count();
assertThat(nonWorkQueues, greaterThanOrEqualTo(3L));
} finally {
workloadMonitor.close();
}
}
@Test
public void getAverageProcessingTime_correctAverageTime() throws Exception {
try {
Map<Region, Double> avgTimes = new HashMap<>();
for (Region region : Region.values()) {
long count = ThreadLocalRandom.current().nextLong(15, 25);
long regionTime = 0;
for (long i = 0; i < count; i++) {
long requestTime = ThreadLocalRandom.current().nextLong(1000, 20000 + 1);
if (count - i <= 10) {
regionTime += requestTime;
}
String body = String.format("{\"requestId\": \"%s\", \"processingTime\": \"%d\", \"driverId\": \"1\"}", UUID.randomUUID(), requestTime);
LOG.info("Sending request {}", body);
rabbit.getClient().convertAndSend(TOPIC_EXCHANGE, "requests." + region.toString().toLowerCase(), body);
}
avgTimes.put(region, ((double) regionTime / 10));
}
// wait for the messages to be processed by rabbit
Thread.sleep(2000);
assertThat(workloadMonitor.getAverageProcessingTime(), equalTo(avgTimes));
} finally {
workloadMonitor.close();
}
}
@Test
public void getWorkerCount_returnsCorrectCount() throws Exception {
try {
// spawn a random number of consumers
Map<Region, Collection<MessageListenerContainer>> consumersForRegion = new HashMap<>();
Map<Region, Long> consumerCountForRegion = new HashMap<>();
for (Region region : Region.values()) {
List<MessageListenerContainer> consumers = new ArrayList<>();
consumersForRegion.put(region, consumers);
consumerCountForRegion.put(region, ThreadLocalRandom.current().nextLong(10, 20 + 1));
for (long i = 0; i < consumerCountForRegion.get(region); i++) {
consumers.add(spawnConsumer("dst." + region.toString().toLowerCase()));
}
}
// wait for rabbit to get to know the new consumers
Thread.sleep(2000);
Map<Region, Long> workerCount = workloadMonitor.getWorkerCount();
// stop all consumers
consumersForRegion.entrySet().stream().map(Map.Entry::getValue).flatMap(Collection::stream).forEach(Lifecycle::stop);
assertThat(workerCount, equalTo(consumerCountForRegion));
} finally {
workloadMonitor.close();
}
}
private MessageListenerContainer spawnConsumer(String queue) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(rabbit.getConnectionFactory());
container.addQueueNames(queue);
container.start();
return container;
}
@Test
public void close_removesQueues() throws Exception {
workloadMonitor.close();
List<QueueInfo> queues = rabbit.getManager().getQueues();
long nonWorkQueues = rabbit.getManager().getQueues().stream().filter(q -> !Arrays.asList(WORK_QUEUES).contains(q.getName())).count();
assertThat(nonWorkQueues, is(0L));
}
}

1
ass3-worker/Dockerfile Normal file
View File

@ -0,0 +1 @@
# TODO

46
ass3-worker/pom.xml Normal file
View File

@ -0,0 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>at.ac.tuwien.infosys.dst</groupId>
<artifactId>dst</artifactId>
<version>2021.1</version>
<relativePath>..</relativePath>
</parent>
<artifactId>ass3-worker</artifactId>
<packaging>jar</packaging>
<name>DST :: Assignment 3 :: Worker</name>
<dependencies>
<dependency>
<groupId>at.ac.tuwien.infosys.dst</groupId>
<artifactId>ass3-messaging</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.docker-java</groupId>
<artifactId>docker-java</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

11
ass3-worker/redis-data.sh Normal file
View File

@ -0,0 +1,11 @@
#!/usr/bin/env bash
redis-cli hmset drivers:at_vienna "1234" "48.19819 16.37127" "5678" "50.19819 20.37127"
# Make sure that all required resources are created and correctly configured
# To test your implementation:
# Publish the following message via the RabbitMQ interface in the queue 'dst.at_vienna'
# {"id":"request1","region":"AT_VIENNA","pickup":{"longitude": 16.371334 ,"latitude": 48.198122}}
# To see if your implementation did remove the driver execute the following command in the VM
# redis-cli hgetall drivers:at_vienna
# the output should not include driver with id 1234

View File

@ -0,0 +1,6 @@
/**
* This package allows you to write your own integration tests for the worker. For example, you could use the
* RabbitTemplate, Jedis, and DockerClient library to programmatically insert test data, and verify whether containers
* are spawned correctly.
*/
package dst.ass3.worker;

1
ass3-worker/worker.py Normal file
View File

@ -0,0 +1 @@
# TODO

77
pom.xml
View File

@ -387,6 +387,47 @@
<version>${httpclient.version}</version> <version>${httpclient.version}</version>
</dependency> </dependency>
<!-- assignment 3 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.github.docker-java</groupId>
<artifactId>docker-java</artifactId>
<version>${docker-api-client.version}</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>${rabbitmq-client.version}</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>http-client</artifactId>
<version>${rabbitmq-http.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>${spring-boot.version}</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>
@ -405,6 +446,10 @@
<module>ass2-service/facade</module> <module>ass2-service/facade</module>
<module>ass2-aop</module> <module>ass2-aop</module>
<module>ass2-ioc</module> <module>ass2-ioc</module>
<module>ass3-messaging</module>
<module>ass3-event</module>
<module>ass3-elastic</module>
<module>ass3-worker</module>
</modules> </modules>
</profile> </profile>
@ -488,6 +533,32 @@
</modules> </modules>
</profile> </profile>
<profile>
<id>ass3-event</id>
<modules>
<module>ass3-event</module>
</modules>
</profile>
<profile>
<id>ass3-messaging</id>
<modules>
<module>ass3-messaging</module>
</modules>
</profile>
<profile>
<id>ass3-elastic</id>
<modules>
<module>ass3-messaging</module>
<module>ass3-elastic</module>
</modules>
</profile>
<profile>
<id>ass3-worker</id>
<modules>
<module>ass3-messaging</module>
<module>ass3-worker</module>
</modules>
</profile>
</profiles> </profiles>
@ -529,6 +600,12 @@
<protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version> <protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
<os-maven-plugin.version>1.7.0</os-maven-plugin.version> <os-maven-plugin.version>1.7.0</os-maven-plugin.version>
<httpclient.version>4.5.13</httpclient.version> <httpclient.version>4.5.13</httpclient.version>
<!-- assignment 3 -->
<flink.version>1.13.0</flink.version>
<docker-api-client.version>3.2.7</docker-api-client.version>
<commons-cli.version>1.4</commons-cli.version>
<rabbitmq-http.version>3.9.0.RELEASE</rabbitmq-http.version>
<rabbitmq-client.version>5.11.0</rabbitmq-client.version>
</properties> </properties>