乐趣区

关于sql:Flink-CDC-20-正式发布详解核心改进

简介:Flink CDC 2.0.0 版本于 8 月 10 日正式公布,点击理解详情~

本文由社区志愿者陈政羽整顿,内容起源自阿里巴巴高级开发工程师徐榜江 (雪尽) 7 月 10 日在北京站 Flink Meetup 分享的《详解 Flink-CDC》。深刻解说了最新公布的 Flink CDC 2.0.0 版本带来的外围个性,包含:全量数据的并发读取、checkpoint、无锁读取等重大改良。

GitHub 地址:
https://github.com/ververica/flink-cdc-connectors

一、CDC 概述

CDC 的全称是 Change Data Capture,在狭义的概念上,只有是能捕捉数据变更的技术,咱们都能够称之为 CDC。目前通常形容的 CDC 技术次要面向数据库的变更,是一种用于捕捉数据库中数据变更的技术。CDC 技术的利用场景十分宽泛:

  • 数据同步: 用于备份,容灾;
  • 数据散发: 一个数据源分发给多个上游零碎;
  • 数据采集: 面向数据仓库 / 数据湖的 ETL 数据集成,是十分重要的数据源。

CDC 的技术计划十分多,目前业界支流的实现机制能够分为两种:

  • 基于查问的 CDC:

    • 离线调度查问作业,批处理。把一张表同步到其余零碎,每次通过查问去获取表中最新的数据;
    • 无奈保障数据一致性,查的过程中有可能数据曾经产生了屡次变更;
    • 不保障实时性,基于离线调度存在人造的提早。
  • 基于日志的 CDC:

    • 实时生产日志,流解决,例如 MySQL 的 binlog 日志残缺记录了数据库中的变更,能够把 binlog 文件当作流的数据源;
    • 保障数据一致性,因为 binlog 文件蕴含了所有历史变更明细;
    • 保障实时性,因为相似 binlog 的日志文件是能够流式生产的,提供的是实时数据。

比照常见的开源 CDC 计划,咱们能够发现:

  • 比照增量同步能力,

    • 基于日志的形式,能够很好的做到增量同步;
    • 而基于查问的形式是很难做到增量同步的。
  • 比照全量同步能力,基于查问或者日志的 CDC 计划根本都反对,除了 Canal。
  • 而比照全量 + 增量同步的能力,只有 Flink CDC、Debezium、Oracle Goldengate 反对较好。
  • 从架构角度去看,该表将架构分为单机和分布式,这里的分布式架构不单纯体现在数据读取能力的程度扩大上,更重要的是在大数据场景下分布式系统接入能力。例如 Flink CDC 的数据入湖或者入仓的时候,上游通常是分布式的零碎,如 Hive、HDFS、Iceberg、Hudi 等,那么从对接入分布式系统能力上看,Flink CDC 的架构可能很好地接入此类零碎。
  • 在数据转换 / 数据荡涤能力上,当数据进入到 CDC 工具的时候是否能较不便的对数据做一些过滤或者荡涤,甚至聚合?

    • 在 Flink CDC 上操作相当简略,能够通过 Flink SQL 去操作这些数据;
    • 然而像 DataX、Debezium 等则须要通过脚本或者模板去做,所以用户的应用门槛会比拟高。
  • 另外,在生态方面,这里指的是上游的一些数据库或者数据源的反对。Flink CDC 上游有丰盛的 Connector,例如写入到 TiDB、MySQL、Pg、HBase、Kafka、ClickHouse 等常见的一些零碎,也反对各种自定义 connector。

二、Flink CDC 我的项目

讲到这里,先带大家回顾下开发 Flink CDC 我的项目的动机。

1. Dynamic Table & ChangeLog Stream

大家都晓得 Flink 有两个根底概念:Dynamic Table 和 Changelog Stream。

  • Dynamic Table 就是 Flink SQL 定义的动静表,动静表和流的概念是对等的。参照上图,流能够转换成动静表,动静表也能够转换成流。
  • 在 Flink SQL 中,数据在从一个算子流向另外一个算子时都是以 Changelog Stream 的模式,任意时刻的 Changelog Stream 能够翻译为一个表,也能够翻译为一个流。

