关于flink:字节跳动流式数据集成基于Flink-Checkpoint两阶段提交的实践和优化

44次阅读

共计 6614 个字符,预计需要花费 17 分钟才能阅读完成。

背景

字节跳动开发套件数据集成团队(DTS,Data Transmission Service)在字节跳动内基于 Flink 实现了流批一体的数据集成服务。其中一个典型场景是 Kafka/ByteMQ/RocketMQ -> HDFS/Hive。Kafka/ByteMQ/RocketMQ -> HDFS/Hive(上面均称之为 MQ dump,具体介绍可见 字节跳动基于 Flink 的 MQ-Hive 实时数据集成)在数仓建设第一层,对数据的准确性和实时性要求比拟高。​

目前字节跳动中国区 MQ dump 例行工作数微小,日均解决流量在 PB 量级。微小的任务量和数据量对 MQ dump 的稳定性以及准确性带来了极大的挑战。​

本文次要介绍 DTS MQ dump 在极其场景中遇到的数据失落问题的排查与优化,最初介绍了上线成果。

线上问题

HDFS 集群某个元数据节点因为硬件故障宕机。在该元数据节点终止半小时后,HDFS 手动运维操作将 HDFS 切主到 backup 节点后,HDFS 复原服务。故障复原后用户反馈 MQ dump 在故障期间有数据失落,产出的数据与 MQ 中的数据不统一。

收到反馈后咱们立刻进行故障的排查。上面先简要介绍一下 Flink Checkpoint 以及 MQ dump 写入流程,而后再介绍一下故障的排查过程以及解决方案,最初是上线成果以及总结。​

Flink Checkpoint 简介​

Flink 基于 Chandy-Lamport 分布式快照算法实现了 Checkpoint 机制,可能提供 Exactly Once 或者 At Least Once 语义。​

Flink 通过在数据流中注入 barriers 将数据拆分为一段一段的数据,在不终止数据流解决的前提下,让每个节点能够独立创立 Checkpoint 保留本人的快照。每个 barrier 都有一个快照 ID,在该快照 ID 之前的数据都会进入这个快照,而之后的数据会进入下一个快照。​

Checkpoint 对 Operator state 进行快照的流程可分为两个阶段:

  • Snapshot state 阶段:对应 2PC 筹备阶段。Checkpoint Coordinator 将 barries 注入到 Source Operator 中。Operator 接管到输出 Operator 所有并发的 barries 后将以后的状态写入到 state 中,并将 barries 传递到下一个 Operator。​
  • Notify Checkpoint 实现阶段:对应 2PC 的 commit 阶段。Checkpoint Coordinator 收到 Sink Operator 的所有 Checkpoint 的实现信号后,会给 Operator 发送 Notify 信号。Operator 收到信号当前会调用相应的函数进行 Notify 的操作。​

    而在工作失败后,工作会从上一个 Checkpoint state 中进行复原,进而实现 Exactly Once 或者 At Least Once 语义。​

MQ dump 写入流程梳理​

MQ dump 利用 Flink Checkpoint 机制和 2PC(Two-phase Commit)机制实现了 Exactly Once 语义,数据能够做到不重不丢。​

依据 Flink Checkpoint 的流程,MQ dump 整个写入过程能够分为四个不同的流程:​

  • 数据写入阶段​
  • SnapshotState 阶段​
  • Notify Checkpoint 实现阶段​
  • Checkpoint 复原阶段​

整个流程能够用上面的流程图示意:

上面具体介绍下面各个阶段的次要操作。假如 Flink 工作以后 Checkpoint id 为 n,当前任务的 task id 为 x。​

数据写入阶段​

写入阶段就次要有以下两个操作:​

  • 如果是以后 Checkpoint 第一次写入(transaction),先清理要写入长期文件夹 /tmp/cp-n/task-x​
  • 在长期文件夹中建设文件并写入数据​

留神在写入数据之前咱们会先清理长期目录。执行这个操作的起因是咱们须要保障最终数据的准确性:​

假如工作 x 在 Checkpoint n 写入阶段失败了(将局部数据写入到长期文件夹 /tmp/cp-n/task-x),那么工作会从上一个 Checkpoint n-1 复原,下一个写入的 Checkpoint id 依然为 n。如果写入前不清理长期目录,失败前遗留的局部脏文件就会保留,在 Checkpoint 阶段就会将脏文件移到正式目录中。​

