聊聊flink的StateTtlConfig

31次阅读

共计 15116 个字符,预计需要花费 38 分钟才能阅读完成。


本文主要研究一下 flink 的 StateTtlConfig
实例
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();

ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>(“text state”, String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
这里利用 builder 创建 StateTtlConfig,之后通过 StateDescriptor 的 enableTimeToLive 方法传递该 config
StateTtlConfig
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/StateTtlConfig.java
/**
* Configuration of state TTL logic.
*
* <p>Note: The map state with TTL currently supports {@code null} user values
* only if the user value serializer can handle {@code null} values.
* If the serializer does not support {@code null} values,
* it can be wrapped with {@link org.apache.flink.api.java.typeutils.runtime.NullableSerializer}
* at the cost of an extra byte in the serialized form.
*/
public class StateTtlConfig implements Serializable {

private static final long serialVersionUID = -7592693245044289793L;

public static final StateTtlConfig DISABLED =
newBuilder(Time.milliseconds(Long.MAX_VALUE)).setUpdateType(UpdateType.Disabled).build();

/**
* This option value configures when to update last access timestamp which prolongs state TTL.
*/
public enum UpdateType {
/** TTL is disabled. State does not expire. */
Disabled,
/** Last access timestamp is initialised when state is created and updated on every write operation. */
OnCreateAndWrite,
/** The same as <code>OnCreateAndWrite</code> but also updated on read. */
OnReadAndWrite
}

/**
* This option configures whether expired user value can be returned or not.
*/
public enum StateVisibility {
/** Return expired user value if it is not cleaned up yet. */
ReturnExpiredIfNotCleanedUp,
/** Never return expired user value. */
NeverReturnExpired
}

/**
* This option configures time scale to use for ttl.
*/
public enum TimeCharacteristic {
/** Processing time, see also <code>TimeCharacteristic.ProcessingTime</code>. */
ProcessingTime
}

private final UpdateType updateType;
private final StateVisibility stateVisibility;
private final TimeCharacteristic timeCharacteristic;
private final Time ttl;
private final CleanupStrategies cleanupStrategies;

private StateTtlConfig(
UpdateType updateType,
StateVisibility stateVisibility,
TimeCharacteristic timeCharacteristic,
Time ttl,
CleanupStrategies cleanupStrategies) {
this.updateType = Preconditions.checkNotNull(updateType);
this.stateVisibility = Preconditions.checkNotNull(stateVisibility);
this.timeCharacteristic = Preconditions.checkNotNull(timeCharacteristic);
this.ttl = Preconditions.checkNotNull(ttl);
this.cleanupStrategies = cleanupStrategies;
Preconditions.checkArgument(ttl.toMilliseconds() > 0,
“TTL is expected to be positive”);
}

@Nonnull
public UpdateType getUpdateType() {
return updateType;
}

@Nonnull
public StateVisibility getStateVisibility() {
return stateVisibility;
}

@Nonnull
public Time getTtl() {
return ttl;
}

@Nonnull
public TimeCharacteristic getTimeCharacteristic() {
return timeCharacteristic;
}

public boolean isEnabled() {
return updateType != UpdateType.Disabled;
}

@Nonnull
public CleanupStrategies getCleanupStrategies() {
return cleanupStrategies;
}

@Override
public String toString() {
return “StateTtlConfig{” +
“updateType=” + updateType +
“, stateVisibility=” + stateVisibility +
“, timeCharacteristic=” + timeCharacteristic +
“, ttl=” + ttl +
‘}’;
}

@Nonnull
public static Builder newBuilder(@Nonnull Time ttl) {
return new Builder(ttl);
}

/**
* Builder for the {@link StateTtlConfig}.
*/
public static class Builder {

private UpdateType updateType = OnCreateAndWrite;
private StateVisibility stateVisibility = NeverReturnExpired;
private TimeCharacteristic timeCharacteristic = ProcessingTime;
private Time ttl;
private CleanupStrategies cleanupStrategies = new CleanupStrategies();

public Builder(@Nonnull Time ttl) {
this.ttl = ttl;
}

/**
* Sets the ttl update type.
*
* @param updateType The ttl update type configures when to update last access timestamp which prolongs state TTL.
*/
@Nonnull
public Builder setUpdateType(UpdateType updateType) {
this.updateType = updateType;
return this;
}

@Nonnull
public Builder updateTtlOnCreateAndWrite() {
return setUpdateType(UpdateType.OnCreateAndWrite);
}

@Nonnull
public Builder updateTtlOnReadAndWrite() {
return setUpdateType(UpdateType.OnReadAndWrite);
}

/**
* Sets the state visibility.
*
* @param stateVisibility The state visibility configures whether expired user value can be returned or not.
*/
@Nonnull
public Builder setStateVisibility(@Nonnull StateVisibility stateVisibility) {
this.stateVisibility = stateVisibility;
return this;
}

@Nonnull
public Builder returnExpiredIfNotCleanedUp() {
return setStateVisibility(StateVisibility.ReturnExpiredIfNotCleanedUp);
}

@Nonnull
public Builder neverReturnExpired() {
return setStateVisibility(StateVisibility.NeverReturnExpired);
}

/**
* Sets the time characteristic.
*
* @param timeCharacteristic The time characteristic configures time scale to use for ttl.
*/
@Nonnull
public Builder setTimeCharacteristic(@Nonnull TimeCharacteristic timeCharacteristic) {
this.timeCharacteristic = timeCharacteristic;
return this;
}

@Nonnull
public Builder useProcessingTime() {
return setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
}

/** Cleanup expired state in full snapshot on checkpoint. */
@Nonnull
public Builder cleanupFullSnapshot() {
cleanupStrategies.strategies.put(
CleanupStrategies.Strategies.FULL_STATE_SCAN_SNAPSHOT,
new CleanupStrategies.CleanupStrategy() {});
return this;
}

/**
* Sets the ttl time.
* @param ttl The ttl time.
*/
@Nonnull
public Builder setTtl(@Nonnull Time ttl) {
this.ttl = ttl;
return this;
}

@Nonnull
public StateTtlConfig build() {
return new StateTtlConfig(
updateType,
stateVisibility,
timeCharacteristic,
ttl,
cleanupStrategies);
}
}

/**
* TTL cleanup strategies.
*
* <p>This class configures when to cleanup expired state with TTL.
* By default, state is always cleaned up on explicit read access if found expired.
* Currently cleanup of state full snapshot can be additionally activated.
*/
public static class CleanupStrategies implements Serializable {
private static final long serialVersionUID = -1617740467277313524L;

/** Fixed strategies ordinals in {@code strategies} config field. */
enum Strategies {
FULL_STATE_SCAN_SNAPSHOT
}

/** Base interface for cleanup strategies configurations. */
interface CleanupStrategy extends Serializable {

}

final EnumMap<Strategies, CleanupStrategy> strategies = new EnumMap<>(Strategies.class);

public boolean inFullSnapshot() {
return strategies.containsKey(Strategies.FULL_STATE_SCAN_SNAPSHOT);
}
}
}

StateTtlConfig 用于设置 state 的 TTL 属性,这里定义了三个枚举,分别是 UpdateType(Disabled、OnCreateAndWrite、OnReadAndWrite)、StateVisibility(ReturnExpiredIfNotCleanedUp、NeverReturnExpired)、TimeCharacteristic(ProcessingTime)
StateTtlConfig 定义了 CleanupStrategies,即 TTL state 的清理策略,默认在读取到 expired 的 state 时会进行清理,目前还额外提供在 FULL_STATE_SCAN_SNAPSHOT 的时候进行清理 (在 checkpoint 时清理 full snapshot 中的 expired state) 的选项
StateTtlConfig 还提供了一个 Builder,用于快速设置 UpdateType、StateVisibility、TimeCharacteristic、Time、CleanupStrategies

AbstractKeyedStateBackend
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
/**
* @see KeyedStateBackend
*/
@Override
@SuppressWarnings(“unchecked”)
public <N, S extends State, V> S getOrCreateKeyedState(
final TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, V> stateDescriptor) throws Exception {
checkNotNull(namespaceSerializer, “Namespace serializer”);
checkNotNull(keySerializer, “State key serializer has not been configured in the config. ” +
“This operation cannot use partitioned state.”);

InternalKvState<K, ?, ?> kvState = keyValueStatesByName.get(stateDescriptor.getName());
if (kvState == null) {
if (!stateDescriptor.isSerializerInitialized()) {
stateDescriptor.initializeSerializerUnlessSet(executionConfig);
}
kvState = TtlStateFactory.createStateAndWrapWithTtlIfEnabled(
namespaceSerializer, stateDescriptor, this, ttlTimeProvider);
keyValueStatesByName.put(stateDescriptor.getName(), kvState);
publishQueryableStateIfEnabled(stateDescriptor, kvState);
}
return (S) kvState;
}
AbstractKeyedStateBackend 的 getOrCreateKeyedState 方法里头使用 TtlStateFactory.createStateAndWrapWithTtlIfEnabled 来创建 InternalKvState
TtlStateFactory
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
/**
* This state factory wraps state objects, produced by backends, with TTL logic.
*/
public class TtlStateFactory<N, SV, S extends State, IS extends S> {
public static <N, SV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(
TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, SV> stateDesc,
KeyedStateFactory originalStateFactory,
TtlTimeProvider timeProvider) throws Exception {
Preconditions.checkNotNull(namespaceSerializer);
Preconditions.checkNotNull(stateDesc);
Preconditions.checkNotNull(originalStateFactory);
Preconditions.checkNotNull(timeProvider);
return stateDesc.getTtlConfig().isEnabled() ?
new TtlStateFactory<N, SV, S, IS>(
namespaceSerializer, stateDesc, originalStateFactory, timeProvider)
.createState() :
originalStateFactory.createInternalState(namespaceSerializer, stateDesc);
}

private final Map<Class<? extends StateDescriptor>, SupplierWithException<IS, Exception>> stateFactories;

private final TypeSerializer<N> namespaceSerializer;
private final StateDescriptor<S, SV> stateDesc;
private final KeyedStateFactory originalStateFactory;
private final StateTtlConfig ttlConfig;
private final TtlTimeProvider timeProvider;
private final long ttl;

private TtlStateFactory(
TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, SV> stateDesc,
KeyedStateFactory originalStateFactory,
TtlTimeProvider timeProvider) {
this.namespaceSerializer = namespaceSerializer;
this.stateDesc = stateDesc;
this.originalStateFactory = originalStateFactory;
this.ttlConfig = stateDesc.getTtlConfig();
this.timeProvider = timeProvider;
this.ttl = ttlConfig.getTtl().toMilliseconds();
this.stateFactories = createStateFactories();
}

private Map<Class<? extends StateDescriptor>, SupplierWithException<IS, Exception>> createStateFactories() {
return Stream.of(
Tuple2.of(ValueStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createValueState),
Tuple2.of(ListStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createListState),
Tuple2.of(MapStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createMapState),
Tuple2.of(ReducingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createReducingState),
Tuple2.of(AggregatingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createAggregatingState),
Tuple2.of(FoldingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createFoldingState)
).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
}

private IS createState() throws Exception {
SupplierWithException<IS, Exception> stateFactory = stateFactories.get(stateDesc.getClass());
if (stateFactory == null) {
String message = String.format(“State %s is not supported by %s”,
stateDesc.getClass(), TtlStateFactory.class);
throw new FlinkRuntimeException(message);
}
return stateFactory.get();
}

@SuppressWarnings(“unchecked”)
private IS createValueState() throws Exception {
ValueStateDescriptor<TtlValue<SV>> ttlDescriptor = new ValueStateDescriptor<>(
stateDesc.getName(), new TtlSerializer<>(stateDesc.getSerializer()));
return (IS) new TtlValueState<>(
originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
ttlConfig, timeProvider, stateDesc.getSerializer());
}

@SuppressWarnings(“unchecked”)
private <T> IS createListState() throws Exception {
ListStateDescriptor<T> listStateDesc = (ListStateDescriptor<T>) stateDesc;
ListStateDescriptor<TtlValue<T>> ttlDescriptor = new ListStateDescriptor<>(
stateDesc.getName(), new TtlSerializer<>(listStateDesc.getElementSerializer()));
return (IS) new TtlListState<>(
originalStateFactory.createInternalState(
namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
ttlConfig, timeProvider, listStateDesc.getSerializer());
}

@SuppressWarnings(“unchecked”)
private <UK, UV> IS createMapState() throws Exception {
MapStateDescriptor<UK, UV> mapStateDesc = (MapStateDescriptor<UK, UV>) stateDesc;
MapStateDescriptor<UK, TtlValue<UV>> ttlDescriptor = new MapStateDescriptor<>(
stateDesc.getName(),
mapStateDesc.getKeySerializer(),
new TtlSerializer<>(mapStateDesc.getValueSerializer()));
return (IS) new TtlMapState<>(
originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
ttlConfig, timeProvider, mapStateDesc.getSerializer());
}

@SuppressWarnings(“unchecked”)
private IS createReducingState() throws Exception {
ReducingStateDescriptor<SV> reducingStateDesc = (ReducingStateDescriptor<SV>) stateDesc;
ReducingStateDescriptor<TtlValue<SV>> ttlDescriptor = new ReducingStateDescriptor<>(
stateDesc.getName(),
new TtlReduceFunction<>(reducingStateDesc.getReduceFunction(), ttlConfig, timeProvider),
new TtlSerializer<>(stateDesc.getSerializer()));
return (IS) new TtlReducingState<>(
originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
ttlConfig, timeProvider, stateDesc.getSerializer());
}

@SuppressWarnings(“unchecked”)
private <IN, OUT> IS createAggregatingState() throws Exception {
AggregatingStateDescriptor<IN, SV, OUT> aggregatingStateDescriptor =
(AggregatingStateDescriptor<IN, SV, OUT>) stateDesc;
TtlAggregateFunction<IN, SV, OUT> ttlAggregateFunction = new TtlAggregateFunction<>(
aggregatingStateDescriptor.getAggregateFunction(), ttlConfig, timeProvider);
AggregatingStateDescriptor<IN, TtlValue<SV>, OUT> ttlDescriptor = new AggregatingStateDescriptor<>(
stateDesc.getName(), ttlAggregateFunction, new TtlSerializer<>(stateDesc.getSerializer()));
return (IS) new TtlAggregatingState<>(
originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
ttlConfig, timeProvider, stateDesc.getSerializer(), ttlAggregateFunction);
}

@SuppressWarnings({“deprecation”, “unchecked”})
private <T> IS createFoldingState() throws Exception {
FoldingStateDescriptor<T, SV> foldingStateDescriptor = (FoldingStateDescriptor<T, SV>) stateDesc;
SV initAcc = stateDesc.getDefaultValue();
TtlValue<SV> ttlInitAcc = initAcc == null ? null : new TtlValue<>(initAcc, Long.MAX_VALUE);
FoldingStateDescriptor<T, TtlValue<SV>> ttlDescriptor = new FoldingStateDescriptor<>(
stateDesc.getName(),
ttlInitAcc,
new TtlFoldFunction<>(foldingStateDescriptor.getFoldFunction(), ttlConfig, timeProvider, initAcc),
new TtlSerializer<>(stateDesc.getSerializer()));
return (IS) new TtlFoldingState<>(
originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
ttlConfig, timeProvider, stateDesc.getSerializer());
}

//……
}

TtlStateFactory 的 createStateAndWrapWithTtlIfEnabled 方法这里会根据 stateDesc.getTtlConfig().isEnabled()来创建 state,如果开启 ttl 则调用 new TtlStateFactory<N, SV, S, IS>(namespaceSerializer, stateDesc, originalStateFactory, timeProvider).createState(),否则调用 originalStateFactory.createInternalState(namespaceSerializer, stateDesc)
这里 createStateFactories 创建了不同类型的 StateDescriptor 对应创建方法的 map,在 createState 的时候,根据指定类型自动调用对应的 SupplierWithException,省去 if else 的判断
ValueStateDescriptor 对应 createValueState 方法,创建的是 TtlValueState;ListStateDescriptor 对应 createListState 方法,创建的是 TtlListState;MapStateDescriptor 对应 createMapState 方法,创建的是 TtlMapState;ReducingStateDescriptor 对应 createReducingState 方法,创建的是 TtlReducingState;AggregatingStateDescriptor 对应 createAggregatingState 方法,创建的是 TtlAggregatingState;FoldingStateDescriptor 对应 createFoldingState 方法,创建的是 TtlFoldingState

小结

StateTtlConfig 用于设置 state 的 TTL 属性,这里主要设置 UpdateType、StateVisibility、TimeCharacteristic、Time、CleanupStrategies 这几个属性
AbstractKeyedStateBackend 的 getOrCreateKeyedState 方法里头使用 TtlStateFactory.createStateAndWrapWithTtlIfEnabled 来创建 InternalKvState
TtlStateFactory 的 createStateAndWrapWithTtlIfEnabled 方法这里会根据 stateDesc.getTtlConfig().isEnabled()来创建对应的 state;TtlStateFactory 的 createState 会根据不同类型的 StateDescriptor 创建对应类型的 ttl state

doc
State Time-To-Live (TTL)

正文完
 0