文:王东阳

前言

在Flink中依据数据集是否依据Key进行分区,将状态分为Keyed State和Operator State(Non-keyed State)两种类型 ,在之前的文章《Flink中基于KeyedState的计算开发方法》曾经具体介绍了Keyed State的概念和用法,本文将持续介绍Operator State。

Operator State与Keyed State不同的是,Operator State只和并行的算子实例绑定,和数据元素中的key无关,每个算子实例中持有所有数据元素中的一部分状态数据。Operator State反对当算子实例并行度发生变化时主动重新分配状态数据, OperatorState目前只反对应用ListState。

Operator State与并行的操作算子实例相关联,例如在Kafka Connector中,每个Kafka生产端算子实例都对应到Kafka的一个分区中,保护Topic分区和Offsets偏移量作为算子的Operator State 在Flink中能够通过 Checkpointed-Function 或者 ListCheckpointed<T extends Serializable>两个接口来定义操作Operator State的函数。

Operator State开发实战

本章节将通过理论的我的项目代码演示Operator State在两种不同计算场景下的开发方法。

在样例中将演示Operator State如何交融进入Flink 的DataStream API,让用户在开发Flink利用的时候,能够将长期数据保留在State中,从State中读取数据,在运行的时候,在运行层面上与算子、Function体系交融,主动对State进行备份Checkpoint,一旦出现异常可能从保留的State中复原状态,实现Exactly-Once 。

通过CheckpointedFunction接口操作Operator State
CheckpointedFunction接口定义如代码所示,须要实现两个办法,当checkpoint触发时就会调用snapshotState()办法,当初始化自定义函数的时候会调用initializeState()办法,其中包含第一次初始化函数和从之前的checkpoints中复原状态数据,同时initializeState()办法中须要蕴含两套逻辑,

一个是不同类型状态数据初始化的逻辑,
一个是从之前的状态中复原数据的逻辑

