关于flink:Flink内部Exactly-Once三板斧状态状态后端与检查点

28次阅读

共计 19598 个字符,预计需要花费 49 分钟才能阅读完成。

Flink 是一个分布式的流解决引擎,而流解决的其中一个特点就是 7X24。那么,如何保障 Flink 作业的继续运行呢?Flink 的外部会将利用状态 (state) 存储到本地内存或者嵌入式的 kv 数据库 (RocksDB) 中,因为采纳的是分布式架构,Flink 须要对本地生成的状态进行长久化存储,以防止因利用或者节点机器故障等起因导致数据的失落,Flink 是通过 checkpoint(检查点)的形式将状态写入到近程的长久化存储,从而就能够实现不同语义的后果保障。通过本文,你能够理解到什么是 Flink 的状态,Flink 的状态是怎么存储的,Flink 可抉择的状态后端 (statebackend) 有哪些,什么是全局一致性检查点,Flink 外部如何通过检查点实现 Exactly Once 的后果保障。另外,本文内容较长,倡议关注加珍藏。

<!– more –>

什么是状态

引子

对于什么是状态,咱们先不做过多的剖析。首先看一个代码案例,其中案例 1 是 Spark 的 WordCount 代码,案例 2 是 Flink 的 WorkCount 代码。

  • 案例 1:Spark WC
object WordCount {def main(args:Array[String]){val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
  val ssc = new StreamingContext(conf, Seconds(5))
  val lines = ssc.socketTextStream("localhost", 9999)
  val words = lines.flatMap(_.split(" "))
  val pairs = words.map(word => (word, 1))
  val wordCounts = pairs.reduceByKey(_ + _)
  wordCounts.print()
  ssc.start()
  ssc.awaitTermination()}
}

输出:

C:\WINDOWS\system32>nc -lp 9999
hello spark
hello spark

输入:

  • 案例 2:Flink WC
public class WordCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);
        SingleOutputStreamOperator<Tuple2<String,Integer>> words = streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String,Integer>> out) throws Exception {String[] splits = value.split("\\s");
                for (String word : splits) {out.collect(Tuple2.of(word, 1));
                }
            }
        });
        words.keyBy(0).sum(1).print();
        env.execute("WC");
    }
}

输出:

C:\WINDOWS\system32>nc -lp 9999
hello Flink
hello Flink

输入:

从下面的两个例子能够看出,在应用 Spark 进行词频统计时,以后的统计后果不受历史统计后果的影响,只计算接管的以后数据的后果,这个就能够了解为无状态的计算。再来看一下 Flink 的例子,能够看出当第二次词频统计时,把第一次的后果值也统计在了一起,即 Flink 把上一次的计算结果保留在了状态里,第二次计算的时候会先拿到上一次的后果状态,而后联合新到来的数据再进行计算,这就能够了解成有状态的计算,如下图所示。

状态的类别

Flink 提供了两种根本类型的状态:别离是 Keyed StateOperator State。依据不同的状态治理形式,每种状态又有两种存在模式,别离为:managed(托管状态)raw(原生状态)。具体如下表格所示。须要留神的是,因为 Flink 举荐应用 managed state,所以下文次要探讨 managed state,对于 raw state,本文不会做过多的探讨。

managed state & raw state 区别

Keyed State & Operator State

Keyed State

Keyed State 只能由作用在 KeyedStream 下面的函数应用,该状态与某个 key 进行绑定,即每一个 key 对应一个 state。Keyed State 依照 key 进行保护和拜访的,Flink 会为每一个 Key 都保护一个状态实例,该状态实例总是位于解决该 key 记录的算子工作上,因而同一个 key 的记录能够拜访到一样的状态。如下图所示,能够通过在一条流上应用 keyBy()办法来生成一个 KeyedStream。Flink 提供了很多种 keyed state,具体如下:

  • ValueState<T>

用于保留类型为 T 的单个值。用户能够通过 ValueState.value()来获取该状态值,通过 ValueState.update()来更新该状态。应用 ValueStateDescriptor 来获取状态句柄。

  • ListState<T>

