聊聊flink的Triggers


本文主要研究一下flink的Triggers
Trigger
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
@PublicEvolving
public abstract class Trigger<T, W extends Window> implements Serializable {

private static final long serialVersionUID = -4104633972991191369L;

public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;

public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;

public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;

public boolean canMerge() {
return false;
}

public void onMerge(W window, OnMergeContext ctx) throws Exception {
throw new UnsupportedOperationException(“This trigger does not support merging.”);
}

public abstract void clear(W window, TriggerContext ctx) throws Exception;

// ————————————————————————

public interface TriggerContext {

long getCurrentProcessingTime();

MetricGroup getMetricGroup();

long getCurrentWatermark();

void registerProcessingTimeTimer(long time);

void registerEventTimeTimer(long time);

void deleteProcessingTimeTimer(long time);

void deleteEventTimeTimer(long time);

<S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor);

@Deprecated
<S extends Serializable> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState);

@Deprecated
<S extends Serializable> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState);
}

public interface OnMergeContext extends TriggerContext {
<S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor);
}
}

Trigger接收两个泛型,一个是element类型,一个是窗口类型;它定义了onElement、onProcessingTime、onEventTime、canMerge、onMerge、clear几个方法,其中onElement、onProcessingTime、onEventTime均需要返回TriggerResult
onElement在每个element添加到window的时候会被回调;onProcessingTime在注册的event-time timer触发时会被回调;onEventTime在注册的processing-time timer触发时会被回调
canMerge用于标识是否支持trigger state的合并,默认返回false;onMerge在多个window合并的时候会被触发;clear用于清除TriggerContext中存储的相关state
Trigger还定义了TriggerContext及OnMergeContext;TriggerContext定义了注册及删除EventTimeTimer、ProcessingTimeTimer方法,同时还定义了getCurrentProcessingTime、getMetricGroup、getCurrentWatermark、getPartitionedState、getKeyValueState、getKeyValueState方法
OnMergeContext继承了TriggerContext,它多定义了mergePartitionedState方法

TriggerResult
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/TriggerResult.java
public enum TriggerResult {

CONTINUE(false, false),

FIRE_AND_PURGE(true, true),

FIRE(true, false),

PURGE(false, true);

// ————————————————————————

private final boolean fire;
private final boolean purge;

TriggerResult(boolean fire, boolean purge) {
this.purge = purge;
this.fire = fire;
}

public boolean isFire() {
return fire;
}

public boolean isPurge() {
return purge;
}
}

TriggerResult用于表示trigger在onElement、onProcessingTime、onEventTime被回调时返回的action枚举,它有fire、purge两个属性,CONTINUE、FIRE_AND_PURGE、FIRE、PURGE五个枚举
fire表示是否要触发window的computation操作;而purge表示是否要清理window的窗口数据
CONTINUE表示不对window做任何操作;FIRE_AND_PURGE表示要触发window的computation操作然后清理window的窗口数据;FIRE表示仅仅触发window的computation操作但不清理window的窗口数据;PURGE表示不触发window的computation操作但是要清理window的窗口数据

EventTimeTrigger
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
@PublicEvolving
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;

private EventTimeTrigger() {}

@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}

@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}

@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}

@Override
public boolean canMerge() {
return true;
}

@Override
public void onMerge(TimeWindow window,
OnMergeContext ctx) {
// only register a timer if the watermark is not yet past the end of the merged window
// this is in line with the logic in onElement(). If the watermark is past the end of
// the window onElement() will fire and setting a timer here would fire the window twice.
long windowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
ctx.registerEventTimeTimer(windowMaxTimestamp);
}
}

@Override
public String toString() {
return “EventTimeTrigger()”;
}

public static EventTimeTrigger create() {
return new EventTimeTrigger();
}
}

