聊聊flink的InternalTimeServiceManager

39次阅读

共计 10866 个字符,预计需要花费 28 分钟才能阅读完成。


本文主要研究一下 flink 的 InternalTimeServiceManager
InternalTimeServiceManager
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
@Internal
public class InternalTimeServiceManager<K> {

@VisibleForTesting
static final String TIMER_STATE_PREFIX = “_timer_state”;
@VisibleForTesting
static final String PROCESSING_TIMER_PREFIX = TIMER_STATE_PREFIX + “/processing_”;
@VisibleForTesting
static final String EVENT_TIMER_PREFIX = TIMER_STATE_PREFIX + “/event_”;

private final KeyGroupRange localKeyGroupRange;
private final KeyContext keyContext;

private final PriorityQueueSetFactory priorityQueueSetFactory;
private final ProcessingTimeService processingTimeService;

private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices;

private final boolean useLegacySynchronousSnapshots;

InternalTimeServiceManager(
KeyGroupRange localKeyGroupRange,
KeyContext keyContext,
PriorityQueueSetFactory priorityQueueSetFactory,
ProcessingTimeService processingTimeService, boolean useLegacySynchronousSnapshots) {

this.localKeyGroupRange = Preconditions.checkNotNull(localKeyGroupRange);
this.priorityQueueSetFactory = Preconditions.checkNotNull(priorityQueueSetFactory);
this.keyContext = Preconditions.checkNotNull(keyContext);
this.processingTimeService = Preconditions.checkNotNull(processingTimeService);
this.useLegacySynchronousSnapshots = useLegacySynchronousSnapshots;

this.timerServices = new HashMap<>();
}

@SuppressWarnings(“unchecked”)
public <N> InternalTimerService<N> getInternalTimerService(
String name,
TimerSerializer<K, N> timerSerializer,
Triggerable<K, N> triggerable) {

InternalTimerServiceImpl<K, N> timerService = registerOrGetTimerService(name, timerSerializer);

timerService.startTimerService(
timerSerializer.getKeySerializer(),
timerSerializer.getNamespaceSerializer(),
triggerable);

return timerService;
}

@SuppressWarnings(“unchecked”)
<N> InternalTimerServiceImpl<K, N> registerOrGetTimerService(String name, TimerSerializer<K, N> timerSerializer) {
InternalTimerServiceImpl<K, N> timerService = (InternalTimerServiceImpl<K, N>) timerServices.get(name);
if (timerService == null) {

timerService = new InternalTimerServiceImpl<>(
localKeyGroupRange,
keyContext,
processingTimeService,
createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + name, timerSerializer),
createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer));

timerServices.put(name, timerService);
}
return timerService;
}

Map<String, InternalTimerServiceImpl<K, ?>> getRegisteredTimerServices() {
return Collections.unmodifiableMap(timerServices);
}

private <N> KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> createTimerPriorityQueue(
String name,
TimerSerializer<K, N> timerSerializer) {
return priorityQueueSetFactory.create(
name,
timerSerializer);
}

public void advanceWatermark(Watermark watermark) throws Exception {
for (InternalTimerServiceImpl<?, ?> service : timerServices.values()) {
service.advanceWatermark(watermark.getTimestamp());
}
}

////////////////// Fault Tolerance Methods ///////////////////

public void snapshotStateForKeyGroup(DataOutputView stream, int keyGroupIdx) throws IOException {
Preconditions.checkState(useLegacySynchronousSnapshots);
InternalTimerServiceSerializationProxy<K> serializationProxy =
new InternalTimerServiceSerializationProxy<>(this, keyGroupIdx);

serializationProxy.write(stream);
}

public void restoreStateForKeyGroup(
InputStream stream,
int keyGroupIdx,
ClassLoader userCodeClassLoader) throws IOException {

InternalTimerServiceSerializationProxy<K> serializationProxy =
new InternalTimerServiceSerializationProxy<>(
this,
userCodeClassLoader,
keyGroupIdx);

serializationProxy.read(stream);
}

//////////////////// Methods used ONLY IN TESTS ////////////////////

@VisibleForTesting
public int numProcessingTimeTimers() {
int count = 0;
for (InternalTimerServiceImpl<?, ?> timerService : timerServices.values()) {
count += timerService.numProcessingTimeTimers();
}
return count;
}

@VisibleForTesting
public int numEventTimeTimers() {
int count = 0;
for (InternalTimerServiceImpl<?, ?> timerService : timerServices.values()) {
count += timerService.numEventTimeTimers();
}
return count;
}
}

