序本文主要研究一下flink的consecutive windowed operations实例DataStream<Integer> input = …;DataStream<Integer> resultsPerKey = input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .reduce(new Summer());DataStream<Integer> globalResults = resultsPerKey .windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) .process(new TopKWindowFunction());本实例首先根据key进行partition,然后再按指定的window对这些key进行计数,之后对该dataStream进行windowAll操作,其时间WindowAssigner与前面的相同,这样可以达到在同样的时间窗口内先partition汇总,再全局汇总的效果(可以解决类似top-k elements的问题)TimestampsAndPeriodicWatermarksOperatorflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.javapublic class TimestampsAndPeriodicWatermarksOperator<T> extends AbstractUdfStreamOperator<T, AssignerWithPeriodicWatermarks<T>> implements OneInputStreamOperator<T, T>, ProcessingTimeCallback { private static final long serialVersionUID = 1L; private transient long watermarkInterval; private transient long currentWatermark; public TimestampsAndPeriodicWatermarksOperator(AssignerWithPeriodicWatermarks<T> assigner) { super(assigner); this.chainingStrategy = ChainingStrategy.ALWAYS; } @Override public void open() throws Exception { super.open(); currentWatermark = Long.MIN_VALUE; watermarkInterval = getExecutionConfig().getAutoWatermarkInterval(); if (watermarkInterval > 0) { long now = getProcessingTimeService().getCurrentProcessingTime(); getProcessingTimeService().registerTimer(now + watermarkInterval, this); } } @Override public void processElement(StreamRecord<T> element) throws Exception { final long newTimestamp = userFunction.extractTimestamp(element.getValue(), element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE); output.collect(element.replace(element.getValue(), newTimestamp)); } @Override public void onProcessingTime(long timestamp) throws Exception { // register next timer Watermark newWatermark = userFunction.getCurrentWatermark(); if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) { currentWatermark = newWatermark.getTimestamp(); // emit watermark output.emitWatermark(newWatermark); } long now = getProcessingTimeService().getCurrentProcessingTime(); getProcessingTimeService().registerTimer(now + watermarkInterval, this); } /** * Override the base implementation to completely ignore watermarks propagated from * upstream (we rely only on the {@link AssignerWithPeriodicWatermarks} to emit * watermarks from here). / @Override public void processWatermark(Watermark mark) throws Exception { // if we receive a Long.MAX_VALUE watermark we forward it since it is used // to signal the end of input and to not block watermark progress downstream if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) { currentWatermark = Long.MAX_VALUE; output.emitWatermark(mark); } } @Override public void close() throws Exception { super.close(); // emit a final watermark Watermark newWatermark = userFunction.getCurrentWatermark(); if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) { currentWatermark = newWatermark.getTimestamp(); // emit watermark output.emitWatermark(newWatermark); } }}假设assignTimestampsAndWatermarks使用的是AssignerWithPeriodicWatermarks类型的参数,那么创建的是TimestampsAndPeriodicWatermarksOperator;它在open的时候根据指定的watermarkInterval注册了一个延时任务该延时任务会回调onProcessingTime方法,而onProcessingTime在这里则会调用AssignerWithPeriodicWatermarks的getCurrentWatermark方法获取watermark,然后重新注册新的延时任务,延时时间为getProcessingTimeService().getCurrentProcessingTime()+watermarkInterval;这里的watermarkInterval即为env.getConfig().setAutoWatermarkInterval设置的值AssignerWithPeriodicWatermarks的getCurrentWatermark方法除了注册延时任务实现不断定时的效果外,还会在新的watermark值大于currentWatermark的条件下发射watermarkSystemProcessingTimeServiceflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.javapublic class SystemProcessingTimeService extends ProcessingTimeService { private static final Logger LOG = LoggerFactory.getLogger(SystemProcessingTimeService.class); private static final int STATUS_ALIVE = 0; private static final int STATUS_QUIESCED = 1; private static final int STATUS_SHUTDOWN = 2; // ———————————————————————— /* The containing task that owns this time service provider. / private final AsyncExceptionHandler task; /* The lock that timers acquire upon triggering. / private final Object checkpointLock; /* The executor service that schedules and calls the triggers of this task. / private final ScheduledThreadPoolExecutor timerService; private final AtomicInteger status; public SystemProcessingTimeService(AsyncExceptionHandler failureHandler, Object checkpointLock) { this(failureHandler, checkpointLock, null); } public SystemProcessingTimeService( AsyncExceptionHandler task, Object checkpointLock, ThreadFactory threadFactory) { this.task = checkNotNull(task); this.checkpointLock = checkNotNull(checkpointLock); this.status = new AtomicInteger(STATUS_ALIVE); if (threadFactory == null) { this.timerService = new ScheduledThreadPoolExecutor(1); } else { this.timerService = new ScheduledThreadPoolExecutor(1, threadFactory); } // tasks should be removed if the future is canceled this.timerService.setRemoveOnCancelPolicy(true); // make sure shutdown removes all pending tasks this.timerService.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); this.timerService.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); } @Override public long getCurrentProcessingTime() { return System.currentTimeMillis(); } @Override public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target) { // delay the firing of the timer by 1 ms to align the semantics with watermark. A watermark // T says we won’t see elements in the future with a timestamp smaller or equal to T. // With processing time, we therefore need to delay firing the timer by one ms. long delay = Math.max(timestamp - getCurrentProcessingTime(), 0) + 1; // we directly try to register the timer and only react to the status on exception // that way we save unnecessary volatile accesses for each timer try { return timerService.schedule( new TriggerTask(status, task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS); } catch (RejectedExecutionException e) { final int status = this.status.get(); if (status == STATUS_QUIESCED) { return new NeverCompleteFuture(delay); } else if (status == STATUS_SHUTDOWN) { throw new IllegalStateException(“Timer service is shut down”); } else { // something else happened, so propagate the exception throw e; } } } //……}SystemProcessingTimeService的registerTimer方法根据指定的timestamp注册了一个延时任务TriggerTask;timerService为JDK自带的ScheduledThreadPoolExecutor;TriggerTask的run方法会在service状态为STATUS_LIVE时,触发ProcessingTimeCallback(这里为TimestampsAndPeriodicWatermarksOperator)的onProcessingTime方法WindowOperatorflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java@Internalpublic class WindowOperator<K, IN, ACC, OUT, W extends Window> extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>> implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> { //…… @Override public void processElement(StreamRecord<IN> element) throws Exception { final Collection<W> elementWindows = windowAssigner.assignWindows( element.getValue(), element.getTimestamp(), windowAssignerContext); //if element is handled by none of assigned elementWindows boolean isSkippedElement = true; final K key = this.<K>getKeyedStateBackend().getCurrentKey(); if (windowAssigner instanceof MergingWindowAssigner) { //…… } else { for (W window: elementWindows) { // drop if the window is already late if (isWindowLate(window)) { continue; } isSkippedElement = false; windowState.setCurrentNamespace(window); windowState.add(element.getValue()); triggerContext.key = key; triggerContext.window = window; TriggerResult triggerResult = triggerContext.onElement(element); if (triggerResult.isFire()) { ACC contents = windowState.get(); if (contents == null) { continue; } emitWindowContents(window, contents); } if (triggerResult.isPurge()) { windowState.clear(); } registerCleanupTimer(window); } } // side output input event if // element not handled by any window // late arriving tag has been set // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp if (isSkippedElement && isElementLate(element)) { if (lateDataOutputTag != null){ sideOutput(element); } else { this.numLateRecordsDropped.inc(); } } } /* * Emits the contents of the given window using the {@link InternalWindowFunction}. */ @SuppressWarnings(“unchecked”) private void emitWindowContents(W window, ACC contents) throws Exception { timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp()); processContext.window = window; userFunction.process(triggerContext.key, window, processContext, contents, timestampedCollector); } //……}WindowOperator的processElement方法会把element添加到windowState,这里为HeapAggregatingState,即在内存中累积,之后调用triggerContext.onElement方法(里头使用的是trigger.onElement方法,这里的trigger为EventTimeTrigger)获取TriggerResult,如果需要fire,则会触发emitWindowContents,如果需要purge则会清空windowState;emitWindowContents则是调用userFunction.process执行用户定义的窗口操作EventTimeTriggerflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java@PublicEvolvingpublic class EventTimeTrigger extends Trigger<Object, TimeWindow> { private static final long serialVersionUID = 1L; private EventTimeTrigger() {} @Override public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { // if the watermark is already past the window fire immediately return TriggerResult.FIRE; } else { ctx.registerEventTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } } @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public void clear(TimeWindow window, TriggerContext ctx) throws Exception { ctx.deleteEventTimeTimer(window.maxTimestamp()); } @Override public boolean canMerge() { return true; } @Override public void onMerge(TimeWindow window, OnMergeContext ctx) { // only register a timer if the watermark is not yet past the end of the merged window // this is in line with the logic in onElement(). If the watermark is past the end of // the window onElement() will fire and setting a timer here would fire the window twice. long windowMaxTimestamp = window.maxTimestamp(); if (windowMaxTimestamp > ctx.getCurrentWatermark()) { ctx.registerEventTimeTimer(windowMaxTimestamp); } } @Override public String toString() { return “EventTimeTrigger()”; } public static EventTimeTrigger create() { return new EventTimeTrigger(); }}EventTimeTrigger的onElement方法会判断,如果window.maxTimestamp() <= ctx.getCurrentWatermark()则会返回TriggerResult.FIRE,告知WindowOperator可以emitWindowContents小结flink支持consecutive windowed operations,比如先根据key进行partition,然后再按指定的window对这些key进行计数,之后对该dataStream进行windowAll操作,其时间WindowAssigner与前面的相同,这样可以达到在同样的时间窗口内先partition汇总,再全局汇总的效果(可以解决类似top-k elements的问题)AssignerWithPeriodicWatermarks或者AssignerWithPunctuatedWatermarks它们有两个功能,一个是从element提取timestamp作为eventTime,一个就是发射watermark;由于element实际上不一定是严格按eventTime时间到来的,可能存在乱序,因而watermark的作用就是限制迟到的数据进入窗口,不让窗口无限等待迟到的可能属于该窗口的element,即告知窗口eventTime小于等于该watermark的元素可以认为都到达了(窗口可以根据自己设定的时间范围,借助trigger判断是否可以关闭窗口然后开始对该窗口数据执行相关操作);对于consecutive windowed operations来说,上游的watermark会forward给下游的operationsTrigger的作用就是告知WindowOperator什么时候可以对关闭该窗口开始对该窗口数据执行相关操作(返回TriggerResult.FIRE的情况下),对于EventTimeTrigger来说,其onElement方法的判断逻辑跟watermark相关,如果window.maxTimestamp() <= ctx.getCurrentWatermark()则会返回TriggerResult.FIREdocConsecutive windowed operations