EventTimeTrigger继承了Trigger,element类型为Object,窗口类型为TimeWindow;SlidingEventTimeWindows、TumblingEventTimeWindows、EventTimeSessionWindows、DynamicEventTimeSessionWindows默认都使用EventTimeTrigger
onElement在window.maxTimestamp()小于等于ctx.getCurrentWatermark()的时候,返回TriggerResult.FIRE,否则执行ctx.registerEventTimeTimer(window.maxTimestamp()),然后返回TriggerResult.CONTINUE;onEventTime在time等于window.maxTimestamp()的时候返回TriggerResult.FIRE,否则返回TriggerResult.CONTINUE;onProcessingTime则返回TriggerResult.CONTINUE
canMerge返回true;onMerge在window.maxTimestamp()大于ctx.getCurrentWatermark()的时候会执行ctx.registerEventTimeTimer(windowMaxTimestamp);clear则执行ctx.deleteEventTimeTimer(window.maxTimestamp())

ProcessingTimeTrigger
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
@PublicEvolving
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;

private ProcessingTimeTrigger() {}

@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
ctx.registerProcessingTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.FIRE;
}

@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteProcessingTimeTimer(window.maxTimestamp());
}

@Override
public boolean canMerge() {
return true;
}

@Override
public void onMerge(TimeWindow window,
OnMergeContext ctx) {
// only register a timer if the time is not yet past the end of the merged window
// this is in line with the logic in onElement(). If the time is past the end of
// the window onElement() will fire and setting a timer here would fire the window twice.
long windowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
ctx.registerProcessingTimeTimer(windowMaxTimestamp);
}
}

@Override
public String toString() {
return “ProcessingTimeTrigger()”;
}

public static ProcessingTimeTrigger create() {
return new ProcessingTimeTrigger();
}

}

ProcessingTimeTrigger继承了Trigger,element类型为Object,窗口类型为TimeWindow;SlidingProcessingTimeWindows、TumblingProcessingTimeWindows、ProcessingTimeSessionWindows、DynamicProcessingTimeSessionWindows默认都使用ProcessingTimeTrigger
onElement执行ctx.registerProcessingTimeTimer(window.maxTimestamp()),然后返回TriggerResult.CONTINUE;onEventTime返回TriggerResult.CONTINUE;onProcessingTime则返回TriggerResult.FIRE
canMerge返回true;onMerge在window.maxTimestamp()大于ctx.getCurrentWatermark()的时候会执行ctx.registerProcessingTimeTimer(windowMaxTimestamp);clear则执行ctx.deleteProcessingTimeTimer(window.maxTimestamp())

NeverTrigger
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
@Internal
public static class NeverTrigger extends Trigger<Object, GlobalWindow> {
private static final long serialVersionUID = 1L;

@Override
public TriggerResult onElement(Object element, long timestamp, GlobalWindow window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}

@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}

@Override
public void onMerge(GlobalWindow window, OnMergeContext ctx) {
}
}
NeverTrigger的onElement、onEventTime、onProcessingTime均返回TriggerResult.CONTINUE;GlobalWindows默认使用的是NeverTrigger
CountTrigger
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
@PublicEvolving
public class CountTrigger<W extends Window> extends Trigger<Object, W> {
private static final long serialVersionUID = 1L;

private final long maxCount;

private final ReducingStateDescriptor<Long> stateDesc =
new ReducingStateDescriptor<>(“count”, new Sum(), LongSerializer.INSTANCE);

private CountTrigger(long maxCount) {
this.maxCount = maxCount;
}

@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
count.add(1L);
if (count.get() >= maxCount) {
count.clear();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}

@Override
public void clear(W window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(stateDesc).clear();
}

@Override
public boolean canMerge() {
return true;
}

@Override
public void onMerge(W window, OnMergeContext ctx) throws Exception {
ctx.mergePartitionedState(stateDesc);
}

@Override
public String toString() {
return “CountTrigger(” + maxCount + “)”;
}

public static <W extends Window> CountTrigger<W> of(long maxCount) {
return new CountTrigger<>(maxCount);
}

private static class Sum implements ReduceFunction<Long> {
private static final long serialVersionUID = 1L;

@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}

}
}
CountTrigger继承了Trigger,指定了element类型为Object类型;它定义了maxCount及ReducingStateDescriptor;其中ReducingStateDescriptor用于窗口计数(它使用的是自己定义的Sum函数),在onElement方法里头,当计数大于等于maxCount时,则会清空计数,然后返回TriggerResult.FIRE,否则返回TriggerResult.CONTINUE;onEventTime、onProcessingTime均返回TriggerResult.CONTINUE;canMerge返回true;onMerge执行的是ctx.mergePartitionedState(stateDesc);clear执行的是ctx.getPartitionedState(stateDesc).clear()
PurgingTrigger
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
@PublicEvolving
public class PurgingTrigger<T, W extends Window> extends Trigger<T, W> {
private static final long serialVersionUID = 1L;

private Trigger<T, W> nestedTrigger;

private PurgingTrigger(Trigger<T, W> nestedTrigger) {
this.nestedTrigger = nestedTrigger;
}

@Override
public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
TriggerResult triggerResult = nestedTrigger.onElement(element, timestamp, window, ctx);
return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
}

