关于数据采集:Flink-CDC-Hudi-海量数据入湖在顺丰的实践

42次阅读

共计 5203 个字符,预计需要花费 14 分钟才能阅读完成。

简介: 覃立辉在 5.21 Flink CDC Meetup 的分享。

本文整顿自顺丰大数据研发工程师覃立辉在 5 月 21 日 Flink CDC Meetup 的演讲。次要内容包含:

  1. 顺丰数据集成背景
  2. Flink CDC 实际问题与优化
  3. 将来布局

点击查看直播回放 & 演讲 PDF

一、顺丰数据集成背景

顺丰是快递物流服务提供商,主营业务蕴含了时效快递、经济快递、同城配送以及冷链运输等。

运输流程背地须要一系列零碎的反对,比方订单管理系统、智慧物业零碎、以及很多中转场、汽车或飞机上的很多传感器,都会产生大量数据。如果须要对这些数据进行数据分析,那么数据集成是其中很重要的一步。

顺丰的数据集成经验了几年的倒退,次要分为两块,一块是离线数据集成,一块是实时数据集成。离线数据集成以 DataX 为主,本文次要介绍实时数据集成计划。

2017 年,基于 Jstorm + Canal 的形式实现了第一个版本的实时数据集成计划。然而此计划存在诸多问题,比方无奈保证数据的一致性、吞吐率较低、难以保护。2019 年,随着 Flink 社区的一直倒退,它补齐了很多重要个性,因而基于 Flink + Canal 的形式实现了第二个版本的实时数据集成计划。然而此计划仍然不够完满,经验了外部调研与实际,2022 年初,咱们全面转向 Flink CDC。

上图为 Flink + Canal 的实时数据入湖架构。

Flink 启动之后,首先读取以后的 Binlog 信息,标记为 StartOffset,通过 select 形式将全量数据采集上来,发往上游 Kafka。全量采集结束之后,再从 startOffset 采集增量的日志信息,发往 Kafka。最终 Kafka 的数据由 Spark 生产后写往 Hudi。

然而此架构存在以下三个问题:

  • 全量与增量数据存在反复:因为采集过程中不会进行锁表,如果在全量采集过程中有数据变更,并且采集到了这些数据,那么这些数据会与 Binlog 中的数据存在反复;
  • 须要上游进行 Upsert 或 Merge 写入能力剔除反复的数据,确保数据的最终一致性;
  • 须要两套计算引擎,再加上音讯队列 Kafka 能力将数据写入到数据湖 Hudi 中,过程波及组件多、链路长,且耗费资源大。

基于以上问题,咱们整顿出了数据集成的外围需要:

  1. 全量增量主动切换,并保证数据的准确性。Flink + Canal 的架构能实现全量和增量主动切换,但无奈保证数据的准确性;
  2. 最大限度地缩小对源数据库的影响,比方同步过程中尽量不应用锁、能流控等;
  3. 能在已存在的工作中增加新表的数据采集,这是十分外围的需要,因为在简单的生产环境中,等所有表都筹备好之后再进行数据集成会导致效率低下。此外,如果不能做到工作的合并,须要起很屡次工作,采集很屡次 Binlog 的数据,可能会导致 DB 机器带宽被打满;
  4. 能同时进行全量和增量日志采集,新增表不能暂停日志采集来确保数据的准确性,这种形式会给其余表日志采集带来提早;
  5. 能确保数据在同一主键 ID 下按历史程序产生,不会呈现后产生的事件先发送到上游。

Flink CDC 很好地解决了业务痛点,并且在可扩展性、稳定性、社区活跃度方面都十分优良。

  • 首先,它能无缝对接 Flink 生态,复用 Flink 泛滥 sink 能力,应用 Flink 数据清理转换的能力;
  • 其次,它能进行全量与增量主动切换,并且保证数据的准确性;
  • 第三,它能反对无锁读取、断点续传、程度扩大,特地是在程度扩大方面,实践上来说,给的资源足够多时,性能瓶颈个别不会呈现在 CDC 侧,而是在于数据源 / 指标源是否能反对读 / 写这么多数据。

二、Flink CDC 实际问题与优化

上图为 Flink CDC 2.0 的架构原理。它基于 FLIP-27 实现,外围步骤如下:

  1. Enumerator 先将全量数据拆分成多个 SnapshotSplit,而后依照上图中第一步将 SnapshotSplit 发送给 SourceReader 执行。执行过程中会对数据进行修改来保证数据的一致性;
  2. SnapshotSplit 读取实现后向 Enumerator 汇报已读取实现的块信息;
  3. 反复执行 (1) (2) 两个步骤,直到将全量数据读取结束;
  4. 全量数据读取结束之后,Enumerator 会依据之前全量实现的 split 信息,结构一个 BinlogSplit。发送给 SourceRead 执行,读取增量日志数据。

