乐趣区

聊聊flink的TimerService


本文主要研究一下 flink 的 TimerService
TimerService
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/TimerService.java
@PublicEvolving
public interface TimerService {

String UNSUPPORTED_REGISTER_TIMER_MSG = “Setting timers is only supported on a keyed streams.”;

String UNSUPPORTED_DELETE_TIMER_MSG = “Deleting timers is only supported on a keyed streams.”;

long currentProcessingTime();

long currentWatermark();

void registerProcessingTimeTimer(long time);

void registerEventTimeTimer(long time);

void deleteProcessingTimeTimer(long time);

void deleteEventTimeTimer(long time);
}
TimerService 接口定义了 currentProcessingTime、currentWatermark、registerProcessingTimeTimer、registerEventTimeTimer、deleteProcessingTimeTimer、deleteEventTimeTimer 接口
SimpleTimerService
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/SimpleTimerService.java
@Internal
public class SimpleTimerService implements TimerService {

private final InternalTimerService<VoidNamespace> internalTimerService;

public SimpleTimerService(InternalTimerService<VoidNamespace> internalTimerService) {
this.internalTimerService = internalTimerService;
}

@Override
public long currentProcessingTime() {
return internalTimerService.currentProcessingTime();
}

@Override
public long currentWatermark() {
return internalTimerService.currentWatermark();
}

@Override
public void registerProcessingTimeTimer(long time) {
internalTimerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, time);
}

@Override
public void registerEventTimeTimer(long time) {
internalTimerService.registerEventTimeTimer(VoidNamespace.INSTANCE, time);
}

@Override
public void deleteProcessingTimeTimer(long time) {
internalTimerService.deleteProcessingTimeTimer(VoidNamespace.INSTANCE, time);
}