用于保留类型为 T 的元素列表,即 key 的状态值是一个列表。用户能够应用 ListState.add()或者 ListState.addAll()将新元素增加到列表中,通过 ListState.get()拜访状态元素,该办法会返回一个可遍历所有元素的 Iterable<T> 对象,留神 ListState 不反对删除单个元素,然而用户能够应用 update(List<T> values)来更新整个列表。应用 ListStateDescriptor来获取状态句柄。

  • ReducingState<T>

调用 add()办法增加值时,会立刻返回一个应用 ReduceFunction 聚合后的值,用户能够应用 ReducingState.get()来获取该状态值。应用 ReducingStateDescriptor来获取状态句柄。

  • AggregatingState<IN, OUT>

与 ReducingState<T> 相似,不同的是它应用的是 AggregateFunction 来聚合外部的值,AggregatingState.get()办法会计算最终的后果并将其返回。应用 AggregatingStateDescriptor来获取状态句柄

  • MapState<UK, UV>

用于保留一组 key、value 的映射,相似于 java 的 Map 汇合。用户能够通过 get(UK key)办法获取 key 对应的状态,能够通过 put(UK k,UV value)办法增加一个键值,能够通过 remove(UK key)删除给定 key 的值,能够通过 contains(UK key)判断是否存在对应的 key。应用 MapStateDescriptor来获取状态句柄。

  • FoldingState<T, ACC>

在 Flink 1.4 的版本中标记过期,在将来的版本中会被移除,应用 AggregatingState 进行代替。

值得注意的是,下面的状态原语都反对通过 State.clear()办法来进行革除状态。另外,上述的状态原语仅用于与状态进行交互,真正的状态是存储在状态后端(前面会介绍状态后端)的,通过该状态原语相当于持有了状态的句柄(handle)。

keyed State 应用案例

上面给出一个 MapState 的应用案例,对于 ValueState 的应用状况能够参考官网,具体如下:

public class MapStateExample {

    // 统计每个用户每种行为的个数
    public static class UserBehaviorCnt extends RichFlatMapFunction<Tuple3<Long, String, String>, Tuple3<Long, String, Integer>> {

        // 定义一个 MapState 句柄
        private transient MapState<String, Integer> behaviorCntState;

        // 初始化状态
        @Override
        public void open(Configuration parameters) throws Exception {super.open(parameters);
            MapStateDescriptor<String, Integer> userBehaviorMapStateDesc = new MapStateDescriptor<>(
                    "userBehavior",  // 状态描述符的名称
                    TypeInformation.of(new TypeHint<String>() {}),  // MapState 状态的 key 的数据类型
                    TypeInformation.of(new TypeHint<Integer>() {})  // MapState 状态的 value 的数据类型
            );
            behaviorCntState = getRuntimeContext().getMapState(userBehaviorMapStateDesc); // 获取状态
        }

        @Override
        public void flatMap(Tuple3<Long, String, String> value, Collector<Tuple3<Long, String, Integer>> out) throws Exception {
            Integer behaviorCnt = 1;
            // 如果以后状态包含该行为,则 +1
            if (behaviorCntState.contains(value.f1)) {behaviorCnt = behaviorCntState.get(value.f1) + 1;
            }
            // 更新状态
            behaviorCntState.put(value.f1, behaviorCnt);
            out.collect(Tuple3.of(value.f0, value.f1, behaviorCnt));
        }
    }
    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        // 模仿数据源[userId,behavior,product]
        DataStreamSource<Tuple3<Long, String, String>> userBehaviors = env.fromElements(Tuple3.of(1L, "buy", "iphone"),
                Tuple3.of(1L, "cart", "huawei"),
                Tuple3.of(1L, "buy", "logi"),
                Tuple3.of(1L, "fav", "oppo"),
                Tuple3.of(2L, "buy", "huawei"),
                Tuple3.of(2L, "buy", "onemore"),
                Tuple3.of(2L, "fav", "iphone"));
        userBehaviors
                .keyBy(0)
                .flatMap(new UserBehaviorCnt())
                .print();
        env.execute("MapStateExample");
    }
}

后果输入:

状态的生命周期治理(TTL)

