乐趣区

关于后端:Flink-CDC-在易车的应用实践

摘要:本文整顿自易车数据平台负责人王林红,在 Flink Forward Asia 2022 数据集成专场的分享。本篇内容次要分为四个局部:

  1. Flink 利用场景
  2. DTS 平台建设
  3. Flink CDC + Hudi 利用实际
  4. 将来布局

点击查看直播回放和演讲 PPT

一、Flink 利用场景

Flink 在易车有丰盛的利用场景,次要蕴含实时数仓建设和数据集成。

对于实时数仓建设,次要是数仓实时指标的开发,将离线指标逐渐向实时指标适度;同时承接了公司各种实时大屏需要,并屡次反对了公司外部的 818 购车节流动。

在实时监控方面,首先是日志监控,次要用来监控埋点状况,另外对于服务器日志,咱们也进行对立收集和监控,监控服务响应和异样日志等,同时联合机器学习算法,做日志的聚类;在前端层面,监控前端接口超时、白屏等异常情况。最初也利用在一些业务实时监控、风控等场景。

在数据集成方面,应用 Flink 实现关系型数据库的实时接入,将 MySQL、SQL Server 等的数据实时接入到 Kafka 中;同时将 Kafka 的数据实时同步到 HDFS/Hive 中,实现数据的实时入仓、入湖;对于数据传输通道,咱们应用 Flink 将 Kafka 的数据同步到上游存储引擎中,比方 TiDB、MySQL、ClickHouse、HDFS、Doris 等存储中,也实现一些异构数据源数据的实时同步。

二、DTS 平台建设

易车的数据集成次要分为两条线,一条离线数据集成,另一条是实时数据集成。本次次要介绍的是实时数据集成的演进过程,对于实时数据集成,最开始也是处在离线阶段,随着业务的倒退,对数据的时效性越来越高,开始应用 Canal 同步 MySQL 的数据,而后应用 Spark 做微批计算,再之后引入 Flink,还是应用 Canal 同步接入 MySQL 数据,应用 Flink 进行数据的实时计算,再之后引入了 Flink CDC,基于 Flink CDC 做全增量一体化实时计算。

在应用 Canal+Flink 的晚期阶段,整体流程如下,对于 MySQL 的数据,应用 Canal 通过解析 Binlog 的形式将数据同步到 Kafka 中,对于 SQL Server 的数据,通过 CDC 的形式同步到 Kafka 中,而后通过 Flink 进行加工计算,同步到上游零碎如 HDFS 或 Kafka 中,在这个阶段,根本能够满足业务需要,也能够疾速实现数据接入及后续开发。

然而也存在一些痛点问题,次要是,整个数据链路比拟长,依赖的组件多,运维老本也比拟高,另外 Canal 不反对全量数据的同步,全量和增量是割裂的两个阶段,并且对于不同数据源的接入,须要思考不同的实时接入计划,保护也比拟艰难。

基于以上痛点问题,和咱们的历史经验总结和评估,咱们对数据集成工具提出了新的诉求。

  • 心愿能够分布式地去撑持大数据场景,工具可能线性扩大,能够不便的对接更多数据源。
  • 心愿用一个框架撑持流批一体的传输。
  • 心愿基于一个开源框架来开发,这个框架须要和 Hadoop 的整个生态有比拟好地集成。并且咱们的终极目标,是用一套对立的技术架构来笼罩离线和实时的所有数据集成场景。

基于以上诉求,咱们把计划锁定在 Flink 技术栈中,决定基于 Flink CDC 自研实现流批一体的数据集成服务。为什么抉择 Flink CDC?

  • Flink CDC 引入了无锁算法,读取阶段全程无锁,升高了因加锁而带来的对线上数据库的影响危险,同时升高了对数据库的压力。反对并发读取,在全量数据同步阶段,能够更快地实现海量数据同步,能够通过程度扩大节点数或减少并行度的形式来放慢数据处理速度、减速海量数据的解决。

    反对断点续传,全量阶段反对 Checkpoint,即便工作因某种原因退出了,也可通过保留的 Checkpoint 对工作进行复原实现数据的断点续传。比方同步数据须要 1 天工夫,然而同步工作运行 12 小时后失败了,不须要重跑整个数据同步工作,只须要从产生谬误的地位重跑即可。

  • 反对丰盛的数据源,目前反对 MySQL、SQL Server、Oracle、TiDB、MongoDB 等,也不便的实现异构数据源的数据同步和数据交融。
  • 端到端的一致性,反对 Exactly-Once 语义,保障全链路数据的准确性。
  • 无缝对接 Flink 生态,复用 Flink 泛滥 Sink 能力。

