Checkpoint介绍

checkpoint机制是Flink可靠性的基石,能够保障Flink集群在某个算子因为某些起因(如 异样退出)呈现故障时,可能将整个利用流图的状态复原到故障之前的某一状态,保 证利用流图状态的一致性。Flink的checkpoint机制原理来自“Chandy-Lamport algorithm”算法。

每个须要checkpoint的利用在启动时,Flink的JobManager为其创立一个 CheckpointCoordinator(检查点协调器),CheckpointCoordinator全权负责本利用的快照制作。

1) CheckpointCoordinator(检查点协调器) 周期性的向该流利用的所有source算子发送 barrier(屏障)。

2) 当某个source算子收到一个barrier时,便暂停数据处理过程,而后将本人的以后状态制作成快照,并保留到指定的长久化存储中,最初向CheckpointCoordinator报告本人快照制作状况,同时向本身所有上游算子播送该barrier,复原数据处理

3) 上游算子收到barrier之后,会暂停本人的数据处理过程,而后将本身的相干状态制作成快照,并保留到指定的长久化存储中,最初向CheckpointCoordinator报告本身快照状况,同时向本身所有上游算子播送该barrier,复原数据处理。

4) 每个算子依照步骤3一直制作快照并向上游播送,直到最初barrier传递到sink算子,快照制作实现。

5) 当CheckpointCoordinator收到所有算子的报告之后,认为该周期的快照制作胜利; 否则,如果在规定的工夫内没有收到所有算子的报告,则认为本周期快照制作失败。

如果一个算子有两个输出源,则临时阻塞先收到barrier的输出源,等到第二个输出源相 同编号的barrier到来时,再制作本身快照并向上游播送该barrier。具体如下图所示:

1) 假如算子C有A和B两个输出源

2) 在第i个快照周期中,因为某些起因(如解决时延、网络时延等)输出源A收回的 barrier 先到来,这时算子C临时将输出源A的输出通道阻塞,仅收输出源B的数据。

3) 当输出源B收回的barrier到来时,算子C制作本身快照并向 CheckpointCoordinator 报告本身的快照制作状况,而后将两个barrier合并为一个,向上游所有的算子播送。

4) 当因为某些起因呈现故障时,CheckpointCoordinator告诉流图上所有算子对立复原到某个周期的checkpoint状态,而后复原数据流解决。分布式checkpoint机制保障了数据仅被解决一次(Exactly Once)。

长久化存储

MemStateBackend

该长久化存储次要将快照数据保留到JobManager的内存中,仅适宜作为测试以及快照的数据量十分小时应用,并不举荐用作大规模商业部署。

MemoryStateBackend 的局限性

默认状况下,每个状态的大小限度为 5 MB。能够在MemoryStateBackend的构造函数中减少此值。

无论配置的最大状态大小如何,状态都不能大于akka帧的大小(请参阅配置)。

聚合状态必须适宜 JobManager 内存。

倡议MemoryStateBackend 用于

本地开发和调试。

状态很少的作业,例如仅蕴含一次记录性能的作业(Map,FlatMap,Filter,...),kafka的消费者须要很少的状态。

FsStateBackend

该长久化存储次要将快照数据保留到文件系统中,目前反对的文件系统次要是 HDFS和本地文件。如果应用HDFS,则初始化FsStateBackend时,须要传入以 “hdfs://”结尾的门路(即: new FsStateBackend("hdfs:///hacluster/checkpoint")), 如果应用本地文件,则须要传入以“file://”结尾的门路(即:new FsStateBackend("file:///Data"))。在分布式状况下,不举荐应用本地文件。如果某 个算子在节点A上失败,在节点B上复原,应用本地文件时,在B上无奈读取节点 A上的数据,导致状态复原失败。

倡议FsStateBackend:

具备大状态,长窗口,大键 / 值状态的作业。

所有高可用性设置。

RocksDBStateBackend

RocksDBStatBackend介于本地文件和HDFS之间,平时应用RocksDB的性能,将数 据长久化到本地文件中,当制作快照时,将本地数据制作成快照,并长久化到 FsStateBackend中(FsStateBackend不用用户特地指明,只需在初始化时传入HDFS 或本地门路即可,如new RocksDBStateBackend("hdfs:///hacluster/checkpoint")或new RocksDBStateBackend("file:///Data"))。

如果用户应用自定义窗口(window),不举荐用户应用RocksDBStateBackend。在自定义窗口中,状态以ListState的模式保留在StatBackend中,如果一个key值中有多个value值,则RocksDB读取该种ListState十分迟缓,影响性能。用户能够依据利用的具体情况抉择FsStateBackend+HDFS或RocksStateBackend+HDFS。

语法

val env = StreamExecutionEnvironment.getExecutionEnvironment()// start a checkpoint every 1000 msenv.enableCheckpointing(1000)// advanced options:// 设置checkpoint的执行模式,最多执行一次或者至多执行一次env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)// 设置checkpoint的超时工夫env.getCheckpointConfig.setCheckpointTimeout(60000)// 如果在只做快照过程中呈现谬误,是否让整体工作失败:true是  false不是env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)//设置同一时间有多少 个checkpoint能够同时执行 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

批改State Backend的两种形式

第一种:单任务调整

批改当前任务代码

env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));
或者new MemoryStateBackend()
或者new RocksDBStateBackend(filebackend, true);【须要增加第三方依赖】

第二种:全局调整

