Checkpoint 介绍
checkpoint 机制是 Flink 可靠性的基石,能够保障 Flink 集群在某个算子因为某些起因 (如 异样退出) 呈现故障时,可能将整个利用流图的状态复原到故障之前的某一状态,保 证利用流图状态的一致性。Flink 的 checkpoint 机制原理来自“Chandy-Lamport algorithm”算法。
每个须要 checkpoint 的利用在启动时,Flink 的 JobManager 为其创立一个 CheckpointCoordinator(检查点协调器),CheckpointCoordinator 全权负责本利用的快照制作。
1) CheckpointCoordinator(检查点协调器) 周期性的向该流利用的所有 source 算子发送 barrier(屏障)。
2) 当某个 source 算子收到一个 barrier 时,便暂停数据处理过程,而后将本人的以后状态制作成快照,并保留到指定的长久化存储中,最初向 CheckpointCoordinator 报告本人快照制作状况,同时向本身所有上游算子播送该 barrier,复原数据处理
3) 上游算子收到 barrier 之后,会暂停本人的数据处理过程,而后将本身的相干状态制作成快照,并保留到指定的长久化存储中,最初向 CheckpointCoordinator 报告本身快照状况,同时向本身所有上游算子播送该 barrier,复原数据处理。
4) 每个算子依照步骤 3 一直制作快照并向上游播送,直到最初 barrier 传递到 sink 算子,快照制作实现。
5) 当 CheckpointCoordinator 收到所有算子的报告之后,认为该周期的快照制作胜利; 否则,如果在规定的工夫内没有收到所有算子的报告,则认为本周期快照制作失败。
如果一个算子有两个输出源,则临时阻塞先收到 barrier 的输出源,等到第二个输出源相 同编号的 barrier 到来时,再制作本身快照并向上游播送该 barrier。具体如下图所示:
1) 假如算子 C 有 A 和 B 两个输出源
2) 在第 i 个快照周期中,因为某些起因 (如解决时延、网络时延等) 输出源 A 收回的 barrier 先到来,这时算子 C 临时将输出源 A 的输出通道阻塞,仅收输出源 B 的数据。
3) 当输出源 B 收回的 barrier 到来时,算子 C 制作本身快照并向 CheckpointCoordinator 报告本身的快照制作状况,而后将两个 barrier 合并为一个,向上游所有的算子播送。
4) 当因为某些起因呈现故障时,CheckpointCoordinator 告诉流图上所有算子对立复原到某个周期的 checkpoint 状态,而后复原数据流解决。分布式 checkpoint 机制保障了数据仅被解决一次(Exactly Once)。
长久化存储
MemStateBackend
该长久化存储次要将快照数据保留到 JobManager 的内存中,仅适宜作为测试以及快照的数据量十分小时应用,并不举荐用作大规模商业部署。
MemoryStateBackend 的局限性:
默认状况下,每个状态的大小限度为 5 MB。能够在 MemoryStateBackend 的构造函数中减少此值。
无论配置的最大状态大小如何,状态都不能大于 akka 帧的大小(请参阅配置)。
聚合状态必须适宜 JobManager 内存。
倡议 MemoryStateBackend 用于:
本地开发和调试。
状态很少的作业,例如仅蕴含一次记录性能的作业(Map,FlatMap,Filter,…),kafka 的消费者须要很少的状态。
FsStateBackend
该长久化存储次要将快照数据保留到文件系统中,目前反对的文件系统次要是 HDFS 和本地文件。如果应用 HDFS,则初始化 FsStateBackend 时,须要传入以“hdfs://”结尾的门路(即: new FsStateBackend(“hdfs:///hacluster/checkpoint”)),如果应用本地文件,则须要传入以“file://”结尾的门路(即:new FsStateBackend(“file:///Data”))。在分布式状况下,不举荐应用本地文件。如果某 个算子在节点 A 上失败,在节点 B 上复原,应用本地文件时,在 B 上无奈读取节点 A 上的数据,导致状态复原失败。
倡议 FsStateBackend:
具备大状态,长窗口,大键 / 值状态的作业。
所有高可用性设置。
RocksDBStateBackend
RocksDBStatBackend 介于本地文件和 HDFS 之间,平时应用 RocksDB 的性能,将数 据长久化到本地文件中,当制作快照时,将本地数据制作成快照,并长久化到 FsStateBackend 中 (FsStateBackend 不用用户特地指明,只需在初始化时传入 HDFS 或本地门路即可,如 new RocksDBStateBackend(“hdfs:///hacluster/checkpoint”) 或 new RocksDBStateBackend(“file:///Data”))。
如果用户应用自定义窗口(window),不举荐用户应用 RocksDBStateBackend。在自定义窗口中,状态以 ListState 的模式保留在 StatBackend 中,如果一个 key 值中有多个 value 值,则 RocksDB 读取该种 ListState 十分迟缓,影响性能。用户能够依据利用的具体情况抉择 FsStateBackend+HDFS 或 RocksStateBackend+HDFS。
语法
val env = StreamExecutionEnvironment.getExecutionEnvironment()
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000)
// advanced options:
// 设置 checkpoint 的执行模式,最多执行一次或者至多执行一次
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 设置 checkpoint 的超时工夫
env.getCheckpointConfig.setCheckpointTimeout(60000)
// 如果在只做快照过程中呈现谬误,是否让整体工作失败:true 是 false 不是
env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)
// 设置同一时间有多少 个 checkpoint 能够同时执行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
批改 State Backend 的两种形式
第一种:单任务调整
批改当前任务代码
env.setStateBackend(new FsStateBackend(“hdfs://namenode:9000/flink/checkpoints”));
或者 new MemoryStateBackend()
或者 new RocksDBStateBackend(filebackend, true);【须要增加第三方依赖】
第二种:全局调整
批改 flink-conf.yaml
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
留神:state.backend 的值能够是上面几种:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)
Checkpoint 的高级选项
默认 checkpoint 性能是 disabled 的,想要应用的时候须要先启用 checkpoint 开启之后,默认的 checkPointMode 是 Exactly-once
// 配置一秒钟开启一个 checkpoint
env.enableCheckpointing(1000)
// 指定 checkpoint 的执行模式
// 两种可选://CheckpointingMode.EXACTLY_ONCE:默认值
//CheckpointingMode.AT_LEAST_ONCE
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
个别状况下抉择 CheckpointingMode.EXACTLY_ONCE,除非场景要求极低的提早(几毫秒)留神:如果须要保障 EXACTLY_ONCE,source 和 sink 要求必须同时保障 EXACTLY_ONCE
// 如果程序被 cancle,保留以前做的 checkpoint
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
默认状况下,检查点不被保留,仅用于在故障中复原作业,能够启用内部长久化检查点,同时指定保留策略:
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION: 在作业勾销时保留检查点,留神,在这种状况下,您必须在勾销后手动清理检查点状态
ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:当作业在被 cancel 时,删除检查点,检查点仅在作业失败时可用
// 设置 checkpoint 超时工夫
env.getCheckpointConfig.setCheckpointTimeout(60000)
//Checkpointing 的超时工夫,超时工夫内没有实现则被终止
//Checkpointing 最小工夫距离,用于指定上一个 checkpoint 实现之后
// 最小等多久能够触发另一个 checkpoint,当指定这个参数时,maxConcurrentCheckpoints 的值为 1
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
// 设置同一个工夫是否能够有多个 checkpoint 执行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
指定运行中的 checkpoint 最多能够有多少个
env.getCheckpointConfig.setFailOnCheckpointingErrors(true)
用于指定在 checkpoint 产生异样的时候,是否应该 fail 该 task,默认是 true,如果设置为 false,则 task 会回绝 checkpoint 而后持续运行
Flink 的重启策略
Flink 反对不同的重启策略,这些重启策略管制着 job 失败后如何重启。集群能够通过默认的重启策略来重启,这个默认的重启策略通常在未指定重启策略的状况下应用,而如果 Job 提交的时候指定了重启策略,这个重启策略就会笼罩掉集群的默认重启策略。
概览
默认的重启策略是通过 Flink 的 flink-conf.yaml 来指定的,这个配置参数 restart-strategy 定义了哪种策略会被采纳。如果 checkpoint 未启动,就会采纳 no restart 策略,如果启动了 checkpoint 机制,然而未指定重启策略的话,就会采纳 fixed-delay 策略,重试 Integer.MAX_VALUE 次。请参考上面的可用重启策略来理解哪些值是反对的。
每个重启策略都有本人的参数来管制它的行为,这些值也能够在配置文件中设置,每个重启策略的形容都蕴含着各自的配置值信息。
除了定义一个默认的重启策略之外,你还能够为每一个 Job 指定它本人的重启策略,这个重启策略能够在 ExecutionEnvironment 中调用 setRestartStrategy() 办法来程序化地调用,留神这种形式同样实用于 StreamExecutionEnvironment。
上面的例子展现了如何为 Job 设置一个固定提早重启策略,一旦有失败,零碎就会尝试每 10 秒重启一次,重启 3 次。
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 重启次数
Time.of(10, TimeUnit.SECONDS) // 延迟时间距离
))
固定提早重启策略(Fixed Delay Restart Strategy)
固定提早重启策略会尝试一个给定的次数来重启 Job,如果超过了最大的重启次数,Job 最终将失败。在间断的两次重启尝试之间,重启策略会期待一个固定的工夫。
重启策略能够配置 flink-conf.yaml 的上面配置参数来启用,作为默认的重启策略:
restart-strategy: fixed-delay
例子:
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
固定提早重启也能够在程序中设置:
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 重启次数
Time.of(10, TimeUnit.SECONDS) // 重启工夫距离
))
失败率重启策略
失败率重启策略在 Job 失败后会重启,然而超过失败率后,Job 会最终被认定失败。在两个间断的重启尝试之间,重启策略会期待一个固定的工夫。
失败率重启策略能够在 flink-conf.yaml 中设置上面的配置参数来启用:
restart-strategy:failure-rate
例子:
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
失败率重启策略也能够在程序中设置:
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 每个测量工夫距离最大失败次数
Time.of(5, TimeUnit.MINUTES), // 失败率测量的工夫距离
Time.of(10, TimeUnit.SECONDS) // 两次间断重启尝试的工夫距离
))
无重启策略
Job 间接失败,不会尝试进行重启
restart-strategy: none
无重启策略也能够在程序中设置
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.noRestart())