关于Flink:字节跳动单点恢复功能及-Regional-CheckPoint-优化实践

4次阅读

共计 5880 个字符,预计需要花费 15 分钟才能阅读完成。

作者|廖嘉逸

摘要:本文介绍字节跳动在过来一段时间里做的两个次要的 Feature,一是在 Network 层的单点复原的性能,二是 Checkpoint 层的 Regional Checkpoint。内容包含:

  1. 单点复原机制
  2. Regional Checkpoint
  3. 在 Checkpoint 的其它优化
  4. 挑战 & 将来布局

作者分享原版视频回顾:https://www.bilibili.com/vide…

一、单点复原机制

在字节跳动的实时举荐场景中,咱们应用 Flink 将用户特色与用户行为进行实时拼接,拼接样本作为实时模型的输出。拼接服务的时延和稳定性间接影响了线上产品对用户的举荐成果,而这种拼接服务在 Flink 中是一个相似双流 Join 的实现,Job 中的任何一个 Task 或节点呈现故障,都会导致整个 Job 产生 Failover,影响对应业务的实时举荐成果。

在介绍单点复原之前,咱们回顾一下 Flink 的 Failover 策略。

  • Individual-Failover:

只重启出错的 Task,实用于 Task 间无连贯的状况,利用场景无限。

  • Region-Failover:

该策略会将作业中的所有 Task 划分为数个 Region。当有 Task 产生故障时,它会尝试找出进行故障复原须要重启的最小 Region 汇合。相比于全局重启故障复原策略,这种策略在一些场景下的故障复原须要重启的 Task 会更少。

如果应用 Region-Failover 策略,但因为 Job 是一个全连贯的拓扑,自身就是一个大 region。重启 region 相当于重启整个 Job,所以咱们思考是否能够用 Flink Individual-task-failover 策略去代替 Region-failover 策略?而 Individual-task-failover 的策略在这种拓扑下是齐全不实用的。所以咱们对于以下特色的场景,须要设计开发一个新的 Failover 策略:

  • 多流 Join 拓扑
  • 流量大(30M QPS)、高并发度(16K*16K)
  • 容许短时间内小量局部数据失落
  • 对数据继续输入型要求高

在讲述技术计划之前,看一下 Flink 现有的数据传输机制。

从左往右看(SubTaskA):

  1. 当数据流入时会先被 RecordWriter 接管
  2. RecordWriter 依据数据的信息,例如 key,将数据进行 shuffle 抉择对应的 channel
  3. 将数据装载到 buffer 中,并放到 channel 对应的 buffer 队列里
  4. 通过 Netty Server 向上游发送
  5. 上游 Netty Client 接收数据
  6. 依据 buffer 中的分区信息,转发发到上游对应的 channel 中
  7. 由 InputProcessor 将数据从 buffer 中取出,执行 operator 逻辑

依据下面提出的思路咱们要解决以下几个问题:

  • 如何让上游 Task 感知上游 Failure
  • 上游 Task 失败后,如何让上游 Task 向失常的 Task 发送数据
  • 上游 Task 失败后,如何让上游 Task 持续生产 buffer 中的数据
  • 上下游中不残缺的数据如何解决
  • 如何建设新的连贯

针对上述问题提出解决方案。

如何让上游 Task 感知上游 Failure

上游 SubTask 被动将失败信息传递给上游,或者 TM 被敞开上游 Netty Server 也能够感知到。图中用 X 示意不可用的 SubPartition。

首先将 SubPartition1 和对应的 view(Netty Server 用来取 SubPartition 数据的一个构造)置为不可用。

之后当 Record Writer 接管到新数据须要向 SubPartition1 发送数据,此时须要进行一个可用性判断,当 SubPartition 状态可用则失常发送,不可用间接抛弃数据。

上游 Task 接管到上游 Task 新的连贯

上游 subTask 被从新调度启动后,向上游发送 Partition Request,上游 Netty Server 收到 Partition Request 后从新给上游 SubTask 创立对用的 View,此时上游 Record Writer 就能够失常写数据。

上游 Task 感知上游 Task 失败

同样的上游 Netty Client 能感知到上游有 subTask 失败了,这时找出对应的 channel,在开端插入一个不可用的事件(这里用感叹号来示意事件)。咱们的目标是想要尽可能的少丢数据,此时 channel 中的 buffer 任能够被 InputProcessor 失常生产,直到读取到“不可用事件”。再进行 channel 不可用标记和对应的 buffer 队列清理。