问题一:新增表会进行 Binlog 日志流

在已存在的工作中增加新表是十分重要的需要,Flink CDC 2.0 也反对了这一性能。然而为了确保数据的一致性,Flink CDC 2.0 在新增表的流程中,须要进行 Binlog 日志流的读取,再进行新增表的全量数据读取。等新增表的全量数据读取结束之后,再将之前进行的 Binlog 工作重新启动。这也意味着新增表会影响其余表的日志采集进度。然而咱们心愿全量和增量两个工作可能同时进行,为了解决这一问题,咱们对 Flink CDC 进行了拓展,反对了全量和增量日志流并行读取,步骤如下:

  1. 程序启动后,在 Enumerator 中创立 BinlogSplit,放在调配列表的第一位,调配给 SourceReader 执行增量数据采集;
  2. 与原有的全量数据采集一样,Enumerator 将全量采集切分成多个 split 块,而后将切分好的块调配给 SourceReader 去执行全量数据的采集;
  3. 全量数据采集实现之后,SourceReader 向 Enumerator 汇报曾经实现的全量数据采集块的信息;
  4. 反复 (2) (3) 步,将全量的表采集结束。

以上就是第一次启动工作,全量与增量日志并行读取的流程。新增表后,并行读取实现步骤如下:

  1. 复原工作时,Flink CDC 会从 state 中获取用户新表的配置信息;
  2. 通过比照用户配置信息与状态信息,捕捉到要新增的表。对于 BinlogSplit 工作,会减少新表 binlog 数据的采集;对于 Enumerator 工作,会对新表进行全量切分;
  3. Enumerator 将切分好的 SnapshotSplit 调配给 SourceReader 执行全量数据采集;
  4. 反复步骤 (3),直到所有全量数据读取结束。

然而,实现全量和增量日志并行读取后,又呈现了数据抵触问题。

如上图所示,Flink CDC 在读取全量数据之前,会先读取以后 Binlog 的地位信息,将其标记为 LW,接着通过 select 的形式读取全量数据,读取到上图中 s1、s2、s3、s4 四条数据。再读取以后的 Binlog 地位,标记为 HW,而后将 LW 和 HW 中变更的数据 merge 到之前全量采集上来的数据中。通过一系列操作后,最终全量采集到的数据是 s1、s2、s3、s4 和 s5。

而增量采集的过程也会读取 Binlog 中的日志信息,会将 LW 和 HW 中的 s2、s2、s4、s5 四条数据发往上游。

上述整个流程中存在两个问题:首先,数据多取,存在数据反复,上图中红色标识即存在反复的数据;其次,全量和增量在两个不同的线程中,也有可能是在两个不同的 JVM 中,因而先发往上游的数据可能是全量数据,也有可能是增量数据,意味着同一主键 ID 达到上游的先后顺序不是按历史程序,与外围需要不符。

针对数据抵触问题,咱们提供了基于 GTID 实现的解决计划。

首先,为全量数据打上 Snapshot 标签,增量数据打上 Binlog 标签;其次,为全量数据补充一个高水位 GTID 信息,而增量数据自身携带有 GTID 信息,因而不须要补充。将数据下发,上游会接上一个 KeyBy 算子,再接上数据抵触解决算子,数据抵触的外围是保障发往上游的数据不反复,并且按历史程序产生。

如果下发的是全量采集到的数据,且此前没有 Binlog 数据下发,则将这条数据的 GTID 存储到 state 并把这条数据下发;如果 state 不为空且此条记录的 GTID 大于等于状态中的 GTID,也将这条数据的 GTID 存储到 state 并把这条数据下发;

通过这种形式,很好地解决了数据抵触的问题,最终输入到上游的数据是不反复且按历史程序产生的。

然而,新的问题又产生了。在解决算法中能够看出,为了确保数据的不反复并且按历史程序下发,会将所有记录对应的 GTID 信息存储在状态中,导致状态始终递增。

清理状态个别首选 TTL,但 TTL 难以管制工夫,且无奈将数据齐全清理掉。第二种形式是手动清理,全量表实现之后,能够下发一条记录通知上游清理 state 中的数据。

解决了以上所有问题,并行读取的最终计划如下图所示。

首先,给数据打上四种标签,别离代表不同的状态:

  • SNAPSHOT:全量采集到的数据信息。
  • STATE\_BINLOG:还未实现全量采集,Binlog 已采集到这张表的变更数据。
  • BINLOG:全量数据采集结束之后,Binlog 再采集到这张表的变更数据。
  • TABLE\_FINISHED:全量数据采集实现之后告诉上游,能够清理 state。

