关于后端:Flink-CDC-在京东的探索与实践

9次阅读

共计 7913 个字符,预计需要花费 20 分钟才能阅读完成。

摘要:本文整顿自京东资深技术专家韩飞,在 Flink Forward Asia 2022 数据集成专场的分享。本篇内容次要分为四个局部:

  1. 京东自研 CDC 介绍
  2. 京东场景的 Flink CDC 优化
  3. 业务案例
  4. 将来布局

点击查看直播回放和演讲 PPT

一、京东自研 CDC 介绍

京东自研 CDC 代号为 Fregata,是咱们针对数据实时采集和散发场景自研的底层框架。Fregata 是一种动物,叫做军舰鸟,它是世界上飞行速度最快的鸟,即便在顽劣天气下也能放弃很好的航行能力及机动性,寓意咱们整个实时采集、散发服务的高效稳固。

目前,Fregata 是京东团体数据中台实时采集和散发的对立入口,服务京东批发、物流、科技、衰弱和工业等 BGBU,笼罩订单交易、商智黄金眼、实时风控、京东白条、实时大屏等外围业务。

目前 Fregata 线上稳固运行工作超过两万,大促解决条数峰值为 64.1 亿条 /min,这个是采集和散发的总数据条数,对应的传输数据量峰值为 8.3TB/min。

针对单数据库实例的采集能力超过 500w 条 /min,远超数据库主从同步的速率。

Fregata 工作目前总计应用 CPU 资源超过 6 万核,内存应用超过 18wGB。

咱们基于京东 JDOS 平台实现了 Fregata 工作的容器化部署和运行,并且反对工作的跨机房部署,目前工作次要散布汇天和廊坊两个机房,两个机房互为主备。

容灾方面,反对工作的一键容灾切换,如果呈现机房大面积故障或者断网等状况,能够疾速将工作切换到备机房,从而保障工作的疾速复原和稳固运行。

上图左侧次要展现了 Fregata 的整体架构。

首先,Fregata 依照性能分为实时采集和实时散发两局部,实时采集基于数据库主从复制原理,实时捕捉 Binlog 数据进行解析并依照肯定的格局进行封装,而后发送到京东自研音讯队列 JDQ 中,供上游业务实时生产,目前反对的源端数据库类型有物理 MySQL,京东自研弹性数据库 JED、京东云 RDS、京东数科 CDS 及 Oracle,其中 Oracle 是通过 Logminer 来实现对数据库日志的实时采集。

实时散发局部,次要是将 JDQ 中多种格局的数据实时同步到不同的指标存储中,目前反对的音讯格局有 CSV/JSON/ProtoBuf/Xml/Avro 等,目前反对的指标存储有 HDFS 或者 Hive(对应离线数仓),OLAP 剖析引擎包含 Doris 和 ClickHouse,音讯队列 JDQ,ElasticSearch 及数据湖存储 Iceberg。反对的数据源端、指标端都会依据理论需要一直进行丰盛。

Fregata 做采集和散发的拆分这样的设计次要是基于“一次采集、屡次散发”的思路,这样的益处是既能够缩小对上游数据库的负载,又能够满足业务屡次生产、多种不同类型生产、以及短期内数据回放的需要,这里 JDQ 数据个别保留 7 天。

上图右侧次要展现了 Fregarat 引擎的设计框架,整个引擎次要分为三层,别离是 Source、Parse、Sink 算子,每层算子之间通过 RingBuffer 进行链接(咱们选用的 disruptor)。

  • Source 算子依据数据源类型的不同实现源端数据的拉取并推到 RingBuffer 中。
  • Parse 算子从 RingBuffer 中拉取数据,对数据进行解析组装和一些 ETL 加工,而后将数据发送到上游的 RingBuffer 中。
  • Sink 算子拉取上游 RingBuffer 中的数据并依照指标数据源的要求进行肯定的数据格式的组装,而后发送到不同的指标存储上。