对于任何类型 Keyed State 都能够设定状态的生命周期(TTL), 即状态的存活工夫,以确保可能在规定工夫内及时地清理状态数据。如果配置了状态的 TTL,那么当状态过期时,存储的状态会被革除。状态生命周期性能能够通过 StateTtlConfig 配置,而后将 StateTtlConfig 配置传入 StateDescriptor 中的 enableTimeToLive 办法中即可。代码示例如下:

StateTtlConfig ttlConfig = StateTtlConfig
                 // 指定 TTL 时长为 10S
                .newBuilder(Time.seconds(10))
                 // 只对创立和写入操作无效
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                 // 不返回过期的数据
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) 
                .build();

        // 初始化状态
        @Override
        public void open(Configuration parameters) throws Exception {super.open(parameters);
            MapStateDescriptor<String, Integer> userBehaviorMapStateDesc = new MapStateDescriptor<>(
                    "userBehavior",  // 状态描述符的名称
                    TypeInformation.of(new TypeHint<String>() {}),  // MapState 状态的 key 的数据类型
                    TypeInformation.of(new TypeHint<Integer>() {})  // MapState 状态的 value 的数据类型

            );
            // 设置 stateTtlConfig
            userBehaviorMapStateDesc.enableTimeToLive(ttlConfig);
            behaviorCntState = getRuntimeContext().getMapState(userBehaviorMapStateDesc); // 获取状态

        }

在 StateTtlConfig 创立时,newBuilder 办法是必须要指定的,newBuilder 中设定过期工夫的参数。对于其余参数都是可选的或应用默认值。其中 setUpdateType 办法中传入的类型有三种:

public enum UpdateType {
        // 禁用 TTL, 永远不会过期
        Disabled,
        // 创立和写入时更新 TTL
        OnCreateAndWrite,
        // 与 OnCreateAndWrite 相似,然而在读操作时也会更新 TTL
        OnReadAndWrite
    }

值得注意的是,过期的状态数据依据 UpdateType 参数进行配置,只有被写入或者读取的工夫才会更新 TTL,也就是说如果某个状态指标始终不被应用或者更新,则永远不会触发对该状态数据的清理操作,这种状况可能会导致系统中的状态数据越来越大。目前用户能够应用 StateTtlConfig.cleanupFullSnapshot 设定当触发 State Snapshot 的时候清理状态数据,然而改配置不适宜用于 RocksDB 做增量 Checkpointing 的操作。

下面的 StateTtlConfig 创立时,能够指定 setStateVisibility,用于状态的可见性配置,依据过期数据是否被清理来确定是否返回状态数据。

    /**
     * 是否返回过期的数据
     */
    public enum StateVisibility {
        // 如果数据没有被清理,就能够返回
        ReturnExpiredIfNotCleanedUp,
        // 永远不返回过期的数据, 默认值
        NeverReturnExpired
    }

Operator State

Operator State 的作用于是某个算子工作,这意味着所有在同一个并行任务之内的记录都能拜访到雷同的状态。算子状态不能通过其余工作拜访,无论该工作是雷同的算子。如下图所示。

Operator State 是一种 non-keyed state,与并行的操作算子实例相关联,例如在 Kafka Connector 中,每个 Kafka 生产端算子实例都对应到 Kafka 的一个分区中,保护 Topic 分区和 Offsets 偏移量作为算子的 Operator State。在 Flink 中能够实现 ListCheckpointed<T extends Serializable> 接口或者 CheckpointedFunction 接口来实现一个 Operator State。

首先,咱们先看一下这两个接口的具体实现,而后再给出这两种接口的具体应用案例。先看一下 ListCheckpointed 接口的源码,如下:

public interface ListCheckpointed<T extends Serializable> {
    
    /**
     * 获取某个算子实例的以后状态,该状态包含该算子实例之前被调用时的所有后果
     * 以列表的模式返回一个函数状态的快照
     * Flink 触发生成检查点时调用该办法
     * @param checkpointId checkpoint 的 ID, 是一个惟一的、枯燥递增的值
     * @param timestamp Job Manager 触发 checkpoint 时的工夫戳
     * @return  返回一个 operator state list, 如果为 null 时, 返回空 list
     * @throws Exception
     */
    List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
    /**
     * 初始化函数状态时调用,可能是在作业启动时或者故障复原时
     * 依据提供的列表复原函数状态
     * 留神:当实现该办法时,须要在 RichFunction#open()办法之前调用该办法
     * @param state 被复原算子实例的 state 列表,可能为空
     * @throws Exception
     */
    void restoreState(List<T> state) throws Exception;
}