批改flink-conf.yaml

state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints

留神:state.backend的值能够是上面几种:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)

Checkpoint的高级选项

默认checkpoint性能是disabled的,想要应用的时候须要先启用checkpoint开启之后,默认的checkPointMode是Exactly-once

//配置一秒钟开启一个checkpointenv.enableCheckpointing(1000)//指定checkpoint的执行模式//两种可选://CheckpointingMode.EXACTLY_ONCE:默认值//CheckpointingMode.AT_LEAST_ONCEenv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)个别状况下抉择CheckpointingMode.EXACTLY_ONCE,除非场景要求极低的提早(几毫秒)留神:如果须要保障EXACTLY_ONCE,source和sink要求必须同时保障EXACTLY_ONCE
//如果程序被cancle,保留以前做的checkpointenv.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)默认状况下,检查点不被保留,仅用于在故障中复原作业,能够启用内部长久化检查点,同时指定保留策略:ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:在作业勾销时保留检查点,留神,在这种状况下,您必须在勾销后手动清理检查点状态ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:当作业在被cancel时,删除检查点,检查点仅在作业失败时可用
//设置checkpoint超时工夫env.getCheckpointConfig.setCheckpointTimeout(60000)//Checkpointing的超时工夫,超时工夫内没有实现则被终止
//Checkpointing最小工夫距离,用于指定上一个checkpoint实现之后//最小等多久能够触发另一个checkpoint,当指定这个参数时,maxConcurrentCheckpoints的值为1env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
//设置同一个工夫是否能够有多个checkpoint执行env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)指定运行中的checkpoint最多能够有多少个env.getCheckpointConfig.setFailOnCheckpointingErrors(true)用于指定在checkpoint产生异样的时候,是否应该fail该task,默认是true,如果设置为false,则task会回绝checkpoint而后持续运行

Flink的重启策略

Flink反对不同的重启策略,这些重启策略管制着job失败后如何重启。集群能够通过默认的重启策略来重启,这个默认的重启策略通常在未指定重启策略的状况下应用,而如果Job提交的时候指定了重启策略,这个重启策略就会笼罩掉集群的默认重启策略。

概览

默认的重启策略是通过Flink的 flink-conf.yaml 来指定的,这个配置参数 restart-strategy 定义了哪种策略会被采纳。如果checkpoint未启动,就会采纳 no restart 策略,如果启动了checkpoint机制,然而未指定重启策略的话,就会采纳 fixed-delay 策略,重试 Integer.MAX_VALUE 次。请参考上面的可用重启策略来理解哪些值是反对的。

每个重启策略都有本人的参数来管制它的行为,这些值也能够在配置文件中设置,每个重启策略的形容都蕴含着各自的配置值信息。

除了定义一个默认的重启策略之外,你还能够为每一个Job指定它本人的重启策略,这个重启策略能够在 ExecutionEnvironment 中调用 setRestartStrategy() 办法来程序化地调用,留神这种形式同样实用于 StreamExecutionEnvironment

上面的例子展现了如何为Job设置一个固定提早重启策略,一旦有失败,零碎就会尝试每10秒重启一次,重启3次。

val env = ExecutionEnvironment.getExecutionEnvironment()env.setRestartStrategy(RestartStrategies.fixedDelayRestart(  3, // 重启次数  Time.of(10, TimeUnit.SECONDS) // 延迟时间距离))

固定提早重启策略(Fixed Delay Restart Strategy)

固定提早重启策略会尝试一个给定的次数来重启Job,如果超过了最大的重启次数,Job最终将失败。在间断的两次重启尝试之间,重启策略会期待一个固定的工夫。

重启策略能够配置flink-conf.yaml的上面配置参数来启用,作为默认的重启策略:

restart-strategy: fixed-delay

例子:

restart-strategy.fixed-delay.attempts: 3restart-strategy.fixed-delay.delay: 10 s

固定提早重启也能够在程序中设置:

val env = ExecutionEnvironment.getExecutionEnvironment()env.setRestartStrategy(RestartStrategies.fixedDelayRestart(  3, // 重启次数  Time.of(10, TimeUnit.SECONDS) // 重启工夫距离))

失败率重启策略

失败率重启策略在Job失败后会重启,然而超过失败率后,Job会最终被认定失败。在两个间断的重启尝试之间,重启策略会期待一个固定的工夫。

失败率重启策略能够在flink-conf.yaml中设置上面的配置参数来启用:

restart-strategy:failure-rate

例子:

restart-strategy.failure-rate.max-failures-per-interval: 3restart-strategy.failure-rate.failure-rate-interval: 5 minrestart-strategy.failure-rate.delay: 10 s

失败率重启策略也能够在程序中设置:

val env = ExecutionEnvironment.getExecutionEnvironment()env.setRestartStrategy(RestartStrategies.failureRateRestart(  3, // 每个测量工夫距离最大失败次数  Time.of(5, TimeUnit.MINUTES), //失败率测量的工夫距离  Time.of(10, TimeUnit.SECONDS) // 两次间断重启尝试的工夫距离))

无重启策略

Job间接失败,不会尝试进行重启

restart-strategy: none

无重启策略也能够在程序中设置

val env = ExecutionEnvironment.getExecutionEnvironment()env.setRestartStrategy(RestartStrategies.noRestart())

搜寻公众号:五分钟学大数据,发送 秘籍,即可获取大数据学习秘籍大礼包,深刻钻研大数据技术!