From d4b7ccbb727d9da1361cdb48d32647df284d67b4 Mon Sep 17 00:00:00 2001 From: Tobias Eidelpes Date: Wed, 19 May 2021 11:40:11 +0200 Subject: [PATCH] Implement EventSourceFunction (3.3.1) --- .../ass3/event/EventProcessingFactory.java | 9 +-- .../ass3/event/impl/EventSourceFunction.java | 58 +++++++++++++++++++ 2 files changed, 63 insertions(+), 4 deletions(-) create mode 100644 ass3-event/src/main/java/dst/ass3/event/impl/EventSourceFunction.java diff --git a/ass3-event/src/main/java/dst/ass3/event/EventProcessingFactory.java b/ass3-event/src/main/java/dst/ass3/event/EventProcessingFactory.java index 6a8c668..2bddf92 100644 --- a/ass3-event/src/main/java/dst/ass3/event/EventProcessingFactory.java +++ b/ass3-event/src/main/java/dst/ass3/event/EventProcessingFactory.java @@ -1,16 +1,17 @@ package dst.ass3.event; +import dst.ass3.event.impl.EventProcessingEnvironment; +import dst.ass3.event.impl.EventSourceFunction; + /** * Creates your {@link IEventProcessingEnvironment} and {@link IEventSourceFunction} implementation instances. */ public class EventProcessingFactory { public static IEventProcessingEnvironment createEventProcessingEnvironment() { - // TODO - return null; + return new EventProcessingEnvironment(); } public static IEventSourceFunction createEventSourceFunction() { - // TODO - return null; + return new EventSourceFunction(); } } diff --git a/ass3-event/src/main/java/dst/ass3/event/impl/EventSourceFunction.java b/ass3-event/src/main/java/dst/ass3/event/impl/EventSourceFunction.java new file mode 100644 index 0000000..750d792 --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/impl/EventSourceFunction.java @@ -0,0 +1,58 @@ +package dst.ass3.event.impl; + +import dst.ass3.event.Constants; +import dst.ass3.event.EventSubscriber; +import dst.ass3.event.IEventSourceFunction; +import dst.ass3.event.model.domain.ITripEventInfo; +import org.apache.flink.api.common.functions.IterationRuntimeContext; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.Configuration; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +public class EventSourceFunction implements IEventSourceFunction { + EventSubscriber eventSubscriber; + + @Override + public void open(Configuration parameters) throws Exception { + SocketAddress socketAddress = new InetSocketAddress(Constants.EVENT_PUBLISHER_PORT); + eventSubscriber = EventSubscriber.subscribe(socketAddress); + } + + @Override + public void close() throws Exception { + eventSubscriber.close(); + } + + @Override + public RuntimeContext getRuntimeContext() { + return null; + } + + @Override + public IterationRuntimeContext getIterationRuntimeContext() { + return null; + } + + @Override + public void setRuntimeContext(RuntimeContext runtimeContext) { + + } + + @Override + public void run(SourceContext ctx) throws Exception { + ITripEventInfo info = eventSubscriber.receive(); + ctx.collect(info); + while (info != null) { + info = eventSubscriber.receive(); + if (info != null) + ctx.collect(info); + } + } + + @Override + public void cancel() { + + } +}