联想下 MySQL 中的表和 binlog 日志,就会发现:MySQL 数据库的一张表所有的变更都记录在 binlog 日志中,如果始终对表进行更新,binlog 日志流也始终会追加,数据库中的表就相当于 binlog 日志流在某个时刻点物化的后果;日志流就是将表的变更数据继续捕捉的后果。这阐明 Flink SQL 的 Dynamic Table 是能够十分天然地示意一张一直变动的 MySQL 数据库表。

在此基础上,咱们调研了一些 CDC 技术,最终抉择了 Debezium 作为 Flink CDC 的底层采集工具。Debezium 反对全量同步,也反对增量同步,也反对全量 + 增量的同步,非常灵活,同时基于日志的 CDC 技术使得提供 Exactly-Once 成为可能。

将 Flink SQL 的外部数据结构 RowData 和 Debezium 的数据结构进行比照,能够发现两者是十分类似的。

  • 每条 RowData 都有一个元数据 RowKind,包含 4 种类型,别离是插入 (INSERT)、更新前镜像 (UPDATE\_BEFORE)、更新后镜像 (UPDATE\_AFTER)、删除 (DELETE),这四种类型和数据库外面的 binlog 概念保持一致。
  • 而 Debezium 的数据结构,也有一个相似的元数据 op 字段,op 字段的取值也有四种,别离是 c、u、d、r,各自对应 create、update、delete、read。对于代表更新操作的 u,其数据局部同时蕴含了前镜像 (before) 和后镜像 (after)。

通过剖析两种数据结构,Flink 和 Debezium 两者的底层数据是能够十分不便地对接起来的,大家能够发现 Flink 做 CDC 从技术上是十分适合的。

2. 传统 CDC ETL 剖析

咱们来看下传统 CDC 的 ETL 剖析链路,如下图所示:

传统的基于 CDC 的 ETL 剖析中,数据采集工具是必须的,国外用户罕用 Debezium,国内用户罕用阿里开源的 Canal,采集工具负责采集数据库的增量数据,一些采集工具也反对同步全量数据。采集到的数据个别输入到消息中间件如 Kafka,而后 Flink 计算引擎再去生产这一部分数据写入到目标端,目标端能够是各种 DB,数据湖,实时数仓和离线数仓。

留神,Flink 提供了 changelog-json format,能够将 changelog 数据写入离线数仓如 Hive / HDFS;对于实时数仓,Flink 反对将 changelog 通过 upsert-kafka connector 间接写入 Kafka。

咱们始终在思考是否能够应用 Flink CDC 去替换上图中虚线框内的采集组件和音讯队列,从而简化剖析链路,升高保护老本。同时更少的组件也意味着数据时效性可能进一步提高。答案是能够的,于是就有了咱们基于 Flink CDC 的 ETL 剖析流程。

3. 基于 Flink CDC 的 ETL 剖析

在应用了 Flink CDC 之后,除了组件更少,保护更不便外,另一个劣势是通过 Flink SQL 极大地升高了用户应用门槛,能够看上面的例子:

该例子是通过 Flink CDC 去同步数据库数据并写入到 TiDB,用户间接应用 Flink SQL 创立了产品和订单的 MySQL-CDC 表,而后对数据流进行 JOIN 加工,加工后间接写入到上游数据库。通过一个 Flink SQL 作业就实现了 CDC 的数据分析,加工和同步。

大家会发现这是一个纯 SQL 作业,这意味着只有会 SQL 的 BI,业务线同学都能够实现此类工作。与此同时,用户也能够利用 Flink SQL 提供的丰盛语法进行数据荡涤、剖析、聚合。

而这些能力,对于现有的 CDC 计划来说,进行数据的荡涤,剖析和聚合是十分艰难的。

此外,利用 Flink SQL 双流 JOIN、维表 JOIN、UDTF 语法能够非常容易地实现数据打宽,以及各种业务逻辑加工。

4. Flink CDC 我的项目倒退

  • 2020 年 7 月由云邪提交了第一个 commit,这是基于个人兴趣孵化的我的项目;
  • 2020 年 7 中旬反对了 MySQL-CDC;
  • 2020 年 7 月末反对了 Postgres-CDC;
  • 一年的工夫,该我的项目在 GitHub 上的 star 数曾经超过 800。