应用 Operator ListState 时,在进行扩缩容时,重散布的策略 (状态复原的模式) 如下图所示:

下面的重散布策略为Even-split Redistribution,即每个算子实例中含有局部状态元素的 List 列表,整个状态数据是所有 List 列表的合集。当触发 restore/redistribution 动作时,通过将状态数据平均分配成与算子并行度雷同数量的 List 列表,每个 task 实例中有一个 List,其能够为空或者含有多个元素。

咱们再来看一下 CheckpointedFunction 接口,源码如下:

public interface CheckpointedFunction {

    /**
     * 会在生成检查点之前调用
     * 该办法的目标是确保检查点开始之前所有状态对象都曾经更新结束
     * @param context 应用 FunctionSnapshotContext 作为参数
     *                从 FunctionSnapshotContext 能够获取 checkpoint 的元数据信息,*                比方 checkpoint 编号,JobManager 在初始化 checkpoint 时的工夫戳
     * @throws Exception
     */
    void snapshotState(FunctionSnapshotContext context) throws Exception;

    /**
     * 在创立 checkpointedFunction 的并行实例时被调用,* 在利用启动或者故障重启时触发该办法的调用
     * @param context 传入 FunctionInitializationContext 对象,*                   能够应用该对象拜访 OperatorStateStore 和 KeyedStateStore 对象,*                   这两个对象能够获取状态的句柄,即通过 Flink runtime 来注册函数状态并返回 state 对象
     *                   比方:ValueState、ListState 等
     * @throws Exception
     */
    void initializeState(FunctionInitializationContext context) throws Exception;
}

CheckpointedFunction 接口是用于指定有状态函数的最底层的接口,该接口提供了用于注册和保护 keyed state 与 operator state 的 hook(即能够同时应用 keyed state 和 operator state),另外也是惟一反对应用 list union state。对于 Union List State, 应用的是 Flink 为 Operator state 提供的另一种重散布的策略:Union Redistribution,即每个算子实例中含有所有状态元素的 List 列表,当触发 restore/redistribution 动作时,每个算子都可能获取到残缺的状态元素列表。具体如下图所示:

ListCheckpointed

ListCheckpointed 接口和 CheckpointedFunction 接口相比在灵活性上绝对弱一些,只能反对 List 类型的状态,并且在数据恢复的时候仅反对 even-redistribution 策略。该接口不像 Flink 提供的 Keyed State(比方 Value State、ListState)那样间接在状态后端 (state backend) 注册,须要将 operator state 实现为成员变量,而后通过接口提供的回调函数与状态后端进行交互。应用代码案例如下:

public class ListCheckpointedExample {
    private static class UserBehaviorCnt extends RichFlatMapFunction<Tuple3<Long, String, String>, Tuple2<String, Long>> implements ListCheckpointed<Long> {
        private Long userBuyBehaviorCnt = 0L;
        @Override
        public void flatMap(Tuple3<Long, String, String> value, Collector<Tuple2<String, Long>> out) throws Exception {if(value.f1.equals("buy")){
                userBuyBehaviorCnt ++;
                out.collect(Tuple2.of("buy",userBuyBehaviorCnt));
            }
        }
        @Override
        public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
            // 返回单个元素的 List 汇合,该汇合元素是用户购买行为的数量
            return Collections.singletonList(userBuyBehaviorCnt);
        }
        @Override
        public void restoreState(List<Long> state) throws Exception {
            // 在进行扩缩容之后,进行状态复原,须要把其余 subtask 的状态加在一起
            for (Long cnt : state) {userBuyBehaviorCnt += 1;}
        }
    }
    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        // 模仿数据源[userId,behavior,product]
        DataStreamSource<Tuple3<Long, String, String>> userBehaviors = env.fromElements(Tuple3.of(1L, "buy", "iphone"),
                Tuple3.of(1L, "cart", "huawei"),
                Tuple3.of(1L, "buy", "logi"),
                Tuple3.of(1L, "fav", "oppo"),
                Tuple3.of(2L, "buy", "huawei"),
                Tuple3.of(2L, "buy", "onemore"),
                Tuple3.of(2L, "fav", "iphone"));

        userBehaviors
                .flatMap(new UserBehaviorCnt())
                .print();

        env.execute("ListCheckpointedExample");
    }
}

