聊聊flink的Tumbling Window

12次阅读

共计 10801 个字符,预计需要花费 28 分钟才能阅读完成。


本文主要研究一下 flink 的 Tumbling Window
WindowAssigner
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
@PublicEvolving
public abstract class WindowAssigner<T, W extends Window> implements Serializable {
private static final long serialVersionUID = 1L;

/**
* Returns a {@code Collection} of windows that should be assigned to the element.
*
* @param element The element to which windows should be assigned.
* @param timestamp The timestamp of the element.
* @param context The {@link WindowAssignerContext} in which the assigner operates.
*/
public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);

/**
* Returns the default trigger associated with this {@code WindowAssigner}.
*/
public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);

/**
* Returns a {@link TypeSerializer} for serializing windows that are assigned by
* this {@code WindowAssigner}.
*/
public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);

/**
* Returns {@code true} if elements are assigned to windows based on event time,
* {@code false} otherwise.
*/
public abstract boolean isEventTime();

/**
* A context provided to the {@link WindowAssigner} that allows it to query the
* current processing time.
*
* <p>This is provided to the assigner by its containing
* {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator},
* which, in turn, gets it from the containing
* {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
*/
public abstract static class WindowAssignerContext {

/**
* Returns the current processing time.
*/
public abstract long getCurrentProcessingTime();

}
}
WindowAssigner 定义了 assignWindows、getDefaultTrigger、getWindowSerializer、isEventTime 这几个抽象方法,同时定义了抽象静态类 WindowAssignerContext;它有两个泛型,其中 T 为元素类型,而 W 为窗口类型
Window
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/windows/Window.java
@PublicEvolving
public abstract class Window {

/**
* Gets the largest timestamp that still belongs to this window.
*
* @return The largest timestamp that still belongs to this window.
*/
public abstract long maxTimestamp();
}
Window 对象代表把无限流数据划分为有限 buckets 的集合,它有一个 maxTimestamp,代表该窗口数据在该时间点内到达;它有两个子类,一个是 GlobalWindow,一个是 TimeWindow
TimeWindow
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
@PublicEvolving
public class TimeWindow extends Window {

private final long start;
private final long end;

public TimeWindow(long start, long end) {
this.start = start;
this.end = end;
}

/**
* Gets the starting timestamp of the window. This is the first timestamp that belongs
* to this window.
*
* @return The starting timestamp of this window.
*/
public long getStart() {
return start;
}

/**
* Gets the end timestamp of this window. The end timestamp is exclusive, meaning it
* is the first timestamp that does not belong to this window any more.
*
* @return The exclusive end timestamp of this window.
*/
public long getEnd() {
return end;
}

/**
* Gets the largest timestamp that still belongs to this window.
*
* <p>This timestamp is identical to {@code getEnd() – 1}.
*
* @return The largest timestamp that still belongs to this window.
*
* @see #getEnd()
*/
@Override
public long maxTimestamp() {
return end – 1;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

TimeWindow window = (TimeWindow) o;

return end == window.end && start == window.start;
}

@Override
public int hashCode() {
return MathUtils.longToIntWithBitMixing(start + end);
}

@Override
public String toString() {
return “TimeWindow{” +
“start=” + start +
“, end=” + end +
‘}’;
}

/**
* Returns {@code true} if this window intersects the given window.
*/
public boolean intersects(TimeWindow other) {
return this.start <= other.end && this.end >= other.start;
}

/**
* Returns the minimal window covers both this window and the given window.
*/
public TimeWindow cover(TimeWindow other) {
return new TimeWindow(Math.min(start, other.start), Math.max(end, other.end));
}

// ————————————————————————
// Serializer
// ————————————————————————

//……

// ————————————————————————
// Utilities
// ————————————————————————

/**
* Merge overlapping {@link TimeWindow}s. For use by merging
* {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner WindowAssigners}.
*/
public static void mergeWindows(Collection<TimeWindow> windows, MergingWindowAssigner.MergeCallback<TimeWindow> c) {

// sort the windows by the start time and then merge overlapping windows

List<TimeWindow> sortedWindows = new ArrayList<>(windows);

Collections.sort(sortedWindows, new Comparator<TimeWindow>() {
@Override
public int compare(TimeWindow o1, TimeWindow o2) {
return Long.compare(o1.getStart(), o2.getStart());
}
});

List<Tuple2<TimeWindow, Set<TimeWindow>>> merged = new ArrayList<>();
Tuple2<TimeWindow, Set<TimeWindow>> currentMerge = null;

for (TimeWindow candidate: sortedWindows) {
if (currentMerge == null) {
currentMerge = new Tuple2<>();
currentMerge.f0 = candidate;
currentMerge.f1 = new HashSet<>();
currentMerge.f1.add(candidate);
} else if (currentMerge.f0.intersects(candidate)) {
currentMerge.f0 = currentMerge.f0.cover(candidate);
currentMerge.f1.add(candidate);
} else {
merged.add(currentMerge);
currentMerge = new Tuple2<>();
currentMerge.f0 = candidate;
currentMerge.f1 = new HashSet<>();
currentMerge.f1.add(candidate);
}
}

if (currentMerge != null) {
merged.add(currentMerge);
}

for (Tuple2<TimeWindow, Set<TimeWindow>> m: merged) {
if (m.f1.size() > 1) {
c.merge(m.f1, m.f0);
}
}
}

/**
* Method to get the window start for a timestamp.
*
* @param timestamp epoch millisecond to get the window start.
* @param offset The offset which window start would be shifted by.
* @param windowSize The size of the generated windows.
* @return window start
*/
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp – (timestamp – offset + windowSize) % windowSize;
}
}