此外,还有一个 BarrierService 定时产生 Barrier,整个工作通过 Barrier 服务来实现状态的提交和记录,其原理跟 Flink 中 Checkponit 机制相似。BarrierService 定时产生 Barrier 并传递给 Source 算子,Source 算子在拿到 Barrier 之后以播送的模式传递给上游的 Parse,上游的 Parse 拿到 Barrier 之后再以播送的模式传递给所有的 Sink 算子,当每个 Sink 算子收到所有 Barrier 之后会向 BarrierService 进行 ack 操作,这时 BarrierService 会进行一系列的状态提交,例如提交生产位点、记录的 Binlog 地位等。

咱们接着看 Fregata 的技术个性,首先是对于 Binlog 的位点追踪。

上图右侧次要介绍了实时采集工作启动运行的整个流程。其中位点服务中记录工作上次曾经生产的 Binlog 位点信息,次要包含 Binlog 文件名称,该 Binlog 文件曾经生产到的地位,数据库实例的 serverid,该 Binlog 地位对应的事务产生工夫以及 GTID 信息。

采集工作启动时会向位点服务获取上次记录的 Binlog 位点信息,而后将记录的 BinlogPosition 或者 GTID 信息传递给 Binlog Connector,Binlog Connector 依据 BinlogPostion 或者 GTID 信息生成 dump 命令并发送给数据库实例,而后数据库实例将 Binlog 日志推送给 Binlog Connector,Binlog Connector 将承受的 Binlog 日志进行反序列化并封装成 Binlog Event 传递给 Fregata,Fregata 对 Binlog Event 进行相干解决后发送给 JDQ。

因为 MySQL 在 5.6.5 版本之后才有 GTID,并且京东线上业务库存在数据库版本较低的景象,因而 Fregata 对 BinlogPosition 和 GTID 两种形式都进行了反对,并且反对从指定工夫点、最新位点、起始位点以及指定 Binlog 地位,多种生产模式灵便配置。

此外,当上游数据库版本升级至高版本并开启了 GTID 后,就存在采集工作须要从 BinlogPosition 模式切换成 GTID 模式的场景,所以 Fregata 也反对了工作的位点模式在 BinlogPosition 和 GTID 之间主动切换的性能,并且在切换的过程中保证数据不丢不重。

切换过程如上图中左下角所示,首先工作从 BinlogPosition 模式重启,而后查问并缓存在这个重启过程中曾经执行的 GTID 事务。接着工作会先以 BinlogPosition 模式持续解决 Binlog 中的 GTID EVENT,并判断前边缓存的 GTID 中是否蕴含以后已生产的 GTID,如果不蕴含,则阐明生产进度曾经追上,此时工作将位点记录模式间接切换成 GTID 模式。

接着介绍 Fregata 动静感知相干的性能。Fregata 实时采集工作配置是数据库域名,如果线上数据库存在故障或者要下线,则会有数据库实例须要产生变更的场景,Fregata 是能够感知到变更并主动进行切换的。

因为切换前后两个数据库实例 Binlog 文件个别都是不统一的,如果此时工作位点记录形式是 BinlogPosition 模式,则在切换之后工作须要主动进行 Binlog 对齐操作,进而保证数据的完整性。(GTID 模式是不须要思考这个问题的)

整个切换过程如上图右侧所示,BinlogPosition 模式下,工作会查问出新数据实例上全副的 Binlog 文件,并依照倒序对 Binlog 文件进行遍历,而后依据位点服务中记录的工夫戳查问出对应的 Position,而后工作从查问出的该 Position 持续生产。这种倒序查找的形式次要是针对线上切库的场景,这种状况下采纳顺叙的查问效率比拟高,个别查找 1-2 分钟前的 Binlog 即可。

Fregata 动静感知能力还体现在 DDL 变更的感知上,Fregata 可能辨认数据库中的 DDL 操作并主动进行适配,目前反对的 DDL 变更类型包含,比方新增、删减字段,批改字段类型、字段程序调整等。

