关于后端:基于-Flink-CDC-的实时同步系统

48次阅读

共计 4950 个字符,预计需要花费 13 分钟才能阅读完成。

摘要:本文整顿自科杰科技大数据架构师张军,在 FFA 数据集成专场的分享。本篇内容次要分为四个局部:

  1. 性能概述
  2. 架构设计
  3. 技术挑战
  4. 生产实践

点击查看直播回放和演讲 PPT

科杰科技是专门做大数据服务的供应商,目前的客户包含能源、金融、证券等各个行业。科杰科技产品的底层是基于湖仓一体的根底数据平台,在数据平台之上有离线、实时、机器学习等各种零碎。我次要负责基于 Flink、Iceberg、K8s 的底层基础设施建设。明天将次要和大家分享,上图中框出来的子系统,即基于 Flink CDC 的实时数据同步零碎。

一、性能概述

咱们零碎的次要的性能有如下几个:

  • 可视化操作。咱们做了后盾的管理系统,是心愿用户在不懂任何代码的状况下,通过点击鼠标就能配置出同步工作做数据同步。
  • 反对整库同步、多表同步。
  • DDL 反对:源端的 Schema 的变更也要同步到指标端。
  • 数据库、表、字段映射。
  • 丰盛的数据源反对。目前输出端反对四种常见的关系型数据库,MySQL、Postgre、SQL Server、Oracle,输入端除了这四个数据库之外,还蕴含 Kafka 和 Iceberg。
  • 丰盛的数据类型反对。对输出端的四种关系型数据库,咱们罕用的所有数据类型都会反对,包含二进制类型。
  • UDF 函数、过滤条件。UDF 函数是指咱们在同步过程中,做一些数据转换。过滤条件是指咱们会在同步的过程中,加一些过滤条件,只同步想要的数据。
  • 选取字段、增加变量字段。选取字段是指用户能够抉择想要的字段进行同步。增加变量是指在同步的过程中,能够手工增加一些字段,比方工夫戳或者表名等。

市面上有很多实时同步的零碎,最终咱们选用了 Flink CDC 做实时同步零碎的底层技术架构。次要是因为 Flink CDC 有一些独有的劣势,包含全量同步、增量同步、全量 + 增量同步,还有底层基于 Flink 做的分布式计算引擎。

通过 Flink CDC 这套架构,想实现咱们现有产品的需要,目前来看还有一些有余。

  • DDL 的反对:PostgreSQL、Oracle 数据库无奈获取 Schema 变更的事件,无奈捕捉相应的 DDL 操作。
  • 整库同步:通过 Flink CDC 的 API 能够捕捉表构造的变更信息,然而现有的 Flink Connector 无奈将新增的表、字段写入指标端。
  • 须要预知 Schema:Flink 工作须要提前晓得表构造的 Schema,而后构建工作,无奈实现不重启的状况下动静解决新增表或者字段。

二、架构设计

接下来从技术角度给大家分享一下咱们零碎的设计架构,从上图中能够看到,一共分为三层。

最下面一层是输出端。基于 Flink CDC API 的形式读数据库进行数据抽取,而后把这些数据和 Schema 的信息发到两头的 Kafka,Kafka 是咱们的两头缓冲层。最上面一层是输入端,会从 Kafka 读取输出端输出的数据。

在输入端这一层能够看到,首先进行过滤,罕用的 SQL 表达式都能够做过滤条件。过滤后对字段利用一些 UDF,比方数据脱敏、加密等等。接下来依据 DB 和 Table 对数据进行 Keyby 分组,而后应用 KeyedProcessFunction 函数对每个表的数据进行一些解决,比方创立表、增加或者批改字段、插入数据等等。

当配置完工作之后,最初咱们别离把 Source 和 Sink 的工作提交到运维核心,运维核心会对工作进行启动、进行、查看统计指标、查看工作状态等一系列操作。最初咱们的工作反对在 Yarn 和 K8s 上运行,用户能够依据本人的状况进行抉择。

在后盾管理系统,用户能够通过配置输出端和输入端,配置须要同步的工作。工作会生成两个配置文件,别离是输出端的配置文件和输入端的配置文件,而后这两个配置文件会别离作为输出端和输入端的启动参数传给两个 Flink 工作。