@Publicpublic interface CheckpointedFunction {    // 每当 checkpoint 触发的时候 调用这个办法  void snapshotState(FunctionSnapshotContext var1) throws Exception;    // 每次 自定义函数初始化的时候 调用此办法初始化  void initializeState(FunctionInitializationContext var1) throws Exception;}

在每个算子中Managed Operator State都是以List模式存储,算子和算子之间的状态数据互相独立,List存储比拟适宜于状态数据的从新散布,Flink目前反对对Managed Operator State两种重散布的策略,别离是Even-split Redistribution和Union Redistribution。

Even-split Redistribution:每个算子实例中含有局部状态元素的List列表,整个状态数据是所有List列表的合集。当触发 restore/redistribution 动作时,通过将状态数据平均分配成与算子并行度 雷同数量的List列表,每个task实例中有一个List,其能够为空或者含有多个元素
Union Redistribution:每个算子实例中含有所有状态元素的List列表,当触发 restore/redistribution 动作时,每个算子都可能获取到残缺的状态元素列表
实现FlatMapFunction和CheckpointedFunction
在理论我的项目中能够通过实现FlatMapFunction和CheckpointedFunction实现对输出数据中每个key的数据元素数量和算子中的元素数量的统计。如代码所示,通过在initializeState()办法中别离创立keyedState和operatorState两种State,存储基于Key相干的状态值以及基于算子的状态值。

import com.google.common.collect.Lists;import java.io.IOException;import java.util.List;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.state.ListState;import org.apache.flink.api.common.state.ListStateDescriptor;import org.apache.flink.api.common.state.ValueState;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.runtime.state.FunctionInitializationContext;import org.apache.flink.runtime.state.FunctionSnapshotContext;import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;import org.apache.flink.util.Collector;import org.apache.log4j.Logger;public class CheckpointCount    implements FlatMapFunction<Tuple2<Integer, Long>, Tuple3<Integer, Long, Long>>, CheckpointedFunction {  private static final Logger logger = Logger.getLogger(CheckpointCount.class);  private Long operatorCount;  private ValueState<Long> keyedState;  private ListState<Long> operatorState;  public void CheckpointCount() {  }  @Override  public void flatMap(      Tuple2<Integer, Long> integerLongTuple2, Collector<Tuple3<Integer, Long, Long>> collector) throws Exception {    if (integerLongTuple2.f0 == 4) {      throw new IOException("input ");    }    if (keyedState.value() == null) {      keyedState.update(1L);    } else {      keyedState.update(keyedState.value() + 1L);    }    operatorCount = operatorCount + 1;    //输入后果,包含id,id对应的数量统计keyedCount,算子输出数据的数量统计 operatorCount    collector.collect(Tuple3.of(integerLongTuple2.f0, keyedState.value(), operatorCount));  }  @Override  public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {    System.out.println("snapshot");    operatorState.clear();    operatorState.add(operatorCount);  }  @Override  public void initializeState(FunctionInitializationContext ctx) throws Exception {    System.out.println("initialize");    logger.debug("init");    keyedState = ctx.getKeyedStateStore().getState(new ValueStateDescriptor<Long>("keyedState", Long.class));    operatorState = ctx.getOperatorStateStore().getListState(new ListStateDescriptor<Long>(        "operatorState",        Long.class));    operatorCount = 0L;    if (ctx.isRestored()) {      List<Long> op = Lists.newArrayList(operatorState.get());      if (op.size() > 0 ) {        operatorCount = op.get(op.size()-1);      }      System.out.println("restored");    }  }}

能够从上述代码中看到的是,在 snapshotState() 办法中清理掉上一次checkpoint中存储的operatorState的数据,而后再增加并更新本次算子中须要checkpoint的operatorCount状态变量。当零碎重启时会调用initializeState办法,从新复原keyedState和operatorState,其中operatorCount数据能够从最新的operatorState中复原。

验证代码

构建验证代码如下:

private static void checkpointOperatorStateWithMapFunction() throws Exception {    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();    env.setParallelism(1);    env.enableCheckpointing(3000); // if commit this line off, only got initializeState    // env.getCheckpointConfig().set    DataStreamSource<String> localhost = env.socketTextStream("localhost", 1111);    SingleOutputStreamOperator<Tuple2<Integer, Long>>  inputStream= localhost.map(new MapFunction<String,        Tuple2<Integer, Long>>() {      @Override      public Tuple2<Integer, Long> map(String s) throws Exception {        String[] split = s.split(",");        return new Tuple2<>(Integer.valueOf(split[0]), Long.valueOf(split[1]));      }    });    inputStream.keyBy(0).flatMap(new CheckpointCount()).print();    env.execute("checkpoint state for map");  }

通过nc,咱们输出以下数据

DESKTOP-SPIDEIC:~$ nc -lk 11112,12,22,33,13,23,3

失去打印输出

initializesnapshotsnapshotsnapshot(2,1,1)(2,2,2)(2,3,3)snapshot(3,1,4)(3,2,5)snapshot(3,3,6)snapshot

能够看到

因为 keyedState 是跟key相干的,所以当integerLongTuple2.f0从2变为3的时候, keyedState 是从新初始化,从1开始递增
因为 operatorState只跟算子相干的,所以始终在递增
因为代码中应用 env.enableCheckpointing(3000) 开启了checkpoint,能够看到 snapshotState 中的日志打印进去
咱们等(3,3,6)前面的snapshot打印进去后,接下来通过nc持续输出

4,3
因为咱们在代码设置当 if (integerLongTuple2.f0 == 4) 的时候抛出异样,所以此刻flink程序就会退出,而后重启,进入到 initializeState

看到对应的打印如下

initialize
restored
snapshot
snapshot

能够看到初始化(initialize)以及复原(restored)的逻辑都执行到了,咱们通过nc持续输出 3,5 ,能够看到程序打印出 (3,4,7) , 阐明 keyedState 以及operatorCount 都失常复原了之前的值。

对于状态数据重散布策略的应用,能够在创立operatorState的过程中通过相应的办法指定:如果应用Even-split Redistribution策 略,则通过context. getListState(descriptor)获取OperatorState;如果应用Union Redistribution策略,则通过context.getUnionList State(descriptor) 来获取。实例代码中默认应用的Even-split Redistribution策略。

通过CheckpointedFunction构建带缓冲区的Sink
上面咱们再看另外一个例子,构建一个带缓冲的Sink。如代码所示,通过checkpointState保留以后曾经承受的所有元素列表。

实现SinkFunction和CheckpointedFunction
在snapshotState()办法中清理掉上一次checkpoint中存储的operatorState的数据,而后再增加本次算子中须要checkpoint的bufferedElements中的每一个元素。
当零碎重启时会调用initializeState办法,从新复原operatorState,其中bufferedElements中的数据能够从 checkpointState 中复原
invoke 会在算子收到每一个元素时调用
finish 会在算子收到上游所有元素后调用

import java.io.IOException;import java.util.ArrayList;import java.util.List;import org.apache.flink.api.common.state.ListState;import org.apache.flink.api.common.state.ListStateDescriptor;import org.apache.flink.api.common.typeinfo.TypeHint;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.runtime.state.FunctionInitializationContext;import org.apache.flink.runtime.state.FunctionSnapshotContext;import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;import org.apache.flink.streaming.api.functions.sink.SinkFunction;public class BufferingSink    implements SinkFunction<Tuple2<Integer, Long>>,    CheckpointedFunction {  private final int threshold;  private transient ListState<Tuple2<Integer, Long>> checkpointState;  private List<Tuple2<Integer, Long>> bufferedElements;  public BufferingSink(int threshold){    this.bufferedElements = new ArrayList<>();    this.threshold = threshold;  }  @Override  public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {    System.out.println("snapshot");    checkpointState.clear();    // for (Tuple2<Integer, Long> element: bufferedElements) {    //   checkpointState.add(element);    // }    checkpointState.addAll(bufferedElements);  }  @Override  public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {    System.out.println("initialize");    ListStateDescriptor<Tuple2<Integer, Long>> descriptor =        new ListStateDescriptor<Tuple2<Integer, Long>>(            "buffered-elements",            TypeInformation.of(new TypeHint<Tuple2<Integer, Long>>() {})        );    checkpointState = functionInitializationContext.getOperatorStateStore().getListState(descriptor);    if (functionInitializationContext.isRestored()) {      for (Tuple2<Integer, Long> element : checkpointState.get()){        bufferedElements.add(element);      }    }  }  @Override  public void invoke(      Tuple2<Integer, Long> value, Context context) throws Exception {    // called for each element    // SinkFunction.super.invoke(value, context);    System.out.println(String.format("recv %d %d", value.f0, value.f1));    bufferedElements.add(value);    if (value.f0 == 4) {      throw new IOException("input ");    }  }  @Override  public void finish() throws Exception {    // called when every element has received    // SinkFunction.super.finish();    for (Tuple2<Integer, Long> element: bufferedElements) {      System.out.println(element);    }  }}

验证代码

构建验证程序如下:

private static void checkpointListStateWithSinkFunction() throws Exception {    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();    env.setParallelism(1);    env.enableCheckpointing(3000); // if commit this line off, only got initializeState    DataStreamSource<String> localhost = env.socketTextStream("localhost", 1111);    SingleOutputStreamOperator<Tuple2<Integer, Long>>  inputStream= localhost.map(new MapFunction<String,        Tuple2<Integer, Long>>() {      @Override      public Tuple2<Integer, Long> map(String s) throws Exception {        String[] split = s.split(",");        return new Tuple2<>(Integer.valueOf(split[0]), Long.valueOf(split[1]));      }    });    inputStream.addSink(new BufferingSink(2));    env.execute("checkpoint state for sink");  }

通过 nc 输出如下数据

wdy@DESKTOP-SPIDEIC:~$ nc -lk 1111
2,1
2,2
2,3
flink程序输入如下打印:

initialize
snapshot
recv 2 1
recv 2 2
snapshot
recv 2 3
snapshot
等 recv 2 3 之后看到 snapshot 打印进去后,用于确保最初一条(2,3)也保留到了checkpoint中,通过 nc 输出

4,0
因为咱们在代码设置当 if (integerLongTuple2.f0 == 4) 的时候抛出异样,所以此刻flink程序就会退出,而后重启,进入到 initializeState

flink程序输入如下打印:

recv 4 0
initialize
restored
snapshotrecv 4 0
initialize
restored
snapshot
能够看到 正确进入到initializeState 并且执行了复原逻辑,接下来通过 nc 输出

2,4
flink程序输入如下打印

recv 2 4
snapshot
接下来 Ctrl+C 进行 nc ,进入到 finish()函数中,flink程序输入

(2,1)
(2,2)
(2,3)
(2,4)
表明flink异样退出重启后,正确从checkpointState复原了之前的数据。

通过ListCheckpointed接口定义Operator State
ListCheckpointed接口和CheckpointedFunction接口相比在灵活性上绝对弱一些,只能反对List类型的状态,并且在数据恢复的时候仅反对even-redistribution策略。在ListCheckpointed接口中须要实现以下两个办法来操作Operator State:

public interface ListCheckpointed<T extends Serializable> {  List<T> snapshotState(long var1, long var3) throws Exception;  void restoreState(List<T> var1) throws Exception;}

其中snapshotState办法定义数据元素List存储到checkpoints的逻辑,restoreState办法则定义从checkpoints中复原状态的逻辑。如果状态数据不反对List模式,则能够在snapshotState办法中返回Collections.singletonList(STATE)。

这个接口在Flink 1.14中曾经不倡议应用,所以本文也不再进行实例演示。

总结

本文介绍了OperateState在MapFunction以及SlinkFunction两种操作场景中的利用,同时展现了如何通过联合CheckpointedFunction主动对State进行备份Checkpoint,从而在工作出现异常时可能从保留的State中复原状态,实现Exactly-Once。

参考资料
《Flink原理、实战与性能优化》5.1 有状态计算
《Flink内核原理与实现》第7章 状态原理
Flink教程(17) Keyed State状态治理之AggregatingState应用案例 求平均值 https://blog.csdn.net/winterkin