因为上游业务方也会关注数据库的 DDL 操作,因而 Fregata 在辨认到 DDL 操作时,还会主动以邮件或者语音的形式告诉管理员及用户进行关注。

Fregata 也具备一些数据加工及丰盛的能力。

Fregata 在采集 Binlog 的过程中,会对每一条记录减少一个惟一的版本号 Mid(也就是 message id),上游用户能够依据这个版本号进行去重或者确定最新的变更记录,比方当将增量数据散发到 Hive 或者其余无主键束缚的存储中时,用户能够依据 Mid 来确定对于同一个主键的多条操作记录,哪条是最新的变更操作。

此外,Fregata 还会将数据库、表及数据库实例等信息作为元数据封装到每条音讯体中,不便上游有相干需要的业务用于判断数据的起源。

在数据加工方面,采集过程中还反对应用多种函数对数据进行加工解决,如敏感字段加解密、类型转换、工夫转换等。

在部署方面,如果上游业务库是分库分表模式并笼罩多个实例,Fregata 将会依据数据库实例个数启动多个采集工作,采集工作和数据库实例一一对应。

这样的益处是工作互相独立并且资源隔离,繁多数据库实例的变更不影响其余数据库实例的采集工作,劣势是如果实例数量较多,配置和保护老本会略高;配置方面,咱们通过产品化流程解决这个问题,实现一次配置。

告警方面,Fregata 反对工作存活告警,在工作存活异样的状况下,运维人员会收到语音或者邮件报警信息。同时,采集工作会按分钟粒度上报采集提早、数据库主从提早和抽取零值的这些监控指标信息,供用户观测工作运行状况。

全增量数据反对方面,Fregata 目前只反对增量数据的抽取,全量数据的抽取依赖 Binlog 保留工夫。

换句话说,如果 Binlog 数据全量保留,则能够抽取全副数据,否则,只能抽取保留的 Binlog 数据,其余更早的历史数据须要离线抽取来弥补。

二、京东场景的 Flink CDC 优化

上边是对于 Fregata 的内容,整体来讲,目前咱们对于 Flink CDC 的应用还处在一个多方面验证和绝对高级的阶段。针对京东外部的场景,咱们在 Flink CDC 中适当补充了一些个性来满足咱们的理论需要。所以接下来一起看下京东场景下的 Flink CDC 优化。

在实践中,会有业务方提出心愿依照指定工夫来进行历史数据的回溯,这是一类需要;还有一种场景是当原来的 Binlog 文件被全副清理,这时须要重置到新产生的 Binlog 文件上。

针对上述场景,咱们通过复用 scan.startup.mode 参数,扩大 earliest-offset\timestamp\specific-offset 三种 Binlog 阶段的启动模式。

其中 specific-offset 模式下,须要设置 scan.startup.specific-offset.file 参数指定 Binlog 文件名称、scan.startup.specific-offset.pos 指定该文件的某一个地位,依据这两个参数来确定增量阶段要生产的起始地位;earliest-offset 模式下默认会读取最早的 Binlog 文件;timestamp 模式,须要设置一个工夫参数 scan.startup.timestamp-millis。

如上图右侧所示,在 timestamp 启动模式下,会依据用户指定的工夫依照倒序的形式去查找相应的 Binlog 文件以及 Position,最终底层模式齐全复用 specific-offset 的形式。

不论应用哪种模式,最终都会依据不同的启动模式构建正确的 Start Binlog Offset,并进一步构建 MySQLBinlogSplit。

在低版本 MySQL 的生产中,会存在数据库实例下线,或者从库存在显著的主从提早(需迁徙至其余从库);在这两种场景下,个别会进行切库操作。如何实现主动切库呢?或者说如何实现在低版本 MySQL 的 Binlogposition 模式下的主动切库呢?

如上图右侧所示,咱们减少了一步 切库查看的操作:

首先,在 MySQLBinlogsplit 中减少了对 MySQL 层面的 serverid 信息的保留,并批改了 state 保留 & 复原过程中对 MySQLSplitBinlog 对象的解决逻辑。