这部分次要是想分享下,对于无奈获取 DDL 事件的状况咱们该如何解决呢?

其实有一些数据库,比方 MySQL,是能够通过 Flink CDC 来获取 Schema 的变更信息的,然而为了代码的逻辑对立,同时适配 Flink CDC 拿不到 Schema 变更的数据库。咱们做了代码对立的解决,用一套架构实现数据和 Schema 的抽取和封装。

咱们通过 JDBC 的形式,从源数据库把 Schema 的信息查出来,放到 Flink 的 State 里。当下一条数据来的时候,跟 State 外面的 Schema 数据进行比照。雷同就不做任何解决,不同就再次查问一下 Schema 的信息,更新到 Flink State 里。同时将从 Flink CDC 拿到的数据和这条数据对应的 Schema 信息,封装成音讯体,发送给中间层的 Kafka。从 Schema 读取的信息蕴含数据的类型、长度、精度,是否是主键等等,格局和 debezium-json 差不多。

Kafka 缓冲层能够用来实现以下几个性能。

在解耦方面:

  • 将 Source 和 Sink 解耦。
  • 多个输入端防止反复抽取。比方我想从 MySQL 抽取一些数据,把它同步到 Iceberg 做一些离线的剖析。同时又同步到 Kafka,做一些实时的数据处理。这种状况就能够从源端只抽取一次,缩小对源端数据库的压力。
  • Sink 呈现故障防止 Source 阻塞,相似 flume 的 channel 的性能。

在 DB 对应 Topic 方面:

  • 一个数据库外面的数据抽取到一个 Topic。
  • 每个 Topic 一个 Partition。
  • 单表重放程序有保障。

输入端和输出端一样,读取后端生成的配置文件作为它的参数,而后应用一些过滤条件,UDF 转换条件等等,从 Kafka 读取数据,进行数据处理。

在数据处理的时候,因为每个输入源的解决逻辑不一样,所以分成以下三类。

  • 写入 RDBMS。通过 JDBC 来操作数据库,蕴含 DDL、DML。
  • 写入 Iceberg。重写 Flink 写入 Icebrg 逻辑,应用原始 API 写入数据,Commit Snapshot。
  • 写入 Kafka。应用 Flink Kafka Connector 写入 Kafka。

运维核心能够对数据进行如下解决:

  • 工作的治理:蕴含工作的启动、进行、暂停等等。
  • 查看指标:监控一些数据,蕴含同步工作的数据条数和数据大小。
  • 配置监控报警:同步工作产生故障时,发送报警,包含邮件、短信等等。
  • 查看日志:查看工作启动的日志、工作运行过程中的日志。

三、技术挑战

上面列举一些次要的技术挑战。

  • 读取增量 Schema:获取源端新增的表、字段以及数据信息(比方 Flink CDC 无奈获取 PostgreSQL 数据库的 Schema 变更事件)。
  • 降级 debezium:批改 Flink CDC 源码,降级 debezium 至最新版,获取 Oracle 新增表、字段事件。
  • SQL 模式过滤条件:反对 SQL 模式的过滤条件,and、or、in、>、<、between 等罕用的表达式。
  • 不重启反对动静 Schema:不重启 Flink 工作,反对动静 Schema 及各种 DML 将数据写入指标端。
  • 重构 Flink 写入 Iceberg:没有应用现有的 Flink Datastream API 写入 Iceberg,从新应用 Iceberg 最底层 API 创立、批改表,插入、批改、删除数据。
  • 简单业务逻辑:反对简单的业务场景,要保证数据的正确性。

这是咱们在开发过程中,输入端遇到的第一个问题,也就是 SQL 条件的过滤。大家可能乍一听感觉很简略,加一个 where 条件就行了,但 Flink 工作在做数据同步时,它要求输出端和输入端的 Schema 须要事后提前晓得,且它是固定不变的,然而咱们的状况有一些不同,比方对于整库同步的过程中,用户新增了一些表,或者在表同步的过程中,新增了一些字段,Flink 现有的 collector 无奈辨认这些新增的信息,无奈在未知的字段上增加 where 条件。那么咱们要如何解决这个问题呢?

