乐趣区

关于flink:Flink中基于Operator-State-的计算开发方法滴普程序员部落

文:王东阳

前言

在 Flink 中依据数据集是否依据 Key 进行分区,将状态分为 Keyed State 和 Operator State(Non-keyed State)两种类型,在之前的文章《Flink 中基于 KeyedState 的计算开发方法》曾经具体介绍了 Keyed State 的概念和用法,本文将持续介绍 Operator State。

Operator State 与 Keyed State 不同的是,Operator State 只和并行的算子实例绑定,和数据元素中的 key 无关,每个算子实例中持有所有数据元素中的一部分状态数据。Operator State 反对当算子实例并行度发生变化时主动重新分配状态数据,OperatorState 目前只反对应用 ListState。

Operator State 与并行的操作算子实例相关联,例如在 Kafka Connector 中,每个 Kafka 生产端算子实例都对应到 Kafka 的一个分区中,保护 Topic 分区和 Offsets 偏移量作为算子的 Operator State 在 Flink 中能够通过 Checkpointed-Function 或者 ListCheckpointed<T extends Serializable> 两个接口来定义操作 Operator State 的函数。

Operator State 开发实战

本章节将通过理论的我的项目代码演示 Operator State 在两种不同计算场景下的开发方法。

在样例中将演示 Operator State 如何交融进入 Flink 的 DataStream API,让用户在开发 Flink 利用的时候,能够将长期数据保留在 State 中,从 State 中读取数据,在运行的时候,在运行层面上与算子、Function 体系交融,主动对 State 进行备份 Checkpoint,一旦出现异常可能从保留的 State 中复原状态,实现 Exactly-Once。

通过 CheckpointedFunction 接口操作 Operator State
CheckpointedFunction 接口定义如代码所示,须要实现两个办法,当 checkpoint 触发时就会调用 snapshotState()办法,当初始化自定义函数的时候会调用 initializeState()办法,其中包含第一次初始化函数和从之前的 checkpoints 中复原状态数据,同时 initializeState()办法中须要蕴含两套逻辑,

一个是不同类型状态数据初始化的逻辑,
一个是从之前的状态中复原数据的逻辑

@Public
public interface CheckpointedFunction {
    // 每当 checkpoint 触发的时候 调用这个办法
  void snapshotState(FunctionSnapshotContext var1) throws Exception;

    // 每次 自定义函数初始化的时候 调用此办法初始化
  void initializeState(FunctionInitializationContext var1) throws Exception;
}

在每个算子中 Managed Operator State 都是以 List 模式存储,算子和算子之间的状态数据互相独立,List 存储比拟适宜于状态数据的从新散布,Flink 目前反对对 Managed Operator State 两种重散布的策略,别离是 Even-split Redistribution 和 Union Redistribution。

Even-split Redistribution:每个算子实例中含有局部状态元素的 List 列表,整个状态数据是所有 List 列表的合集。当触发 restore/redistribution 动作时,通过将状态数据平均分配成与算子并行度 雷同数量的 List 列表,每个 task 实例中有一个 List,其能够为空或者含有多个元素
Union Redistribution:每个算子实例中含有所有状态元素的 List 列表,当触发 restore/redistribution 动作时,每个算子都可能获取到残缺的状态元素列表
实现 FlatMapFunction 和 CheckpointedFunction
在理论我的项目中能够通过实现 FlatMapFunction 和 CheckpointedFunction 实现对输出数据中每个 key 的数据元素数量和算子中的元素数量的统计。如代码所示,通过在 initializeState()办法中别离创立 keyedState 和 operatorState 两种 State,存储基于 Key 相干的状态值以及基于算子的状态值。

import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.List;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.util.Collector;
import org.apache.log4j.Logger;

public class CheckpointCount
    implements FlatMapFunction<Tuple2<Integer, Long>, Tuple3<Integer, Long, Long>>, CheckpointedFunction {private static final Logger logger = Logger.getLogger(CheckpointCount.class);

  private Long operatorCount;
  private ValueState<Long> keyedState;
  private ListState<Long> operatorState;

  public void CheckpointCount() {}

  @Override
  public void flatMap(Tuple2<Integer, Long> integerLongTuple2, Collector<Tuple3<Integer, Long, Long>> collector) throws Exception {if (integerLongTuple2.f0 == 4) {throw new IOException("input");
    }

    if (keyedState.value() == null) {keyedState.update(1L);
    } else {keyedState.update(keyedState.value() + 1L);
    }

    operatorCount = operatorCount + 1;
    // 输入后果,包含 id,id 对应的数量统计 keyedCount,算子输出数据的数量统计 operatorCount
    collector.collect(Tuple3.of(integerLongTuple2.f0, keyedState.value(), operatorCount));
  }

  @Override
  public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {System.out.println("snapshot");
    operatorState.clear();
    operatorState.add(operatorCount);
  }

  @Override
  public void initializeState(FunctionInitializationContext ctx) throws Exception {System.out.println("initialize");
    logger.debug("init");
    keyedState = ctx.getKeyedStateStore().getState(new ValueStateDescriptor<Long>("keyedState", Long.class));
    operatorState = ctx.getOperatorStateStore().getListState(new ListStateDescriptor<Long>(
        "operatorState",
        Long.class));
    operatorCount = 0L;

    if (ctx.isRestored()) {List<Long> op = Lists.newArrayList(operatorState.get());
      if (op.size() > 0 ) {operatorCount = op.get(op.size()-1);
      }
      System.out.println("restored");
    }
  }
}