CheckpointedFunction

CheckpointedFunction 接口提供了更加丰盛的操作,比方反对 Union list state,能够拜访 keyedState,对于重散布策略,如果应用 Even-split Redistribution 策略,则通过 context. getListState(descriptor)获取 Operator State;如果应用 UnionRedistribution 策略,则通过 context. getUnionList State(descriptor)来获取。应用案例如下:

public class CheckpointFunctionExample {
    private static class UserBehaviorCnt implements CheckpointedFunction, FlatMapFunction<Tuple3<Long, String, String>, Tuple3<Long, Long, Long>> {
        // 统计每个 operator 实例的用户行为数量的本地变量
        private Long opUserBehaviorCnt = 0L;
        // 每个 key 的 state, 存储 key 对应的相干状态
        private ValueState<Long> keyedCntState;
        // 定义 operator state,存储算子的状态
        private ListState<Long> opCntState;

        @Override
        public void flatMap(Tuple3<Long, String, String> value, Collector<Tuple3<Long, Long, Long>> out) throws Exception {if (value.f1.equals("buy")) {
                // 更新算子状态本地变量值
                opUserBehaviorCnt += 1;
                Long keyedCount = keyedCntState.value();
                // 更新 keyedstate 的状态 , 判断状态是否为 null,否则空指针异样
                keyedCntState.update(keyedCount == null ? 1L : keyedCount + 1);
                // 后果输入
                out.collect(Tuple3.of(value.f0, keyedCntState.value(), opUserBehaviorCnt));
            }
        }
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            // 应用 opUserBehaviorCnt 本地变量更新 operator state
            opCntState.clear();
            opCntState.add(opUserBehaviorCnt);
        }

        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {

            // 通过 KeyedStateStore, 定义 keyedState 的 StateDescriptor 描述符
            ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("keyedCnt", TypeInformation.of(new TypeHint<Long>() {}));

            // 通过 OperatorStateStore, 定义 OperatorState 的 StateDescriptor 描述符
            ListStateDescriptor opStateDescriptor = new ListStateDescriptor("opCnt", TypeInformation.of(new TypeHint<Long>() {}));
            // 初始化 keyed state 状态值
            keyedCntState = context.getKeyedStateStore().getState(valueStateDescriptor);
            // 初始化 operator state 状态
            opCntState = context.getOperatorStateStore().getListState(opStateDescriptor);
            // 初始化本地变量 operator state
            for (Long state : opCntState.get()) {opUserBehaviorCnt += state;}
        }
    }

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        // 模仿数据源[userId,behavior,product]
        DataStreamSource<Tuple3<Long, String, String>> userBehaviors = env.fromElements(Tuple3.of(1L, "buy", "iphone"),
                Tuple3.of(1L, "cart", "huawei"),
                Tuple3.of(1L, "buy", "logi"),
                Tuple3.of(1L, "fav", "oppo"),
                Tuple3.of(2L, "buy", "huawei"),
                Tuple3.of(2L, "buy", "onemore"),
                Tuple3.of(2L, "fav", "iphone"));

        userBehaviors
                .keyBy(0)
                .flatMap(new UserBehaviorCnt())
                .print();
        env.execute("CheckpointFunctionExample");
    }
}

什么是状态后端