Flink CDC 反对了丰盛的数据源,源头反对 MySQL、Mongo、TiDB、Oracle、SQL Server 等,指标端反对 kakfa、Hudi、TiDB、Hive、Doris、ClickHouse 等。

同时 Flink CDC 作为新一代的数据集成框架,不仅能够代替传统的 DataX 和 Canal 做实时数据同步,将数据库的全量和增量数据一体化的同步到音讯队列或上游零碎中;也能够用于实时数据集成,将数据库数据实时入湖入仓;同时还反对弱小的数据加工能力,能够通过 SQL API 或 DataStrean API 对数据库数据进行实时关联、打宽、聚合等。

在 2.0 版本中,Flink CDC 对于 MySQL CDC 反对了无锁读取、并发读取、全程断点续传等高级性能,实现 MySQL 数据的增量快照读取,在最新的 2.2 版本中,将增量快照读取算法形象成了公共框架,也不便其余 Connector 的接入,其余 Connector 只须要接入这个框架就能够提供无锁算法,并发读取和断点续传的能力,非常不便其余连接器的扩大。

所以咱们基于 Flink CDC 构建了 DTS 数据传输平台,在源端,目前曾经集成反对了 MySQL、SQL Server、TiDB、Mongo、kafka 等数据源,在指标端,也集成了 Hudi、kafka、Doris、ClickHouse、HDFS、Hive 等数据源,不便业务进行数据实时入湖入仓,和异构数据源的传输、同步。

然而,在 DTS 平台建设过程中,咱们也遇到了一些问题,比方元信息的字段映射,如何不便平安的将源库的字段类型映射成 Flink 的字段类型;在工作运行过程中,如何动静的减少新的同步表,包含如果业务源库字段变更了,上游零碎如何解决?另外随着工作的增多,如何更好的对数据源信息进行保护,如一个业务库迁徙,如何优雅的对工作中的数据源信息进行变更?

首先说元信息主动映射的问题,Flink CDC 反对丰盛的数据源,这些数据源都需通过手工的形式映射成 Flink 的 DDL。手工映射表构造是比拟繁琐的,尤其是数据源头多、映射关系比较复杂,每种数据源都有本人的映射关系,当表和字段数十分多的时候,手工映射也非常容易出错,对用户不敌对,开发效率也不高。

为了解决上述问题,咱们开发了对立数据源服务,咱们将平台中应用到的数据源对立注册到数据源零碎中,实现数据源的对立保护治理,同时实现表构造变更告诉,影响剖析等。

用户在实时计算平台创立表和创立同步工作时,抉择对应的数据源,主动获取表的 Schema,通过模版化的形式创立表和数据同步工作,同时应用数据源 ID 对用户屏蔽连贯串和账号密码信息,晋升账号安全性。

另外数据源信息与工作信息关联,数据源变更或迁徙,只须要批改数据源信息,升高批改老本。最初离线层的数据接入也依赖于对立数据源,离线和实时应用同一套元数据,便于流和批模型的对立。

上图是数据源革新前后的一个比照图,后面是原生的 MySQL 的流表,须要连贯串、账号、明码信息,对立数据源之后,在链接串中只须要关注数据源 ID。

同时咱们对 Connector 进行了革新,工作在执行时,会将具体的数据源 ID 替换为实在的链接串、账号和明码,对于 Kafka 流表也一样,在表创立时,只须要数据源 ID,工作执行时,会替换为 Kafka 的 Server 地址,对于 groupID,在工作中,通过 set 的形式进行设置。这种形式也不便咱们进行后续 Kafka 集群的主备切换。对于其余的数据源,比方 Hbase、HDFS 等也做了相似的革新。

上图是咱们主动建表的页面,抉择对应的数据源,须要映射的源表,通过数据源服务主动获取源表的 Schema,主动做字段类型映射,通过模版化的形式,一键生成 Flink 的建表语句。

上图是数据同步的配置界面,用户次要抉择对应的源和指标数据源、数据表,主动做字段映射,如果指标表不存在,会主动创立指标表。

得益于 MySQL CDC 动静加表性能,也能够在已有工作中,间接减少须要同步的表,增加的表会主动先同步该表的全量数据,而后再无缝切换到同步增量数据。遇到新增监控表时不必新起作业。同时也反对通过正则的形式配置分库分表的同步,另外对于源表字段变更,类型变更等,也做了一些适配。