三、Flink CDC 2.0 详解

1. Flink CDC 痛点

MySQL CDC 是 Flink CDC 中应用最多也是最重要的 Connector,本文下述章节形容 Flink CDC Connector 均为 MySQL CDC Connector。

随着 Flink CDC 我的项目的倒退,失去了很多用户在社区的反馈,次要演绎为三个:

  • 全量 + 增量读取的过程须要保障所有数据的一致性,因而须要通过加锁保障,然而加锁在数据库层面上是一个非常高危的操作。底层 Debezium 在保证数据一致性时,须要对读取的库或表加锁,全局锁可能导致数据库锁住,表级锁会锁住表的读,DBA 个别不给锁权限。
  • 不反对程度扩大,因为 Flink CDC 底层是基于 Debezium,起架构是单节点,所以 Flink CDC 只反对单并发。在全量阶段读取阶段,如果表十分大 (亿级别),读取工夫在小时甚至天级别,用户不能通过减少资源去晋升作业速度。
  • 全量读取阶段不反对 checkpoint:CDC 读取分为两个阶段,全量读取和增量读取,目前全量读取阶段是不反对 checkpoint 的,因而会存在一个问题:当咱们同步全量数据时,假如须要 5 个小时,当咱们同步了 4 小时的时候作业失败,这时候就须要从新开始,再读取 5 个小时。

2. Debezium 锁剖析

Flink CDC 底层封装了 Debezium,Debezium 同步一张表分为两个阶段:

  • 全量阶段: 查问以后表中所有记录;
  • 增量阶段: 从 binlog 生产变更数据。

大部分用户应用的场景都是全量 + 增量同步,加锁是产生在全量阶段,目标是为了确定全量阶段的初始位点,保障增量 + 全量实现一条不多,一条不少,从而保证数据一致性。从下图中咱们能够剖析全局锁和表锁的一些加锁流程,右边红色线条是锁的生命周期,左边是 MySQL 开启可反复读事务的生命周期。

以全局锁为例,首先是获取一个锁,而后再去开启可反复读的事务。这里锁住操作是读取 binlog 的起始地位和以后表的 schema。这样做的目标是保障 binlog 的起始地位和读取到的以后 schema 是能够对应上的,因为表的 schema 是会扭转的,比方如删除列或者减少列。在读取这两个信息后,SnapshotReader 会在可反复读事务里读取全量数据,在全量数据读取实现后,会启动 BinlogReader 从读取的 binlog 起始地位开始增量读取,从而保障全量数据 + 增量数据的无缝连接。

表锁是全局锁的进化版,因为全局锁的权限会比拟高,因而在某些场景,用户只有表锁。表锁锁的工夫会更长,因为表锁有个特色:锁提前开释了可反复读的事务默认会提交,所以锁须要等到全量数据读完后能力开释。

通过下面剖析,接下来看看这些锁到底会造成怎么重大的结果:

Flink CDC 1.x 能够不加锁,可能满足大部分场景,但就义了肯定的数据准确性。Flink CDC 1.x 默认加全局锁,尽管能保证数据一致性,但存在上述 hang 住数据的危险。

3. Flink CDC 2.0 设计 (以 MySQL 为例)

通过下面的剖析,能够晓得 2.0 的设计方案,外围要解决上述的三个问题,即反对无锁、程度扩大、checkpoint。

DBlog 这篇论文里形容的无锁算法如下图所示:

右边是 Chunk 的切分算法形容,Chunk 的切分算法其实和很多数据库的分库分表原理相似,通过表的主键对表中的数据进行分片。假如每个 Chunk 的步长为 10,依照这个规定进行切分,只须要把这些 Chunk 的区间做成左开右闭或者左闭右开的区间,保障连接后的区间可能等于表的主键区间即可。

左边是每个 Chunk 的无锁读算法形容,该算法的核心思想是在划分了 Chunk 后,对于每个 Chunk 的全量读取和增量读取,在不必锁的条件下实现一致性的合并。Chunk 的切分如下图所示:

因为每个 chunk 只负责本人主键范畴内的数据,不难推导,只有可能保障每个 Chunk 读取的一致性,就能保障整张表读取的一致性,这便是无锁算法的基本原理。