InternalTimeServiceManager 用于管理所有 keyed operators 要使用的 timerService,它在内存使用 map 维护了 timerService 的名称与 InternalTimerServiceImpl 的映射
getInternalTimerService 方法首先调用 registerOrGetTimerService 方法获取或创建指定 name 的 InternalTimerServiceImpl,之后调用 timerService.startTimerService 进行初始化然后返回
registerOrGetTimerService 方法先从名为 timerServices 的 map 中查找指定 name 的 InternalTimerServiceImpl,没有就创建一个,然后放入到名为 timerServices 的 map 中;创建 InternalTimerServiceImpl 的时候,这里使用 createTimerPriorityQueue 来创建 KeyGroupedInternalPriorityQueue 类型的 processingTimeTimersQueue 及 eventTimeTimersQueue;createTimerPriorityQueue 是通过 priorityQueueSetFactory 来创建的

PriorityQueueSetFactory
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/PriorityQueueSetFactory.java
public interface PriorityQueueSetFactory {

@Nonnull
<T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T> create(
@Nonnull String stateName,
@Nonnull TypeSerializer<T> byteOrderedElementSerializer);
}
PriorityQueueSetFactory 定义了 create 方法,创建的是 KeyGroupedInternalPriorityQueue,其中 T 的泛型要求是同时继承或实现 HeapPriorityQueueElement、PriorityComparable、Keyed 这三个接口
HeapPriorityQueueElement
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/heap/HeapPriorityQueueElement.java
@Internal
public interface HeapPriorityQueueElement {

/**
* The index that indicates that a {@link HeapPriorityQueueElement} object is not contained in and managed by any
* {@link HeapPriorityQueue}. We do not strictly enforce that internal indexes must be reset to this value when
* elements are removed from a {@link HeapPriorityQueue}.
*/
int NOT_CONTAINED = Integer.MIN_VALUE;

/**
* Returns the current index of this object in the internal array of {@link HeapPriorityQueue}.
*/
int getInternalIndex();

/**
* Sets the current index of this object in the {@link HeapPriorityQueue} and should only be called by the owning
* {@link HeapPriorityQueue}.
*
* @param newIndex the new index in the timer heap.
*/
void setInternalIndex(int newIndex);
}
HeapPriorityQueueElement 接口定义了 HeapPriorityQueue 所要求的元素类型,它定义了 getInternalIndex、setInternalIndex 方法
PriorityComparable
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/PriorityComparable.java
public interface PriorityComparable<T> {

int comparePriorityTo(@Nonnull T other);
}
PriorityComparable 定义了 comparePriorityTo 方法,用于根据 priority 来进行比对
Keyed
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/Keyed.java
public interface Keyed<K> {

K getKey();
}
Keyed 接口定义了 getKey 方法,用于返回该对象的 key
InternalTimer
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/InternalTimer.java
@Internal
public interface InternalTimer<K, N> extends PriorityComparable<InternalTimer<?, ?>>, Keyed<K> {

/** Function to extract the key from a {@link InternalTimer}. */
KeyExtractorFunction<InternalTimer<?, ?>> KEY_EXTRACTOR_FUNCTION = InternalTimer::getKey;

/** Function to compare instances of {@link InternalTimer}. */
PriorityComparator<InternalTimer<?, ?>> TIMER_COMPARATOR =
(left, right) -> Long.compare(left.getTimestamp(), right.getTimestamp());
/**
* Returns the timestamp of the timer. This value determines the point in time when the timer will fire.
*/
long getTimestamp();

/**
* Returns the key that is bound to this timer.
*/
@Nonnull
@Override
K getKey();

/**
* Returns the namespace that is bound to this timer.
*/
@Nonnull
N getNamespace();
}
InternalTimer 继承了 PriorityComparable、Keyed 接口,它定义了 getTimestamp、getKey、getNamespace 方法,同时内置了 KEY_EXTRACTOR_FUNCTION、TIMER_COMPARATOR
TimerHeapInternalTimer
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
@Internal
public final class TimerHeapInternalTimer<K, N> implements InternalTimer<K, N>, HeapPriorityQueueElement {

/** The key for which the timer is scoped. */
@Nonnull
private final K key;

/** The namespace for which the timer is scoped. */
@Nonnull
private final N namespace;

/** The expiration timestamp. */
private final long timestamp;

private transient int timerHeapIndex;

TimerHeapInternalTimer(long timestamp, @Nonnull K key, @Nonnull N namespace) {
this.timestamp = timestamp;
this.key = key;
this.namespace = namespace;
this.timerHeapIndex = NOT_CONTAINED;
}

@Override
public long getTimestamp() {
return timestamp;
}

@Nonnull
@Override
public K getKey() {
return key;
}

@Nonnull
@Override
public N getNamespace() {
return namespace;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}

if (o instanceof InternalTimer) {
InternalTimer<?, ?> timer = (InternalTimer<?, ?>) o;
return timestamp == timer.getTimestamp()
&& key.equals(timer.getKey())
&& namespace.equals(timer.getNamespace());
}

return false;
}

