共计 6293 个字符,预计需要花费 16 分钟才能阅读完成。
序
本文主要研究一下 flink 的 window 操作
window
DataStream
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java
public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size) {
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return windowAll(TumblingProcessingTimeWindows.of(size));
} else {
return windowAll(TumblingEventTimeWindows.of(size));
}
}
public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size, Time slide) {
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return windowAll(SlidingProcessingTimeWindows.of(size, slide));
} else {
return windowAll(SlidingEventTimeWindows.of(size, slide));
}
}
public AllWindowedStream<T, GlobalWindow> countWindowAll(long size) {
return windowAll(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
}
public AllWindowedStream<T, GlobalWindow> countWindowAll(long size, long slide) {
return windowAll(GlobalWindows.create())
.evictor(CountEvictor.of(size))
.trigger(CountTrigger.of(slide));
}
@PublicEvolving
public <W extends Window> AllWindowedStream<T, W> windowAll(WindowAssigner<? super T, W> assigner) {
return new AllWindowedStream<>(this, assigner);
}
对于非 KeyedStream,有 timeWindowAll、countWindowAll、windowAll 操作,其中最主要的是 windowAll 操作,它的 parallelism 为 1,它需要一个 WindowAssigner 参数,返回的是 AllWindowedStream
KeyedStream
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return window(TumblingProcessingTimeWindows.of(size));
} else {
return window(TumblingEventTimeWindows.of(size));
}
}
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return window(SlidingProcessingTimeWindows.of(size, slide));
} else {
return window(SlidingEventTimeWindows.of(size, slide));
}
}
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
}
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
return window(GlobalWindows.create())
.evictor(CountEvictor.of(size))
.trigger(CountTrigger.of(slide));
}
@PublicEvolving
public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
return new WindowedStream<>(this, assigner);
}
对于 KeyedStream 除了继承了 DataStream 的 window 相关操作,它主要用的是 timeWindow、countWindow、window 操作,其中最主要的是 window 操作,它也需要一个 WindowAssigner 参数,返回的是 WindowedStream
WindowedStream
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/WindowedStream.java
@Public
public class WindowedStream<T, K, W extends Window> {
/** The keyed data stream that is windowed by this stream. */
private final KeyedStream<T, K> input;
/** The window assigner. */
private final WindowAssigner<? super T, W> windowAssigner;
/** The trigger that is used for window evaluation/emission. */
private Trigger<? super T, ? super W> trigger;
/** The evictor that is used for evicting elements before window evaluation. */
private Evictor<? super T, ? super W> evictor;
/** The user-specified allowed lateness. */
private long allowedLateness = 0L;
/**
* Side output {@code OutputTag} for late data. If no tag is set late data will simply be
* dropped.
*/
private OutputTag<T> lateDataOutputTag;
@PublicEvolving
public WindowedStream(KeyedStream<T, K> input,
WindowAssigner<? super T, W> windowAssigner) {
this.input = input;
this.windowAssigner = windowAssigner;
this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());
}
@PublicEvolving
public WindowedStream<T, K, W> trigger(Trigger<? super T, ? super W> trigger) {
if (windowAssigner instanceof MergingWindowAssigner && !trigger.canMerge()) {
throw new UnsupportedOperationException(“A merging window assigner cannot be used with a trigger that does not support merging.”);
}
if (windowAssigner instanceof BaseAlignedWindowAssigner) {
throw new UnsupportedOperationException(“Cannot use a ” + windowAssigner.getClass().getSimpleName() + ” with a custom trigger.”);
}
this.trigger = trigger;
return this;
}
@PublicEvolving
public WindowedStream<T, K, W> allowedLateness(Time lateness) {
final long millis = lateness.toMilliseconds();
checkArgument(millis >= 0, “The allowed lateness cannot be negative.”);
this.allowedLateness = millis;
return this;
}
@PublicEvolving
public WindowedStream<T, K, W> sideOutputLateData(OutputTag<T> outputTag) {
Preconditions.checkNotNull(outputTag, “Side output tag must not be null.”);
this.lateDataOutputTag = input.getExecutionEnvironment().clean(outputTag);
return this;
}
@PublicEvolving
public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) {
if (windowAssigner instanceof BaseAlignedWindowAssigner) {
throw new UnsupportedOperationException(“Cannot use a ” + windowAssigner.getClass().getSimpleName() + ” with an Evictor.”);
}
this.evictor = evictor;
return this;
}
// ————————————————————————
// Operations on the keyed windows
// ————————————————————————
//……
}
WindowedStream 有几个参数,其中构造器要求的是 input 及 windowAssigner 参数,然后还有 Trigger、Evictor、allowedLateness、OutputTag 这几个可选参数;另外还必须设置 operation function,主要有 ReduceFunction、AggregateFunction、FoldFunction(废弃)、ProcessWindowFunction 这几个
windowAssigner 主要用来决定元素如何划分到 window 中,这里主要有 TumblingEventTimeWindows/TumblingProcessingTimeWindows、SlidingEventTimeWindows/SlidingProcessingTimeWindows、EventTimeSessionWindows/ProcessingTimeSessionWindows、GlobalWindows 这几个
Trigger 用来触发 window 的发射,Evictor 用来在发射 window 的时候剔除元素,allowedLateness 用于指定允许元素落后于 watermark 的最大时间,超出则被丢弃 (仅仅对于 event-time window 有效),OutputTag 用于将 late 数据输出到 side output,可以通过 SingleOutputStreamOperator.getSideOutput(OutputTag) 方法来获取
AllWindowedStream 的属性 / 操作基本跟 WindowedStream 类似,这里就不详细展开
小结
window 操作是处理无限数据流的核心,它将数据流分割为有限大小的 buckets,然后就可以在这些有限数据上进行相关的操作。flink 的 window 操作主要分为两大类,一类是针对 KeyedStream 的 window 操作,一个是针对 non-key stream 的 windowAll 操作
window 操作主要有几个参数,WindowAssigner 是必不可少的参数,主要有 TumblingEventTimeWindows/TumblingProcessingTimeWindows、SlidingEventTimeWindows/SlidingProcessingTimeWindows、EventTimeSessionWindows/ProcessingTimeSessionWindows、GlobalWindows 这几个;另外还必须设置 operation function,主要有 ReduceFunction、AggregateFunction、FoldFunction(废弃)、ProcessWindowFunction 这几个
Trigger、Evictor、allowedLateness、OutputTag 这几个为可选参数,Trigger 用来触发 window 的发射,Evictor 用来在发射 window 的时候剔除元素,allowedLateness 用于指定允许元素落后于 watermark 的最大时间,超出则被丢弃 (仅仅对于 event-time window 有效),OutputTag 用于将 late 数据输出到 side output,可以通过 SingleOutputStreamOperator.getSideOutput(OutputTag) 方法来获取
doc
Windows