接下来介绍下平台的整体架构,咱们对 DTS 平台、调度平台、实时平台进行了深度的整合和集成,任务调度层集成到对立调度平台中,实现工作的对立治理,次要蕴含工作的运维治理、权限治理、资源管控、监控告警、和变量治理等。

对于实时平台,次要关注工作的开发、运维、和治理。

  • 对于工作开发,提供 WebIDE 给用户进行工作的开发调试,同时提供语法智能校验、检测性能,便于用户发现代码语法问题。
  • 对于工作运维,提供工作诊断、评分、健康检查、日志收集、作业快照(Checkpoint、Savapoint)、主动拉起、批量重启等性能,同时反对工作的容灾复原。
  • 对于工作治理,次要蕴含全链路的工作血统和表血统,不便的理解工作和表的血缘关系和对工作进行影响剖析,还有数据层面品质监控,包含断流、数据量异样稳定、数据比对等。

实时计算平台上次要反对 SQL 工作、Jar 包工作和 DTS 工作。对于 SQL 工作和 Jar 包工作,提供版本治理、资源管理:资源管理次要是将表、UDF、Connector 等资源对立治理,并通过模版化的形式和配置化的形式实现 Source、Sink 表的创立,升高用户开发成本;对于 TDS 工作,提供数据源治理、工作配置和数据校验等一些模块和服务。

通过服务平台化,打造一站式的工作开发治理平台,在平台上实现工作从开发到测试、公布、监控的全流程解决操作,升高用户应用平台的门槛。

对于外围的 DTS 数据传输架构如上,整个架构次要是基于 Flink 1.14 的 DataStream API 和 Flink CDC 2.2 构建,笼罩流批的场景,实现各种同步需要。

整个架构次要蕴含 Source 端、数据传输层、Sink 端, Source 和 Sink 端形象出 SourceFactory 和 SinkFactory,不便实现对接各种类型数据源,在数据传输层,提供对立的根底服务框架,反对类型转换、自定义监控指标、数据校验等性能,如类型转换,DTS 中也反对了对 Canal 数据格式的适配。

目前 DTS 反对了公司内大部分数据传输管道,涵盖数据库,如 MySQL、SQL Server 和 TiDB 等;音讯队列,如 Kafka、RocketMQ 等;以及大数据生态系统的各种组件,例如 HDFS、Hive、ClickHouse、Doris 等,笼罩了易车大部分实时流场景和多数离线场景。

这套数据集成架构现在在易车外部已稳固运行近一年工夫,服务于泛滥产品线,整套架构对数据集成,有很大的收益。

  • 对立了技术栈,通过 Flink 能够实现数据异构数据源的实时集成,同时反对流批一体。
  • 通过平台化的操作,升高了数据接入、工作运维等的复杂度,也无需额定部署 Canal 等组件,升高运维老本,链路稳定性也失去了晋升。
  • Flink CDC 全增量一体化的框架,解决了在数据集成方面全量、增量隔裂的痛点问题,实现了全增量一体化的数据集成。

三、Flink CDC + Hudi 利用实际

Flink CDC 的一个次要利用场景就是数据实时入湖,对于数据湖咱们次要应用的 Hudi,Hudi 的次要特点如下:

  • Hudi 的 upsert 性能反对的比拟成熟。
  • Hudi 的表文件能够存储在 HDFS 上,兼容 Hadoop 生态圈,能够应用 Hive、Spark、Presto 等引擎查问 Hudi 表。
  • Hudi 表的组织模式也很灵便,能够依据不同场景抉择不同的表模式。
  • Hudi 曾经集成了 Flink,便于咱们计算引擎的对立。最初 Hudi 也有绝对比拟沉闷的社区。

应用 Hudi 后,在没有引入 Flink CDC 之前,咱们的数据入湖架构如下:

首先应用 Canal 通过解析 Binlog 的形式将增量数据同步到 Kafka 中,而后通过 DataX,将 MySQL 的全量数据同步到 HDFS 上,而后应用 bulk_insert 的形式初始化数据到 Hudi 中,实现全量数据的初始化,最初,应用 Flink 生产 Kafka 的数据,将数据写入到 Hudi 中,同时通过主键解决数据抵触问题。

