Flink是一个分布式的流解决引擎,而流解决的其中一个特点就是7X24。那么,如何保障Flink作业的继续运行呢?Flink的外部会将利用状态(state)存储到本地内存或者嵌入式的kv数据库(RocksDB)中,因为采纳的是分布式架构,Flink须要对本地生成的状态进行长久化存储,以防止因利用或者节点机器故障等起因导致数据的失落,Flink是通过checkpoint(检查点)的形式将状态写入到近程的长久化存储,从而就能够实现不同语义的后果保障。通过本文,你能够理解到什么是Flink的状态,Flink的状态是怎么存储的,Flink可抉择的状态后端(statebackend)有哪些,什么是全局一致性检查点,Flink外部如何通过检查点实现Exactly Once的后果保障。另外,本文内容较长,倡议关注加珍藏。
<!-- more -->
什么是状态
引子
对于什么是状态,咱们先不做过多的剖析。首先看一个代码案例,其中案例1是Spark的WordCount代码,案例2是Flink的WorkCount代码。
- 案例1:Spark WC
object WordCount { def main(args:Array[String]){ val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(5)) val lines = ssc.socketTextStream("localhost", 9999) val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination()}}
输出:
C:\WINDOWS\system32>nc -lp 9999hello sparkhello spark
输入:
- 案例2:Flink WC
public class WordCount { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999); SingleOutputStreamOperator<Tuple2<String,Integer>> words = streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String,Integer>> out) throws Exception { String[] splits = value.split("\\s"); for (String word : splits) { out.collect(Tuple2.of(word, 1)); } } }); words.keyBy(0).sum(1).print(); env.execute("WC"); }}
输出:
C:\WINDOWS\system32>nc -lp 9999hello Flinkhello Flink
输入:
从下面的两个例子能够看出,在应用Spark进行词频统计时,以后的统计后果不受历史统计后果的影响,只计算接管的以后数据的后果,这个就能够了解为无状态的计算。再来看一下Flink的例子,能够看出当第二次词频统计时,把第一次的后果值也统计在了一起,即Flink把上一次的计算结果保留在了状态里,第二次计算的时候会先拿到上一次的后果状态,而后联合新到来的数据再进行计算,这就能够了解成有状态的计算,如下图所示。
状态的类别
Flink提供了两种根本类型的状态:别离是 Keyed State
和Operator State
。依据不同的状态治理形式,每种状态又有两种存在模式,别离为:managed(托管状态)
和raw(原生状态)
。具体如下表格所示。须要留神的是,因为Flink举荐应用managed state,所以下文次要探讨managed state,对于raw state,本文不会做过多的探讨。
managed state & raw state区别
Keyed State & Operator State
Keyed State
Keyed State只能由作用在KeyedStream下面的函数应用,该状态与某个key进行绑定,即每一个key对应一个state。Keyed State依照key进行保护和拜访的,Flink会为每一个Key都保护一个状态实例,该状态实例总是位于解决该key记录的算子工作上,因而同一个key的记录能够拜访到一样的状态。如下图所示,能够通过在一条流上应用keyBy()办法来生成一个KeyedStream。Flink提供了很多种keyed state,具体如下:
- ValueState<T>
用于保留类型为T的单个值。用户能够通过ValueState.value()来获取该状态值,通过ValueState.update()来更新该状态。应用ValueStateDescriptor
来获取状态句柄。
- ListState<T>
用于保留类型为T的元素列表,即key的状态值是一个列表。用户能够应用ListState.add()或者ListState.addAll()将新元素增加到列表中,通过ListState.get()拜访状态元素,该办法会返回一个可遍历所有元素的Iterable<T>对象,留神ListState不反对删除单个元素,然而用户能够应用update(List<T> values)来更新整个列表。应用 ListStateDescriptor
来获取状态句柄。
- ReducingState<T>
调用add()办法增加值时,会立刻返回一个应用ReduceFunction聚合后的值,用户能够应用ReducingState.get()来获取该状态值。应用 ReducingStateDescriptor
来获取状态句柄。
- AggregatingState<IN, OUT>
与ReducingState<T>相似,不同的是它应用的是AggregateFunction来聚合外部的值,AggregatingState.get()办法会计算最终的后果并将其返回。应用 AggregatingStateDescriptor
来获取状态句柄
- MapState<UK, UV>
用于保留一组key、value的映射,相似于java的Map汇合。用户能够通过get(UK key)办法获取key对应的状态,能够通过put(UK k,UV value)办法增加一个键值,能够通过remove(UK key)删除给定key的值,能够通过contains(UK key)判断是否存在对应的key。应用 MapStateDescriptor
来获取状态句柄。
- FoldingState<T, ACC>
在Flink 1.4的版本中标记过期,在将来的版本中会被移除,应用AggregatingState进行代替。
值得注意的是,下面的状态原语都反对通过State.clear()办法来进行革除状态。另外,上述的状态原语仅用于与状态进行交互,真正的状态是存储在状态后端(前面会介绍状态后端)的,通过该状态原语相当于持有了状态的句柄(handle)。
keyed State应用案例
上面给出一个MapState的应用案例,对于ValueState的应用状况能够参考官网,具体如下:
public class MapStateExample { //统计每个用户每种行为的个数 public static class UserBehaviorCnt extends RichFlatMapFunction<Tuple3<Long, String, String>, Tuple3<Long, String, Integer>> { //定义一个MapState句柄 private transient MapState<String, Integer> behaviorCntState; // 初始化状态 @Override public void open(Configuration parameters) throws Exception { super.open(parameters); MapStateDescriptor<String, Integer> userBehaviorMapStateDesc = new MapStateDescriptor<>( "userBehavior", // 状态描述符的名称 TypeInformation.of(new TypeHint<String>() {}), // MapState状态的key的数据类型 TypeInformation.of(new TypeHint<Integer>() {}) // MapState状态的value的数据类型 ); behaviorCntState = getRuntimeContext().getMapState(userBehaviorMapStateDesc); // 获取状态 } @Override public void flatMap(Tuple3<Long, String, String> value, Collector<Tuple3<Long, String, Integer>> out) throws Exception { Integer behaviorCnt = 1; // 如果以后状态包含该行为,则+1 if (behaviorCntState.contains(value.f1)) { behaviorCnt = behaviorCntState.get(value.f1) + 1; } // 更新状态 behaviorCntState.put(value.f1, behaviorCnt); out.collect(Tuple3.of(value.f0, value.f1, behaviorCnt)); } } public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); // 模仿数据源[userId,behavior,product] DataStreamSource<Tuple3<Long, String, String>> userBehaviors = env.fromElements( Tuple3.of(1L, "buy", "iphone"), Tuple3.of(1L, "cart", "huawei"), Tuple3.of(1L, "buy", "logi"), Tuple3.of(1L, "fav", "oppo"), Tuple3.of(2L, "buy", "huawei"), Tuple3.of(2L, "buy", "onemore"), Tuple3.of(2L, "fav", "iphone")); userBehaviors .keyBy(0) .flatMap(new UserBehaviorCnt()) .print(); env.execute("MapStateExample"); }}
后果输入:
状态的生命周期治理(TTL)
对于任何类型Keyed State都能够设定状态的生命周期(TTL),即状态的存活工夫,以确保可能在规定工夫内及时地清理状态数据。如果配置了状态的TTL,那么当状态过期时,存储的状态会被革除。状态生命周期性能能够通过StateTtlConfig配置,而后将StateTtlConfig配置传入StateDescriptor中的enableTimeToLive办法中即可。代码示例如下:
StateTtlConfig ttlConfig = StateTtlConfig // 指定TTL时长为10S .newBuilder(Time.seconds(10)) // 只对创立和写入操作无效 .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 不返回过期的数据 .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); // 初始化状态 @Override public void open(Configuration parameters) throws Exception { super.open(parameters); MapStateDescriptor<String, Integer> userBehaviorMapStateDesc = new MapStateDescriptor<>( "userBehavior", // 状态描述符的名称 TypeInformation.of(new TypeHint<String>() {}), // MapState状态的key的数据类型 TypeInformation.of(new TypeHint<Integer>() {}) // MapState状态的value的数据类型 ); // 设置stateTtlConfig userBehaviorMapStateDesc.enableTimeToLive(ttlConfig); behaviorCntState = getRuntimeContext().getMapState(userBehaviorMapStateDesc); // 获取状态 }
在StateTtlConfig创立时,newBuilder办法是必须要指定的,newBuilder中设定过期工夫的参数。对于其余参数都是可选的或应用默认值。其中setUpdateType办法中传入的类型有三种:
public enum UpdateType { //禁用TTL,永远不会过期 Disabled, // 创立和写入时更新TTL OnCreateAndWrite, // 与OnCreateAndWrite相似,然而在读操作时也会更新TTL OnReadAndWrite }
值得注意的是,过期的状态数据依据UpdateType参数进行配置,只有被写入或者读取的工夫才会更新TTL,也就是说如果某个状态指标始终不被应用或者更新,则永远不会触发对该状态数据的清理操作,这种状况可能会导致系统中的状态数据越来越大。目前用户能够应用StateTtlConfig.cleanupFullSnapshot设定当触发State Snapshot的时候清理状态数据,然而改配置不适宜用于RocksDB做增量Checkpointing的操作。
下面的StateTtlConfig创立时,能够指定setStateVisibility,用于状态的可见性配置,依据过期数据是否被清理来确定是否返回状态数据。
/** * 是否返回过期的数据 */ public enum StateVisibility { //如果数据没有被清理,就能够返回 ReturnExpiredIfNotCleanedUp, //永远不返回过期的数据,默认值 NeverReturnExpired }
Operator State
Operator State的作用于是某个算子工作,这意味着所有在同一个并行任务之内的记录都能拜访到雷同的状态 。算子状态不能通过其余工作拜访,无论该工作是雷同的算子。如下图所示。
Operator State是一种non-keyed state,与并行的操作算子实例相关联,例如在Kafka Connector中,每个Kafka生产端算子实例都对应到Kafka的一个分区中,保护Topic分区和Offsets偏移量作为算子的Operator State。在Flink中能够实现ListCheckpointed<T extends Serializable>接口或者CheckpointedFunction 接口来实现一个Operator State。
首先,咱们先看一下这两个接口的具体实现,而后再给出这两种接口的具体应用案例。先看一下ListCheckpointed接口的源码,如下:
public interface ListCheckpointed<T extends Serializable> { /** * 获取某个算子实例的以后状态,该状态包含该算子实例之前被调用时的所有后果 * 以列表的模式返回一个函数状态的快照 * Flink触发生成检查点时调用该办法 * @param checkpointId checkpoint的ID,是一个惟一的、枯燥递增的值 * @param timestamp Job Manager触发checkpoint时的工夫戳 * @return 返回一个operator state list,如果为null时,返回空list * @throws Exception */ List<T> snapshotState(long checkpointId, long timestamp) throws Exception; /** * 初始化函数状态时调用,可能是在作业启动时或者故障复原时 * 依据提供的列表复原函数状态 * 留神:当实现该办法时,须要在RichFunction#open()办法之前调用该办法 * @param state 被复原算子实例的state列表 ,可能为空 * @throws Exception */ void restoreState(List<T> state) throws Exception;}
应用Operator ListState时,在进行扩缩容时,重散布的策略(状态复原的模式)如下图所示:
下面的重散布策略为Even-split Redistribution,即每个算子实例中含有局部状态元素的List列表,整个状态数据是所有List列表的合集。当触发restore/redistribution动作时,通过将状态数据平均分配成与算子并行度雷同数量的List列表,每个task实例中有一个List,其能够为空或者含有多个元素。
咱们再来看一下CheckpointedFunction接口,源码如下:
public interface CheckpointedFunction { /** * 会在生成检查点之前调用 * 该办法的目标是确保检查点开始之前所有状态对象都曾经更新结束 * @param context 应用FunctionSnapshotContext作为参数 * 从FunctionSnapshotContext能够获取checkpoint的元数据信息, * 比方checkpoint编号,JobManager在初始化checkpoint时的工夫戳 * @throws Exception */ void snapshotState(FunctionSnapshotContext context) throws Exception; /** * 在创立checkpointedFunction的并行实例时被调用, * 在利用启动或者故障重启时触发该办法的调用 * @param context 传入FunctionInitializationContext对象, * 能够应用该对象拜访OperatorStateStore和 KeyedStateStore对象, * 这两个对象能够获取状态的句柄,即通过Flink runtime来注册函数状态并返回state对象 * 比方:ValueState、ListState等 * @throws Exception */ void initializeState(FunctionInitializationContext context) throws Exception;}
CheckpointedFunction接口是用于指定有状态函数的最底层的接口,该接口提供了用于注册和保护keyed state 与operator state的hook(即能够同时应用keyed state 和operator state),另外也是惟一反对应用list union state。对于Union List State,应用的是Flink为Operator state提供的另一种重散布的策略:Union Redistribution,即每个算子实例中含有所有状态元素的List列表,当触发restore/redistribution动作时,每个算子都可能获取到残缺的状态元素列表。具体如下图所示:
ListCheckpointed
ListCheckpointed接口和CheckpointedFunction接口相比在灵活性上绝对弱一些,只能反对List类型的状态,并且在数据恢复的时候仅反对even-redistribution策略。该接口不像Flink提供的Keyed State(比方Value State、ListState)那样间接在状态后端(state backend)注册,须要将operator state实现为成员变量,而后通过接口提供的回调函数与状态后端进行交互。应用代码案例如下:
public class ListCheckpointedExample { private static class UserBehaviorCnt extends RichFlatMapFunction<Tuple3<Long, String, String>, Tuple2<String, Long>> implements ListCheckpointed<Long> { private Long userBuyBehaviorCnt = 0L; @Override public void flatMap(Tuple3<Long, String, String> value, Collector<Tuple2<String, Long>> out) throws Exception { if(value.f1.equals("buy")){ userBuyBehaviorCnt ++; out.collect(Tuple2.of("buy",userBuyBehaviorCnt)); } } @Override public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception { //返回单个元素的List汇合,该汇合元素是用户购买行为的数量 return Collections.singletonList(userBuyBehaviorCnt); } @Override public void restoreState(List<Long> state) throws Exception { // 在进行扩缩容之后,进行状态复原,须要把其余subtask的状态加在一起 for (Long cnt : state) { userBuyBehaviorCnt += 1; } } } public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); // 模仿数据源[userId,behavior,product] DataStreamSource<Tuple3<Long, String, String>> userBehaviors = env.fromElements( Tuple3.of(1L, "buy", "iphone"), Tuple3.of(1L, "cart", "huawei"), Tuple3.of(1L, "buy", "logi"), Tuple3.of(1L, "fav", "oppo"), Tuple3.of(2L, "buy", "huawei"), Tuple3.of(2L, "buy", "onemore"), Tuple3.of(2L, "fav", "iphone")); userBehaviors .flatMap(new UserBehaviorCnt()) .print(); env.execute("ListCheckpointedExample"); }}
CheckpointedFunction
CheckpointedFunction接口提供了更加丰盛的操作,比方反对Union list state,能够拜访keyedState,对于重散布策略,如果应用Even-split Redistribution策略,则通过context. getListState(descriptor)获取Operator State;如果应用UnionRedistribution策略,则通过context. getUnionList State(descriptor)来获取。应用案例如下:
public class CheckpointFunctionExample { private static class UserBehaviorCnt implements CheckpointedFunction, FlatMapFunction<Tuple3<Long, String, String>, Tuple3<Long, Long, Long>> { // 统计每个operator实例的用户行为数量的本地变量 private Long opUserBehaviorCnt = 0L; // 每个key的state,存储key对应的相干状态 private ValueState<Long> keyedCntState; // 定义operator state,存储算子的状态 private ListState<Long> opCntState; @Override public void flatMap(Tuple3<Long, String, String> value, Collector<Tuple3<Long, Long, Long>> out) throws Exception { if (value.f1.equals("buy")) { // 更新算子状态本地变量值 opUserBehaviorCnt += 1; Long keyedCount = keyedCntState.value(); // 更新keyedstate的状态 ,判断状态是否为null,否则空指针异样 keyedCntState.update(keyedCount == null ? 1L : keyedCount + 1 ); // 后果输入 out.collect(Tuple3.of(value.f0, keyedCntState.value(), opUserBehaviorCnt)); } } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { // 应用opUserBehaviorCnt本地变量更新operator state opCntState.clear(); opCntState.add(opUserBehaviorCnt); } @Override public void initializeState(FunctionInitializationContext context) throws Exception { // 通过KeyedStateStore,定义keyedState的StateDescriptor描述符 ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("keyedCnt", TypeInformation.of(new TypeHint<Long>() { })); // 通过OperatorStateStore,定义OperatorState的StateDescriptor描述符 ListStateDescriptor opStateDescriptor = new ListStateDescriptor("opCnt", TypeInformation.of(new TypeHint<Long>() { })); // 初始化keyed state状态值 keyedCntState = context.getKeyedStateStore().getState(valueStateDescriptor); // 初始化operator state状态 opCntState = context.getOperatorStateStore().getListState(opStateDescriptor); // 初始化本地变量operator state for (Long state : opCntState.get()) { opUserBehaviorCnt += state; } } } public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); // 模仿数据源[userId,behavior,product] DataStreamSource<Tuple3<Long, String, String>> userBehaviors = env.fromElements( Tuple3.of(1L, "buy", "iphone"), Tuple3.of(1L, "cart", "huawei"), Tuple3.of(1L, "buy", "logi"), Tuple3.of(1L, "fav", "oppo"), Tuple3.of(2L, "buy", "huawei"), Tuple3.of(2L, "buy", "onemore"), Tuple3.of(2L, "fav", "iphone")); userBehaviors .keyBy(0) .flatMap(new UserBehaviorCnt()) .print(); env.execute("CheckpointFunctionExample"); }}
什么是状态后端
下面应用的状态都须要存储到状态后端(StateBackend),而后在checkpoint触发时,将状态长久化到内部存储系统。Flink提供了三种类型的状态后端,别离是基于内存的状态后端(MemoryStateBackend、基于文件系统的状态后端(FsStateBackend)以及基于RockDB作为存储介质的RocksDB StateBackend。这三种类型的StateBackend都可能无效地存储Flink流式计算过程中产生的状态数据,在默认状况下Flink应用的是MemoryStateBackend,区别见下表。上面别离对每种状态后端的特点进行阐明。
状态后端的类别
MemoryStateBackend
MemoryStateBackend将状态数据全副存储在JVM堆内存中,包含用户在应用DataStream API中创立的Key/Value State,窗口中缓存的状态数据,以及触发器等数据。MemoryStateBackend具备十分疾速和高效的特点,但也具备十分多的限度,最次要的就是内存的容量限度,一旦存储的状态数据过多就会导致系统内存溢出等问题,从而影响整个利用的失常运行。同时如果机器呈现问题,整个主机内存中的状态数据都会失落,进而无奈复原工作中的状态数据。因而从数据安全的角度倡议用户尽可能地防止在生产环境中应用MemoryStateBackend。Flink将MemoryStateBackend作为默认状态后端。
MemoryStateBackend比拟适宜用于测试环境中,并用于本地调试和验证,不倡议在生产环境中应用。但如果利用状态数据量不是很大,例如应用了大量的非状态计算算子,也能够在生产环境中使MemoryStateBackend.
FsStateBackend
FsStateBackend是基于文件系统的一种状态后端,这里的文件系统能够是本地文件系统,也能够是HDFS分布式文件系统。创立FsStateBackend的构造函数如下:
FsStateBackend(Path checkpointDataUri, boolean asynchronousSnapshots)
其中path如果为本地门路,其格局为“file:///data/flink/checkpoints”,如果path为HDFS门路,其格局为“hdfs://nameservice/flink/checkpoints”。FsStateBackend中第二个Boolean类型的参数指定是否以同步的形式进行状态数据记录,默认采纳异步的形式将状态数据同步到文件系统中,异步形式可能尽可能防止在Checkpoint的过程中影响流式计算工作。如果用户想采纳同步的形式进行状态数据的检查点数据,则将第二个参数指定为True即可。
相比于MemoryStateBackend, FsStateBackend更适宜工作状态十分大的状况,例如利用中含有工夫范畴十分长的窗口计算,或Key/value State状态数据量十分大的场景,这时零碎内存不足以撑持状态数据的存储。同时FsStateBackend最大的益处是绝对比较稳定,在checkpoint时,将状态长久化到像HDFS分布式文件系统中,能最大水平保障状态数据的安全性。
RocksDBStateBackend
与后面的状态后端不同,RocksDBStateBackend须要独自引入相干的依赖包。RocksDB 是一个 key/value 的内存存储系统,相似于HBase,是一种内存磁盘混合的 LSM DB。当写数据时会先写进write buffer(相似于HBase的memstore),而后在flush到磁盘文件,当读取数据时会当初block cache(相似于HBase的block cache),所以速度会很快。
RocksDBStateBackend在性能上要比FsStateBackend高一些,次要是因为借助于RocksDB存储了最新热数据,而后通过异步的形式再同步到文件系统中,但RocksDBStateBackend和MemoryStateBackend相比性能就会较弱一些。
须要留神 RocksDB 不反对同步的 Checkpoint,构造方法中没有同步快照这个选项。不过 RocksDB 反对增量的 Checkpoint,也是目前惟一增量 Checkpoint 的 Backend,意味着并不需要把所有 sst 文件上传到 Checkpoint 目录,仅须要上传新生成的 sst 文件即可。它的 Checkpoint 存储在内部文件系统(本地或HDFS),其容量限度只有单个 TaskManager 上 State 总量不超过它的内存+磁盘,单 Key最大 2G,总大小不超过配置的文件系统容量即可。对于超大状态的作业,例如天级窗口聚合等场景下能够使会用该状态后端。
配置状态后端
Flink默认应用的状态后端是MemoryStateBackend,所以不须要显示配置。对于其余的状态后端,都须要进行显性配置。在Flink中蕴含了两种级别的StateBackend配置:一种是在程序中进行配置,该配置只对以后利用无效;另外一种是通过 flink-conf.yaml
进行全局配置,一旦配置就会对整个Flink集群上的所有利用无效。
- 利用级别配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
如果应用RocksDBStateBackend则须要独自引入rockdb依赖库,如下:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.11</artifactId> <version>1.10.0</version> <scope>provided</scope></dependency>
应用形式与FsStateBackend相似,如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:40010/flink/checkpoints"));
- 集群级别配置
具体的配置项在flink-conf.yaml文件中,如下代码所示,参数state.backend指明StateBackend类型,state.checkpoints.dir配置具体的状态存储门路,代码中应用filesystem作为StateBackend,而后指定相应的HDFS文件门路作为state的checkpoint文件夹。
# 应用filesystem存储state.backend: filesystem# checkpoint存储门路state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
如果想用RocksDBStateBackend配置集群级别的状态后端,能够应用上面的配置:
# 操作RocksDBStateBackend的线程数量,默认值为1state.backend.rocksdb.checkpoint.transfer.thread.num: 1# 指定RocksDB存储状态数据的本地文件门路state.backend.rocksdb.localdir: /var/rockdb/checkpoints# 用于指定定时器服务的工厂类实现类,默认为“HEAP”,也能够指定为“RocksDB”state.backend.rocksdb.timer-service.factory: HEAP
什么是Checkpoint(检查点)
下面解说了Flink的状态以及状态后端,状态是存储在状态后端。为了保障state容错,Flink提供了解决故障的措施,这种措施称之为checkpoint(一致性检查点)。checkpoint是Flink实现容错的外围性能,次要是周期性地触发checkpoint,将state生成快照长久化到内部存储系统(比方HDFS)。这样一来,如果Flink程序呈现故障,那么就能够从上一次checkpoint中进行状态复原,从而提供容错保障。另外,通过checkpoint机制,Flink能够实现Exactly-once语义(Flink外部的Exactly-once,对于端到端的exactly_once,Flink是通过两阶段提交协定实现的)。上面将会详细分析Flink的checkpoint机制。
检查点的生成
如上图,输出流是用户行为数据,包含购买(buy)和退出购物车(cart)两种,每种行为数据都有一个偏移量,统计每种行为的个数。
第一步:JobManager checkpoint coordinator 触发checkpoint。
第二步:假如当生产到[cart,3]这条数据时,触发了checkpoint。那么此时数据源会把生产的偏移量3写入长久化存储。
第三步:当写入完结后,source会将state handle(状态存储门路)反馈给JobManager的checkpoint coordinator。
第四步:接着算子count buy与count cart也会进行同样的步骤
第五步:等所有的算子都实现了上述步骤之后,即当 Checkpoint coordinator 收集齐所有 task 的 state handle,就认为这一次的 Checkpoint 全局实现了,向长久化存储中再备份一个 Checkpoint meta 文件,那么整个checkpoint也就实现了,如果两头有一个不胜利,那么本次checkpoin就宣告失败。
检查点的复原
通过下面的剖析,或者你曾经对Flink的checkpoint有了初步的意识。那么接下来,咱们看一下是如何从检查点复原的。
- 工作失败
- 重启作业
- 复原检查点
- 持续解决数据
上述过程具体总结如下:
- 第一步:重启作业
- 第二步:从上一次检查点复原状态数据
- 第三步:持续解决新的数据
Flink外部Exactly-Once实现
Flink提供了准确一次的解决语义,准确一次的解决语义能够了解为:数据可能会反复计算,然而后果状态只有一个。Flink通过Checkpoint机制实现了准确一次的解决语义,Flink在触发Checkpoint时会向Source端插入checkpoint barrier,checkpoint barriers是从source端插入的,并且会向上游算子进行传递。checkpoint barriers携带一个checkpoint ID,用于标识属于哪一个checkpoint,checkpoint barriers将流逻辑是哪个分为了两局部。对于双流的状况,通过barrier对齐的形式实现准确一次的解决语义。
对于什么是checkpoint barrier,能够看一下CheckpointBarrier类的源码形容,如下:
/** * Checkpoint barriers用来在数据流中实现checkpoint对齐的. * Checkpoint barrier由JobManager的checkpoint coordinator插入到Source中, * Source会把barrier播送发送到上游算子,当一个算子接管到了其中一个输出流的Checkpoint barrier时, * 它就会晓得曾经解决完了本次checkpoint与上次checkpoint之间的数据. * * 一旦某个算子接管到了所有输出流的checkpoint barrier时, * 意味着该算子的曾经解决完了截止到以后checkpoint的数据, * 能够触发checkpoint,并将barrier向上游传递 * * 依据用户抉择的解决语义,在checkpoint实现之前会缓存后一次checkpoint的数据, * 直到本次checkpoint实现(exactly once) * * checkpoint barrier的id是严格枯燥递增的 * */ public class CheckpointBarrier extends RuntimeEvent {...}
能够看出checkpoint barrier次要性能是实现checkpoint对齐的,从而能够实现Exactly-Once解决语义。
上面将会对checkpoint过程进行合成,具体如下:
图1,包含两个流,每个工作都会生产一条用户行为数据(包含购买(buy)和加购(cart)),数字代表该数据的偏移量,count buy工作统计购买行为的个数,coun cart统计加购行为的个数。
图2,触发checkpoint,JobManager会向每个数据源发送一个新的checkpoint编号,以此来启动检查点生成流程。
- 图3,当Source工作收到音讯后,会进行收回数据,而后利用状态后端触发生成本地状态检查点,并把该checkpoint barrier以及checkpoint id播送至所有传出的数据流分区。状态后端会在checkpoint实现之后告诉工作,随后工作会向Job Manager发送确认音讯。在将checkpoint barrier收回之后,Source工作恢复正常工作。
- 图4,Source工作收回的checkpoint barrier会发送到与之相连的上游算子工作,当工作收到一个新的checkpoint barrier时,会持续期待其余输出分区的checkpoint barrier到来,这个过程称之为barrier 对齐,checkpoint barrier到来之前会把到来的数据线缓存起来。
- 图5,工作收齐了全副输出分区的checkpoint barrier之后,会告诉状态后端开始生成checkpoint,同时会把checkpoint barrier播送至上游算子。
- 图6,工作在收回checkpoint barrier之后,开始解决因barrier对齐产生的缓存数据,在缓存的数据处理完之后,就会持续解决输出流数据。
- 图7,最终checkpoint barrier会被传送到sink端,sink工作接管到checkpoint barrier之后,会向其余算子工作一样,将本身的状态写入checkpoint,之后向Job Manager发送确认音讯。Job Manager接管到所有工作返回的确认音讯之后,就会将此次检查点标记为实现。
应用案例
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// checkpoint的工夫距离,如果状态比拟大,能够适当调大该值env.enableCheckpointing(1000);// 配置解决语义,默认是exactly-onceenv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 两个checkpoint之间的最小工夫距离,避免因checkpoint工夫过长,导致checkpoint积压env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);// checkpoint执行的下限工夫,如果超过该阈值,则会中断checkpointenv.getCheckpointConfig().setCheckpointTimeout(60000);// 最大并行执行的检查点数量,默认为1,能够指定多个,从而同时登程多个checkpoint,晋升效率env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 设定周期性内部检查点,将状态数据长久化到内部零碎中,// 应用该形式不会在工作失常进行的过程中清理掉检查点数据env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// allow job recovery fallback to checkpoint when there is a more recent savepointenv.getCheckpointConfig().setPreferCheckpointForRecovery(true);
总结
本文首先从Flink的状态动手,通过Spark的WordCount和Flink的Work Count进行阐明什么是状态。接着对状态的分类以及状态的应用进行了具体阐明。而后对Flink提供的三种状态后端进行探讨,并给出了状态后端的应用阐明。最初,以图解加文字的模式具体解释了Flink的checkpoint机制,并给出了应用Checkpoint时的程序配置。
- 关注公众号:大数据技术与数仓
- 收费支付百G大数据资料