@Override
public int getInternalIndex() {
return timerHeapIndex;
}

@Override
public void setInternalIndex(int newIndex) {
this.timerHeapIndex = newIndex;
}

void removedFromTimerQueue() {
setInternalIndex(NOT_CONTAINED);
}

@Override
public int hashCode() {
int result = (int) (timestamp ^ (timestamp >>> 32));
result = 31 * result + key.hashCode();
result = 31 * result + namespace.hashCode();
return result;
}

@Override
public String toString() {
return “Timer{” +
“timestamp=” + timestamp +
“, key=” + key +
“, namespace=” + namespace +
‘}’;
}

@Override
public int comparePriorityTo(@Nonnull InternalTimer<?, ?> other) {
return Long.compare(timestamp, other.getTimestamp());
}
}
TimerHeapInternalTimer 实现了 InternalTimer 及 HeapPriorityQueueElement 接口;这里 removedFromTimerQueue 接口是调用 setInternalIndex(NOT_CONTAINED),即改动其 index 为 NOT_CONTAINED,逻辑删除
HeapPriorityQueueSetFactory
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java
public class HeapPriorityQueueSetFactory implements PriorityQueueSetFactory {

@Nonnull
private final KeyGroupRange keyGroupRange;

@Nonnegative
private final int totalKeyGroups;

@Nonnegative
private final int minimumCapacity;

public HeapPriorityQueueSetFactory(
@Nonnull KeyGroupRange keyGroupRange,
@Nonnegative int totalKeyGroups,
@Nonnegative int minimumCapacity) {

this.keyGroupRange = keyGroupRange;
this.totalKeyGroups = totalKeyGroups;
this.minimumCapacity = minimumCapacity;
}

@Nonnull
@Override
public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> HeapPriorityQueueSet<T> create(
@Nonnull String stateName,
@Nonnull TypeSerializer<T> byteOrderedElementSerializer) {

return new HeapPriorityQueueSet<>(
PriorityComparator.forPriorityComparableObjects(),
KeyExtractorFunction.forKeyedObjects(),
minimumCapacity,
keyGroupRange,
totalKeyGroups);
}
}
HeapPriorityQueueSetFactory 实现了 PriorityQueueSetFactory 接口,其 create 方法创建的是 HeapPriorityQueueSet
小结

InternalTimeServiceManager 用于管理所有 keyed operators 要使用的 timerService,它在内存使用 map 维护了 timerService 的名称与 InternalTimerServiceImpl 的映射;getInternalTimerService 方法首先调用 registerOrGetTimerService 方法获取或创建指定 name 的 InternalTimerServiceImpl,之后调用 timerService.startTimerService 进行初始化然后返回
registerOrGetTimerService 方法先从名为 timerServices 的 map 中查找指定 name 的 InternalTimerServiceImpl,没有就创建一个,然后放入到名为 timerServices 的 map 中;创建 InternalTimerServiceImpl 的时候,这里使用 createTimerPriorityQueue 来创建 KeyGroupedInternalPriorityQueue 类型的 processingTimeTimersQueue 及 eventTimeTimersQueue;createTimerPriorityQueue 是通过 priorityQueueSetFactory 来创建的
PriorityQueueSetFactory 定义了 create 方法,创建的是 KeyGroupedInternalPriorityQueue,其中 T 的泛型要求是同时继承或实现 HeapPriorityQueueElement、PriorityComparable、Keyed 这三个接口 (InternalTimer 继承了 PriorityComparable、Keyed 接口,TimerHeapInternalTimer 实现了 InternalTimer 及 HeapPriorityQueueElement 接口);HeapPriorityQueueSetFactory 实现了 PriorityQueueSetFactory 接口,其 create 方法创建的是 HeapPriorityQueueSet

doc
InternalTimeServiceManager

正文完
 0