TimeWindow 有 start 及 end 属性,其中 start 为 inclusive,而 end 为 exclusive,所以 maxTimestamp 返回的是 end-1;这里重写了 equals 及 hashcode 方法
TimeWindow 提供了 intersects 方法用于表示本窗口与指定窗口是否有交叉;而 cover 方法用于返回本窗口与指定窗口的重叠窗口
TimeWindow 还提供了 mergeWindows 及 getWindowStartWithOffset 静态方法;前者用于合并重叠的时间窗口,后者用于获取指定 timestamp、offset、windowSize 的 window start

TumblingEventTimeWindows
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
@PublicEvolving
public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;

private final long size;

private final long offset;

protected TumblingEventTimeWindows(long size, long offset) {
if (offset < 0 || offset >= size) {
throw new IllegalArgumentException(“TumblingEventTimeWindows parameters must satisfy 0 <= offset < size”);
}

this.size = size;
this.offset = offset;
}

@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
// Long.MIN_VALUE is currently assigned when no timestamp is present
long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
return Collections.singletonList(new TimeWindow(start, start + size));
} else {
throw new RuntimeException(“Record has Long.MIN_VALUE timestamp (= no timestamp marker). ” +
“Is the time characteristic set to ‘ProcessingTime’, or did you forget to call ” +
“‘DataStream.assignTimestampsAndWatermarks(…)’?”);
}
}

@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return EventTimeTrigger.create();
}

@Override
public String toString() {
return “TumblingEventTimeWindows(” + size + “)”;
}

public static TumblingEventTimeWindows of(Time size) {
return new TumblingEventTimeWindows(size.toMilliseconds(), 0);
}

public static TumblingEventTimeWindows of(Time size, Time offset) {
return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds());
}

@Override
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}

@Override
public boolean isEventTime() {
return true;
}
}

TumblingEventTimeWindows 继承了 Window,其中元素类型为 Object,而窗口类型为 TimeWindow;它有两个参数,一个是 size,一个是 offset,其中 offset 必须大于等于 0,size 必须大于 offset
assignWindows 方法获取的窗口为 start 及 start+size,而 start=TimeWindow.getWindowStartWithOffset(timestamp, offset, size);getDefaultTrigger 方法返回的是 EventTimeTrigger;getWindowSerializer 方法返回的是 TimeWindow.Serializer();isEventTime 返回 true
TumblingEventTimeWindows 提供了 of 静态工厂方法,可以指定 size 及 offset 参数

TumblingProcessingTimeWindows
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;

private final long size;

private final long offset;

private TumblingProcessingTimeWindows(long size, long offset) {
if (offset < 0 || offset >= size) {
throw new IllegalArgumentException(“TumblingProcessingTimeWindows parameters must satisfy 0 <= offset < size”);
}

this.size = size;
this.offset = offset;
}

@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
final long now = context.getCurrentProcessingTime();
long start = TimeWindow.getWindowStartWithOffset(now, offset, size);
return Collections.singletonList(new TimeWindow(start, start + size));
}

public long getSize() {
return size;
}

@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return ProcessingTimeTrigger.create();
}

@Override
public String toString() {
return “TumblingProcessingTimeWindows(” + size + “)”;
}

public static TumblingProcessingTimeWindows of(Time size) {
return new TumblingProcessingTimeWindows(size.toMilliseconds(), 0);
}

public static TumblingProcessingTimeWindows of(Time size, Time offset) {
return new TumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds());
}

@Override
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}

@Override
public boolean isEventTime() {
return false;
}
}

TumblingProcessingTimeWindows 继承了 WindowAssigner,其中元素类型为 Object,而窗口类型为 TimeWindow;它有两个参数,一个是 size,一个是 offset,其中 offset 必须大于等于 0,size 必须大于 offset
assignWindows 方法获取的窗口为 start 及 start+size,而 start=TimeWindow.getWindowStartWithOffset(now, offset, size),而 now 值则为 context.getCurrentProcessingTime(),则是与 TumblingEventTimeWindows 的不同之处,TumblingProcessingTimeWindows 不使用 timestamp 参数来计算,它使用 now 值替代;getDefaultTrigger 方法返回的是 ProcessingTimeTrigger,而 isEventTime 方法返回的为 false
TumblingProcessingTimeWindows 也提供了 of 静态工厂方法,可以指定 size 及 offset 参数

小结

flink 的 Tumbling Window 分为 TumblingEventTimeWindows 及 TumblingProcessingTimeWindows,它们都继承了 WindowAssigner,其中元素类型为 Object,而窗口类型为 TimeWindow;它有两个参数,一个是 size,一个是 offset,其中 offset 必须大于等于 0,size 必须大于 offset
WindowAssigner 定义了 assignWindows、getDefaultTrigger、getWindowSerializer、isEventTime 这几个抽象方法,同时定义了抽象静态类 WindowAssignerContext;它有两个泛型,其中 T 为元素类型,而 W 为窗口类型;TumblingEventTimeWindows 及 TumblingProcessingTimeWindows 的窗口类型为 TimeWindow,它有 start 及 end 属性,其中 start 为 inclusive,而 end 为 exclusive,maxTimestamp 返回的是 end-1,它还提供了 mergeWindows 及 getWindowStartWithOffset 静态方法;前者用于合并重叠的时间窗口,后者用于获取指定 timestamp、offset、windowSize 的 window start
TumblingEventTimeWindows 及 TumblingProcessingTimeWindows 的不同在于 assignWindows、getDefaultTrigger、isEventTime 方法;前者 assignWindows 使用的是参数中的 timestamp,而后者使用的是 now 值;前者的 getDefaultTrigger 返回的是 EventTimeTrigger,而后者返回的是 ProcessingTimeTrigger;前者 isEventTime 方法返回的为 true,而后者返回的为 false

doc
Tumbling Windows

正文完
 0