Buffer 中有不残缺的数据

首先要晓得不残缺的数据寄存在哪里,它存在于 input process 的外部,input process 会给每一个 channel 保护一个小的 buffer 队列。当收到一个 buffer,它是不残缺的数据,那么等到接管到下一个 buffer 后再拼接成一条残缺的数据发往 operator。

上游 Task 和上游 Task 从新连贯

当上游有问题的 Task 被从新调度后,通过调用 TaskManager API 来告诉上游。上游 Shuffle Environment 收到告诉后判断对应的 channel 状态,如果是不可,用间接生成新的 channel 并开释掉老的。如果是可用状态,阐明 channel 的 buffer 没有生产完,须要期待 buffer 生产完再进行替换操作。

业务收益

上图是以 4000 并行度的作业为例做了比照测试。业务是将一个用户展示流和一个用户行为流的进行 Join,整个作业共有 12000 个 Task。

上图中 单点复原(预留资源) 是应用调度组做的一个 feature,在申请资源的时,抉择额定多申请一些资源,当产生 failover 时省去了从 YARN 去申请资源的工夫开销。

最初做到了作业的输入缩小千分之一,复原工夫约 5 秒。因为整个复原过程工夫较短,能够根本做到上游无感知。

二、Regional Checkpoint

在一个比拟经典的数据集成场景,数据导入导出。比方从 Kafka 导入到 Hive,满足上面几个特色。

  • 拓扑中没有 All-to-All 的连贯
  • 强依赖 Checkpoint 来实现 Exactly-Once 语义下的数据输入
  • Checkpoint 距离长,对成功率要求高

在这种状况下,数据没有任何的 shuffle。

在数据集成的场景中遇到哪些问题?

  • 单个 Task Checkpoint 失败会影响全局的 Checkpoint 输入
  • 网络抖动、写入超时 / 失败、存储环境抖动对作业的影响过于显著
  • 2000 并行以上的作业成功率显著降落,低于业务预期

在这里,咱们想到作业会依据 region-failover 策略将作业的拓扑划分为多个 region。那么 Checkpoint 是否能够采取相似的思路,将 checkpoint 以 region 的单位来治理?答案是必定的。

在这种状况下不须要等到所有 Task checkpoint 实现后才去做分区归档操作(例如 HDFS 文件 rename)。而是当某个 region 实现后即可进行 region 级别的 checkpoint 归档操作。

介绍计划之前先简略回顾 Flink 现有的 checkpoint 机制。置信大家都比拟相熟。


现有 ckp

上图中是一个 Kafka source 和 Hive sink 算子的拓扑,并行度为 4 的例子。

首先 checkpoint coordinator 触发 triggerCheckpoint 的操作,发送到各个 source task。在 Task 收到申请之后,触发 Task 内的 operator 进行 snapshot 操作。例子中有 8 个 operator 状态。


现有 ckp1

在各 operator 实现 snapshot 后,Task 发送 ACK 音讯给 checkpoint coordinator 示意以后 Task 曾经实现了 Checkpoint。

之后当 coordinator 收到所有 Task 胜利的 ACK 音讯,那么 checkpont 能够认为是胜利了。最初触发 finalize 操作,保留对应的 metadata。告诉所有 Task checkpoint 实现。

当咱们应用 Region 形式去治理 checkpoint 时会遇到什么问题?

  • 如何划分 Checkpoint Region

把彼此没有连贯的 Task 汇合,划分为 1 个 region。不言而喻例子中有四个 Region。

  • 失败 Region 的 Checkpoint 后果如何解决

假如第一次 checkpoint 能失常实现,每个 operator 对应的状态都胜利写入 HDFS checkpoint1 目录中,并通过逻辑映射,将 8 个 operator 映射到 4 个 checkpoint region。留神仅仅是逻辑映射,并没有对物理文件做出任何挪动和批改。


现有 ckp1

第二次进行 checkpoint 时 region-4-data(Kafka-4,Hive-4)checkpoint 失败。checkpoint2(job/chk_2)目录中没有对应 Kafka-4-state 和 Hive-4-state 文件,以后 checkpoint2 是不残缺的。为了保障残缺,从上一次或之前胜利的 checkpoint 文件中寻找 region-4-data 胜利的 state 文件,并进行逻辑映射。这样以后 checkpoint 每个 region 状态文件就残缺了,能够认为 checkpoint 实现。

此时如果产生大部分或所有 region 都失败,如果都援用前一次 checkpoint 那么以后这个 checkpoint 和上一个 checkpoint 雷同也就没有意义了。