下面应用的状态都须要存储到状态后端 (StateBackend),而后在 checkpoint 触发时,将状态长久化到内部存储系统。Flink 提供了三种类型的状态后端,别离是基于内存的状态后端(MemoryStateBackend、基于文件系统的状态后端(FsStateBackend) 以及基于 RockDB 作为存储介质的RocksDB StateBackend。这三种类型的 StateBackend 都可能无效地存储 Flink 流式计算过程中产生的状态数据,在默认状况下 Flink 应用的是 MemoryStateBackend,区别见下表。上面别离对每种状态后端的特点进行阐明。

状态后端的类别

MemoryStateBackend

MemoryStateBackend 将状态数据全副存储在 JVM 堆内存中,包含用户在应用 DataStream API 中创立的 Key/Value State,窗口中缓存的状态数据,以及触发器等数据。MemoryStateBackend 具备十分疾速和高效的特点,但也具备十分多的限度,最次要的就是内存的容量限度,一旦存储的状态数据过多就会导致系统内存溢出等问题,从而影响整个利用的失常运行。同时如果机器呈现问题,整个主机内存中的状态数据都会失落,进而无奈复原工作中的状态数据。因而从数据安全的角度倡议用户尽可能地防止在生产环境中应用 MemoryStateBackend。Flink 将 MemoryStateBackend 作为默认状态后端。

MemoryStateBackend 比拟适宜用于测试环境中,并用于本地调试和验证,不倡议在生产环境中应用。但如果利用状态数据量不是很大,例如应用了大量的非状态计算算子,也能够在生产环境中使 MemoryStateBackend.

FsStateBackend

FsStateBackend 是基于文件系统的一种状态后端,这里的文件系统能够是本地文件系统,也能够是 HDFS 分布式文件系统。创立 FsStateBackend 的构造函数如下:

FsStateBackend(Path checkpointDataUri, boolean asynchronousSnapshots)

其中 path 如果为本地门路,其格局为“file:///data/flink/checkpoints”,如果 path 为 HDFS 门路,其格局为“hdfs://nameservice/flink/checkpoints”。FsStateBackend 中第二个 Boolean 类型的参数指定是否以同步的形式进行状态数据记录,默认采纳异步的形式将状态数据同步到文件系统中,异步形式可能尽可能防止在 Checkpoint 的过程中影响流式计算工作。如果用户想采纳同步的形式进行状态数据的检查点数据,则将第二个参数指定为 True 即可。

相比于 MemoryStateBackend, FsStateBackend 更适宜工作状态十分大的状况,例如利用中含有工夫范畴十分长的窗口计算,或 Key/value State 状态数据量十分大的场景,这时零碎内存不足以撑持状态数据的存储。同时 FsStateBackend 最大的益处是绝对比较稳定,在 checkpoint 时,将状态长久化到像 HDFS 分布式文件系统中,能最大水平保障状态数据的安全性。

RocksDBStateBackend

与后面的状态后端不同,RocksDBStateBackend 须要独自引入相干的依赖包。RocksDB 是一个 key/value 的内存存储系统,相似于 HBase,是一种内存磁盘混合的 LSM DB。当写数据时会先写进 write buffer(相似于 HBase 的 memstore),而后在 flush 到磁盘文件,当读取数据时会当初 block cache(相似于 HBase 的 block cache),所以速度会很快。

RocksDBStateBackend 在性能上要比 FsStateBackend 高一些,次要是因为借助于 RocksDB 存储了最新热数据,而后通过异步的形式再同步到文件系统中,但 RocksDBStateBackend 和 MemoryStateBackend 相比性能就会较弱一些。

须要留神 RocksDB 不反对同步的 Checkpoint,构造方法中没有同步快照这个选项。不过 RocksDB 反对增量的 Checkpoint,也是目前惟一增量 Checkpoint 的 Backend,意味着并不需要把所有 sst 文件上传到 Checkpoint 目录,仅须要上传新生成的 sst 文件即可。它的 Checkpoint 存储在内部文件系统(本地或 HDFS),其容量限度只有单个 TaskManager 上 State 总量不超过它的内存 + 磁盘,单 Key 最大 2G,总大小不超过配置的文件系统容量即可。对于超大状态的作业,例如天级窗口聚合等场景下能够使会用该状态后端。

配置状态后端

Flink 默认应用的状态后端是 MemoryStateBackend,所以不须要显示配置。对于其余的状态后端,都须要进行显性配置。在 Flink 中蕴含了两种级别的 StateBackend 配置:一种是在程序中进行配置,该配置只对以后利用无效;另外一种是通过 flink-conf.yaml进行全局配置,一旦配置就会对整个 Flink 集群上的所有利用无效。

  • 利用级别配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));

如果应用 RocksDBStateBackend 则须要独自引入 rockdb 依赖库, 如下:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
    <version>1.10.0</version>
    <scope>provided</scope>