能够从上述代码中看到的是,在 snapshotState() 办法中清理掉上一次 checkpoint 中存储的 operatorState 的数据,而后再增加并更新本次算子中须要 checkpoint 的 operatorCount 状态变量。当零碎重启时会调用 initializeState 办法,从新复原 keyedState 和 operatorState,其中 operatorCount 数据能够从最新的 operatorState 中复原。

验证代码

构建验证代码如下:

private static void checkpointOperatorStateWithMapFunction() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.enableCheckpointing(3000); // if commit this line off, only got initializeState
    // env.getCheckpointConfig().set

    DataStreamSource<String> localhost = env.socketTextStream("localhost", 1111);
    SingleOutputStreamOperator<Tuple2<Integer, Long>>  inputStream= localhost.map(new MapFunction<String,
        Tuple2<Integer, Long>>() {
      @Override
      public Tuple2<Integer, Long> map(String s) throws Exception {String[] split = s.split(",");
        return new Tuple2<>(Integer.valueOf(split[0]), Long.valueOf(split[1]));
      }
    });

    inputStream.keyBy(0).flatMap(new CheckpointCount()).print();
    env.execute("checkpoint state for map");
  }

通过 nc,咱们输出以下数据

DESKTOP-SPIDEIC:~$ nc -lk 1111
2,1
2,2
2,3
3,1
3,2
3,3

失去打印输出

initialize
snapshot
snapshot
snapshot
(2,1,1)
(2,2,2)
(2,3,3)
snapshot
(3,1,4)
(3,2,5)
snapshot
(3,3,6)
snapshot

能够看到

因为 keyedState 是跟 key 相干的,所以当 integerLongTuple2.f0 从 2 变为 3 的时候,keyedState 是从新初始化,从 1 开始递增
因为 operatorState 只跟算子相干的,所以始终在递增
因为代码中应用 env.enableCheckpointing(3000) 开启了 checkpoint,能够看到 snapshotState 中的日志打印进去
咱们等 (3,3,6) 前面的 snapshot 打印进去后,接下来通过 nc 持续输出

4,3
因为咱们在代码设置当 if (integerLongTuple2.f0 == 4) 的时候抛出异样,所以此刻 flink 程序就会退出,而后重启,进入到 initializeState

看到对应的打印如下

initialize
restored
snapshot
snapshot

能够看到初始化 (initialize) 以及复原 (restored) 的逻辑都执行到了,咱们通过 nc 持续输出 3,5,能够看到程序打印出 (3,4,7),阐明 keyedState 以及 operatorCount 都失常复原了之前的值。

对于状态数据重散布策略的应用,能够在创立 operatorState 的过程中通过相应的办法指定:如果应用 Even-split Redistribution 策 略,则通过 context. getListState(descriptor)获取 OperatorState;如果应用 Union Redistribution 策略,则通过 context.getUnionList State(descriptor) 来获取。实例代码中默认应用的 Even-split Redistribution 策略。

通过 CheckpointedFunction 构建带缓冲区的 Sink
上面咱们再看另外一个例子,构建一个带缓冲的 Sink。如代码所示,通过 checkpointState 保留以后曾经承受的所有元素列表。

实现 SinkFunction 和 CheckpointedFunction
在 snapshotState()办法中清理掉上一次 checkpoint 中存储的 operatorState 的数据,而后再增加本次算子中须要 checkpoint 的 bufferedElements 中的每一个元素。
当零碎重启时会调用 initializeState 办法,从新复原 operatorState,其中 bufferedElements 中的数据能够从 checkpointState 中复原
invoke 会在算子收到每一个元素时调用
finish 会在算子收到上游所有元素后调用

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;