SnapshotState 阶段​

SnapshotState 阶段对应 2PC 的两个阶段中的第一个阶段。次要操作是敞开正在写入的文件,并将工作的 state(次要是以后的 Checkpoint id 和 task id)存储起来。​

Notify Checkpoint 实现阶段​

该阶段对应 2PC 两个阶段中的第二个阶段。次要操作如下:​

  • List 长期目录文件夹 /tmp/cp-n/task-x​
  • 将长期目录文件夹下的所有文件 rename 到正式目录​
  • 删除长期目录文件夹 /tmp/cp-n/task-x​

Checkpoint 复原阶段​

Checkpoint 复原阶段是工作在异样场景下,从轻量级的分布式快照复原阶段。次要操作如下:​

  • 从 Flink state 中复原出工作的 Checkpoint id n 和 工作的 task id x​
  • 依据 Checkpoint id 和 工作的 task id x 获取到长期目录文件夹 /tmp/cp-n/task-x​
  • 将长期目录文件夹下的所有文件 rename 到正式目录​
  • 删除长期目录文件夹 /tmp/cp-n/task-x​

故障排查过程​

理解完相干写入流程后,咱们回到故障的排查。用户工作配置的并发为 8,也就是说执行过程中有 8 个 task 在同时执行。​

Flink 日志查看​

排查过程中,咱们首先查看 Flink Job manager 和 Task manager 在 HDFS 故障期间的日志,发现在 Checkpoint id 为 4608 时,task 2/3/6/7 都产出了若干个文件。而 task 0/1/4/5 在 Checkpoint id 为 4608 时,都因为某个文件被删除造成写入数据或者敞开文件时失败,如 task 0 失败是因为文件 /xx/_DUMP_TEMPORARY/cp-4608/task-0/date=20211031/18_xx_0_4608.1635674819911.zstd 被删除而失败。​

然而查看正式目录下相干文件的信息,咱们发现 task 2、3 两个 task 并没有 Checkpoint 4608 的文件(文件名含有 task id 和 Checkpoint id 信息,所以能够依据正式目录下的文件名晓得其是哪个 task 在哪个 Checkpoint 期间创立的)。故初步确定的起因是某些文件被误删造成数据失落。Task 2/3/6/7 在文件删除后因为没有文件的写入和敞开操作,task 失常运行;而 task 0/1/4/5 在文件删除后还有文件的写入和敞开操作,造成 task 失败。​

HDFS 元数据查看​

下一步就要去排查文件失落的起因。咱们通过 HDFS trace 记录表(HDFS trace 记录表记录着用户和零碎调用行为,以达到剖析和运维的目标)查看 task 2 Checkpoint 4608 长期目录操作记录,对应的门路为 /xx/_DUMP_TEMPORARY/cp-4608/task-2。​

从 HDFS trace 操作记录中能够发现文件夹的删除操作执行了很屡次。​

而后再查问 task 2 Checkpoint 4608 长期目录下的文件操作记录。能够看出在 2021-10-31 18:08:58 左右理论有创立两个文件,然而因为删除操作的反复执行造成创立的两个文件被删除。​

问题的初步起因曾经找到:删除操作的反复执行造成数据失落。​

根本原因​

咱们对以下两点感觉比拟困惑:一是为啥删除操作会反复执行;二是在写入流程中,删除操作要不是产生在数据写入之前,要不产生在数据曾经挪动到正式目录之后,怎么会造成数据失落。带着纳闷,咱们进一步剖析。​

疏忽 Flink Checkpoint 的复原流程以及 Flink 状态的操作流程,只保留与 HDFS 交互的相干步骤,DTS MQ dump 与 HDFS 的操作流程能够简化为如下流程图:​

在整个写入流程中波及到 delete 的操作有两个中央:一个是在写入文件之前;一个是在将临时文件重命名到正式目录之后。在第二个删除操作中,即便删除操作反复执行,也不影响最终数据的准确性。因为在之前的重命名过程中曾经将所有数据从长期文件夹挪动到正式目录。​

