序本文主要研究一下flink的AbstractTtlStateInternalKvStateflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/internal/InternalKvState.java/** * The {@code InternalKvState} is the root of the internal state type hierarchy, similar to the * {@link State} being the root of the public API state hierarchy. * * <p>The internal state classes give access to the namespace getters and setters and access to * additional functionality, like raw value access or state merging. * * <p>The public API state hierarchy is intended to be programmed against by Flink applications. * The internal state hierarchy holds all the auxiliary methods that are used by the runtime and not * intended to be used by user applications. These internal methods are considered of limited use to users and * only confusing, and are usually not regarded as stable across releases. * * <p>Each specific type in the internal state hierarchy extends the type from the public * state hierarchy: * * <pre> * State * | * +——————-InternalKvState * | | * MergingState | * | | * +—————–InternalMergingState * | | * +——–+——+ | * | | | * ReducingState ListState +—–+—————–+ * | | | | * +———–+ +———– —————–InternalListState * | | * +———InternalReducingState * </pre> * * @param <K> The type of key the state is associated to * @param <N> The type of the namespace * @param <V> The type of values kept internally in state /public interface InternalKvState<K, N, V> extends State { TypeSerializer<K> getKeySerializer(); TypeSerializer<N> getNamespaceSerializer(); TypeSerializer<V> getValueSerializer(); void setCurrentNamespace(N namespace); byte[] getSerializedValue( final byte[] serializedKeyAndNamespace, final TypeSerializer<K> safeKeySerializer, final TypeSerializer<N> safeNamespaceSerializer, final TypeSerializer<V> safeValueSerializer) throws Exception;}InternalKvState接口定义内部的kvState要实现的方法,这里主要是getKeySerializer、getNamespaceSerializer、getValueSerializer、setCurrentNamespace、getSerializedValueAbstractTtlStateflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/ttl/AbstractTtlState.java/* * Base class for TTL logic wrappers of state objects. * * @param <K> The type of key the state is associated to * @param <N> The type of the namespace * @param <SV> The type of values kept internally in state without TTL * @param <TTLSV> The type of values kept internally in state with TTL * @param <S> Type of originally wrapped state object /abstract class AbstractTtlState<K, N, SV, TTLSV, S extends InternalKvState<K, N, TTLSV>> extends AbstractTtlDecorator<S> implements InternalKvState<K, N, SV> { private final TypeSerializer<SV> valueSerializer; AbstractTtlState(S original, StateTtlConfig config, TtlTimeProvider timeProvider, TypeSerializer<SV> valueSerializer) { super(original, config, timeProvider); this.valueSerializer = valueSerializer; } <SE extends Throwable, CE extends Throwable, T> T getWithTtlCheckAndUpdate( SupplierWithException<TtlValue<T>, SE> getter, ThrowingConsumer<TtlValue<T>, CE> updater) throws SE, CE { return getWithTtlCheckAndUpdate(getter, updater, original::clear); } @Override public TypeSerializer<K> getKeySerializer() { return original.getKeySerializer(); } @Override public TypeSerializer<N> getNamespaceSerializer() { return original.getNamespaceSerializer(); } @Override public TypeSerializer<SV> getValueSerializer() { return valueSerializer; } @Override public void setCurrentNamespace(N namespace) { original.setCurrentNamespace(namespace); } @Override public byte[] getSerializedValue( byte[] serializedKeyAndNamespace, TypeSerializer<K> safeKeySerializer, TypeSerializer<N> safeNamespaceSerializer, TypeSerializer<SV> safeValueSerializer) { throw new FlinkRuntimeException(“Queryable state is not currently supported with TTL.”); } @Override public void clear() { original.clear(); }}AbstractTtlState实现了InternalKvState接口的方法,同时继承了AbstractTtlDecorator;它提供了getWithTtlCheckAndUpdate方法,该方法主要是调用AbstractTtlDecorator的getWithTtlCheckAndUpdate来实现TTL逻辑AbstractTtlDecoratorflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java/* * Base class for TTL logic wrappers. * * @param <T> Type of originally wrapped object /abstract class AbstractTtlDecorator<T> { /* Wrapped original state handler. / final T original; final StateTtlConfig config; final TtlTimeProvider timeProvider; /* Whether to renew expiration timestamp on state read access. / final boolean updateTsOnRead; /* Whether to renew expiration timestamp on state read access. / final boolean returnExpired; /* State value time to live in milliseconds. / final long ttl; AbstractTtlDecorator( T original, StateTtlConfig config, TtlTimeProvider timeProvider) { Preconditions.checkNotNull(original); Preconditions.checkNotNull(config); Preconditions.checkNotNull(timeProvider); this.original = original; this.config = config; this.timeProvider = timeProvider; this.updateTsOnRead = config.getUpdateType() == StateTtlConfig.UpdateType.OnReadAndWrite; this.returnExpired = config.getStateVisibility() == StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp; this.ttl = config.getTtl().toMilliseconds(); } <V> V getUnexpired(TtlValue<V> ttlValue) { return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); } <V> boolean expired(TtlValue<V> ttlValue) { return TtlUtils.expired(ttlValue, ttl, timeProvider); } <V> TtlValue<V> wrapWithTs(V value) { return TtlUtils.wrapWithTs(value, timeProvider.currentTimestamp()); } <V> TtlValue<V> rewrapWithNewTs(TtlValue<V> ttlValue) { return wrapWithTs(ttlValue.getUserValue()); } <SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> V getWithTtlCheckAndUpdate( SupplierWithException<TtlValue<V>, SE> getter, ThrowingConsumer<TtlValue<V>, CE> updater, ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE { TtlValue<V> ttlValue = getWrappedWithTtlCheckAndUpdate(getter, updater, stateClear); return ttlValue == null ? null : ttlValue.getUserValue(); } <SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> TtlValue<V> getWrappedWithTtlCheckAndUpdate( SupplierWithException<TtlValue<V>, SE> getter, ThrowingConsumer<TtlValue<V>, CE> updater, ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE { TtlValue<V> ttlValue = getter.get(); if (ttlValue == null) { return null; } else if (expired(ttlValue)) { stateClear.run(); if (!returnExpired) { return null; } } else if (updateTsOnRead) { updater.accept(rewrapWithNewTs(ttlValue)); } return ttlValue; }}AbstractTtlDecorator对TTL逻辑进行了封装,其主要的逻辑在getWrappedWithTtlCheckAndUpdate方法,它在每次访问的时候对于非null的value会先判断下是否expired(TtlUtils.expired(ttlValue, ttl, timeProvider)),如果过期了则调用stateClear(ThrowingRunnable类型,这里是original::clear),对于非returnExpired的则直接返回null;对于没有expired的,则判断是否updateTsOnRead,若是则调用updater进行处理,最后返回ttlValueTtlUtils.expiredflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/ttl/TtlUtils.java/* Common functions related to State TTL. /class TtlUtils { static <V> boolean expired(@Nullable TtlValue<V> ttlValue, long ttl, TtlTimeProvider timeProvider) { return expired(ttlValue, ttl, timeProvider.currentTimestamp()); } static <V> boolean expired(@Nullable TtlValue<V> ttlValue, long ttl, long currentTimestamp) { return ttlValue != null && expired(ttlValue.getLastAccessTimestamp(), ttl, currentTimestamp); } private static boolean expired(long ts, long ttl, long currentTimestamp) { return getExpirationTimestamp(ts, ttl) <= currentTimestamp; } private static long getExpirationTimestamp(long ts, long ttl) { long ttlWithoutOverflow = ts > 0 ? Math.min(Long.MAX_VALUE - ts, ttl) : ttl; return ts + ttlWithoutOverflow; } //……}TtlUtils的expired方法主要是通过getExpirationTimestamp获取过期时间,然后跟currentTimestamp进行比较;而getExpirationTimestamp这里是根据ttlValue.getLastAccessTimestamp()及ttl值进行判断,这里利用Long.MAX_VALUE处理了overflow的情况,防止最后的值超出long类型的最大范围ThrowingRunnableflink-core-1.7.0-sources.jar!/org/apache/flink/util/function/ThrowingRunnable.java/* * Similar to a {@link Runnable}, this interface is used to capture a block of code * to be executed. In contrast to {@code Runnable}, this interface allows throwing * checked exceptions. /@PublicEvolving@FunctionalInterfacepublic interface ThrowingRunnable<E extends Throwable> { /* * The work method. * * @throws E Exceptions may be thrown. / void run() throws E; /* * Converts a {@link ThrowingRunnable} into a {@link Runnable} which throws all checked exceptions * as unchecked. * * @param throwingRunnable to convert into a {@link Runnable} * @return {@link Runnable} which throws all checked exceptions as unchecked. */ static Runnable unchecked(ThrowingRunnable<?> throwingRunnable) { return () -> { try { throwingRunnable.run(); } catch (Throwable t) { ExceptionUtils.rethrow(t); } }; }}stateClear是ThrowingRunnable类型,它与Runnable不同,ThrowingRunnable允许抛出checked exceptions,它提供了一个unchecked的静态方法,用于将非Error及非RuntimeException的转为RuntimeException抛出来,从而将ThrowingRunnable转换为Runnable小结InternalKvState接口定义内部的kvState要实现的方法,这里主要是getKeySerializer、getNamespaceSerializer、getValueSerializer、setCurrentNamespace、getSerializedValueAbstractTtlState实现了InternalKvState接口的方法,同时继承了AbstractTtlDecorator;它提供了getWithTtlCheckAndUpdate方法,该方法主要是调用AbstractTtlDecorator的getWithTtlCheckAndUpdate来实现TTL逻辑AbstractTtlDecorator的getWrappedWithTtlCheckAndUpdate方法,在每次访问的时候对于非null的value会先判断下是否expired(TtlUtils.expired(ttlValue, ttl, timeProvider)),如果过期了则调用stateClear(ThrowingRunnable类型,这里是original::clear),对于非returnExpired的则直接返回null;对于没有expired的,则判断是否updateTsOnRead,若是则调用updater进行处理,最后返回ttlValuedocState Time-To-Live (TTL)