而后,查问 MySQL 实例获取 serveid,并与 MySQLBinlogsplit 对象中存储的 serverid 进行比照。

如果不统一,则认为产生切库操作,此时须要依据 Binlogoffset 保留的生产位点的工夫信息,也就是 timestamp,在新库中倒序查找并从新构建 start Binlogoffset 以及进一步构建 MySQLBinlogsplit。

以后 Flink MySQL CDC 反对采集时延、发送时延、闲暇时长的监控指标,在理论生产中,用户反馈有须要关注上游数据库主从提早的需要。同时,所有监控指标都存在可视化及异样报警需要。

基于上述情况,首先咱们新增了数据库主从提早的监控指标,并将所有这些监控指标对接到监控零碎 Byzer。如上图所示,整体流程是这样,Flink JobManager 和 TaskManager 启动时会携带 agent,会通过 agent 将监控数据发送到 Byzer 零碎。

用户能够在 JRC 平台 (实时计算平台) 配置监控报警规定,这些规定会被同步到 Byzer 零碎。另一方面,JRC 平台会拉取 Byzer 监控零碎数据并进行可视化展现。

最初来看一个偏利用层面的革新,在理论的业务中大量存在分库分表的场景,并且线上分库分表根本会散布在多个 MySQL 实例中。

社区版本 Flink MySQL CDC 如果要在一个作业中反对多实例,须要用户屡次复制 DDL 定义语句并批改 hostname 配置,如果实例数量多的话是比拟影响用户体验及 SQL 的可读性。

对此,咱们联合平台实现了多实例的反对。通过 calcite 解析用户的 SQL 语句,找到 MySQL-cdc 的 DDL 定义,并解析其中 hostname 字段来判断是否蕴含多实例,也就是配置了多个 host。如果蕴含多个实例,则主动按实例宰割,创立不同实例对应的表,最初再 union 为一个视图。如图中蓝色卷轴示例所示,此时只须要做一次 DDL 的定义。

此外,在采集多实例,写带 Primary Key 的 Sink 场景中,咱们做了一个优化。因为 Flink MySQL CDC 进入 Binlog 阶段后只会在 Source 算子的第一个 subtask 中执行工作,而 Primary Key Sink 会触发 Flink 引擎优化 Sink 算子减少 NotNullEnforcer 算子来检查数据相干的 not null 的字段,而后再进行 hash 散发到 SinkMaterializer 算子以及前面的 Sink 算子。

因为 Source 与 NotNullEnforcer 之间是 forward 关系,因而 NotNullEnforcer 也只有一个 task 解决数据,这在 Source 较多的场景下解决性能可能是不够的。

为充分利用 NotNullEnforcer 算子的并行度,咱们减少了 table.exec.sink.not-null-enforcer.hash 参数,而后在 commonExecSink 中减少 通过该参数来判断是否要减速 NotNullEnforcer 算子 这样的逻辑。如果开启减速,则提前应用 Primary Key 进行 hash,而后再散发到 NotNullEnforcer 算子,从而实现对 NotNullEnforcer 算子的优化。

来看下优化前后的比照。

第一个图中能够看到,如红框所示,NotNullEnforcer 算子中只有第一个 Task 在解决数据。

优化后,在第二个图中,能够看到 NotNullEnforcer 算子的所有 10 个并行度都被利用了起来,并且 Source 算子和 NotNullEnforcer 算子之间是 hash 关系。

三、业务案例

在这个案例中,咱们联合 Flink CDC、Flink 外围计算能力以及数据湖 Hudi,对咱们平台的一个业务方,京东物流的一个业务数据系统进行了技术架构的试点革新。

这个零碎是物流经营数据中心 LDC 中的中小件实时经营监控零碎。它在整个京东物流外部被高频应用,不论是管理者用于决策,还是一线人员用于精细化进度治理。

