Implement EventSourceFunction (3.3.1)

This commit is contained in:
Tobias Eidelpes 2021-05-19 11:40:11 +02:00
parent 33d061cbef
commit d4b7ccbb72
2 changed files with 63 additions and 4 deletions

View File

@ -1,16 +1,17 @@
package dst.ass3.event; package dst.ass3.event;
import dst.ass3.event.impl.EventProcessingEnvironment;
import dst.ass3.event.impl.EventSourceFunction;
/** /**
* Creates your {@link IEventProcessingEnvironment} and {@link IEventSourceFunction} implementation instances. * Creates your {@link IEventProcessingEnvironment} and {@link IEventSourceFunction} implementation instances.
*/ */
public class EventProcessingFactory { public class EventProcessingFactory {
public static IEventProcessingEnvironment createEventProcessingEnvironment() { public static IEventProcessingEnvironment createEventProcessingEnvironment() {
// TODO return new EventProcessingEnvironment();
return null;
} }
public static IEventSourceFunction createEventSourceFunction() { public static IEventSourceFunction createEventSourceFunction() {
// TODO return new EventSourceFunction();
return null;
} }
} }

View File

@ -0,0 +1,58 @@
package dst.ass3.event.impl;
import dst.ass3.event.Constants;
import dst.ass3.event.EventSubscriber;
import dst.ass3.event.IEventSourceFunction;
import dst.ass3.event.model.domain.ITripEventInfo;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
public class EventSourceFunction implements IEventSourceFunction {
EventSubscriber eventSubscriber;
@Override
public void open(Configuration parameters) throws Exception {
SocketAddress socketAddress = new InetSocketAddress(Constants.EVENT_PUBLISHER_PORT);
eventSubscriber = EventSubscriber.subscribe(socketAddress);
}
@Override
public void close() throws Exception {
eventSubscriber.close();
}
@Override
public RuntimeContext getRuntimeContext() {
return null;
}
@Override
public IterationRuntimeContext getIterationRuntimeContext() {
return null;
}
@Override
public void setRuntimeContext(RuntimeContext runtimeContext) {
}
@Override
public void run(SourceContext<ITripEventInfo> ctx) throws Exception {
ITripEventInfo info = eventSubscriber.receive();
ctx.collect(info);
while (info != null) {
info = eventSubscriber.receive();
if (info != null)
ctx.collect(info);
}
}
@Override
public void cancel() {
}
}