Netflix 的 DBLog 论文中 Chunk 读取算法是通过在 DB 保护一张信号表,再通过信号表在 binlog 文件中打点,记录每个 chunk 读取前的 Low Position (低位点) 和读取完结之后 High Position (高位点),在低位点和高位点之间去查问该 Chunk 的全量数据。在读取出这一部分 Chunk 的数据之后,再将这 2 个位点之间的 binlog 增量数据合并到 chunk 所属的全量数据,从而失去高位点时刻,该 chunk 对应的全量数据。

Flink CDC 联合本身的状况,在 Chunk 读取算法上做了去信号表的改良,不须要额定保护信号表,通过间接读取 binlog 位点代替在 binlog 中做标记的性能,整体的 chunk 读算法形容如下图所示:

比方正在读取 Chunk-1,Chunk 的区间是 [K1, K10],首先间接将该区间内的数据 select 进去并把它存在 buffer 中,在 select 之前记录 binlog 的一个位点 (低位点),select 实现后记录 binlog 的一个位点 (高位点)。而后开始增量局部,生产从低位点到高位点的 binlog。

  • 图中的 – (k2,100) + (k2,108) 记录示意这条数据的值从 100 更新到 108;
  • 第二条记录是删除 k3;
  • 第三条记录是更新 k2 为 119;
  • 第四条记录是 k5 的数据由原来的 77 变更为 100。

察看图片中右下角最终的输入,会发现在生产该 chunk 的 binlog 时,呈现的 key 是 k2、k3、k5,咱们返回 buffer 将这些 key 做标记。

  • 对于 k1、k4、k6、k7 来说,在高位点读取结束之后,这些记录没有变动过,所以这些数据是能够间接输入的;
  • 对于扭转过的数据,则须要将增量的数据合并到全量的数据中,只保留合并后的最终数据。例如,k2 最终的后果是 119,那么只须要输入 +(k2,119),而不须要两头产生过扭转的数据。

通过这种形式,Chunk 最终的输入就是在高位点是 chunk 中最新的数据。

上图形容的是单个 Chunk 的一致性读,然而如果有多个表分了很多不同的 Chunk,且这些 Chunk 散发到了不同的 task 中,那么如何散发 Chunk 并保障全局一致性读呢?

这个就是基于 FLIP-27 来优雅地实现的,通过下图能够看到有 SourceEnumerator 的组件,这个组件次要用于 Chunk 的划分,划分好的 Chunk 会提供给上游的 SourceReader 去读取,通过把 chunk 分发给不同的 SourceReader 便实现了并发读取 Snapshot Chunk 的过程,同时基于 FLIP-27 咱们能较为不便地做到 chunk 粒度的 checkpoint。

当 Snapshot Chunk 读取实现之后,须要有一个汇报的流程,如下图中橘色的汇报信息,将 Snapshot Chunk 实现信息汇报给 SourceEnumerator。

汇报的次要目标是为了后续散发 binlog chunk (如下图)。因为 Flink CDC 反对全量 + 增量同步,所以当所有 Snapshot Chunk 读取实现之后,还须要生产增量的 binlog,这是通过下发一个 binlog chunk 给任意一个 Source Reader 进行单并发读取实现的。

对于大部分用户来讲,其实无需过于关注如何无锁算法和分片的细节,理解整体的流程就好。

整体流程能够概括为,首先通过主键对表进行 Snapshot Chunk 划分,再将 Snapshot Chunk 分发给多个 SourceReader,每个 Snapshot Chunk 读取时通过算法实现无锁条件下的一致性读,SourceReader 读取时反对 chunk 粒度的 checkpoint,在所有 Snapshot Chunk 读取实现后,下发一个 binlog chunk 进行增量局部的 binlog 读取,这便是 Flink CDC 2.0 的整体流程,如下图所示:

Flink CDC 是一个齐全开源的我的项目,我的项目所有设计和源码目前都已奉献到开源社区,Flink CDC 2.0 也曾经正式公布,此次的外围改良和晋升包含:

  • 提供 MySQL CDC 2.0,外围 feature 包含

    • 并发读取,全量数据的读取性能能够程度扩大;
    • 全程无锁,不对线上业务产生锁的危险;
    • 断点续传,反对全量阶段的 checkpoint。
  • 搭建文档网站,提供多版本文档反对,文档反对关键词搜寻

