关于flink:Flink中基于Operator-State-的计算开发方法滴普程序员部落

文:王东阳

前言

在Flink中依据数据集是否依据Key进行分区,将状态分为Keyed State和Operator State(Non-keyed State)两种类型 ,在之前的文章《Flink中基于KeyedState的计算开发方法》曾经具体介绍了Keyed State的概念和用法,本文将持续介绍Operator State。

Operator State与Keyed State不同的是,Operator State只和并行的算子实例绑定,和数据元素中的key无关,每个算子实例中持有所有数据元素中的一部分状态数据。Operator State反对当算子实例并行度发生变化时主动重新分配状态数据, OperatorState目前只反对应用ListState。

Operator State与并行的操作算子实例相关联,例如在Kafka Connector中,每个Kafka生产端算子实例都对应到Kafka的一个分区中,保护Topic分区和Offsets偏移量作为算子的Operator State 在Flink中能够通过 Checkpointed-Function 或者 ListCheckpointed<T extends Serializable>两个接口来定义操作Operator State的函数。

Operator State开发实战

本章节将通过理论的我的项目代码演示Operator State在两种不同计算场景下的开发方法。

在样例中将演示Operator State如何交融进入Flink 的DataStream API,让用户在开发Flink利用的时候,能够将长期数据保留在State中,从State中读取数据,在运行的时候,在运行层面上与算子、Function体系交融,主动对State进行备份Checkpoint,一旦出现异常可能从保留的State中复原状态,实现Exactly-Once 。

通过CheckpointedFunction接口操作Operator State
CheckpointedFunction接口定义如代码所示,须要实现两个办法,当checkpoint触发时就会调用snapshotState()办法,当初始化自定义函数的时候会调用initializeState()办法,其中包含第一次初始化函数和从之前的checkpoints中复原状态数据,同时initializeState()办法中须要蕴含两套逻辑,

一个是不同类型状态数据初始化的逻辑,
一个是从之前的状态中复原数据的逻辑

@Public
public interface CheckpointedFunction {
    // 每当 checkpoint 触发的时候 调用这个办法
  void snapshotState(FunctionSnapshotContext var1) throws Exception;

    // 每次 自定义函数初始化的时候 调用此办法初始化
  void initializeState(FunctionInitializationContext var1) throws Exception;
}

在每个算子中Managed Operator State都是以List模式存储,算子和算子之间的状态数据互相独立,List存储比拟适宜于状态数据的从新散布,Flink目前反对对Managed Operator State两种重散布的策略,别离是Even-split Redistribution和Union Redistribution。

Even-split Redistribution:每个算子实例中含有局部状态元素的List列表,整个状态数据是所有List列表的合集。当触发 restore/redistribution 动作时,通过将状态数据平均分配成与算子并行度 雷同数量的List列表,每个task实例中有一个List,其能够为空或者含有多个元素
Union Redistribution:每个算子实例中含有所有状态元素的List列表,当触发 restore/redistribution 动作时,每个算子都可能获取到残缺的状态元素列表
实现FlatMapFunction和CheckpointedFunction
在理论我的项目中能够通过实现FlatMapFunction和CheckpointedFunction实现对输出数据中每个key的数据元素数量和算子中的元素数量的统计。如代码所示,通过在initializeState()办法中别离创立keyedState和operatorState两种State,存储基于Key相干的状态值以及基于算子的状态值。

import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.List;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.util.Collector;
import org.apache.log4j.Logger;

public class CheckpointCount
    implements FlatMapFunction<Tuple2<Integer, Long>, Tuple3<Integer, Long, Long>>, CheckpointedFunction {

  private static final Logger logger = Logger.getLogger(CheckpointCount.class);

  private Long operatorCount;
  private ValueState<Long> keyedState;
  private ListState<Long> operatorState;

  public void CheckpointCount() {

  }

  @Override
  public void flatMap(
      Tuple2<Integer, Long> integerLongTuple2, Collector<Tuple3<Integer, Long, Long>> collector) throws Exception {

    if (integerLongTuple2.f0 == 4) {
      throw new IOException("input ");
    }

    if (keyedState.value() == null) {
      keyedState.update(1L);
    } else {
      keyedState.update(keyedState.value() + 1L);
    }

    operatorCount = operatorCount + 1;
    //输入后果,包含id,id对应的数量统计keyedCount,算子输出数据的数量统计 operatorCount
    collector.collect(Tuple3.of(integerLongTuple2.f0, keyedState.value(), operatorCount));
  }

  @Override
  public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
    System.out.println("snapshot");
    operatorState.clear();
    operatorState.add(operatorCount);
  }

  @Override
  public void initializeState(FunctionInitializationContext ctx) throws Exception {
    System.out.println("initialize");
    logger.debug("init");
    keyedState = ctx.getKeyedStateStore().getState(new ValueStateDescriptor<Long>("keyedState", Long.class));
    operatorState = ctx.getOperatorStateStore().getListState(new ListStateDescriptor<Long>(
        "operatorState",
        Long.class));
    operatorCount = 0L;

    if (ctx.isRestored()) {
      List<Long> op = Lists.newArrayList(operatorState.get());
      if (op.size() > 0 ) {
        operatorCount = op.get(op.size()-1);
      }
      System.out.println("restored");
    }
  }
}

