文:王东阳
前言
在Flink中依据数据集是否依据Key进行分区,将状态分为Keyed State和Operator State(Non-keyed State)两种类型 ,在之前的文章《Flink中基于KeyedState的计算开发方法》曾经具体介绍了Keyed State的概念和用法,本文将持续介绍Operator State。
Operator State与Keyed State不同的是,Operator State只和并行的算子实例绑定,和数据元素中的key无关,每个算子实例中持有所有数据元素中的一部分状态数据。Operator State反对当算子实例并行度发生变化时主动重新分配状态数据, OperatorState目前只反对应用ListState。
Operator State与并行的操作算子实例相关联,例如在Kafka Connector中,每个Kafka生产端算子实例都对应到Kafka的一个分区中,保护Topic分区和Offsets偏移量作为算子的Operator State 在Flink中能够通过 Checkpointed-Function 或者 ListCheckpointed<T extends Serializable>两个接口来定义操作Operator State的函数。
Operator State开发实战
本章节将通过理论的我的项目代码演示Operator State在两种不同计算场景下的开发方法。
在样例中将演示Operator State如何交融进入Flink 的DataStream API,让用户在开发Flink利用的时候,能够将长期数据保留在State中,从State中读取数据,在运行的时候,在运行层面上与算子、Function体系交融,主动对State进行备份Checkpoint,一旦出现异常可能从保留的State中复原状态,实现Exactly-Once 。
通过CheckpointedFunction接口操作Operator State
CheckpointedFunction接口定义如代码所示,须要实现两个办法,当checkpoint触发时就会调用snapshotState()办法,当初始化自定义函数的时候会调用initializeState()办法,其中包含第一次初始化函数和从之前的checkpoints中复原状态数据,同时initializeState()办法中须要蕴含两套逻辑,
一个是不同类型状态数据初始化的逻辑,
一个是从之前的状态中复原数据的逻辑
@Publicpublic interface CheckpointedFunction { // 每当 checkpoint 触发的时候 调用这个办法 void snapshotState(FunctionSnapshotContext var1) throws Exception; // 每次 自定义函数初始化的时候 调用此办法初始化 void initializeState(FunctionInitializationContext var1) throws Exception;}
在每个算子中Managed Operator State都是以List模式存储,算子和算子之间的状态数据互相独立,List存储比拟适宜于状态数据的从新散布,Flink目前反对对Managed Operator State两种重散布的策略,别离是Even-split Redistribution和Union Redistribution。
Even-split Redistribution:每个算子实例中含有局部状态元素的List列表,整个状态数据是所有List列表的合集。当触发 restore/redistribution 动作时,通过将状态数据平均分配成与算子并行度 雷同数量的List列表,每个task实例中有一个List,其能够为空或者含有多个元素
Union Redistribution:每个算子实例中含有所有状态元素的List列表,当触发 restore/redistribution 动作时,每个算子都可能获取到残缺的状态元素列表
实现FlatMapFunction和CheckpointedFunction
在理论我的项目中能够通过实现FlatMapFunction和CheckpointedFunction实现对输出数据中每个key的数据元素数量和算子中的元素数量的统计。如代码所示,通过在initializeState()办法中别离创立keyedState和operatorState两种State,存储基于Key相干的状态值以及基于算子的状态值。
import com.google.common.collect.Lists;import java.io.IOException;import java.util.List;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.state.ListState;import org.apache.flink.api.common.state.ListStateDescriptor;import org.apache.flink.api.common.state.ValueState;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.runtime.state.FunctionInitializationContext;import org.apache.flink.runtime.state.FunctionSnapshotContext;import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;import org.apache.flink.util.Collector;import org.apache.log4j.Logger;public class CheckpointCount implements FlatMapFunction<Tuple2<Integer, Long>, Tuple3<Integer, Long, Long>>, CheckpointedFunction { private static final Logger logger = Logger.getLogger(CheckpointCount.class); private Long operatorCount; private ValueState<Long> keyedState; private ListState<Long> operatorState; public void CheckpointCount() { } @Override public void flatMap( Tuple2<Integer, Long> integerLongTuple2, Collector<Tuple3<Integer, Long, Long>> collector) throws Exception { if (integerLongTuple2.f0 == 4) { throw new IOException("input "); } if (keyedState.value() == null) { keyedState.update(1L); } else { keyedState.update(keyedState.value() + 1L); } operatorCount = operatorCount + 1; //输入后果,包含id,id对应的数量统计keyedCount,算子输出数据的数量统计 operatorCount collector.collect(Tuple3.of(integerLongTuple2.f0, keyedState.value(), operatorCount)); } @Override public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { System.out.println("snapshot"); operatorState.clear(); operatorState.add(operatorCount); } @Override public void initializeState(FunctionInitializationContext ctx) throws Exception { System.out.println("initialize"); logger.debug("init"); keyedState = ctx.getKeyedStateStore().getState(new ValueStateDescriptor<Long>("keyedState", Long.class)); operatorState = ctx.getOperatorStateStore().getListState(new ListStateDescriptor<Long>( "operatorState", Long.class)); operatorCount = 0L; if (ctx.isRestored()) { List<Long> op = Lists.newArrayList(operatorState.get()); if (op.size() > 0 ) { operatorCount = op.get(op.size()-1); } System.out.println("restored"); } }}
能够从上述代码中看到的是,在 snapshotState() 办法中清理掉上一次checkpoint中存储的operatorState的数据,而后再增加并更新本次算子中须要checkpoint的operatorCount状态变量。当零碎重启时会调用initializeState办法,从新复原keyedState和operatorState,其中operatorCount数据能够从最新的operatorState中复原。
验证代码
构建验证代码如下:
private static void checkpointOperatorStateWithMapFunction() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(3000); // if commit this line off, only got initializeState // env.getCheckpointConfig().set DataStreamSource<String> localhost = env.socketTextStream("localhost", 1111); SingleOutputStreamOperator<Tuple2<Integer, Long>> inputStream= localhost.map(new MapFunction<String, Tuple2<Integer, Long>>() { @Override public Tuple2<Integer, Long> map(String s) throws Exception { String[] split = s.split(","); return new Tuple2<>(Integer.valueOf(split[0]), Long.valueOf(split[1])); } }); inputStream.keyBy(0).flatMap(new CheckpointCount()).print(); env.execute("checkpoint state for map"); }
通过nc,咱们输出以下数据
DESKTOP-SPIDEIC:~$ nc -lk 11112,12,22,33,13,23,3
失去打印输出
initializesnapshotsnapshotsnapshot(2,1,1)(2,2,2)(2,3,3)snapshot(3,1,4)(3,2,5)snapshot(3,3,6)snapshot
能够看到
因为 keyedState 是跟key相干的,所以当integerLongTuple2.f0从2变为3的时候, keyedState 是从新初始化,从1开始递增
因为 operatorState只跟算子相干的,所以始终在递增
因为代码中应用 env.enableCheckpointing(3000) 开启了checkpoint,能够看到 snapshotState 中的日志打印进去
咱们等(3,3,6)前面的snapshot打印进去后,接下来通过nc持续输出
4,3
因为咱们在代码设置当 if (integerLongTuple2.f0 == 4) 的时候抛出异样,所以此刻flink程序就会退出,而后重启,进入到 initializeState
看到对应的打印如下
initialize
restored
snapshot
snapshot
能够看到初始化(initialize)以及复原(restored)的逻辑都执行到了,咱们通过nc持续输出 3,5 ,能够看到程序打印出 (3,4,7) , 阐明 keyedState 以及operatorCount 都失常复原了之前的值。
对于状态数据重散布策略的应用,能够在创立operatorState的过程中通过相应的办法指定:如果应用Even-split Redistribution策 略,则通过context. getListState(descriptor)获取OperatorState;如果应用Union Redistribution策略,则通过context.getUnionList State(descriptor) 来获取。实例代码中默认应用的Even-split Redistribution策略。
通过CheckpointedFunction构建带缓冲区的Sink
上面咱们再看另外一个例子,构建一个带缓冲的Sink。如代码所示,通过checkpointState保留以后曾经承受的所有元素列表。
实现SinkFunction和CheckpointedFunction
在snapshotState()办法中清理掉上一次checkpoint中存储的operatorState的数据,而后再增加本次算子中须要checkpoint的bufferedElements中的每一个元素。
当零碎重启时会调用initializeState办法,从新复原operatorState,其中bufferedElements中的数据能够从 checkpointState 中复原
invoke 会在算子收到每一个元素时调用
finish 会在算子收到上游所有元素后调用
import java.io.IOException;import java.util.ArrayList;import java.util.List;import org.apache.flink.api.common.state.ListState;import org.apache.flink.api.common.state.ListStateDescriptor;import org.apache.flink.api.common.typeinfo.TypeHint;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.runtime.state.FunctionInitializationContext;import org.apache.flink.runtime.state.FunctionSnapshotContext;import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;import org.apache.flink.streaming.api.functions.sink.SinkFunction;public class BufferingSink implements SinkFunction<Tuple2<Integer, Long>>, CheckpointedFunction { private final int threshold; private transient ListState<Tuple2<Integer, Long>> checkpointState; private List<Tuple2<Integer, Long>> bufferedElements; public BufferingSink(int threshold){ this.bufferedElements = new ArrayList<>(); this.threshold = threshold; } @Override public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { System.out.println("snapshot"); checkpointState.clear(); // for (Tuple2<Integer, Long> element: bufferedElements) { // checkpointState.add(element); // } checkpointState.addAll(bufferedElements); } @Override public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception { System.out.println("initialize"); ListStateDescriptor<Tuple2<Integer, Long>> descriptor = new ListStateDescriptor<Tuple2<Integer, Long>>( "buffered-elements", TypeInformation.of(new TypeHint<Tuple2<Integer, Long>>() {}) ); checkpointState = functionInitializationContext.getOperatorStateStore().getListState(descriptor); if (functionInitializationContext.isRestored()) { for (Tuple2<Integer, Long> element : checkpointState.get()){ bufferedElements.add(element); } } } @Override public void invoke( Tuple2<Integer, Long> value, Context context) throws Exception { // called for each element // SinkFunction.super.invoke(value, context); System.out.println(String.format("recv %d %d", value.f0, value.f1)); bufferedElements.add(value); if (value.f0 == 4) { throw new IOException("input "); } } @Override public void finish() throws Exception { // called when every element has received // SinkFunction.super.finish(); for (Tuple2<Integer, Long> element: bufferedElements) { System.out.println(element); } }}
验证代码
构建验证程序如下:
private static void checkpointListStateWithSinkFunction() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(3000); // if commit this line off, only got initializeState DataStreamSource<String> localhost = env.socketTextStream("localhost", 1111); SingleOutputStreamOperator<Tuple2<Integer, Long>> inputStream= localhost.map(new MapFunction<String, Tuple2<Integer, Long>>() { @Override public Tuple2<Integer, Long> map(String s) throws Exception { String[] split = s.split(","); return new Tuple2<>(Integer.valueOf(split[0]), Long.valueOf(split[1])); } }); inputStream.addSink(new BufferingSink(2)); env.execute("checkpoint state for sink"); }
通过 nc 输出如下数据
wdy@DESKTOP-SPIDEIC:~$ nc -lk 1111
2,1
2,2
2,3
flink程序输入如下打印:
initialize
snapshot
recv 2 1
recv 2 2
snapshot
recv 2 3
snapshot
等 recv 2 3 之后看到 snapshot 打印进去后,用于确保最初一条(2,3)也保留到了checkpoint中,通过 nc 输出
4,0
因为咱们在代码设置当 if (integerLongTuple2.f0 == 4) 的时候抛出异样,所以此刻flink程序就会退出,而后重启,进入到 initializeState
flink程序输入如下打印:
recv 4 0
initialize
restored
snapshotrecv 4 0
initialize
restored
snapshot
能够看到 正确进入到initializeState 并且执行了复原逻辑,接下来通过 nc 输出
2,4
flink程序输入如下打印
recv 2 4
snapshot
接下来 Ctrl+C 进行 nc ,进入到 finish()函数中,flink程序输入
(2,1)
(2,2)
(2,3)
(2,4)
表明flink异样退出重启后,正确从checkpointState复原了之前的数据。
通过ListCheckpointed接口定义Operator State
ListCheckpointed接口和CheckpointedFunction接口相比在灵活性上绝对弱一些,只能反对List类型的状态,并且在数据恢复的时候仅反对even-redistribution策略。在ListCheckpointed接口中须要实现以下两个办法来操作Operator State:
public interface ListCheckpointed<T extends Serializable> { List<T> snapshotState(long var1, long var3) throws Exception; void restoreState(List<T> var1) throws Exception;}
其中snapshotState办法定义数据元素List存储到checkpoints的逻辑,restoreState办法则定义从checkpoints中复原状态的逻辑。如果状态数据不反对List模式,则能够在snapshotState办法中返回Collections.singletonList(STATE)。
这个接口在Flink 1.14中曾经不倡议应用,所以本文也不再进行实例演示。
总结
本文介绍了OperateState在MapFunction以及SlinkFunction两种操作场景中的利用,同时展现了如何通过联合CheckpointedFunction主动对State进行备份Checkpoint,从而在工作出现异常时可能从保留的State中复原状态,实现Exactly-Once。
参考资料
《Flink原理、实战与性能优化》5.1 有状态计算
《Flink内核原理与实现》第7章 状态原理
Flink教程(17) Keyed State状态治理之AggregatingState应用案例 求平均值 https://blog.csdn.net/winterkin