它笼罩物流的三大外围操作环节,揽收、分拣、配送,并在不同的维度进行下钻,来提供物流各环节操作单量的监控以及可视化。

上游是弹性数据库 JED,分库分表并且散布在多个实例上。

在上边的离线链路中,首先通过 plumber 将数据抽取到离线数仓的 BDM 层,plumber 是京东离线异构数据交换的根底服务,负责将不同数据源的数据抽取至数仓或者将数仓计算结果推送到不同的数据源中。

在数据抽取到 BDM 层后,数据会通过 FDM 层的拉链以及后边几层的数据加工,最初业务数据的后果汇总至 APP 层,再通过 plumber 将后果推送至 ES 中,LDC 的用户应用的产品底层查问 ES。还存在另外一种形式,OLAP 引擎 StarRocks 会导入 app 层的数据,而后供用户查问。

下边实时链路中,Fregata 采集数据库 Binlog 发送至 JDQ,Flink 生产 JDQ 数据持续写入 JDQ,以此往返,对应于离线数仓的分层逻辑,构建了基于 JDQ 的实时数仓,最终的后果通过一个叫 syncer 的同步工具,将数据从 JDQ 同步到 ES 和 StarRocks 中。

同时,还存在另一条链路,最上游的 JDQ 通过 Fregata 间接散发数据到离线的 BDM 层,构建准实时的 BDM 表。整体来看,属于典型的 Lambda 数据架构。

以后的架构存在几个痛点:

  • 离线链路存在 SLA 撞线的问题,当上游链路计算资源拥挤或者出现异常重试的状况时数据的时效性有可能不如按时达成。
  • ES 服务器的存储老本比拟高,一年在 100 万左右。
  • 典型 Lambda 架构的一些问题,因为流批割裂导致的服务器资源无奈复用,技术栈不同,开发效率低,数据口径不统一等问题。

因为这个业务的实时数据承受端到端分钟级别的时延,因而对这个数据架构做了些革新。

首先基于咱们革新后的 Flink CDC 能力,实现了一个 Flink 作业,对上游多实例的 JED 分库分表数据,进行全增量一体化采集。

在数据加工层面,联合 FlinkSQL,为用户提供了低代码的开发方式,也就是拖拽 +SQL,计算的后果写入数据湖 Hudi。

而后再基于 Hudi 的增量读取能力,进一步加工,实现 FDM、GDM、APP 等不同层的加工逻辑,后果通过 StarRocks 挂载 Hudi 内部表,提供给终端 LDC 用户查问。通过这样的革新,最终构建了一条端到端准实时的数据链路。

总结:首先,联合 Flink CDC、Flink 外围计算能力及 Hudi 首次实现端到端流批一体。能够看到,笼罩采集、存储、计算三个环节。最终这个链路是端到端分钟级别数据时延(2-3min),数据时效的晋升无效驱动了新的业务价值,例如对于物流履约达成以及用户体验的晋升。数据时效老本方面,解决离线撞线问题,一条准实时链路,不存在离线撞线;Hudi+StarRocks 的组合老本相较于 ES 显著升高(经评估,约为原来的 1/3)。相较于 Lambda 架构,在服务器老本、开发效率及数据品质方面都有显著的晋升。

四、将来布局

将来布局蕴含以下几个方面:

  • 尝试实现工作不进行的 Schema Evolution。例如针对 Hudi、针对 JDQ。
  • 持续基于京东场景的 Flink CDC 革新。比方数据加密、全面对接实时计算平台 JRC 等。
  • 尝试将局部 Fregata 生产工作切换 Flink CDC。益处是技术栈对立,合乎整体技术收敛的趋势。
  • 联合流批一体的存储来晋升端到端的整体时效性。例如联合 Table Store 去尝试实现端到端更低的,例如秒级别的时延。

点击查看直播回放和演讲 PPT

更多内容


流动举荐

阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
99 元试用 实时计算 Flink 版(包年包月、10CU)即有机会取得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
理解流动详情:https://www.aliyun.com/product/bigdata/sc

正文完
 0