能够从上述代码中看到的是,在 snapshotState() 办法中清理掉上一次checkpoint中存储的operatorState的数据,而后再增加并更新本次算子中须要checkpoint的operatorCount状态变量。当零碎重启时会调用initializeState办法,从新复原keyedState和operatorState,其中operatorCount数据能够从最新的operatorState中复原。

验证代码

构建验证代码如下:

private static void checkpointOperatorStateWithMapFunction() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.enableCheckpointing(3000); // if commit this line off, only got initializeState
    // env.getCheckpointConfig().set

    DataStreamSource<String> localhost = env.socketTextStream("localhost", 1111);
    SingleOutputStreamOperator<Tuple2<Integer, Long>>  inputStream= localhost.map(new MapFunction<String,
        Tuple2<Integer, Long>>() {
      @Override
      public Tuple2<Integer, Long> map(String s) throws Exception {
        String[] split = s.split(",");
        return new Tuple2<>(Integer.valueOf(split[0]), Long.valueOf(split[1]));
      }
    });

    inputStream.keyBy(0).flatMap(new CheckpointCount()).print();
    env.execute("checkpoint state for map");
  }

通过nc,咱们输出以下数据

DESKTOP-SPIDEIC:~$ nc -lk 1111
2,1
2,2
2,3
3,1
3,2
3,3

失去打印输出

initialize
snapshot
snapshot
snapshot
(2,1,1)
(2,2,2)
(2,3,3)
snapshot
(3,1,4)
(3,2,5)
snapshot
(3,3,6)
snapshot

能够看到

因为 keyedState 是跟key相干的,所以当integerLongTuple2.f0从2变为3的时候, keyedState 是从新初始化,从1开始递增
因为 operatorState只跟算子相干的,所以始终在递增
因为代码中应用 env.enableCheckpointing(3000) 开启了checkpoint,能够看到 snapshotState 中的日志打印进去
咱们等(3,3,6)前面的snapshot打印进去后,接下来通过nc持续输出

4,3
因为咱们在代码设置当 if (integerLongTuple2.f0 == 4) 的时候抛出异样,所以此刻flink程序就会退出,而后重启,进入到 initializeState

看到对应的打印如下

initialize
restored
snapshot
snapshot

能够看到初始化(initialize)以及复原(restored)的逻辑都执行到了,咱们通过nc持续输出 3,5 ,能够看到程序打印出 (3,4,7) , 阐明 keyedState 以及operatorCount 都失常复原了之前的值。

对于状态数据重散布策略的应用,能够在创立operatorState的过程中通过相应的办法指定:如果应用Even-split Redistribution策 略,则通过context. getListState(descriptor)获取OperatorState;如果应用Union Redistribution策略,则通过context.getUnionList State(descriptor) 来获取。实例代码中默认应用的Even-split Redistribution策略。

通过CheckpointedFunction构建带缓冲区的Sink
上面咱们再看另外一个例子,构建一个带缓冲的Sink。如代码所示,通过checkpointState保留以后曾经承受的所有元素列表。

实现SinkFunction和CheckpointedFunction
在snapshotState()办法中清理掉上一次checkpoint中存储的operatorState的数据,而后再增加本次算子中须要checkpoint的bufferedElements中的每一个元素。
当零碎重启时会调用initializeState办法,从新复原operatorState,其中bufferedElements中的数据能够从 checkpointState 中复原
invoke 会在算子收到每一个元素时调用
finish 会在算子收到上游所有元素后调用

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;