大家能够看到,这个架构整体的链路比拟长,操作频繁、保护老本比拟高,波及的组件比拟多,对于实现数据接入工作量比拟大,并且稳定性不好保障,如果一旦有数据问题,数据恢复、重导也是一件比拟苦楚的事件。

在应用 Flink CDC 之后,联合 DTS 平台,架构如上。在 DTS 平台中,很不便的能够实现 MySQL 数据一键入湖,并且得益于 Flink CDC 全增量一体化框架,不必思考全增量问题,同时也反对动静减少表性能,操作非常简单。

随着接入表的增多,对于同一个数据源下的数据同步工作,建设了过多数据库连贯,导致 Binlog 反复读取,会对数据源库造成微小的压力。另外有些 Task 同步的数据量很小,也会造成肯定的资源节约。

为了解决这个问题,咱们应用 API 的形式读取数据,通过侧输入流的形式对 DataStream 进行分流,实现合并 Source 的性能。对于读取的同一数据源,同一工作只会建设一个数据库连贯,Binlog 也只会读取一次,升高了对数据库的压力,不便的实现了单任务多表的数据实时入湖。

在数据实时同步写入 Hudi 时,Flink Hudi 的写入 Pipeline 算子如下。

第一个算子负责将快照数据 + 增量数据加载到 Flink 状态。接着通过一个 Bucket Assigner,它次要负责将曾经转好的 HudiRecord 调配到特定的 File Group 中,接着分好 File Group 的 Record 会流入 Writer 算子执行真正的文件写入。再之后会接一个 Compaction 的算子,次要用来解决 MOR 表读放大的问题。

这个架构在理论的生产环境会遇到如下问题:

  • 当数据量比拟大的时候,Flink State 的收缩会比拟厉害,相应地会影响 Task 的写入速度以及 Checkpoint 的成功率。
  • 对于 Compaction 算子,当在执行 Compaction 阶段时,会和数据读写算子进行资源的抢占,也会导致工作的背压、Checkpoint 超时等。

为了解决这个问题,咱们把 Compaction 进行独自拆分,拆分为一个独立的调度工作,同时为了合并的合理性,对相干的合并打算也做了一些优化。

除此之外,咱们还做了一些其余的优化和 bug 修复。

  • 第一,在 Hudi 同步 Hive 分区时,会对 Hive 表面和 Hudi 表以后表构造、分区做比拟,会获取 Hive 的所有分区,而咱们在 Hive 层面对分区拜访做了限度,超过分区数量限度,禁止拜访,所以触发了该问题,咱们通过批改源码,如果是分区表,对拜访 Hive 的分区做了过滤,只拜访最近一段时间的分区。
  • 第二,在业务 MySQL 降级时,呈现了混合模式的 Binlog,导致工作失败,也是批改了源码,疏忽了一些 DML 的 Binlog 操作,具体 patch 能够参考 3319;
  • 第三,解决了一些 Flink CDC 分片的 bug,如 Flink CDC 分片字段是 String 时,比拟逻辑没有疏忽大小写,导致抽取全量数据到内存,导致工作失败。Flink CDC 分片字段是 bigint 时,ID 差值较大,触发了 Flink CDC 的分片优化逻辑,但在优化逻辑后加载了大量数据到内存中,所以优化参数,升高数据分布因子等。

除此之外,还有一些其余优化实际,大部分能够查阅材料或在社区的帮忙下解决,在这里再次感激社区。

四、将来布局

简略介绍一下咱们的将来布局:

  • 第一,全量阶段的异步切片逻辑优化,目前数据全量读取阶段读取流程为首先通过主键对表进行 Snapshot Chunk 划分,再将 Snapshot Chunk 分发给多个 SourceReader。在所有 Snapshot Chunk 读取实现后,下发一个 Binlog chunk 进行增量局部的 Binlog 读取。Snapshot Chunk 划分及读取是程序的,影响整个读取阶段的性能,导致大表全量接入阶段周期长。所以优化 Snapshot Chunk 划分切片的逻辑,减少异步读取策略,晋升全量读取阶段性能
  • 第二,目前咱们的数据集成工具只笼罩了大量的离线场景,后续筹备笼罩更多的离线数据集成场景。
  • 第三,目前咱们的实时数据集成的品质绝对还比拟单薄,须要进一步增强和打磨。

点击查看直播回放和演讲 PPT

更多内容


流动举荐

阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
0 元试用 实时计算 Flink 版(5000CU* 小时,3 个月内)
理解流动详情:https://free.aliyun.com/?pipCode=sc

退出移动版