具体实现步骤如下:

  1. 调配 Binlog,此时 Binlog 采集到的数据都为 STATE\_BINLOG 标签;
  2. 调配 SnapshotSplit 工作,此时全量采集到的数据都为 SNAPSHOT 标签;
  3. Enumerator 实时监控表的状态,某一张表执行实现并实现 checkpoint 后,告诉 Binlog 工作。Binlog 工作收到告诉后,将此表后续采集到的 Binlog 信息都打上 BINLOG 标签;此外,它还会结构一条 TABLE\_FINISHED 记录发往上游做解决;
  4. 数据采集实现后,除了接上数据抵触解决算子,此处还新增了一个步骤:从支流中筛选进去的 TABLE\_FINISHED 事件记录,通过播送的形式将其发往上游,上游依据具体信息清理对应表的状态信息。

问题二:写 Hudi 时存在数据歪斜

如上图,Flink CDC 采集三张表数据的时候,会先读取完 tableA 的全量数据,再读取 tableB 的全量数据。读取 tableA 的过程中,上游只有 tableA 的 sink 有数据流入。

咱们通过多表混合读取的形式来解决数据歪斜的问题。

引入多表混合之前,Flink CDC 读取完 tableA 的所有 chunk,再读取 tableB 的所有 chunk。实现了多表混合读取后,读取的程序变为读取 tableA 的 chunk1、tableB 的 chunk1、tableC 的 chunk1,再读取 tableA 的 chunk2,以此类推,最终很好地解决了上游 sink 数据歪斜的问题,保障每个 sink 都有数据流入。

咱们对多表混合读取的性能进行了测试,由 TPCC 工具结构的测试数据,读取了 4。张表,总并行度为 8,每个 sink 的并行度为 2,写入工夫由原来的 46 分钟降至 20 分钟,性能晋升 2.3 倍。

须要留神的是,如果 sink 的并行度和总并行度相等,则性能不会有显著晋升,多表混合读取次要的作用是更快地获取到每张表下发的数据。

问题三:须要用户手动指定 schema 信息

用户手动执行 DB schema 与 sink 之间 schema 映射关系,开发效率低,耗时长且容易出错。

为了升高用户的应用门槛,晋升开发效率,咱们实现了 Oracle catalog,让用户能以低代码的形式、无需指定 DB schema 信息与 sink schema 信息的映射关系,即可通过 Flink CDC 将数据写入到 Hudi。

三、将来布局

第一,反对 schema 信息变更同步。比方数据源产生了 schema 信息变更,可能将其同步到 Kafka 和 Hudi 中;反对平台接入更多数据源类型,加强稳定性,实现更多利用场景的落地。

第二,反对 SQL 化的形式,应用 Flink CDC 将数据同步到 Hudi 中,升高用户的应用门槛。

第三,心愿技术更凋谢,与社区独特成长,为社区奉献出本人的一份力量。

问答

Q:断点续传采集如何解决?

A:断点续传有两种,分为全量和 Binlog。但它们都是基于 Flink state 的能力,同步的过程中会将进度存储到 state 中。如果失败了,下一次再从 state 中复原即可。

Q:MySQL 在监控多表应用 SQL 写入 Hudi 表中的时候,存在多个 job,保护很麻烦,如何通过单 job 同步整库?

A:咱们基于 GTID 的形式对 Flink CDC 进行了拓展, 反对工作中新增表,且不影响其余表的采集进度。不思考新增表影响到其余表进度的状况下,也能够基于 Flink CDC 2.2 做新增表的能力。

Q:顺丰这些个性会在 CDC 开源版本中实现吗?

A:目前咱们的计划还存在一些局限性,比方必须用 MySQL 的 GTID,须要上游有数据抵触解决的算子,因而较难实现在社区中开源。

Q:Flink CDC 2.0 新增表反对全量 + 增量吗?

A:是的。

Q:GTID 去重算子会不会成为性能瓶颈?

A:通过实际,不存在性能瓶颈,它只是做了一些数据的判断和过滤。

点击查看直播回放 & 演讲 PDF

版权申明: 本文内容由阿里云实名注册用户自发奉献,版权归原作者所有,阿里云开发者社区不领有其著作权,亦不承当相应法律责任。具体规定请查看《阿里云开发者社区用户服务协定》和《阿里云开发者社区知识产权爱护指引》。如果您发现本社区中有涉嫌剽窃的内容,填写侵权投诉表单进行举报,一经查实,本社区将立即删除涉嫌侵权内容。

正文完
 0