聊聊flink的Managed Keyed State

50次阅读

共计 7848 个字符,预计需要花费 20 分钟才能阅读完成。


本文主要研究一下 flink 的 Managed Keyed State
State
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/State.java
/**
* Interface that different types of partitioned state must implement.
*
* <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
* automatically supplied by the system, so the function always sees the value mapped to the
* key of the current element. That way, the system can handle stream and state partitioning
* consistently together.
*/
@PublicEvolving
public interface State {

/**
* Removes the value mapped under the current key.
*/
void clear();
}
State 是所有不同类型的 State 必须实现的接口,它定义了 clear 方法
ValueState
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/ValueState.java
@PublicEvolving
public interface ValueState<T> extends State {

/**
* Returns the current value for the state. When the state is not
* partitioned the returned value is the same for all inputs in a given
* operator instance. If state partitioning is applied, the value returned
* depends on the current operator input, as the operator maintains an
* independent state for each partition.
*
* <p>If you didn’t specify a default value when creating the {@link ValueStateDescriptor}
* this will return {@code null} when to value was previously set using {@link #update(Object)}.
*
* @return The state value corresponding to the current input.
*
* @throws IOException Thrown if the system cannot access the state.
*/
T value() throws IOException;

/**
* Updates the operator state accessible by {@link #value()} to the given
* value. The next time {@link #value()} is called (for the same state
* partition) the returned state will represent the updated value. When a
* partitioned state is updated with null, the state for the current key
* will be removed and the default value is returned on the next access.
*
* @param value The new value for the state.
*
* @throws IOException Thrown if the system cannot access the state.
*/
void update(T value) throws IOException;

}
ValueState 继承了 State 接口,它定义了 value、update 两个方法,一个用于取值,一个用于更新值
AppendingState
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/AppendingState.java
@PublicEvolving
public interface AppendingState<IN, OUT> extends State {

/**
* Returns the current value for the state. When the state is not
* partitioned the returned value is the same for all inputs in a given
* operator instance. If state partitioning is applied, the value returned
* depends on the current operator input, as the operator maintains an
* independent state for each partition.
*
* <p><b>NOTE TO IMPLEMENTERS:</b> if the state is empty, then this method
* should return {@code null}.
*
* @return The operator state value corresponding to the current input or {@code null}
* if the state is empty.
*
* @throws Exception Thrown if the system cannot access the state.
*/
OUT get() throws Exception;

/**
* Updates the operator state accessible by {@link #get()} by adding the given value
* to the list of values. The next time {@link #get()} is called (for the same state
* partition) the returned state will represent the updated list.
*
* <p>If null is passed in, the state value will remain unchanged.
*
* @param value The new value for the state.
*
* @throws Exception Thrown if the system cannot access the state.
*/
void add(IN value) throws Exception;

}
AppendingState 继承了 State 接口,它定义了 get、add 方法,该 State 接收 IN、OUT 两个泛型
FoldingState
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/FoldingState.java
@PublicEvolving
@Deprecated
public interface FoldingState<T, ACC> extends AppendingState<T, ACC> {}
FoldingState 继承了 AppendingState,其中 OUT 泛型表示 ACC,即累积值;FoldingState 在 Flink 1.4 版本被标记为废弃,后续会被移除掉,可使用 AggregatingState 替代
MergingState
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/MergingState.java
/**
* Extension of {@link AppendingState} that allows merging of state. That is, two instances
* of {@link MergingState} can be combined into a single instance that contains all the
* information of the two merged states.
*
* @param <IN> Type of the value that can be added to the state.
* @param <OUT> Type of the value that can be retrieved from the state.
*/
@PublicEvolving
public interface MergingState<IN, OUT> extends AppendingState<IN, OUT> {}
MergingState 继承了 AppendingState,这里用命名表达 merge state 的意思,它有几个子接口,分别是 ListState、ReducingState、AggregatingState
ListState
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/ListState.java
@PublicEvolving
public interface ListState<T> extends MergingState<T, Iterable<T>> {

/**
* Updates the operator state accessible by {@link #get()} by updating existing values to
* to the given list of values. The next time {@link #get()} is called (for the same state
* partition) the returned state will represent the updated list.
*
* <p>If null or an empty list is passed in, the state value will be null.
*
* @param values The new values for the state.
*
* @throws Exception The method may forward exception thrown internally (by I/O or functions).
*/
void update(List<T> values) throws Exception;

/**
* Updates the operator state accessible by {@link #get()} by adding the given values
* to existing list of values. The next time {@link #get()} is called (for the same state
* partition) the returned state will represent the updated list.
*
* <p>If null or an empty list is passed in, the state value remains unchanged.
*
* @param values The new values to be added to the state.
*
* @throws Exception The method may forward exception thrown internally (by I/O or functions).
*/
void addAll(List<T> values) throws Exception;
}
ListState 继承了 MergingState,它的 OUT 类型为 Iterable<IN>;它主要用于 operation 存储 partitioned list state,它继承了 MergingState 接口 (指定 OUT 的泛型为 Iterable<T>),同时声明了两个方法;其中 update 用于全量更新 state,如果参数为 null 或者 empty,那么 state 会被清空;addAll 方法用于增量更新,如果参数为 null 或者 empty,则保持不变,否则则新增给定的 values
ReducingState
flink-core/1.7.0/flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/ReducingState.java
@PublicEvolving
public interface ReducingState<T> extends MergingState<T, T> {}
ReducingState 继承了 MergingState,它的 IN、OUT 类型相同
AggregatingState
flink-core/1.7.0/flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/AggregatingState.java
@PublicEvolving
public interface AggregatingState<IN, OUT> extends MergingState<IN, OUT> {}
AggregatingState 继承了 MergingState,它与 ReducingState 不同,IN、OUT 类型可以不同
MapState
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/MapState.java
@PublicEvolving
public interface MapState<UK, UV> extends State {

/**
* Returns the current value associated with the given key.
*
* @param key The key of the mapping
* @return The value of the mapping with the given key
*
* @throws Exception Thrown if the system cannot access the state.
*/
UV get(UK key) throws Exception;

/**
* Associates a new value with the given key.
*
* @param key The key of the mapping
* @param value The new value of the mapping
*
* @throws Exception Thrown if the system cannot access the state.
*/
void put(UK key, UV value) throws Exception;

/**
* Copies all of the mappings from the given map into the state.
*
* @param map The mappings to be stored in this state
*
* @throws Exception Thrown if the system cannot access the state.
*/
void putAll(Map<UK, UV> map) throws Exception;

/**
* Deletes the mapping of the given key.
*
* @param key The key of the mapping
*
* @throws Exception Thrown if the system cannot access the state.
*/
void remove(UK key) throws Exception;

/**
* Returns whether there exists the given mapping.
*
* @param key The key of the mapping
* @return True if there exists a mapping whose key equals to the given key
*
* @throws Exception Thrown if the system cannot access the state.
*/
boolean contains(UK key) throws Exception;

/**
* Returns all the mappings in the state.
*
* @return An iterable view of all the key-value pairs in the state.
*
* @throws Exception Thrown if the system cannot access the state.
*/
Iterable<Map.Entry<UK, UV>> entries() throws Exception;

/**
* Returns all the keys in the state.
*
* @return An iterable view of all the keys in the state.
*
* @throws Exception Thrown if the system cannot access the state.
*/
Iterable<UK> keys() throws Exception;

/**
* Returns all the values in the state.
*
* @return An iterable view of all the values in the state.
*
* @throws Exception Thrown if the system cannot access the state.
*/
Iterable<UV> values() throws Exception;

/**
* Iterates over all the mappings in the state.
*
* @return An iterator over all the mappings in the state
*
* @throws Exception Thrown if the system cannot access the state.
*/
Iterator<Map.Entry<UK, UV>> iterator() throws Exception;
}
MapState 直接继承了 State,它接收 UK、UV 两个泛型,分别是 map 的 key 和 value 的类型
小结

flink 提供了好几个不同类型的 Managed Keyed State,有 ValueState<T>、ListState<T>、ReducingState<T>、AggregatingState<IN, OUT>、FoldingState<T, ACC>、MapState<UK, UV>
ValueState<T> 和 MapState<UK, UV> 是直接继承 State 接口;FoldingState 继承了 AppendingState<IN, OUT>(AppendingState 直接继承了 State);ListState、ReducingState、AggregatingState 继承了 MergingState<IN, OUT>(MergingState 继承了 AppendingState)
FoldingState 在 Flink 1.4 版本被标记为废弃,后续会被移除掉,可使用 AggregatingState 替代

doc
Using Managed Keyed State

正文完
 0