笔者用 TPC-DS 数据集中的 customer 表进行了测试,Flink 版本是 1.13.1,customer 表的数据量是 6500 万条,Source 并发为 8,全量读取阶段:

  • MySQL CDC 2.0 用时 13 分钟;
  • MySQL CDC 1.4 用时 89 分钟;
  • 读取性能晋升 6.8 倍。

为了提供更好的文档反对,Flink CDC 社区搭建了文档网站,网站反对对文档的版本治理:

文档网站反对关键字搜寻性能,十分实用:

四、将来布局

对于 CDC 我的项目的将来布局,咱们心愿围绕稳定性,进阶 feature 和生态集成三个方面开展。

  • 稳定性

    • 通过社区的形式吸引更多的开发者,公司的开源力量晋升 Flink CDC 的成熟度;
    • 反对 Lazy Assigning。Lazy Assigning 的思路是将 chunk 先划分一批,而不是一次性进行全副划分。以后 Source Reader 对数据读取进行分片是一次性全副划分好所有 chunk,例如有 1 万个 chunk,能够先划分 1 千个 chunk,而不是一次性全副划分,在 SourceReader 读取完 1 千 chunk 后再持续划分,节约划分 chunk 的工夫。
  • 进阶 Feature

    • 反对 Schema Evolution。这个场景是:当同步数据库的过程中,忽然在表中增加了一个字段,并且心愿后续同步上游零碎的时候可能主动退出这个字段;
    • 反对 Watermark Pushdown 通过 CDC 的 binlog 获取到一些心跳信息,这些心跳的信息能够作为一个 Watermark,通过这个心跳信息能够晓得到这个流以后生产的一些进度;
    • 反对 META 数据,分库分表的场景下,有可能须要元数据晓得这条数据起源哪个库哪个表,在上游零碎入湖入仓能够有更多的灵便操作;
    • 整库同步:用户要同步整个数据库只需一行 SQL 语法即可实现,而不必每张表定义一个 DDL 和 query。
  • 生态集成

    • 集成更多上游数据库,如 Oracle,MS SqlServer。Cloudera 目前正在踊跃奉献 oracle-cdc connector;
    • 在入湖层面,Hudi 和 Iceberg 写入上有肯定的优化空间,例如在高 QPS 入湖的时候,数据分布有比拟大的性能影响,这一点能够通过与生态买通和集成持续优化。

最初,欢送大家退出 Flink CDC 用户群一起交换。

附录

[1] Flink-CDC 我的项目地址

[2] Flink-CDC 文档网站

[3] Percona – MySQL 全局锁工夫剖析

[4] DBLog – 无锁算法论文

[5] Flink FLIP-27 设计文档


实时数仓 Meetup 议题征集

8 月 29 日左右 (工夫暂定),Flink 社区打算举办 Meetup 实时数仓专场,现征集议题中!
对于实时数仓,大家的关注度始终很高,目前业界也有许多落地的公司。在 Meetup 实时数仓专场,咱们将更加重视“交换”,心愿将大家汇集在一起互相探讨对于实时数仓的话题,重点在踩过的坑、碰到的痛点都是怎么解决的~
现征集实时数仓 Meetup 的议题,围绕“实时数仓踩坑痛点和避坑教训”,欢送各位老师和同学带上贵公司的介绍,以及议题的初步纲要来找小松鼠。
公司不议大小,教训才论足缺。咱们会选取其中最具代表性的议题,邀请您加入实时数仓 Meetup 专场~ 你们的教训对于其余技术开发者和 Flink 社区都很重要!

版权申明: 本文内容由阿里云实名注册用户自发奉献,版权归原作者所有,阿里云开发者社区不领有其著作权,亦不承当相应法律责任。具体规定请查看《阿里云开发者社区用户服务协定》和《阿里云开发者社区知识产权爱护指引》。如果您发现本社区中有涉嫌剽窃的内容,填写侵权投诉表单进行举报,一经查实,本社区将立即删除涉嫌侵权内容。

退出移动版