@Override
public void deleteEventTimeTimer(long time) {
internalTimerService.deleteEventTimeTimer(VoidNamespace.INSTANCE, time);
}
}
SimpleTimerService 实现了 TimerService,它是委托 InternalTimerService 来实现
InternalTimerService
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/InternalTimerService.java
@Internal
public interface InternalTimerService<N> {

long currentProcessingTime();

long currentWatermark();

void registerProcessingTimeTimer(N namespace, long time);

void deleteProcessingTimeTimer(N namespace, long time);

void registerEventTimeTimer(N namespace, long time);

void deleteEventTimeTimer(N namespace, long time);
}
InternalTimerService 是 TimerService 的 internal 版本的接口,比起 TimerService 它定义了 namespace,在 registerProcessingTimeTimer、deleteProcessingTimeTimer、registerEventTimeTimer、deleteEventTimeTimer 的方法中均多了一个 namesapce 的参数
InternalTimerServiceImpl
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N>, ProcessingTimeCallback {

private final ProcessingTimeService processingTimeService;

private final KeyContext keyContext;

private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue;

private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue;

private final KeyGroupRange localKeyGroupRange;

private final int localKeyGroupRangeStartIdx;

private long currentWatermark = Long.MIN_VALUE;

private ScheduledFuture<?> nextTimer;

// Variables to be set when the service is started.

private TypeSerializer<K> keySerializer;

private TypeSerializer<N> namespaceSerializer;

private Triggerable<K, N> triggerTarget;

private volatile boolean isInitialized;

private TypeSerializer<K> keyDeserializer;

private TypeSerializer<N> namespaceDeserializer;

private InternalTimersSnapshot<K, N> restoredTimersSnapshot;

InternalTimerServiceImpl(
KeyGroupRange localKeyGroupRange,
KeyContext keyContext,
ProcessingTimeService processingTimeService,
KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue,
KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue) {

this.keyContext = checkNotNull(keyContext);
this.processingTimeService = checkNotNull(processingTimeService);
this.localKeyGroupRange = checkNotNull(localKeyGroupRange);
this.processingTimeTimersQueue = checkNotNull(processingTimeTimersQueue);
this.eventTimeTimersQueue = checkNotNull(eventTimeTimersQueue);

// find the starting index of the local key-group range
int startIdx = Integer.MAX_VALUE;
for (Integer keyGroupIdx : localKeyGroupRange) {
startIdx = Math.min(keyGroupIdx, startIdx);
}
this.localKeyGroupRangeStartIdx = startIdx;
}

public void startTimerService(
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
Triggerable<K, N> triggerTarget) {

if (!isInitialized) {

if (keySerializer == null || namespaceSerializer == null) {
throw new IllegalArgumentException(“The TimersService serializers cannot be null.”);
}

if (this.keySerializer != null || this.namespaceSerializer != null || this.triggerTarget != null) {
throw new IllegalStateException(“The TimerService has already been initialized.”);
}

// the following is the case where we restore
if (restoredTimersSnapshot != null) {
CompatibilityResult<K> keySerializerCompatibility = CompatibilityUtil.resolveCompatibilityResult(
this.keyDeserializer,
null,
restoredTimersSnapshot.getKeySerializerConfigSnapshot(),
keySerializer);

CompatibilityResult<N> namespaceSerializerCompatibility = CompatibilityUtil.resolveCompatibilityResult(
this.namespaceDeserializer,
null,
restoredTimersSnapshot.getNamespaceSerializerConfigSnapshot(),
namespaceSerializer);

if (keySerializerCompatibility.isRequiresMigration() || namespaceSerializerCompatibility.isRequiresMigration()) {
throw new IllegalStateException(“Tried to initialize restored TimerService ” +
“with incompatible serializers than those used to snapshot its state.”);
}
}

this.keySerializer = keySerializer;
this.namespaceSerializer = namespaceSerializer;
this.keyDeserializer = null;
this.namespaceDeserializer = null;

this.triggerTarget = Preconditions.checkNotNull(triggerTarget);

// re-register the restored timers (if any)
final InternalTimer<K, N> headTimer = processingTimeTimersQueue.peek();
if (headTimer != null) {
nextTimer = processingTimeService.registerTimer(headTimer.getTimestamp(), this);
}
this.isInitialized = true;
} else {
if (!(this.keySerializer.equals(keySerializer) && this.namespaceSerializer.equals(namespaceSerializer))) {
throw new IllegalArgumentException(“Already initialized Timer Service ” +
“tried to be initialized with different key and namespace serializers.”);
}
}
}

@Override
public long currentProcessingTime() {
return processingTimeService.getCurrentProcessingTime();
}

@Override
public long currentWatermark() {
return currentWatermark;
}

@Override
public void registerProcessingTimeTimer(N namespace, long time) {
InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
// check if we need to re-schedule our timer to earlier
if (time < nextTriggerTime) {
if (nextTimer != null) {
nextTimer.cancel(false);
}
nextTimer = processingTimeService.registerTimer(time, this);
}
}
}

@Override
public void registerEventTimeTimer(N namespace, long time) {
eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
}

@Override
public void deleteProcessingTimeTimer(N namespace, long time) {
processingTimeTimersQueue.remove(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
}

@Override
public void deleteEventTimeTimer(N namespace, long time) {
eventTimeTimersQueue.remove(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
}

@Override
public void onProcessingTime(long time) throws Exception {
// null out the timer in case the Triggerable calls registerProcessingTimeTimer()
// inside the callback.
nextTimer = null;

InternalTimer<K, N> timer;

while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
processingTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey());
triggerTarget.onProcessingTime(timer);
}

if (timer != null && nextTimer == null) {
nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
}
}

public void advanceWatermark(long time) throws Exception {
currentWatermark = time;

InternalTimer<K, N> timer;

while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
eventTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey());
triggerTarget.onEventTime(timer);
}
}

public InternalTimersSnapshot<K, N> snapshotTimersForKeyGroup(int keyGroupIdx) {
return new InternalTimersSnapshot<>(
keySerializer,
keySerializer.snapshotConfiguration(),
namespaceSerializer,
namespaceSerializer.snapshotConfiguration(),
eventTimeTimersQueue.getSubsetForKeyGroup(keyGroupIdx),
processingTimeTimersQueue.getSubsetForKeyGroup(keyGroupIdx));
}