咱们发送到两头 Kafka 缓冲层的数据格式和 debezium-json 的格局差不多,数据次要存储在 payload.after 和 payload.before 外面,这外面的数据的格局是 map 类型,它的 key 是字符串,value 是 object 类型的数据,然而这个格局咱们无奈把它映射成 Flink SQL,因为 object 类型在 Flink CDC 外面没有对应的类型,所以咱们把 object 类型映射成了 string 类型,并对 SQL 进行了一些转换。应用 Flink SQL 解析器把 where 条件进行解析,而后从新生成新的过滤条件。

比方咱们原始的过滤 SQL 是这样的:

id between 1 and 3.5

通过咱们的重构,变成了上面这个模式:

cast(payload.after[‘id’] as DECIMAL(2,1)) BETWEEN ASYMMETRIC 1 AND 3.5

数据通过 where 条件的过滤之后,并且通过 UDF 函数转换进入 KeyedProcessFunction 函数进行解决。第一步先判断输入端的指标库和指标表是否曾经存在。在没有存在的状况下,用纯 JDBC 的形式拼接 SQL 执行 DDL,创立数据库和表。而后进行数据处理,为了进步性能。咱们把数据放到队列里,当队列达到肯定的阈值后,进行 flush 操作,把数据批量写入数据库。

在这个同步过程中,对于 Schema 的解决和 Source 端一样,把获取的 Schema 信息放到 State 里,每来一条数据进行一次 Schema 比照。如果产生了变更,就能证实数据产生了 DDL 的操作。这个时候要刷数据,把队列里的数据 flush 到数据库,而后执行 DDL,执行完 DDL 之后从新拼接一个 INSERT INTO 的 SQL 执行新插入的数据。通过这种形式实现不重启 Flink 工作的状况下,同时反对 DDL(create、alter)和 DML(insert、update、delete)等一系列操作。

因为 Iceberg 无奈用纯 JDBC 的形式写入,所以它无奈跟关系型数据联合到一起。因而 Flink 写入 Iceberg 会遇到以下的一些问题。

  • Flink SQL 不反对 DDL。比方 Flink SQL 无奈反对 Alter Table 的 DDL 语法。
  • Flink SQL 需预知 Schema。应用 Flink SQL 写入 Iceberg 表,须要提前晓得表的 Schema 信息,且无奈解决新增字段。
  • DataStream 需预知 Schema。如果应用 API 写入,也会和 Flink SQL 一样遇到同样的问题,写入也是须要提前预知表的 Schema 信息。
  • 提交 Snapshot。Flink 写入 Iceberg 是每次 Checkpoint 提交快照,然而咱们须要本人管制,须要在产生 DDL 的时候触发提交。

咱们发现 Flink 不论用 SQL 还是 API 的形式,都无奈实现咱们的需要,所以咱们从更底层的角度来思考实现办法,最初应用 Iceberg 很底层的 API 来实现咱们所须要的性能。

比方 Create Table 就是应用 Iceberg 里的 Catalog 来创立 Table 的,蕴含一些主键和 Schema。其余的操作,包含批改表的 Schema、写入数据、提交快照等都是用纯 Iceberg 的底层 API 来实现,没有应用现有的 Flink Iceberg API 来做,这样实现起来更加灵便。

在业务上,咱们也会面临很多简单的业务场景,比方对同一字段,咱们会有很多种操作。比方须要反对 UDF;对字段加过滤条件;字段的映射;增加常量字段;开启字段同步等等。所以咱们在写逻辑的时候,要思考各种各样简单的条件。因为可能改了其中某一个性能进而就影响了其余性能。

四、生产实践

咱们零碎上线后,目前曾经服务于十几个客户,波及到金融、能源等各个行业。反对的数据源包含 MySQL、PostgreSQL、Oracle、SQL Server 等。数据规模方面,目前客户用于同步的工作从几个到几十个库不等,每秒同步数千条数据。

将来咱们将在以下三方面进行晋升:

  • 第一,做一些性能晋升。做一些压测,从各个角度进步零碎的吞吐率和性能。
  • 第二,心愿有更多参数配置。比方 Kafka Sink 的各种 Topic 配置、Iceberg 的分区配置等等。
  • 第三,心愿有更多数据源的反对。

点击查看直播回放和演讲 PPT


更多内容


流动举荐

阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
99 元试用 实时计算 Flink 版(包年包月、10CU)即有机会取得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
理解流动详情:https://www.aliyun.com/product/bigdata/sc

正文完
 0