所以咱们能够确定是在写入文件之前的删除操作的反复执行造成最终的数据失落。​

在 task-2 的日志中咱们发现 HDFS client 在 18:03:37-18:08:58 始终在尝试调用 HDFS 删除接口删除长期目录,然而因为 java.net.SocketTimeoutException 始终删除失败。在工夫点 18:08:58 删除操作执行胜利。而这个工夫点也根本与咱们在 HDFS trace 数据中发现删除操作的执行记录时间是对应的。通过日志咱们发现建设文件以及敞开文件操作根本都是在 18:08:58 这个工夫点实现的,这个工夫点与 HDFS trace 中的记录也是对应上的。​

征询 HDFS 后,HDFS 示意 HDFS 删除操作不会保障幂等性。进而咱们判断问题产生的本源为:在故障期间,写入数据前的删除操作的多次重试在 HDFS NameNode 上反复执行,将咱们写入的数据删除造成最终数据的失落。如果反复执行的删除操作产生在文件敞开之前,那么 task 会因为写入的文件不存在而失败;如果反复删除命令是在敞开文件之后,那么就会造成数据的失落。​

解决方案​

MQ dump 在异样场景中失落数据的实质起因是咱们依赖删除操作和写入操作的程序性。然而 HDFS NameNode 在异样场景下是无奈保障两个操作的程序性。​

计划一:HDFS 保障操作的幂等性​

为了解决这个问题,咱们首先想到的是 HDFS 保障删除操作的幂等性,这样即便删除操作反复执行也不会影响后续写入的问题,进而能够保证数据的准确性。然而征询 HDFS 后,HDFS 示意 HDFS 在现有架构下无奈保障删除的幂等性。​

参考 DDIA (Designing Data-Intensive Applications) 第 9 章中对于因果关系的定义:因果关系对事件施加了一种程序——因在果之前。对应于 MQ dump 流程中删除操作是因,产生在写入数据之前。咱们须要保障这两个关系的因果关系。而依据其解决因果问题的办法,一种解决思路是 HDFS 在每个 client 申请中都带上序列号程序,进而在 HDFS NameNode 上能够保障单个 client 的申请因果性。跟 HDFS 探讨后发现这个计划的实现老本会比拟大。​

计划二:应用文件 state​

理解 HDFS 难以保障操作的幂等性后,咱们想是否能够将写入前的删除操作去除,也就是说在写入 HDFS 之前不清理文件夹而是间接写入数据到文件,这样就不须要有因果性的保障。​

如果咱们晓得长期文件夹中哪些文件是咱们须要的,在重命名阶段就能够间接将须要的文件重命名到正式目录而疏忽长期文件夹中的脏文件,这样在写入之前就不须要删除文件夹。故咱们的解决方案是将写入的文件门路存储到 Flink state 中,从而确保在 commit 阶段以及复原阶段能够将须要的文件挪动到正式目录。​

最终,咱们抉择了计划二解决该问题,应用文件 state 前后解决流程对比方下图所示:​

目前文件 state 曾经在线上应用了,上面先介绍一下实现中碰到的相干问题,而后再形容一下上线后成果。​

文件 state 实现细节​

文件挪动幂等性​
通过文件 state 咱们能够解析出以后文件所在的长期目录以及将要写入的正式目录。通过以下流程咱们保障了挪动的幂等性。​

通过以上的流程即便文件挪动失败,再次重试时也可能保障文件挪动的幂等性。​

可观测性​

实现文件 state 后,咱们减少了 metric 记录创立的文件数量以及胜利挪动到正式目录的文件数量,进步了零碎可观测性。如果文件在长期目录和正式目录都不存在时,咱们减少了挪动失败的 metric,并减少了报警,在文件挪动失败后能够及时感知到,而不是等用户报告数据失落后再排查。​

上线后线上 metric 成果如下:

