序本文主要研究一下flink的InternalTimeServiceManagerInternalTimeServiceManagerflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java@Internalpublic class InternalTimeServiceManager<K> { @VisibleForTesting static final String TIMER_STATE_PREFIX = “timer_state”; @VisibleForTesting static final String PROCESSING_TIMER_PREFIX = TIMER_STATE_PREFIX + “/processing”; @VisibleForTesting static final String EVENT_TIMER_PREFIX = TIMER_STATE_PREFIX + “/event_”; private final KeyGroupRange localKeyGroupRange; private final KeyContext keyContext; private final PriorityQueueSetFactory priorityQueueSetFactory; private final ProcessingTimeService processingTimeService; private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices; private final boolean useLegacySynchronousSnapshots; InternalTimeServiceManager( KeyGroupRange localKeyGroupRange, KeyContext keyContext, PriorityQueueSetFactory priorityQueueSetFactory, ProcessingTimeService processingTimeService, boolean useLegacySynchronousSnapshots) { this.localKeyGroupRange = Preconditions.checkNotNull(localKeyGroupRange); this.priorityQueueSetFactory = Preconditions.checkNotNull(priorityQueueSetFactory); this.keyContext = Preconditions.checkNotNull(keyContext); this.processingTimeService = Preconditions.checkNotNull(processingTimeService); this.useLegacySynchronousSnapshots = useLegacySynchronousSnapshots; this.timerServices = new HashMap<>(); } @SuppressWarnings(“unchecked”) public <N> InternalTimerService<N> getInternalTimerService( String name, TimerSerializer<K, N> timerSerializer, Triggerable<K, N> triggerable) { InternalTimerServiceImpl<K, N> timerService = registerOrGetTimerService(name, timerSerializer); timerService.startTimerService( timerSerializer.getKeySerializer(), timerSerializer.getNamespaceSerializer(), triggerable); return timerService; } @SuppressWarnings(“unchecked”) <N> InternalTimerServiceImpl<K, N> registerOrGetTimerService(String name, TimerSerializer<K, N> timerSerializer) { InternalTimerServiceImpl<K, N> timerService = (InternalTimerServiceImpl<K, N>) timerServices.get(name); if (timerService == null) { timerService = new InternalTimerServiceImpl<>( localKeyGroupRange, keyContext, processingTimeService, createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + name, timerSerializer), createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer)); timerServices.put(name, timerService); } return timerService; } Map<String, InternalTimerServiceImpl<K, ?>> getRegisteredTimerServices() { return Collections.unmodifiableMap(timerServices); } private <N> KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> createTimerPriorityQueue( String name, TimerSerializer<K, N> timerSerializer) { return priorityQueueSetFactory.create( name, timerSerializer); } public void advanceWatermark(Watermark watermark) throws Exception { for (InternalTimerServiceImpl<?, ?> service : timerServices.values()) { service.advanceWatermark(watermark.getTimestamp()); } } ////////////////// Fault Tolerance Methods /////////////////// public void snapshotStateForKeyGroup(DataOutputView stream, int keyGroupIdx) throws IOException { Preconditions.checkState(useLegacySynchronousSnapshots); InternalTimerServiceSerializationProxy<K> serializationProxy = new InternalTimerServiceSerializationProxy<>(this, keyGroupIdx); serializationProxy.write(stream); } public void restoreStateForKeyGroup( InputStream stream, int keyGroupIdx, ClassLoader userCodeClassLoader) throws IOException { InternalTimerServiceSerializationProxy<K> serializationProxy = new InternalTimerServiceSerializationProxy<>( this, userCodeClassLoader, keyGroupIdx); serializationProxy.read(stream); } //////////////////// Methods used ONLY IN TESTS //////////////////// @VisibleForTesting public int numProcessingTimeTimers() { int count = 0; for (InternalTimerServiceImpl<?, ?> timerService : timerServices.values()) { count += timerService.numProcessingTimeTimers(); } return count; } @VisibleForTesting public int numEventTimeTimers() { int count = 0; for (InternalTimerServiceImpl<?, ?> timerService : timerServices.values()) { count += timerService.numEventTimeTimers(); } return count; }}InternalTimeServiceManager用于管理所有keyed operators要使用的timerService,它在内存使用map维护了timerService的名称与InternalTimerServiceImpl的映射getInternalTimerService方法首先调用registerOrGetTimerService方法获取或创建指定name的InternalTimerServiceImpl,之后调用timerService.startTimerService进行初始化然后返回registerOrGetTimerService方法先从名为timerServices的map中查找指定name的InternalTimerServiceImpl,没有就创建一个,然后放入到名为timerServices的map中;创建InternalTimerServiceImpl的时候,这里使用createTimerPriorityQueue来创建KeyGroupedInternalPriorityQueue类型的processingTimeTimersQueue及eventTimeTimersQueue;createTimerPriorityQueue是通过priorityQueueSetFactory来创建的PriorityQueueSetFactoryflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/PriorityQueueSetFactory.javapublic interface PriorityQueueSetFactory { @Nonnull <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T> create( @Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer);}PriorityQueueSetFactory定义了create方法,创建的是KeyGroupedInternalPriorityQueue,其中T的泛型要求是同时继承或实现HeapPriorityQueueElement、PriorityComparable、Keyed这三个接口HeapPriorityQueueElementflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/heap/HeapPriorityQueueElement.java@Internalpublic interface HeapPriorityQueueElement { /** * The index that indicates that a {@link HeapPriorityQueueElement} object is not contained in and managed by any * {@link HeapPriorityQueue}. We do not strictly enforce that internal indexes must be reset to this value when * elements are removed from a {@link HeapPriorityQueue}. / int NOT_CONTAINED = Integer.MIN_VALUE; /* * Returns the current index of this object in the internal array of {@link HeapPriorityQueue}. / int getInternalIndex(); /* * Sets the current index of this object in the {@link HeapPriorityQueue} and should only be called by the owning * {@link HeapPriorityQueue}. * * @param newIndex the new index in the timer heap. / void setInternalIndex(int newIndex);}HeapPriorityQueueElement接口定义了HeapPriorityQueue所要求的元素类型,它定义了getInternalIndex、setInternalIndex方法PriorityComparableflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/PriorityComparable.javapublic interface PriorityComparable<T> { int comparePriorityTo(@Nonnull T other);}PriorityComparable定义了comparePriorityTo方法,用于根据priority来进行比对Keyedflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/Keyed.javapublic interface Keyed<K> { K getKey();}Keyed接口定义了getKey方法,用于返回该对象的keyInternalTimerflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/InternalTimer.java@Internalpublic interface InternalTimer<K, N> extends PriorityComparable<InternalTimer<?, ?>>, Keyed<K> { /* Function to extract the key from a {@link InternalTimer}. / KeyExtractorFunction<InternalTimer<?, ?>> KEY_EXTRACTOR_FUNCTION = InternalTimer::getKey; /* Function to compare instances of {@link InternalTimer}. / PriorityComparator<InternalTimer<?, ?>> TIMER_COMPARATOR = (left, right) -> Long.compare(left.getTimestamp(), right.getTimestamp()); /* * Returns the timestamp of the timer. This value determines the point in time when the timer will fire. / long getTimestamp(); /* * Returns the key that is bound to this timer. / @Nonnull @Override K getKey(); /* * Returns the namespace that is bound to this timer. / @Nonnull N getNamespace();}InternalTimer继承了PriorityComparable、Keyed接口,它定义了getTimestamp、getKey、getNamespace方法,同时内置了KEY_EXTRACTOR_FUNCTION、TIMER_COMPARATORTimerHeapInternalTimerflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java@Internalpublic final class TimerHeapInternalTimer<K, N> implements InternalTimer<K, N>, HeapPriorityQueueElement { /* The key for which the timer is scoped. / @Nonnull private final K key; /* The namespace for which the timer is scoped. / @Nonnull private final N namespace; /* The expiration timestamp. */ private final long timestamp; private transient int timerHeapIndex; TimerHeapInternalTimer(long timestamp, @Nonnull K key, @Nonnull N namespace) { this.timestamp = timestamp; this.key = key; this.namespace = namespace; this.timerHeapIndex = NOT_CONTAINED; } @Override public long getTimestamp() { return timestamp; } @Nonnull @Override public K getKey() { return key; } @Nonnull @Override public N getNamespace() { return namespace; } @Override public boolean equals(Object o) { if (this == o) { return true; } if (o instanceof InternalTimer) { InternalTimer<?, ?> timer = (InternalTimer<?, ?>) o; return timestamp == timer.getTimestamp() && key.equals(timer.getKey()) && namespace.equals(timer.getNamespace()); } return false; } @Override public int getInternalIndex() { return timerHeapIndex; } @Override public void setInternalIndex(int newIndex) { this.timerHeapIndex = newIndex; } void removedFromTimerQueue() { setInternalIndex(NOT_CONTAINED); } @Override public int hashCode() { int result = (int) (timestamp ^ (timestamp >>> 32)); result = 31 * result + key.hashCode(); result = 31 * result + namespace.hashCode(); return result; } @Override public String toString() { return “Timer{” + “timestamp=” + timestamp + “, key=” + key + “, namespace=” + namespace + ‘}’; } @Override public int comparePriorityTo(@Nonnull InternalTimer<?, ?> other) { return Long.compare(timestamp, other.getTimestamp()); }}TimerHeapInternalTimer实现了InternalTimer及HeapPriorityQueueElement接口;这里removedFromTimerQueue接口是调用setInternalIndex(NOT_CONTAINED),即改动其index为NOT_CONTAINED,逻辑删除HeapPriorityQueueSetFactoryflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.javapublic class HeapPriorityQueueSetFactory implements PriorityQueueSetFactory { @Nonnull private final KeyGroupRange keyGroupRange; @Nonnegative private final int totalKeyGroups; @Nonnegative private final int minimumCapacity; public HeapPriorityQueueSetFactory( @Nonnull KeyGroupRange keyGroupRange, @Nonnegative int totalKeyGroups, @Nonnegative int minimumCapacity) { this.keyGroupRange = keyGroupRange; this.totalKeyGroups = totalKeyGroups; this.minimumCapacity = minimumCapacity; } @Nonnull @Override public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> HeapPriorityQueueSet<T> create( @Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer) { return new HeapPriorityQueueSet<>( PriorityComparator.forPriorityComparableObjects(), KeyExtractorFunction.forKeyedObjects(), minimumCapacity, keyGroupRange, totalKeyGroups); }}HeapPriorityQueueSetFactory实现了PriorityQueueSetFactory接口,其create方法创建的是HeapPriorityQueueSet小结InternalTimeServiceManager用于管理所有keyed operators要使用的timerService,它在内存使用map维护了timerService的名称与InternalTimerServiceImpl的映射;getInternalTimerService方法首先调用registerOrGetTimerService方法获取或创建指定name的InternalTimerServiceImpl,之后调用timerService.startTimerService进行初始化然后返回registerOrGetTimerService方法先从名为timerServices的map中查找指定name的InternalTimerServiceImpl,没有就创建一个,然后放入到名为timerServices的map中;创建InternalTimerServiceImpl的时候,这里使用createTimerPriorityQueue来创建KeyGroupedInternalPriorityQueue类型的processingTimeTimersQueue及eventTimeTimersQueue;createTimerPriorityQueue是通过priorityQueueSetFactory来创建的PriorityQueueSetFactory定义了create方法,创建的是KeyGroupedInternalPriorityQueue,其中T的泛型要求是同时继承或实现HeapPriorityQueueElement、PriorityComparable、Keyed这三个接口(InternalTimer继承了PriorityComparable、Keyed接口,TimerHeapInternalTimer实现了InternalTimer及HeapPriorityQueueElement接口);HeapPriorityQueueSetFactory实现了PriorityQueueSetFactory接口,其create方法创建的是HeapPriorityQueueSetdocInternalTimeServiceManager