[case51]聊聊flink的StateDescriptor

47次阅读

共计 11905 个字符,预计需要花费 30 分钟才能阅读完成。


本文主要研究一下 flink 的 StateDescriptor
RuntimeContext.getState
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/functions/RuntimeContext.java
/**
* A RuntimeContext contains information about the context in which functions are executed. Each parallel instance
* of the function will have a context through which it can access static contextual information (such as
* the current parallelism) and other constructs like accumulators and broadcast variables.
*
* <p>A function can, during runtime, obtain the RuntimeContext via a call to
* {@link AbstractRichFunction#getRuntimeContext()}.
*/
@Public
public interface RuntimeContext {
//……

@PublicEvolving
<T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties);

@PublicEvolving
<T> ListState<T> getListState(ListStateDescriptor<T> stateProperties);

@PublicEvolving
<T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties);

@PublicEvolving
<IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties);

@PublicEvolving
@Deprecated
<T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties);

@PublicEvolving
<UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties);

}
RuntimeContext 针对各种 state 提供了根据对应 StateDescriptor 的 get 方法,比如提供了 getState 方法,通过 ValueStateDescriptor 参数来获取 ValueState;getListState 通过 ListStateDescriptor 获取 ListState;getReducingState 通过 ReducingStateDescriptor 获取 ReducingState;getAggregatingState 通过 AggregatingStateDescriptor 获取 AggregatingState;getFoldingState 通过 FoldingStateDescriptor 获取 FoldingState;getMapState 通过 MapStateDescriptor 获取 MapState
StateDescriptor
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/StateDescriptor.java
/**
* Base class for state descriptors. A {@code StateDescriptor} is used for creating partitioned
* {@link State} in stateful operations.
*
* <p>Subclasses must correctly implement {@link #equals(Object)} and {@link #hashCode()}.
*
* @param <S> The type of the State objects created from this {@code StateDescriptor}.
* @param <T> The type of the value of the state object described by this state descriptor.
*/
@PublicEvolving
public abstract class StateDescriptor<S extends State, T> implements Serializable {

/**
* An enumeration of the types of supported states. Used to identify the state type
* when writing and restoring checkpoints and savepoints.
*/
// IMPORTANT: Do not change the order of the elements in this enum, ordinal is used in serialization
public enum Type {
/**
* @deprecated Enum for migrating from old checkpoints/savepoint versions.
*/
@Deprecated
UNKNOWN,
VALUE,
LIST,
REDUCING,
FOLDING,
AGGREGATING,
MAP
}

private static final long serialVersionUID = 1L;

// ————————————————————————

/** Name that uniquely identifies state created from this StateDescriptor. */
protected final String name;

/** The serializer for the type. May be eagerly initialized in the constructor,
* or lazily once the {@link #initializeSerializerUnlessSet(ExecutionConfig)} method
* is called. */
@Nullable
protected TypeSerializer<T> serializer;

/** The type information describing the value type. Only used to if the serializer
* is created lazily. */
@Nullable
private TypeInformation<T> typeInfo;

/** Name for queries against state created from this StateDescriptor. */
@Nullable
private String queryableStateName;

/** Name for queries against state created from this StateDescriptor. */
@Nonnull
private StateTtlConfig ttlConfig = StateTtlConfig.DISABLED;

/** The default value returned by the state when no other value is bound to a key. */
@Nullable
protected transient T defaultValue;

// ————————————————————————

/**
* Create a new {@code StateDescriptor} with the given name and the given type serializer.
*
* @param name The name of the {@code StateDescriptor}.
* @param serializer The type serializer for the values in the state.
* @param defaultValue The default value that will be set when requesting state without setting
* a value before.
*/
protected StateDescriptor(String name, TypeSerializer<T> serializer, @Nullable T defaultValue) {
this.name = checkNotNull(name, “name must not be null”);
this.serializer = checkNotNull(serializer, “serializer must not be null”);
this.defaultValue = defaultValue;
}

/**
* Create a new {@code StateDescriptor} with the given name and the given type information.
*
* @param name The name of the {@code StateDescriptor}.
* @param typeInfo The type information for the values in the state.
* @param defaultValue The default value that will be set when requesting state without setting
* a value before.
*/
protected StateDescriptor(String name, TypeInformation<T> typeInfo, @Nullable T defaultValue) {
this.name = checkNotNull(name, “name must not be null”);
this.typeInfo = checkNotNull(typeInfo, “type information must not be null”);
this.defaultValue = defaultValue;
}

/**
* Create a new {@code StateDescriptor} with the given name and the given type information.
*
* <p>If this constructor fails (because it is not possible to describe the type via a class),
* consider using the {@link #StateDescriptor(String, TypeInformation, Object)} constructor.
*
* @param name The name of the {@code StateDescriptor}.
* @param type The class of the type of values in the state.
* @param defaultValue The default value that will be set when requesting state without setting
* a value before.
*/
protected StateDescriptor(String name, Class<T> type, @Nullable T defaultValue) {
this.name = checkNotNull(name, “name must not be null”);
checkNotNull(type, “type class must not be null”);

try {
this.typeInfo = TypeExtractor.createTypeInfo(type);
} catch (Exception e) {
throw new RuntimeException(
“Could not create the type information for ‘” + type.getName() + “‘. ” +
“The most common reason is failure to infer the generic type information, due to Java’s type erasure. ” +
“In that case, please pass a ‘TypeHint’ instead of a class to describe the type. ” +
“For example, to describe ‘Tuple2<String, String>’ as a generic type, use ” +
“‘new PravegaDeserializationSchema<>(new TypeHint<Tuple2<String, String>>(){}, serializer);'”, e);
}

this.defaultValue = defaultValue;
}

// ————————————————————————

/**
* Returns the name of this {@code StateDescriptor}.
*/
public String getName() {
return name;
}

/**
* Returns the default value.
*/
public T getDefaultValue() {
if (defaultValue != null) {
if (serializer != null) {
return serializer.copy(defaultValue);
} else {
throw new IllegalStateException(“Serializer not yet initialized.”);
}
} else {
return null;
}
}

/**
* Returns the {@link TypeSerializer} that can be used to serialize the value in the state.
* Note that the serializer may initialized lazily and is only guaranteed to exist after
* calling {@link #initializeSerializerUnlessSet(ExecutionConfig)}.
*/
public TypeSerializer<T> getSerializer() {
if (serializer != null) {
return serializer.duplicate();
} else {
throw new IllegalStateException(“Serializer not yet initialized.”);
}
}

/**
* Sets the name for queries of state created from this descriptor.
*
* <p>If a name is set, the created state will be published for queries
* during runtime. The name needs to be unique per job. If there is another
* state instance published under the same name, the job will fail during runtime.
*
* @param queryableStateName State name for queries (unique name per job)
* @throws IllegalStateException If queryable state name already set
*/
public void setQueryable(String queryableStateName) {
Preconditions.checkArgument(
ttlConfig.getUpdateType() == StateTtlConfig.UpdateType.Disabled,
“Queryable state is currently not supported with TTL”);
if (this.queryableStateName == null) {
this.queryableStateName = Preconditions.checkNotNull(queryableStateName, “Registration name”);
} else {
throw new IllegalStateException(“Queryable state name already set”);
}
}

/**
* Returns the queryable state name.
*
* @return Queryable state name or <code>null</code> if not set.
*/
@Nullable
public String getQueryableStateName() {
return queryableStateName;
}

/**
* Returns whether the state created from this descriptor is queryable.
*
* @return <code>true</code> if state is queryable, <code>false</code>
* otherwise.
*/
public boolean isQueryable() {
return queryableStateName != null;
}

/**
* Configures optional activation of state time-to-live (TTL).
*
* <p>State user value will expire, become unavailable and be cleaned up in storage
* depending on configured {@link StateTtlConfig}.
*
* @param ttlConfig configuration of state TTL
*/
public void enableTimeToLive(StateTtlConfig ttlConfig) {
Preconditions.checkNotNull(ttlConfig);
Preconditions.checkArgument(
ttlConfig.getUpdateType() != StateTtlConfig.UpdateType.Disabled &&
queryableStateName == null,
“Queryable state is currently not supported with TTL”);
this.ttlConfig = ttlConfig;
}

@Nonnull
@Internal
public StateTtlConfig getTtlConfig() {
return ttlConfig;
}

// ————————————————————————

/**
* Checks whether the serializer has been initialized. Serializer initialization is lazy,
* to allow parametrization of serializers with an {@link ExecutionConfig} via
* {@link #initializeSerializerUnlessSet(ExecutionConfig)}.
*
* @return True if the serializers have been initialized, false otherwise.
*/
public boolean isSerializerInitialized() {
return serializer != null;
}

/**
* Initializes the serializer, unless it has been initialized before.
*
* @param executionConfig The execution config to use when creating the serializer.
*/
public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) {
if (serializer == null) {
checkState(typeInfo != null, “no serializer and no type info”);

// instantiate the serializer
serializer = typeInfo.createSerializer(executionConfig);

// we can drop the type info now, no longer needed
typeInfo = null;
}
}

// ————————————————————————
// Standard Utils
// ————————————————————————

@Override
public final int hashCode() {
return name.hashCode() + 31 * getClass().hashCode();
}

@Override
public final boolean equals(Object o) {
if (o == this) {
return true;
}
else if (o != null && o.getClass() == this.getClass()) {
final StateDescriptor<?, ?> that = (StateDescriptor<?, ?>) o;
return this.name.equals(that.name);
}
else {
return false;
}
}

@Override
public String toString() {
return getClass().getSimpleName() +
“{name=” + name +
“, defaultValue=” + defaultValue +
“, serializer=” + serializer +
(isQueryable() ? “, queryableStateName=” + queryableStateName + “” : “”) +
‘}’;
}

public abstract Type getType();

// ————————————————————————
// Serialization
// ————————————————————————

private void writeObject(final ObjectOutputStream out) throws IOException {
// write all the non-transient fields
out.defaultWriteObject();

// write the non-serializable default value field
if (defaultValue == null) {
// we don’t have a default value
out.writeBoolean(false);
} else {
// we have a default value
out.writeBoolean(true);

byte[] serializedDefaultValue;
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(baos)) {

TypeSerializer<T> duplicateSerializer = serializer.duplicate();
duplicateSerializer.serialize(defaultValue, outView);

outView.flush();
serializedDefaultValue = baos.toByteArray();
}
catch (Exception e) {
throw new IOException(“Unable to serialize default value of type ” +
defaultValue.getClass().getSimpleName() + “.”, e);
}

out.writeInt(serializedDefaultValue.length);
out.write(serializedDefaultValue);
}
}

