乐趣区

聊聊flink的Triggers


本文主要研究一下 flink 的 Triggers
Trigger
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
@PublicEvolving
public abstract class Trigger<T, W extends Window> implements Serializable {

private static final long serialVersionUID = -4104633972991191369L;

public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;

public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;

public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;

public boolean canMerge() {
return false;
}

public void onMerge(W window, OnMergeContext ctx) throws Exception {
throw new UnsupportedOperationException(“This trigger does not support merging.”);
}

public abstract void clear(W window, TriggerContext ctx) throws Exception;

// ————————————————————————

public interface TriggerContext {

long getCurrentProcessingTime();

MetricGroup getMetricGroup();

long getCurrentWatermark();

void registerProcessingTimeTimer(long time);

void registerEventTimeTimer(long time);

void deleteProcessingTimeTimer(long time);

void deleteEventTimeTimer(long time);

<S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor);

@Deprecated
<S extends Serializable> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState);

@Deprecated
<S extends Serializable> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState);
}

public interface OnMergeContext extends TriggerContext {
<S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor);
}
}

Trigger 接收两个泛型,一个是 element 类型,一个是窗口类型;它定义了 onElement、onProcessingTime、onEventTime、canMerge、onMerge、clear 几个方法,其中 onElement、onProcessingTime、onEventTime 均需要返回 TriggerResult
onElement 在每个 element 添加到 window 的时候会被回调;onProcessingTime 在注册的 event-time timer 触发时会被回调;onEventTime 在注册的 processing-time timer 触发时会被回调
canMerge 用于标识是否支持 trigger state 的合并,默认返回 false;onMerge 在多个 window 合并的时候会被触发;clear 用于清除 TriggerContext 中存储的相关 state
Trigger 还定义了 TriggerContext 及 OnMergeContext;TriggerContext 定义了注册及删除 EventTimeTimer、ProcessingTimeTimer 方法,同时还定义了 getCurrentProcessingTime、getMetricGroup、getCurrentWatermark、getPartitionedState、getKeyValueState、getKeyValueState 方法
OnMergeContext 继承了 TriggerContext,它多定义了 mergePartitionedState 方法

TriggerResult
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/TriggerResult.java
public enum TriggerResult {

CONTINUE(false, false),

FIRE_AND_PURGE(true, true),

FIRE(true, false),

PURGE(false, true);

// ————————————————————————

private final boolean fire;
private final boolean purge;

TriggerResult(boolean fire, boolean purge) {
this.purge = purge;
this.fire = fire;
}

public boolean isFire() {
return fire;
}

public boolean isPurge() {
return purge;
}
}

TriggerResult 用于表示 trigger 在 onElement、onProcessingTime、onEventTime 被回调时返回的 action 枚举,它有 fire、purge 两个属性,CONTINUE、FIRE_AND_PURGE、FIRE、PURGE 五个枚举
fire 表示是否要触发 window 的 computation 操作;而 purge 表示是否要清理 window 的窗口数据
CONTINUE 表示不对 window 做任何操作;FIRE_AND_PURGE 表示要触发 window 的 computation 操作然后清理 window 的窗口数据;FIRE 表示仅仅触发 window 的 computation 操作但不清理 window 的窗口数据;PURGE 表示不触发 window 的 computation 操作但是要清理 window 的窗口数据

EventTimeTrigger
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
@PublicEvolving
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;

private EventTimeTrigger() {}

@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}

@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}

@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}

@Override
public boolean canMerge() {
return true;
}

@Override
public void onMerge(TimeWindow window,
OnMergeContext ctx) {
// only register a timer if the watermark is not yet past the end of the merged window
// this is in line with the logic in onElement(). If the watermark is past the end of
// the window onElement() will fire and setting a timer here would fire the window twice.
long windowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
ctx.registerEventTimeTimer(windowMaxTimestamp);
}
}

@Override
public String toString() {
return “EventTimeTrigger()”;
}

public static EventTimeTrigger create() {
return new EventTimeTrigger();
}
}

