乐趣区

聊聊flink的consecutive windowed operations


本文主要研究一下 flink 的 consecutive windowed operations
实例
DataStream<Integer> input = …;

DataStream<Integer> resultsPerKey = input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce(new Summer());

DataStream<Integer> globalResults = resultsPerKey
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new TopKWindowFunction());
本实例首先根据 key 进行 partition,然后再按指定的 window 对这些 key 进行计数,之后对该 dataStream 进行 windowAll 操作,其时间 WindowAssigner 与前面的相同,这样可以达到在同样的时间窗口内先 partition 汇总,再全局汇总的效果(可以解决类似 top-k elements 的问题)
TimestampsAndPeriodicWatermarksOperator
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
public class TimestampsAndPeriodicWatermarksOperator<T>
extends AbstractUdfStreamOperator<T, AssignerWithPeriodicWatermarks<T>>
implements OneInputStreamOperator<T, T>, ProcessingTimeCallback {

private static final long serialVersionUID = 1L;

private transient long watermarkInterval;

private transient long currentWatermark;

public TimestampsAndPeriodicWatermarksOperator(AssignerWithPeriodicWatermarks<T> assigner) {
super(assigner);
this.chainingStrategy = ChainingStrategy.ALWAYS;
}

@Override
public void open() throws Exception {
super.open();

currentWatermark = Long.MIN_VALUE;
watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();

if (watermarkInterval > 0) {
long now = getProcessingTimeService().getCurrentProcessingTime();
getProcessingTimeService().registerTimer(now + watermarkInterval, this);
}
}

@Override
public void processElement(StreamRecord<T> element) throws Exception {
final long newTimestamp = userFunction.extractTimestamp(element.getValue(),
element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);

output.collect(element.replace(element.getValue(), newTimestamp));
}

@Override
public void onProcessingTime(long timestamp) throws Exception {
// register next timer
Watermark newWatermark = userFunction.getCurrentWatermark();
if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
currentWatermark = newWatermark.getTimestamp();
// emit watermark
output.emitWatermark(newWatermark);
}

long now = getProcessingTimeService().getCurrentProcessingTime();
getProcessingTimeService().registerTimer(now + watermarkInterval, this);
}

/**
* Override the base implementation to completely ignore watermarks propagated from
* upstream (we rely only on the {@link AssignerWithPeriodicWatermarks} to emit
* watermarks from here).
*/
@Override
public void processWatermark(Watermark mark) throws Exception {
// if we receive a Long.MAX_VALUE watermark we forward it since it is used
// to signal the end of input and to not block watermark progress downstream
if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) {
currentWatermark = Long.MAX_VALUE;
output.emitWatermark(mark);
}
}

@Override
public void close() throws Exception {
super.close();

// emit a final watermark
Watermark newWatermark = userFunction.getCurrentWatermark();
if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
currentWatermark = newWatermark.getTimestamp();
// emit watermark
output.emitWatermark(newWatermark);
}
}
}

假设 assignTimestampsAndWatermarks 使用的是 AssignerWithPeriodicWatermarks 类型的参数,那么创建的是 TimestampsAndPeriodicWatermarksOperator;它在 open 的时候根据指定的 watermarkInterval 注册了一个延时任务
该延时任务会回调 onProcessingTime 方法,而 onProcessingTime 在这里则会调用 AssignerWithPeriodicWatermarks 的 getCurrentWatermark 方法获取 watermark,然后重新注册新的延时任务,延时时间为 getProcessingTimeService().getCurrentProcessingTime()+watermarkInterval;这里的 watermarkInterval 即为 env.getConfig().setAutoWatermarkInterval 设置的值
AssignerWithPeriodicWatermarks 的 getCurrentWatermark 方法除了注册延时任务实现不断定时的效果外,还会在新的 watermark 值大于 currentWatermark 的条件下发射 watermark

SystemProcessingTimeService
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
public class SystemProcessingTimeService extends ProcessingTimeService {

private static final Logger LOG = LoggerFactory.getLogger(SystemProcessingTimeService.class);

private static final int STATUS_ALIVE = 0;
private static final int STATUS_QUIESCED = 1;
private static final int STATUS_SHUTDOWN = 2;

// ————————————————————————

/** The containing task that owns this time service provider. */
private final AsyncExceptionHandler task;

/** The lock that timers acquire upon triggering. */
private final Object checkpointLock;

/** The executor service that schedules and calls the triggers of this task. */
private final ScheduledThreadPoolExecutor timerService;

private final AtomicInteger status;

public SystemProcessingTimeService(AsyncExceptionHandler failureHandler, Object checkpointLock) {
this(failureHandler, checkpointLock, null);
}

public SystemProcessingTimeService(
AsyncExceptionHandler task,
Object checkpointLock,
ThreadFactory threadFactory) {

this.task = checkNotNull(task);
this.checkpointLock = checkNotNull(checkpointLock);

this.status = new AtomicInteger(STATUS_ALIVE);

if (threadFactory == null) {
this.timerService = new ScheduledThreadPoolExecutor(1);
} else {
this.timerService = new ScheduledThreadPoolExecutor(1, threadFactory);
}

// tasks should be removed if the future is canceled
this.timerService.setRemoveOnCancelPolicy(true);

// make sure shutdown removes all pending tasks
this.timerService.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
this.timerService.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
}

@Override
public long getCurrentProcessingTime() {
return System.currentTimeMillis();
}

@Override
public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target) {

// delay the firing of the timer by 1 ms to align the semantics with watermark. A watermark
// T says we won’t see elements in the future with a timestamp smaller or equal to T.
// With processing time, we therefore need to delay firing the timer by one ms.
long delay = Math.max(timestamp – getCurrentProcessingTime(), 0) + 1;

// we directly try to register the timer and only react to the status on exception
// that way we save unnecessary volatile accesses for each timer
try {
return timerService.schedule(
new TriggerTask(status, task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS);
}
catch (RejectedExecutionException e) {
final int status = this.status.get();
if (status == STATUS_QUIESCED) {
return new NeverCompleteFuture(delay);
}
else if (status == STATUS_SHUTDOWN) {
throw new IllegalStateException(“Timer service is shut down”);
}
else {
// something else happened, so propagate the exception
throw e;
}
}
}

//……
}
SystemProcessingTimeService 的 registerTimer 方法根据指定的 timestamp 注册了一个延时任务 TriggerTask;timerService 为 JDK 自带的 ScheduledThreadPoolExecutor;TriggerTask 的 run 方法会在 service 状态为 STATUS_LIVE 时,触发 ProcessingTimeCallback(这里为 TimestampsAndPeriodicWatermarksOperator)的 onProcessingTime 方法
WindowOperator
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@Internal
public class WindowOperator<K, IN, ACC, OUT, W extends Window>
extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> {

//……
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
final Collection<W> elementWindows = windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), windowAssignerContext);