@SuppressWarnings(“unchecked”)
public void restoreTimersForKeyGroup(InternalTimersSnapshot<?, ?> restoredSnapshot, int keyGroupIdx) {
this.restoredTimersSnapshot = (InternalTimersSnapshot<K, N>) restoredSnapshot;

if (areSnapshotSerializersIncompatible(restoredSnapshot)) {
throw new IllegalArgumentException(“Tried to restore timers ” +
“for the same service with different serializers.”);
}

this.keyDeserializer = restoredTimersSnapshot.getKeySerializer();
this.namespaceDeserializer = restoredTimersSnapshot.getNamespaceSerializer();

checkArgument(localKeyGroupRange.contains(keyGroupIdx),
“Key Group ” + keyGroupIdx + ” does not belong to the local range.”);

// restore the event time timers
eventTimeTimersQueue.addAll(restoredTimersSnapshot.getEventTimeTimers());

// restore the processing time timers
processingTimeTimersQueue.addAll(restoredTimersSnapshot.getProcessingTimeTimers());
}

@VisibleForTesting
public int numProcessingTimeTimers() {
return this.processingTimeTimersQueue.size();
}

@VisibleForTesting
public int numEventTimeTimers() {
return this.eventTimeTimersQueue.size();
}

@VisibleForTesting
public int numProcessingTimeTimers(N namespace) {
return countTimersInNamespaceInternal(namespace, processingTimeTimersQueue);
}

@VisibleForTesting
public int numEventTimeTimers(N namespace) {
return countTimersInNamespaceInternal(namespace, eventTimeTimersQueue);
}

private int countTimersInNamespaceInternal(N namespace, InternalPriorityQueue<TimerHeapInternalTimer<K, N>> queue) {
int count = 0;
try (final CloseableIterator<TimerHeapInternalTimer<K, N>> iterator = queue.iterator()) {
while (iterator.hasNext()) {
final TimerHeapInternalTimer<K, N> timer = iterator.next();
if (timer.getNamespace().equals(namespace)) {
count++;
}
}
} catch (Exception e) {
throw new FlinkRuntimeException(“Exception when closing iterator.”, e);
}
return count;
}

@VisibleForTesting
int getLocalKeyGroupRangeStartIdx() {
return this.localKeyGroupRangeStartIdx;
}

@VisibleForTesting
List<Set<TimerHeapInternalTimer<K, N>>> getEventTimeTimersPerKeyGroup() {
return partitionElementsByKeyGroup(eventTimeTimersQueue);
}

@VisibleForTesting
List<Set<TimerHeapInternalTimer<K, N>>> getProcessingTimeTimersPerKeyGroup() {
return partitionElementsByKeyGroup(processingTimeTimersQueue);
}

private <T> List<Set<T>> partitionElementsByKeyGroup(KeyGroupedInternalPriorityQueue<T> keyGroupedQueue) {
List<Set<T>> result = new ArrayList<>(localKeyGroupRange.getNumberOfKeyGroups());
for (int keyGroup : localKeyGroupRange) {
result.add(Collections.unmodifiableSet(keyGroupedQueue.getSubsetForKeyGroup(keyGroup)));
}
return result;
}

private boolean areSnapshotSerializersIncompatible(InternalTimersSnapshot<?, ?> restoredSnapshot) {
return (this.keyDeserializer != null && !this.keyDeserializer.equals(restoredSnapshot.getKeySerializer())) ||
(this.namespaceDeserializer != null && !this.namespaceDeserializer.equals(restoredSnapshot.getNamespaceSerializer()));
}
}