public class BufferingSink
    implements SinkFunction<Tuple2<Integer, Long>>,
    CheckpointedFunction {

  private final int threshold;
  private transient ListState<Tuple2<Integer, Long>> checkpointState;
  private List<Tuple2<Integer, Long>> bufferedElements;

  public BufferingSink(int threshold){this.bufferedElements = new ArrayList<>();
    this.threshold = threshold;
  }

  @Override
  public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {System.out.println("snapshot");
    checkpointState.clear();
    // for (Tuple2<Integer, Long> element: bufferedElements) {//   checkpointState.add(element);
    // }
    checkpointState.addAll(bufferedElements);
  }

  @Override
  public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {System.out.println("initialize");
    ListStateDescriptor<Tuple2<Integer, Long>> descriptor =
        new ListStateDescriptor<Tuple2<Integer, Long>>(
            "buffered-elements",
            TypeInformation.of(new TypeHint<Tuple2<Integer, Long>>() {})
        );

    checkpointState = functionInitializationContext.getOperatorStateStore().getListState(descriptor);

    if (functionInitializationContext.isRestored()) {for (Tuple2<Integer, Long> element : checkpointState.get()){bufferedElements.add(element);
      }
    }

  }

  @Override
  public void invoke(Tuple2<Integer, Long> value, Context context) throws Exception {
    // called for each element
    // SinkFunction.super.invoke(value, context);
    System.out.println(String.format("recv %d %d", value.f0, value.f1));
    bufferedElements.add(value);
    if (value.f0 == 4) {throw new IOException("input");
    }

  }

  @Override
  public void finish() throws Exception {
    // called when every element has received
    // SinkFunction.super.finish();
    for (Tuple2<Integer, Long> element: bufferedElements) {System.out.println(element);
    }
  }
}

验证代码

构建验证程序如下:

private static void checkpointListStateWithSinkFunction() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.enableCheckpointing(3000); // if commit this line off, only got initializeState

    DataStreamSource<String> localhost = env.socketTextStream("localhost", 1111);
    SingleOutputStreamOperator<Tuple2<Integer, Long>>  inputStream= localhost.map(new MapFunction<String,
        Tuple2<Integer, Long>>() {
      @Override
      public Tuple2<Integer, Long> map(String s) throws Exception {String[] split = s.split(",");
        return new Tuple2<>(Integer.valueOf(split[0]), Long.valueOf(split[1]));
      }
    });

    inputStream.addSink(new BufferingSink(2));
    env.execute("checkpoint state for sink");
  }

通过 nc 输出如下数据

wdy@DESKTOP-SPIDEIC:~$ nc -lk 1111
2,1
2,2
2,3
flink 程序输入如下打印:

initialize
snapshot
recv 2 1
recv 2 2
snapshot
recv 2 3
snapshot
等 recv 2 3 之后看到 snapshot 打印进去后,用于确保最初一条 (2,3) 也保留到了 checkpoint 中,通过 nc 输出

4,0
因为咱们在代码设置当 if (integerLongTuple2.f0 == 4) 的时候抛出异样,所以此刻 flink 程序就会退出,而后重启,进入到 initializeState

flink 程序输入如下打印:

recv 4 0
initialize
restored
snapshotrecv 4 0
initialize
restored
snapshot
能够看到 正确进入到 initializeState 并且执行了复原逻辑,接下来通过 nc 输出

2,4
flink 程序输入如下打印

recv 2 4
snapshot
接下来 Ctrl+C 进行 nc,进入到 finish()函数中,flink 程序输入

(2,1)
(2,2)
(2,3)
(2,4)
表明 flink 异样退出重启后,正确从 checkpointState 复原了之前的数据。

通过 ListCheckpointed 接口定义 Operator State
ListCheckpointed 接口和 CheckpointedFunction 接口相比在灵活性上绝对弱一些,只能反对 List 类型的状态,并且在数据恢复的时候仅反对 even-redistribution 策略。在 ListCheckpointed 接口中须要实现以下两个办法来操作 Operator State:

public interface ListCheckpointed<T extends Serializable> {List<T> snapshotState(long var1, long var3) throws Exception;

  void restoreState(List<T> var1) throws Exception;
}

其中 snapshotState 办法定义数据元素 List 存储到 checkpoints 的逻辑,restoreState 办法则定义从 checkpoints 中复原状态的逻辑。如果状态数据不反对 List 模式,则能够在 snapshotState 办法中返回 Collections.singletonList(STATE)。

这个接口在 Flink 1.14 中曾经不倡议应用,所以本文也不再进行实例演示。

总结

本文介绍了 OperateState 在 MapFunction 以及 SlinkFunction 两种操作场景中的利用,同时展现了如何通过联合 CheckpointedFunction 主动对 State 进行备份 Checkpoint,从而在工作出现异常时可能从保留的 State 中复原状态,实现 Exactly-Once。

参考资料
《Flink 原理、实战与性能优化》5.1 有状态计算
《Flink 内核原理与实现》第 7 章 状态原理
Flink 教程(17) Keyed State 状态治理之 AggregatingState 应用案例 求平均值 https://blog.csdn.net/winterkin

退出移动版