public class BufferingSink
    implements SinkFunction<Tuple2<Integer, Long>>,
    CheckpointedFunction {

  private final int threshold;
  private transient ListState<Tuple2<Integer, Long>> checkpointState;
  private List<Tuple2<Integer, Long>> bufferedElements;

  public BufferingSink(int threshold){
    this.bufferedElements = new ArrayList<>();
    this.threshold = threshold;
  }

  @Override
  public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
    System.out.println("snapshot");
    checkpointState.clear();
    // for (Tuple2<Integer, Long> element: bufferedElements) {
    //   checkpointState.add(element);
    // }
    checkpointState.addAll(bufferedElements);
  }

  @Override
  public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
    System.out.println("initialize");
    ListStateDescriptor<Tuple2<Integer, Long>> descriptor =
        new ListStateDescriptor<Tuple2<Integer, Long>>(
            "buffered-elements",
            TypeInformation.of(new TypeHint<Tuple2<Integer, Long>>() {})
        );

    checkpointState = functionInitializationContext.getOperatorStateStore().getListState(descriptor);

    if (functionInitializationContext.isRestored()) {
      for (Tuple2<Integer, Long> element : checkpointState.get()){
        bufferedElements.add(element);
      }
    }

  }

  @Override
  public void invoke(
      Tuple2<Integer, Long> value, Context context) throws Exception {
    // called for each element
    // SinkFunction.super.invoke(value, context);
    System.out.println(String.format("recv %d %d", value.f0, value.f1));
    bufferedElements.add(value);
    if (value.f0 == 4) {
      throw new IOException("input ");
    }

  }

  @Override
  public void finish() throws Exception {
    // called when every element has received
    // SinkFunction.super.finish();
    for (Tuple2<Integer, Long> element: bufferedElements) {
      System.out.println(element);
    }
  }
}

验证代码

构建验证程序如下:

private static void checkpointListStateWithSinkFunction() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.enableCheckpointing(3000); // if commit this line off, only got initializeState

    DataStreamSource<String> localhost = env.socketTextStream("localhost", 1111);
    SingleOutputStreamOperator<Tuple2<Integer, Long>>  inputStream= localhost.map(new MapFunction<String,
        Tuple2<Integer, Long>>() {
      @Override
      public Tuple2<Integer, Long> map(String s) throws Exception {
        String[] split = s.split(",");
        return new Tuple2<>(Integer.valueOf(split[0]), Long.valueOf(split[1]));
      }
    });

    inputStream.addSink(new BufferingSink(2));
    env.execute("checkpoint state for sink");
  }

通过 nc 输出如下数据

wdy@DESKTOP-SPIDEIC:~$ nc -lk 1111
2,1
2,2
2,3
flink程序输入如下打印:

initialize
snapshot
recv 2 1
recv 2 2
snapshot
recv 2 3
snapshot
等 recv 2 3 之后看到 snapshot 打印进去后,用于确保最初一条(2,3)也保留到了checkpoint中,通过 nc 输出

4,0
因为咱们在代码设置当 if (integerLongTuple2.f0 == 4) 的时候抛出异样,所以此刻flink程序就会退出,而后重启,进入到 initializeState

flink程序输入如下打印:

recv 4 0
initialize
restored
snapshotrecv 4 0
initialize
restored
snapshot
能够看到 正确进入到initializeState 并且执行了复原逻辑,接下来通过 nc 输出

2,4
flink程序输入如下打印

recv 2 4
snapshot
接下来 Ctrl+C 进行 nc ,进入到 finish()函数中,flink程序输入

(2,1)
(2,2)
(2,3)
(2,4)
表明flink异样退出重启后,正确从checkpointState复原了之前的数据。

通过ListCheckpointed接口定义Operator State
ListCheckpointed接口和CheckpointedFunction接口相比在灵活性上绝对弱一些,只能反对List类型的状态,并且在数据恢复的时候仅反对even-redistribution策略。在ListCheckpointed接口中须要实现以下两个办法来操作Operator State:

public interface ListCheckpointed<T extends Serializable> {
  List<T> snapshotState(long var1, long var3) throws Exception;

  void restoreState(List<T> var1) throws Exception;
}

其中snapshotState办法定义数据元素List存储到checkpoints的逻辑,restoreState办法则定义从checkpoints中复原状态的逻辑。如果状态数据不反对List模式,则能够在snapshotState办法中返回Collections.singletonList(STATE)。

这个接口在Flink 1.14中曾经不倡议应用,所以本文也不再进行实例演示。

总结

本文介绍了OperateState在MapFunction以及SlinkFunction两种操作场景中的利用,同时展现了如何通过联合CheckpointedFunction主动对State进行备份Checkpoint,从而在工作出现异常时可能从保留的State中复原状态,实现Exactly-Once。

参考资料
《Flink原理、实战与性能优化》5.1 有状态计算
《Flink内核原理与实现》第7章 状态原理
Flink教程(17) Keyed State状态治理之AggregatingState应用案例 求平均值 https://blog.csdn.net/winterkin

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理