通过配置 region 最大失败比例,比方 50%,例子中 4 个 region,最多能承受两个 region 失败。

  • 如何防止在文件系统上存储过多的 Checkpoint 历史数据

如果有某个 region 始终失败(遇到脏数据或代码逻辑问题),以后的机制会导致把所有历史 checkpoint 文件都保留下来,显然这是不合理的。

通过配置反对 region 最大间断失败次数。例如 2 示意 region 最多能援用前两次的 checkpoint 胜利的 region 后果。

工程实现难点

  • 如何解决 Task Fail 和 checkpoint timeout
  • 同一 region 内曾经 snapshot 胜利的 subTask 状态如何解决
  • 如何保障和 checkpoint Coordinator 的兼容性

来看目前 Flink 是如何做的。


现有 coordinator

当产生 Task failure,先会告诉到 JobMaster FailoverStrategy,通过 FailoverStrategy 来告诉 checkpoint coordinator 进行 checkpoint cancel 操作。

那么 checkpoint timeout 状况如何解决?当 coordinator 触发 checkpoint 时,会开启 checkpoint canceller。canceller 内有一个定时器,当超过预设工夫并且 coordinator 还未实现 checkpoint,阐明呈现 timeout,告诉 coordinator cancel 本次 checkpoint。

无论是 Task fail 还是 timeout 最终都会指向 pendding checkpoint,并且以后指向的 checkpoint 就会被抛弃。

在做出相应批改前先梳理 checkpoint 相干的 Message,和 checkpoint coordinator 会做出的反馈。

Global checkpoint 为 Flink 现有机制。

为了放弃和 checkpoint Coordinator 兼容性,增加一个 CheckpointHandle 接口。并增加了两个实现别离是 GlobalCheckpointHandle 和 RegionalCheckpointHandle 通过过滤音讯的形式实现 global checkpoint 和 region checkpoint 相干操作。

region checkpoint 提一点。如果 handler 接管到失败音讯,将这个 region 置为失败,并尝试从之前的 successful checkpoint 进行 region 逻辑映射。同样 coordinator 发送 nofityComplate 音讯也会先通过 handler 的过滤,过滤掉发送给失败 Task 的音讯。


业务收益

测试在 5000 并行度下,假如单个 Task snapshot 的成功率为 99.99%。应用 Global checkpoint 的成功率为 60.65%,而应用 Region checkpoint 任然能放弃 99.99%。

三、Checkpoint 上的其它优化

并行化复原 operator 状态

union state 是一种比拟非凡的状态,在复原时须要找到 job 所有的 Task state 再进行 union 复原到单个 Task 中。如果 Job 并行度十分大,如 10000,那么每个 task 的 union state 进行复原时至多须要读取 10000 个文件。如果串行复原这 10000 个文件里的状态,那么复原的耗时可想而知是十分漫长的。

尽管 OperatorState 对应的数据结构是无奈进行并行操作的,然而咱们读取文件的过程是能够并行化的,在 OperatorStateBackend 的复原过程中,咱们将读取 HDFS 文件的过程并行化,等到所有状态文件解析到内存后,再用单线程去解决,这样咱们能够将几十分钟的状态复原工夫缩小到几分钟。

加强 CheckpointScheduler 并反对 Checkpoint 整点触发

Flink checkpoint 的 interval,timeout 在工作提交之后是无奈批改的。但刚上线时只能依据经验值进行设置。而往往在作业高峰期时会发现 interval,timeout 等参数设置不合理。这时通常一个办法是批改参数重启工作,对业务影响比拟大,显然这种形式是不合理的。

在这里,咱们对 CheckpointCoordinator 外部的 Checkpoint 触发机制做了重构,将已有的 Checkpoint 触发流程给形象进去,使得咱们能够很快地基于抽象类对 Checkpoint 触发机制进行定制化。比方在反对数据导入的场景中,为了更快地造成 Hive 分区,咱们实现了整点触发的机制,不便上游尽快地看到数据。

还有很多优化点就不一一列举了。

四、挑战 & 将来布局

目前字节外部的作业状态最大能达到 200TB 左右的程度,而对于这种大流量和大状态的作业,间接应用 RocksDB StateBackend 是无奈撑持的。所以将来,咱们会之后持续会在 state 和 checkpoint 性能优化和稳定性上做更多的工作,比方强化已有的 StateBackend、解决歪斜和反压下 Checkpoint 的速率问题、加强调试能力等。

正文完
 0