EventTimeTrigger 继承了 Trigger,element 类型为 Object,窗口类型为 TimeWindow;SlidingEventTimeWindows、TumblingEventTimeWindows、EventTimeSessionWindows、DynamicEventTimeSessionWindows 默认都使用 EventTimeTrigger
onElement 在 window.maxTimestamp() 小于等于 ctx.getCurrentWatermark() 的时候,返回 TriggerResult.FIRE,否则执行 ctx.registerEventTimeTimer(window.maxTimestamp()),然后返回 TriggerResult.CONTINUE;onEventTime 在 time 等于 window.maxTimestamp() 的时候返回 TriggerResult.FIRE,否则返回 TriggerResult.CONTINUE;onProcessingTime 则返回 TriggerResult.CONTINUE
canMerge 返回 true;onMerge 在 window.maxTimestamp() 大于 ctx.getCurrentWatermark() 的时候会执行 ctx.registerEventTimeTimer(windowMaxTimestamp);clear 则执行 ctx.deleteEventTimeTimer(window.maxTimestamp())

ProcessingTimeTrigger
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
@PublicEvolving
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;

private ProcessingTimeTrigger() {}

@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
ctx.registerProcessingTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.FIRE;
}

@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteProcessingTimeTimer(window.maxTimestamp());
}

@Override
public boolean canMerge() {
return true;
}

@Override
public void onMerge(TimeWindow window,
OnMergeContext ctx) {
// only register a timer if the time is not yet past the end of the merged window
// this is in line with the logic in onElement(). If the time is past the end of
// the window onElement() will fire and setting a timer here would fire the window twice.
long windowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
ctx.registerProcessingTimeTimer(windowMaxTimestamp);
}
}

@Override
public String toString() {
return “ProcessingTimeTrigger()”;
}

public static ProcessingTimeTrigger create() {
return new ProcessingTimeTrigger();
}

}

ProcessingTimeTrigger 继承了 Trigger,element 类型为 Object,窗口类型为 TimeWindow;SlidingProcessingTimeWindows、TumblingProcessingTimeWindows、ProcessingTimeSessionWindows、DynamicProcessingTimeSessionWindows 默认都使用 ProcessingTimeTrigger
onElement 执行 ctx.registerProcessingTimeTimer(window.maxTimestamp()),然后返回 TriggerResult.CONTINUE;onEventTime 返回 TriggerResult.CONTINUE;onProcessingTime 则返回 TriggerResult.FIRE
canMerge 返回 true;onMerge 在 window.maxTimestamp() 大于 ctx.getCurrentWatermark() 的时候会执行 ctx.registerProcessingTimeTimer(windowMaxTimestamp);clear 则执行 ctx.deleteProcessingTimeTimer(window.maxTimestamp())

NeverTrigger
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
@Internal
public static class NeverTrigger extends Trigger<Object, GlobalWindow> {
private static final long serialVersionUID = 1L;

@Override
public TriggerResult onElement(Object element, long timestamp, GlobalWindow window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}

@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}

@Override
public void onMerge(GlobalWindow window, OnMergeContext ctx) {
}
}
NeverTrigger 的 onElement、onEventTime、onProcessingTime 均返回 TriggerResult.CONTINUE;GlobalWindows 默认使用的是 NeverTrigger
CountTrigger
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
@PublicEvolving
public class CountTrigger<W extends Window> extends Trigger<Object, W> {
private static final long serialVersionUID = 1L;

private final long maxCount;

private final ReducingStateDescriptor<Long> stateDesc =
new ReducingStateDescriptor<>(“count”, new Sum(), LongSerializer.INSTANCE);

private CountTrigger(long maxCount) {
this.maxCount = maxCount;
}

@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
count.add(1L);
if (count.get() >= maxCount) {
count.clear();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}

@Override
public void clear(W window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(stateDesc).clear();
}

@Override
public boolean canMerge() {
return true;
}

@Override
public void onMerge(W window, OnMergeContext ctx) throws Exception {
ctx.mergePartitionedState(stateDesc);
}

@Override
public String toString() {
return “CountTrigger(” + maxCount + “)”;
}

public static <W extends Window> CountTrigger<W> of(long maxCount) {
return new CountTrigger<>(maxCount);
}

private static class Sum implements ReduceFunction<Long> {
private static final long serialVersionUID = 1L;

@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}

}
}
CountTrigger 继承了 Trigger,指定了 element 类型为 Object 类型;它定义了 maxCount 及 ReducingStateDescriptor;其中 ReducingStateDescriptor 用于窗口计数 (它使用的是自己定义的 Sum 函数),在 onElement 方法里头,当计数大于等于 maxCount 时,则会清空计数,然后返回 TriggerResult.FIRE,否则返回 TriggerResult.CONTINUE;onEventTime、onProcessingTime 均返回 TriggerResult.CONTINUE;canMerge 返回 true;onMerge 执行的是 ctx.mergePartitionedState(stateDesc);clear 执行的是 ctx.getPartitionedState(stateDesc).clear()
PurgingTrigger
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
@PublicEvolving
public class PurgingTrigger<T, W extends Window> extends Trigger<T, W> {
private static final long serialVersionUID = 1L;

private Trigger<T, W> nestedTrigger;

private PurgingTrigger(Trigger<T, W> nestedTrigger) {
this.nestedTrigger = nestedTrigger;
}

@Override
public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
TriggerResult triggerResult = nestedTrigger.onElement(element, timestamp, window, ctx);
return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
}