总共有四个指标,别离为创立文件的数量、重命名胜利文件的数量、疏忽重命名文件的数量、重命名失败的文件数量,别离代表的意义如下:​

  • 创立文件的数量:state 中所有文件的数量,也就是以后 Checkpoint 解决数据阶段创立的所有文件数量。​
  • 重命名胜利文件的数量:NotifyCheckpointComplete 阶段将临时文件胜利挪动到正式目录下的文件数量。
  • 疏忽重命名文件的数量:NotifyCheckpointComplete 阶段疏忽挪动到正式目录下下的文件数量。也就是长期文件夹中不存在然而正式目录存在的文件。这种状况通常产生在工作有 Failover 的状况。Failover 后工作从 Checkpoint 中复原,失败前曾经重命名胜利的文件在以后阶段会疏忽重命名。
  • 重命名失败的文件数量:长期目录以及正式目录下都不存在文件的数量。这种状况通常是因为工作产生了异样造成数据的失落。目前线上比拟常见的一个 case 是工作在敞开一段时间后再开启。因为 HDFS TTL 的设置小于工作敞开的时长,长期目录中写入的文件被 HDFS TTL 策略革除。这个后果理论是合乎预期的。

前向兼容性

预期中上线文件 state 后写入数据前不须要删除要写入的临时文件,然而为了保障降级后的前向兼容性,咱们分两期上线了文件 state:

  • 第一期写入数据前保留了删除操作
  • 第二期删除了写入数据前的删除操作

第一期保留删除操作的起因如果文件 state 上线后有异样的话,回滚到之前的版本须要保证数据的准确性。而只有保留删除操作能力保障回滚后数据的准确性。否则如果之前的 Checkpoint 文件夹中有脏文件存在,回滚到文件 state 之前的版本的话,因为没有文件 state 存在,会将脏文件也挪动到正式目录中,影响最终数据的准确性。

上线成果

切主演练

上线后与 HDFS 进行了 HDFS 集群切主演练。演练了以下两个场景:

  • HDFS 集群失常切主
  • HDFS 集群主节点失败超过 10 分钟
    而测试过程是建设两组不同的工作生产雷同的 Kafka topic,写入不同的 Hive 表。而后建设数据校验工作校验两组工作数据的一致性。一组工作应用 HDFS 测试集群,另一组工作应用失常集群。

将测试集群进行屡次 HDFS 失常切主和异样切主,校验工作显示演练完结前后两组工作写入数据的一致性。后果验证了该计划可无效解决 HDFS 操作非幂等的丢数问题。

性能成果

应用文件 state 后,在 Notify Checkpoint 实现阶段不须要调用 HDFS list 接口,能够缩小一次 HDFS 调用,实践上能够缩小 Notify Checkpoint 阶段与 HDFS 交互工夫。下图展现了上线(18:26 左右)前后 Notify 阶段与 HDFS 交互的 metrics。能够看出上线前的均匀解决工夫在 300ms 左右,而上线后均匀解决工夫在 150 ms 左右,缩小了一半的解决工夫。

总结

随着字节跳动产品业务的疾速倒退,字节跳动一站式大数据开发平台性能也越来越丰盛了,提供了离线、实时、增量等场景下全域数据集成解决方案。而业务数据量的增大以及业务的多样化给数据集成带来了很大的挑战。比方咱们扩大了增加 Hive 分区的策略,以反对实时数仓近实时 append 场景,使数据的应用提早降落了 75%。

字节跳动流式数据集成仍在一直倒退中,将来次要关注以下几方面:

  1. 性能加强,减少简略的数据转换逻辑,缩短流式数据处理链路,进而缩小解决时延
  2. 架构降级,离线集成和实时数据集成架构对立
  3. 反对 auto scaling 性能,在业务顶峰和低峰主动扩缩容,进步资源利用率,缩小资源节约

本文中介绍的《字节跳动流式数据集成基于 Flink Checkpoint 两阶段提交的实际和优化》,目前已通过火山引擎数据产品大数据研发治理套件 DataLeap 向内部企业输入。

大数据研发治理套件 DataLeap 作为一站式大数据中台解决方案,能够实现全场景数据整合、全链路数据研发、全周期数据治理、全方位数据安全。

参考文献

  • 字节跳动基于 Flink 的 MQ-Hive 实时数据集成
  • 字节跳动单点复原性能及 Regional CheckPoint 优化实际
  • Designing Data-Intensive Applications
  • Stateful Stream Processing

欢送关注字节跳动数据平台同名公众号

正文完
 0