</dependency>

应用形式与 FsStateBackend 相似,如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:40010/flink/checkpoints"));
  • 集群级别配置

具体的配置项在 flink-conf.yaml 文件中,如下代码所示,参数 state.backend 指明 StateBackend 类型,state.checkpoints.dir 配置具体的状态存储门路,代码中应用 filesystem 作为 StateBackend,而后指定相应的 HDFS 文件门路作为 state 的 checkpoint 文件夹。

# 应用 filesystem 存储
state.backend: filesystem
# checkpoint 存储门路
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints

如果想用 RocksDBStateBackend 配置集群级别的状态后端,能够应用上面的配置:

# 操作 RocksDBStateBackend 的线程数量,默认值为 1
state.backend.rocksdb.checkpoint.transfer.thread.num: 1
# 指定 RocksDB 存储状态数据的本地文件门路
state.backend.rocksdb.localdir: /var/rockdb/checkpoints
# 用于指定定时器服务的工厂类实现类,默认为“HEAP”,也能够指定为“RocksDB”state.backend.rocksdb.timer-service.factory: HEAP

什么是 Checkpoint(检查点)

下面解说了 Flink 的状态以及状态后端,状态是存储在状态后端。为了保障 state 容错,Flink 提供了解决故障的措施,这种措施称之为 checkpoint(一致性检查点)。checkpoint 是 Flink 实现容错的外围性能,次要是周期性地触发 checkpoint,将 state 生成快照长久化到内部存储系统(比方 HDFS)。这样一来,如果 Flink 程序呈现故障,那么就能够从上一次 checkpoint 中进行状态复原,从而提供容错保障。另外,通过 checkpoint 机制,Flink 能够实现 Exactly-once 语义(Flink 外部的 Exactly-once, 对于端到端的 exactly_once,Flink 是通过两阶段提交协定实现的)。上面将会详细分析 Flink 的 checkpoint 机制。

检查点的生成

如上图,输出流是用户行为数据,包含购买 (buy) 和退出购物车 (cart) 两种,每种行为数据都有一个偏移量,统计每种行为的个数。

第一步:JobManager checkpoint coordinator 触发 checkpoint。

第二步:假如当生产到 [cart,3] 这条数据时,触发了 checkpoint。那么此时数据源会把生产的偏移量 3 写入长久化存储。

第三步:当写入完结后,source 会将 state handle(状态存储门路)反馈给 JobManager 的 checkpoint coordinator。

第四步:接着算子 count buy 与 count cart 也会进行同样的步骤

第五步:等所有的算子都实现了上述步骤之后,即当 Checkpoint coordinator 收集齐所有 task 的 state handle,就认为这一次的 Checkpoint 全局实现了,向长久化存储中再备份一个 Checkpoint meta 文件,那么整个 checkpoint 也就实现了,如果两头有一个不胜利,那么本次 checkpoin 就宣告失败。

检查点的复原

通过下面的剖析,或者你曾经对 Flink 的 checkpoint 有了初步的意识。那么接下来,咱们看一下是如何从检查点复原的。

  • 工作失败

  • 重启作业

  • 复原检查点

  • 持续解决数据

上述过程具体总结如下:

  • 第一步:重启作业
  • 第二步:从上一次检查点复原状态数据
  • 第三步:持续解决新的数据

Flink 外部 Exactly-Once 实现

Flink 提供了准确一次的解决语义,准确一次的解决语义能够了解为:数据可能会反复计算,然而后果状态只有一个。Flink 通过 Checkpoint 机制实现了准确一次的解决语义,Flink 在触发 Checkpoint 时会向 Source 端插入 checkpoint barrier,checkpoint barriers 是从 source 端插入的,并且会向上游算子进行传递。checkpoint barriers 携带一个 checkpoint ID,用于标识属于哪一个 checkpoint,checkpoint barriers 将流逻辑是哪个分为了两局部。对于双流的状况,通过 barrier 对齐的形式实现准确一次的解决语义。

对于什么是 checkpoint barrier,能够看一下 CheckpointBarrier 类的源码形容,如下:

/**
 * Checkpoint barriers 用来在数据流中实现 checkpoint 对齐的.
 * Checkpoint barrier 由 JobManager 的 checkpoint coordinator 插入到 Source 中,
 * Source 会把 barrier 播送发送到上游算子, 当一个算子接管到了其中一个输出流的 Checkpoint barrier 时,
 * 它就会晓得曾经解决完了本次 checkpoint 与上次 checkpoint 之间的数据.
 * 
 * 一旦某个算子接管到了所有输出流的 checkpoint barrier 时,* 意味着该算子的曾经解决完了截止到以后 checkpoint 的数据,* 能够触发 checkpoint,并将 barrier 向上游传递
 * 
 * 依据用户抉择的解决语义,在 checkpoint 实现之前会缓存后一次 checkpoint 的数据,* 直到本次 checkpoint 实现(exactly once)
 * 
 * checkpoint barrier 的 id 是严格枯燥递增的
 *
 */
    public class CheckpointBarrier extends RuntimeEvent {...}

能够看出 checkpoint barrier 次要性能是实现 checkpoint 对齐的,从而能够实现 Exactly-Once 解决语义。

上面将会对 checkpoint 过程进行合成,具体如下:

图 1,包含两个流,每个工作都会生产一条用户行为数据 (包含购买(buy) 和加购(cart)),数字代表该数据的偏移量,count buy 工作统计购买行为的个数,coun cart 统计加购行为的个数。

图 2,触发 checkpoint,JobManager 会向每个数据源发送一个新的 checkpoint 编号,以此来启动检查点生成流程。

  • 图 3,当 Source 工作收到音讯后,会进行收回数据,而后利用状态后端触发生成本地状态检查点,并把该 checkpoint barrier 以及 checkpoint id 播送至所有传出的数据流分区。状态后端会在 checkpoint 实现之后告诉工作,随后工作会向 Job Manager 发送确认音讯。在将 checkpoint barrier 收回之后,Source 工作恢复正常工作。

  • 图 4,Source 工作收回的 checkpoint barrier 会发送到与之相连的上游算子工作,当工作收到一个新的 checkpoint barrier 时,会持续期待其余输出分区的 checkpoint barrier 到来,这个过程称之为barrier 对齐,checkpoint barrier 到来之前会把到来的数据线缓存起来。

  • 图 5,工作收齐了全副输出分区的 checkpoint barrier 之后,会告诉状态后端开始生成 checkpoint,同时会把 checkpoint barrier 播送至上游算子。

  • 图 6,工作在收回 checkpoint barrier 之后,开始解决因 barrier 对齐产生的缓存数据,在缓存的数据处理完之后,就会持续解决输出流数据。

  • 图 7,最终 checkpoint barrier 会被传送到 sink 端,sink 工作接管到 checkpoint barrier 之后,会向其余算子工作一样,将本身的状态写入 checkpoint,之后向 Job Manager 发送确认音讯。Job Manager 接管到所有工作返回的确认音讯之后,就会将此次检查点标记为实现。

应用案例

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// checkpoint 的工夫距离,如果状态比拟大,能够适当调大该值
env.enableCheckpointing(1000);
// 配置解决语义,默认是 exactly-once
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 两个 checkpoint 之间的最小工夫距离,避免因 checkpoint 工夫过长,导致 checkpoint 积压
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoint 执行的下限工夫,如果超过该阈值,则会中断 checkpoint
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 最大并行执行的检查点数量,默认为 1,能够指定多个,从而同时登程多个 checkpoint,晋升效率
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 设定周期性内部检查点,将状态数据长久化到内部零碎中,// 应用该形式不会在工作失常进行的过程中清理掉检查点数据
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// allow job recovery fallback to checkpoint when there is a more recent savepoint
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

总结

本文首先从 Flink 的状态动手,通过 Spark 的 WordCount 和 Flink 的 Work Count 进行阐明什么是状态。接着对状态的分类以及状态的应用进行了具体阐明。而后对 Flink 提供的三种状态后端进行探讨,并给出了状态后端的应用阐明。最初,以图解加文字的模式具体解释了 Flink 的 checkpoint 机制,并给出了应用 Checkpoint 时的程序配置。

  • 关注公众号:大数据技术与数仓
  • 收费支付百 G 大数据资料

正文完
 0