//if element is handled by none of assigned elementWindows
boolean isSkippedElement = true;

final K key = this.<K>getKeyedStateBackend().getCurrentKey();

if (windowAssigner instanceof MergingWindowAssigner) {

//……

} else {
for (W window: elementWindows) {

// drop if the window is already late
if (isWindowLate(window)) {
continue;
}
isSkippedElement = false;

windowState.setCurrentNamespace(window);
windowState.add(element.getValue());

triggerContext.key = key;
triggerContext.window = window;

TriggerResult triggerResult = triggerContext.onElement(element);

if (triggerResult.isFire()) {
ACC contents = windowState.get();
if (contents == null) {
continue;
}
emitWindowContents(window, contents);
}

if (triggerResult.isPurge()) {
windowState.clear();
}
registerCleanupTimer(window);
}
}

// side output input event if
// element not handled by any window
// late arriving tag has been set
// windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
if (isSkippedElement && isElementLate(element)) {
if (lateDataOutputTag != null){
sideOutput(element);
} else {
this.numLateRecordsDropped.inc();
}
}
}

/**
* Emits the contents of the given window using the {@link InternalWindowFunction}.
*/
@SuppressWarnings(“unchecked”)
private void emitWindowContents(W window, ACC contents) throws Exception {
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
processContext.window = window;
userFunction.process(triggerContext.key, window, processContext, contents, timestampedCollector);
}

//……
}
WindowOperator 的 processElement 方法会把 element 添加到 windowState,这里为 HeapAggregatingState,即在内存中累积,之后调用 triggerContext.onElement 方法 (里头使用的是 trigger.onElement 方法,这里的 trigger 为 EventTimeTrigger) 获取 TriggerResult,如果需要 fire,则会触发 emitWindowContents,如果需要 purge 则会清空 windowState;emitWindowContents 则是调用 userFunction.process 执行用户定义的窗口操作
EventTimeTrigger
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
@PublicEvolving
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;

private EventTimeTrigger() {}

@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}

@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}

@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}

@Override
public boolean canMerge() {
return true;
}

@Override
public void onMerge(TimeWindow window,
OnMergeContext ctx) {
// only register a timer if the watermark is not yet past the end of the merged window
// this is in line with the logic in onElement(). If the watermark is past the end of
// the window onElement() will fire and setting a timer here would fire the window twice.
long windowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
ctx.registerEventTimeTimer(windowMaxTimestamp);
}
}

@Override
public String toString() {
return “EventTimeTrigger()”;
}

public static EventTimeTrigger create() {
return new EventTimeTrigger();
}
}
EventTimeTrigger 的 onElement 方法会判断,如果 window.maxTimestamp() <= ctx.getCurrentWatermark()则会返回 TriggerResult.FIRE,告知 WindowOperator 可以 emitWindowContents
小结

flink 支持 consecutive windowed operations,比如先根据 key 进行 partition,然后再按指定的 window 对这些 key 进行计数,之后对该 dataStream 进行 windowAll 操作,其时间 WindowAssigner 与前面的相同,这样可以达到在同样的时间窗口内先 partition 汇总,再全局汇总的效果(可以解决类似 top-k elements 的问题)
AssignerWithPeriodicWatermarks 或者 AssignerWithPunctuatedWatermarks 它们有两个功能,一个是从 element 提取 timestamp 作为 eventTime,一个就是发射 watermark;由于 element 实际上不一定是严格按 eventTime 时间到来的,可能存在乱序,因而 watermark 的作用就是限制迟到的数据进入窗口,不让窗口无限等待迟到的可能属于该窗口的 element,即告知窗口 eventTime 小于等于该 watermark 的元素可以认为都到达了(窗口可以根据自己设定的时间范围,借助 trigger 判断是否可以关闭窗口然后开始对该窗口数据执行相关操作);对于 consecutive windowed operations 来说,上游的 watermark 会 forward 给下游的 operations
Trigger 的作用就是告知 WindowOperator 什么时候可以对关闭该窗口开始对该窗口数据执行相关操作 (返回 TriggerResult.FIRE 的情况下),对于 EventTimeTrigger 来说,其 onElement 方法的判断逻辑跟 watermark 相关,如果 window.maxTimestamp() <= ctx.getCurrentWatermark() 则会返回 TriggerResult.FIRE

doc
Consecutive windowed operations

退出移动版