[case49]聊聊flink的checkpoint配置

30次阅读

共计 4432 个字符,预计需要花费 12 分钟才能阅读完成。


本文主要研究下 flink 的 checkpoint 配置
实例
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);

// advanced options:

// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);

// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// This determines if a task will be failed if an error occurs in the execution of the task’s checkpoint procedure.
env.getCheckpointConfig().setFailOnCheckpointingErrors(true);

使用 StreamExecutionEnvironment.enableCheckpointing 方法来设置开启 checkpoint;具体可以使用 enableCheckpointing(long interval),或者 enableCheckpointing(long interval, CheckpointingMode mode);interval 用于指定 checkpoint 的触发间隔(单位 milliseconds),而 CheckpointingMode 默认是 CheckpointingMode.EXACTLY_ONCE,也可以指定为 CheckpointingMode.AT_LEAST_ONCE
也可以通过 StreamExecutionEnvironment.getCheckpointConfig().setCheckpointingMode 来设置 CheckpointingMode,一般对于超低延迟的应用 (大概几毫秒) 可以使用 CheckpointingMode.AT_LEAST_ONCE,其他大部分应用使用 CheckpointingMode.EXACTLY_ONCE 就可以
checkpointTimeout 用于指定 checkpoint 执行的超时时间(单位 milliseconds),超时没完成就会被 abort 掉
minPauseBetweenCheckpoints 用于指定 checkpoint coordinator 上一个 checkpoint 完成之后最小等多久可以出发另一个 checkpoint,当指定这个参数时,maxConcurrentCheckpoints 的值为 1
maxConcurrentCheckpoints 用于指定运行中的 checkpoint 最多可以有多少个,用于包装 topology 不会花太多的时间在 checkpoints 上面;如果有设置了 minPauseBetweenCheckpoints,则 maxConcurrentCheckpoints 这个参数就不起作用了(大于 1 的值不起作用)
enableExternalizedCheckpoints 用于开启 checkpoints 的外部持久化,但是在 job 失败的时候不会自动清理,需要自己手工清理 state;ExternalizedCheckpointCleanup 用于指定当 job canceled 的时候 externalized checkpoint 该如何清理,DELETE_ON_CANCELLATION 的话,在 job canceled 的时候会自动删除 externalized state,但是如果是 FAILED 的状态则会保留;RETAIN_ON_CANCELLATION 则在 job canceled 的时候会保留 externalized checkpoint state
failOnCheckpointingErrors 用于指定在 checkpoint 发生异常的时候,是否应该 fail 该 task,默认为 true,如果设置为 false,则 task 会拒绝 checkpoint 然后继续运行

flink-conf.yaml 相关配置
#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================

# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are ‘jobmanager’, ‘filesystem’, ‘rocksdb’, or the
# <class-name-of-factory>.
#
# state.backend: filesystem

# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints

# Default target directory for savepoints, optional.
#
# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints

# Flag to enable/disable incremental checkpoints for backends that
# support incremental checkpoints (like the RocksDB state backend).
#
# state.backend.incremental: false

state.backend 用于指定 checkpoint state 存储的 backend,默认为 none
state.backend.async 用于指定 backend 是否使用异步 snapshot(默认为 true),有些不支持 async 或者只支持 async 的 state backend 可能会忽略这个参数
state.backend.fs.memory-threshold,默认为 1024,用于指定存储于 files 的 state 大小阈值,如果小于该值则会存储在 root checkpoint metadata file
state.backend.incremental,默认为 false,用于指定是否采用增量 checkpoint,有些不支持增量 checkpoint 的 backend 会忽略该配置
state.backend.local-recovery,默认为 false
state.checkpoints.dir,默认为 none,用于指定 checkpoint 的 data files 和 meta data 存储的目录,该目录必须对所有参与的 TaskManagers 及 JobManagers 可见
state.checkpoints.num-retained,默认为 1,用于指定保留的已完成的 checkpoints 个数
state.savepoints.dir,默认为 none,用于指定 savepoints 的默认目录
taskmanager.state.local.root-dirs,默认为 none

小结

可以通过使用 StreamExecutionEnvironment.enableCheckpointing 方法来设置开启 checkpoint;具体可以使用 enableCheckpointing(long interval),或者 enableCheckpointing(long interval, CheckpointingMode mode)
checkpoint 的高级配置可以配置 checkpointTimeout(用于指定 checkpoint 执行的超时时间,单位 milliseconds),minPauseBetweenCheckpoints(用于指定 checkpoint coordinator 上一个 checkpoint 完成之后最小等多久可以出发另一个 checkpoint),maxConcurrentCheckpoints(用于指定运行中的 checkpoint 最多可以有多少个,如果有设置了 minPauseBetweenCheckpoints,则 maxConcurrentCheckpoints 这个参数大于 1 的值不起作用),enableExternalizedCheckpoints(用于开启 checkpoints 的外部持久化,在 job failed 的时候 externalized checkpoint state 无法自动清理,但是在 job canceled 的时候可以配置是删除还是保留 state)
在 flink-conf.yaml 里头也有 checkpoint 的相关配置,主要是 state backend 的配置,比如 state.backend.async、state.backend.incremental、state.checkpoints.dir、state.savepoints.dir 等

doc
Checkpointing

正文完
 0