@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
TriggerResult triggerResult = nestedTrigger.onEventTime(time, window, ctx);
return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
}

@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
TriggerResult triggerResult = nestedTrigger.onProcessingTime(time, window, ctx);
return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
}

@Override
public void clear(W window, TriggerContext ctx) throws Exception {
nestedTrigger.clear(window, ctx);
}

@Override
public boolean canMerge() {
return nestedTrigger.canMerge();
}

@Override
public void onMerge(W window, OnMergeContext ctx) throws Exception {
nestedTrigger.onMerge(window, ctx);
}

@Override
public String toString() {
return “PurgingTrigger(” + nestedTrigger.toString() + “)”;
}

public static <T, W extends Window> PurgingTrigger<T, W> of(Trigger<T, W> nestedTrigger) {
return new PurgingTrigger<>(nestedTrigger);
}

@VisibleForTesting
public Trigger<T, W> getNestedTrigger() {
return nestedTrigger;
}
}
PurgingTrigger 是包装型的 Trigger,它包装了 nestedTrigger,其 onElement、onEventTime、onProcessingTime 根据 nestedTrigger 的返回结果,在 triggerResult.isFire() 为 true 的时候,包装返回 TriggerResult.FIRE_AND_PURGE;canMerge、onMerge、clear 等方法均是委托给 nestedTrigger 处理
小结

Trigger 接收两个泛型,一个是 element 类型,一个是窗口类型;它定义了 onElement、onProcessingTime、onEventTime、canMerge、onMerge、clear 几个方法,其中 onElement、onProcessingTime、onEventTime 均需要返回 TriggerResult;TriggerResult 用于表示 trigger 在 onElement、onProcessingTime、onEventTime 被回调时返回的 action 枚举,它有 fire、purge 两个属性 (fire 表示是否要触发 window 的 computation 操作;而 purge 表示是否要清理 window 的窗口数据),CONTINUE、FIRE_AND_PURGE、FIRE、PURGE 五个枚举
SlidingEventTimeWindows、TumblingEventTimeWindows、EventTimeSessionWindows、DynamicEventTimeSessionWindows 默认都使用 EventTimeTrigger;SlidingProcessingTimeWindows、TumblingProcessingTimeWindows、ProcessingTimeSessionWindows、DynamicProcessingTimeSessionWindows 默认都使用 ProcessingTimeTrigger;GlobalWindows 默认使用的是 NeverTrigger
CountTrigger 主要用于计数的窗口类型,它使用 ReducingStateDescriptor 来进行窗口计数,在 onElement 方法里头,当计数大于等于 maxCount 时,则会清空计数,然后返回 TriggerResult.FIRE,否则返回 TriggerResult.CONTINUE;PurgingTrigger 是包装型的 Trigger,它包装了 nestedTrigger,其 onElement、onEventTime、onProcessingTime 根据 nestedTrigger 的返回结果,在 triggerResult.isFire() 为 true 的时候,包装返回 TriggerResult.FIRE_AND_PURGE;canMerge、onMerge、clear 等方法均是委托给 nestedTrigger 处理

doc
Triggers

退出移动版