InternalTimerServiceImpl 实现了 InternalTimerService 及 ProcessingTimeCallback(定义了 onProcessingTime 方法) 接口
startTimerService 方法主要是初始化 keySerializer、namespaceSerializer、triggerTarget 属性;registerEventTimeTimer 及 deleteEventTimeTimer 方法使用的是 eventTimeTimersQueue;registerProcessingTimeTimer 及 deleteProcessingTimeTimer 方法使用的是 processingTimeTimersQueue(eventTimeTimersQueue 及 processingTimeTimersQueue 的类型为 KeyGroupedInternalPriorityQueue,queue 的元素类型为 TimerHeapInternalTimer)
eventTimerTimer 的触发主要是在 advanceWatermark 方法中 (AbstractStreamOperator 的 processWatermark 方法会调用 InternalTimeServiceManager 的 advanceWatermark 方法,而该方法调用的是 InternalTimerServiceImpl 的 advanceWatermark 方法),它会移除 timestamp 小于等于指定 time 的 eventTimerTimer,然后回调 triggerTarget.onEventTime 方法;而 processingTimeTimer 的触发则是在 onProcessingTime 方法中 (SystemProcessingTimeService 的 TriggerTask 及 RepeatedTriggerTask 的定时任务会回调 ProcessingTimeCallback 的 onProcessingTime 方法),它会移除 timestamp 小于等于指定 time 的 processingTimeTimer,然后回调 triggerTarget.onProcessingTime 方法

Triggerable
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/Triggerable.java
@Internal
public interface Triggerable<K, N> {

/**
* Invoked when an event-time timer fires.
*/
void onEventTime(InternalTimer<K, N> timer) throws Exception;

/**
* Invoked when a processing-time timer fires.
*/
void onProcessingTime(InternalTimer<K, N> timer) throws Exception;
}
Triggerable 接口定义了 InternalTimerService 会调用的 onEventTime 及 onProcessingTime 方法;WindowOperator、IntervalJoinOperator、KeyedProcessOperator、KeyedCoProcessOperator 等 operator 均实现了 Triggerable 接口,可以响应 timer 的 onEventTime 或 onProcessingTime 的回调
小结

TimerService 接口定义了 currentProcessingTime、currentWatermark、registerProcessingTimeTimer、registerEventTimeTimer、deleteProcessingTimeTimer、deleteEventTimeTimer 接口;它有一个实现类为 SimpleTimerService,而 SimpleTimerService 主要是委托给 InternalTimerService 来实现
InternalTimerService 是 TimerService 的 internal 版本的接口,比起 TimerService 它定义了 namespace,在 registerProcessingTimeTimer、deleteProcessingTimeTimer、registerEventTimeTimer、deleteEventTimeTimer 的方法中均多了一个 namesapce 的参数;它的实现类为 InternalTimerServiceImpl;InternalTimerServiceImpl 实现了 InternalTimerService 及 ProcessingTimeCallback(定义了 onProcessingTime 方法) 接口,其 registerEventTimeTimer 及 deleteEventTimeTimer 方法使用的是 eventTimeTimersQueue;registerProcessingTimeTimer 及 deleteProcessingTimeTimer 方法使用的是 processingTimeTimersQueue(eventTimeTimersQueue 及 processingTimeTimersQueue 的类型为 KeyGroupedInternalPriorityQueue,queue 的元素类型为 TimerHeapInternalTimer)
InternalTimerServiceImpl 的 eventTimerTimer 的触发主要是在 advanceWatermark 方法中 (AbstractStreamOperator 的 processWatermark 方法会调用 InternalTimeServiceManager 的 advanceWatermark 方法,而该方法调用的是 InternalTimerServiceImpl 的 advanceWatermark 方法),它会移除 timestamp 小于等于指定 time 的 eventTimerTimer,然后回调 triggerTarget.onEventTime 方法
InternalTimerServiceImpl 的 processingTimeTimer 的触发则是在 onProcessingTime 方法中 (SystemProcessingTimeService 的 TriggerTask 及 RepeatedTriggerTask 的定时任务会回调 ProcessingTimeCallback 的 onProcessingTime 方法),它会移除 timestamp 小于等于指定 time 的 processingTimeTimer,然后回调 triggerTarget.onProcessingTime 方法
Triggerable 接口定义了 InternalTimerService 会调用的 onEventTime 及 onProcessingTime 方法;WindowOperator、IntervalJoinOperator、KeyedProcessOperator、KeyedCoProcessOperator 等 operator 均实现了 Triggerable 接口,可以响应 timer 的 onEventTime 或 onProcessingTime 的回调

doc
TimerService

退出移动版