private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
// read the non-transient fields
in.defaultReadObject();

// read the default value field
boolean hasDefaultValue = in.readBoolean();
if (hasDefaultValue) {
int size = in.readInt();

byte[] buffer = new byte[size];

in.readFully(buffer);

try (ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(bais)) {

defaultValue = serializer.deserialize(inView);
}
catch (Exception e) {
throw new IOException(“Unable to deserialize default value.”, e);
}
} else {
defaultValue = null;
}
}
}

StateDescriptor 是 ValueStateDescriptor、ListStateDescriptor、ReducingStateDescriptor、FoldingStateDescriptor、AggregatingStateDescriptor、MapStateDescriptor 的基类,它定义了一个抽象方法,返回 Type 类型 (VALUE,LIST,EDUCING,FOLDING,AGGREGATING,MAP),用于各个子类表达自己的 Type 类型
StateDescriptor 提供了几个构造器,用于传递 name、TypeSerializer 或 TypeInformation 或 Class 类型信息、defaultValue
StateDescriptor 重写了 equals 及 hashCode 方法;它还实现了 Serializable 接口,另外还通过 writeObject 及 readObject 自定义序列化过程

小结

RuntimeContext 针对各种 state 提供了根据对应 StateDescriptor 的 get 方法,比如 getState、getListState、getReducingState、getAggregatingState、getFoldingState、getMapState
StateDescriptor 是 ValueStateDescriptor、ListStateDescriptor、ReducingStateDescriptor、FoldingStateDescriptor、AggregatingStateDescriptor、MapStateDescriptor 的基类,它定义了一个抽象方法,返回 Type 类型 (VALUE,LIST,EDUCING,FOLDING,AGGREGATING,MAP),用于各个子类表达自己的 Type 类型
StateDescriptor 重写了 equals 及 hashCode 方法;它还实现了 Serializable 接口,另外还通过 writeObject 及 readObject 自定义序列化过程

doc
Using Managed Keyed State

正文完
 0