@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
TriggerResult triggerResult = nestedTrigger.onEventTime(time, window, ctx);
return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
}

@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
TriggerResult triggerResult = nestedTrigger.onProcessingTime(time, window, ctx);
return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
}

@Override
public void clear(W window, TriggerContext ctx) throws Exception {
nestedTrigger.clear(window, ctx);
}

@Override
public boolean canMerge() {
return nestedTrigger.canMerge();
}

@Override
public void onMerge(W window, OnMergeContext ctx) throws Exception {
nestedTrigger.onMerge(window, ctx);
}

@Override
public String toString() {
return “PurgingTrigger(” + nestedTrigger.toString() + “)”;
}

public static <T, W extends Window> PurgingTrigger<T, W> of(Trigger<T, W> nestedTrigger) {
return new PurgingTrigger<>(nestedTrigger);
}

@VisibleForTesting
public Trigger<T, W> getNestedTrigger() {
return nestedTrigger;
}
}
PurgingTrigger是包装型的Trigger,它包装了nestedTrigger,其onElement、onEventTime、onProcessingTime根据nestedTrigger的返回结果,在triggerResult.isFire()为true的时候,包装返回TriggerResult.FIRE_AND_PURGE;canMerge、onMerge、clear等方法均是委托给nestedTrigger处理
小结

Trigger接收两个泛型,一个是element类型,一个是窗口类型;它定义了onElement、onProcessingTime、onEventTime、canMerge、onMerge、clear几个方法,其中onElement、onProcessingTime、onEventTime均需要返回TriggerResult;TriggerResult用于表示trigger在onElement、onProcessingTime、onEventTime被回调时返回的action枚举,它有fire、purge两个属性(fire表示是否要触发window的computation操作;而purge表示是否要清理window的窗口数据),CONTINUE、FIRE_AND_PURGE、FIRE、PURGE五个枚举
SlidingEventTimeWindows、TumblingEventTimeWindows、EventTimeSessionWindows、DynamicEventTimeSessionWindows默认都使用EventTimeTrigger;SlidingProcessingTimeWindows、TumblingProcessingTimeWindows、ProcessingTimeSessionWindows、DynamicProcessingTimeSessionWindows默认都使用ProcessingTimeTrigger;GlobalWindows默认使用的是NeverTrigger
CountTrigger主要用于计数的窗口类型,它使用ReducingStateDescriptor来进行窗口计数,在onElement方法里头,当计数大于等于maxCount时,则会清空计数,然后返回TriggerResult.FIRE,否则返回TriggerResult.CONTINUE;PurgingTrigger是包装型的Trigger,它包装了nestedTrigger,其onElement、onEventTime、onProcessingTime根据nestedTrigger的返回结果,在triggerResult.isFire()为true的时候,包装返回TriggerResult.FIRE_AND_PURGE;canMerge、onMerge、clear等方法均是委托给nestedTrigger处理

doc
Triggers

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理