乐趣区

关于程序员:MRS-CDL架构设计与实现

1 前言

MRS CDL 是 FusionInsight MRS 推出的一种数据实时同步服务,旨在将传统 OLTP 数据库中的事件信息捕获并实时推送到大数据产品中去,本文档会具体为大家介绍 CDL 的整体架构以及关键技术。

2 CDL 的概念

MRS CDL(Change Data Loader)是一款基于 Kafka Connect 的 CDC 数据同步服务,能够从多种 OLTP 数据源捕捉数据,如 Oracle、MySQL、PostgreSQL 等,而后传输给指标存储,该指标存储能够大数据存储如 HDFS,OBS,也能够是实时数据湖 Hudi 等。

2.1 什么是 CDC?

CDC(Change Data Capture)是一种通过监测数据变更(新增、批改、删除等)而对变更的数据进行进一步解决的一种设计模式,通常利用在数据仓库以及和数据库密切相关的一些利用上,比方数据同步、备份、审计、ETL 等。

CDC 技术的诞生曾经有些年头了,二十多年前,CDC 技术就曾经用来捕捉利用数据的变更。CDC 技术可能及时无效的将音讯同步到对应的数仓中,并且简直对以后的生产利用不产生影响。现在,大数据利用越来越广泛,CDC 这项古老的技术从新焕发了活力,对接大数据场景曾经是 CDC 技术的新使命。

以后业界曾经有许多成熟的 CDC to 大数据的产品,如:Oracle GoldenGate(for Kafka)、Ali/Canal、Linkedin/Databus、Debezium/Debezium 等等。

2.2 CDL 反对的场景

MRS CDL 排汇了以上成熟产品的成功经验,采纳 Oracle LogMinner 和开源的 Debezium 来进行 CDC 事件的捕获,借助 Kafka 和 Kafka Connect 的高并发,高吞吐量,高牢靠框架进行工作的部署。

现有的 CDC 产品在对接大数据场景时,根本都会抉择将数据同步到音讯队列 Kafka 中。MRS CDL 在此基础上进一步提供了数据间接入湖的能力,能够间接对接 MRS HDFS 和 Huawei OBS 以及 MRS Hudi、ClickHouse 等,解决数据的最初一公里问题。

场景 数据源 指标存储
实时数据湖剖析 Oracle Huawei OBS, MRS HDFS, MRS Hudi, MRS ClickHouse, MRS Hive
实时数据湖剖析 MySQL Huawei OBS, MRS HDFS, MRS Hudi, MRS ClickHouse, MRS Hive
实时数据湖剖析 PostgreSQL Huawei OBS, MRS HDFS, MRS Hudi, MRS ClickHouse, MRS Hive

表 1 MRS CDL 反对的场景

3 CDL 的架构

作为一个 CDC 零碎,可能从源指标抽取数据并且传输到指标存储中去是根本能力,在此基础上,灵便、高性能、高牢靠、可扩大、可重入、平安是 MRS CDL 着重思考的方向,因而,CDL 的外围设计准则如下:

  • 系统结构必须满足可扩展性准则,反对在不侵害现有零碎性能的前提下增加新的源和指标数据存储。
  • 架构设计该当满足不同角色间的业务侧重点拆散
  • 在正当的状况下缩小复杂性和依赖性,最大限度的升高架构、安全性、韧性方面的危险。
  • 须要满足插件式的客户需要,提供通用的插件能力,使得零碎灵便、易用、可配置。
  • 业务平安,防止横向越权和信息泄露。

3.1 架构图 / 角色介绍


图 1 CDL 架构

MRS CDL 蕴含 CDL Service 和 CDL Connector 两个角色,他们各自的职能如下:

  • CDL Service:负责工作的治理和调度,提供对立的 API 接口,同时监测整个 CDL 服务的衰弱状态。
  • CDL Connector:实质上是 Kafka Connect 的 Worker 过程,负责实在 Task 的运行,在 Kafka Connect 高牢靠、高可用、可扩大的个性根底上减少了心跳机制来帮助 CDL Service 实现集群的衰弱监测。

3.2 为什么抉择 Kafka?

咱们将 Apache Kafka 与 Flume 和 Nifi 等各种其余选项进行了比拟,如下表所示:

Flume Nifi Kafka
长处 基于配置的 Agent 架构;拦截器;Source、Channel、Sink 模型 有许多开箱即用的处理器;背压机制;解决任意大小的音讯;反对 MiNifi Agent 来收集数据;反对边缘层数据流 可扩大、分布式、高容错、高吞吐量的消息传递零碎;背压机制;无数据失落;Kafka Connect 反对 Source、Sink 模型;超过 50 种可用的 Connector;音讯保序;低耦合
毛病 存在数据失落的场景;没有数据备份;数据大小限度;没有背压机制 没有数据复制;软弱的容错机制;不反对音讯保序;可扩展性较差 音讯大小限度

表 1 框架比拟

对于 CDC 零碎,Kafka 有足够的劣势来撑持咱们做出抉择。同时,Kafka Connect 的架构完满符合 CDC 零碎:

  • 并行 – 对于一个数据复制工作,能够通过拆解成多个子工作并且并行运行来进步吞吐率。
  • 保序 – Kafka 的 partition 机制能够保障在一个 partition 内数据严格有序,这样有助于咱们实现数据完整性。
  • 可扩大 – Kafka Connect 在集群中分布式的运行 Connector。
  • 易用 – 对 Kafka 的接口进行了形象,晋升了易用性。
  • 平衡 – Kafka Connect 自动检测故障,并在残余过程上依据各自负载从新进行平衡调度。
  • 生命周期治理 – 提供欠缺的 Connector 的生命周期治理能力。

4 MRS CDL 关键技术


图 2 CDL 关键技术

4.1 CDL Job

MRS CDL 对业务进行了下层的形象,通过引入 CDL Job 的概念来定义一个残缺的业务流程。在一个 Job 中,用户能够抉择数据源和指标存储类型,并且能够筛选要复制的数据表。

在 Job 构造的根底上,MRS CDL 提供执行 CDL Job 的机制,在运行时,应用 Kafka Connect Source Connector 联合日志复制技术将 CDC 事件从源数据存储捕捉到 Kafka,而后应用 Kafka Connect Sink Connector 从 Kafka 提取数据,在利用各种转换规则后将最终后果推送到指标存储。

提供定义表级和列级映射转换的机制,在定义 CDL Job 的过程中能够指定转换规则。

4.2 Data Comparison

MRS CDL 提供一种非凡的 Job,用于进行数据一致性比照。用户能够抉择源和指标数据存储架构,从源和指标架构中抉择各种比拟对进行数据比拟,以确保数据在源和指标数据存储中统一。


图 3 Data Comparison 形象视图

MRS CDL 提供了专用的 Rest API 来运行 Data Compare Job,并且提供如下能力:

  • 提供多样的数据比拟算法,如行哈希算法,非主键列比拟等。
  • 提供专门的查问接口,能够查问同步报表,展现以后 Compare 工作的执行明细。
  • 提供实时的基于源和指标存储的修复脚本,一键修复不同步数据。

如下是 Data Compare Job 执行流程:


图 4 Data Compare Job 执行和查看流程

4.3 Source Connectors

MRS CDL 通过 Kafka Connect SDK 创立各种源连接器,这些连接器从各种数据源捕捉 CDC 事件并推送到 Kafka。CDL 提供专门的 Rest API 来治理这些数据源连接器的生命周期。

4.3.1 Oracle Source Connector

Oracle Source Connector 应用 Oracle RDBMS 提供的 Log Miner 接口从 Oracle 数据库捕捉 DDL 和 DML 事件。


图 5 Log Miner 抓取数据示意图

在解决 DML 事件时,如果表中存在 BOLB/CLOB 列,CDL 同样能够提供反对。对于 BOLB 列的解决,关键点解决如下:

  • 当 insert/update 操作产生时,会触发一系列的 LOB_WRITE 操作。
  • LOB_WRITE 用于将文件加载到 BLOB 字段中。
  • 每个 LOB_WRITE 只能写入 1KB 数据。
  • 对于一个 1GB 的图片文件,咱们会整顿全副的 100 万个 LOB_WRITE 操作中的二进制数据,而后合并成一个对象。咱们会把这个对象存储到 Huawei OBS 中,最终在写入 Kafka 的 message 中给出该对象在 OBS 中的地位。

对于 DDL 事件的捕捉,咱们创立独自的会话来继续跟踪。以后反对的 DDL 语句如下:

No DDL 语句 示例
1 CREATE TABLE CREATE TABLE TEST (EMPID INT PRIMARY KEY, ENAME VARCHAR2(10))
2 ALTER TABLE … ADD (<name> <data type>) ALTER TABLE TEST ADD (SALARY NUMBER)
3 ALTER TABLE … DROP COLUMN … ALTER TABLE TEST DROP (SALARY)
4 ALTER TABLE … MODIFY (<column> … ALTER TABLE TEST MODIFY SALARY INT
5 ALTER … RENAME… ALTER TABLE TEST RENAME TO CUSTOMER
6 DROP … DROP TABLE TEST
7 CREATE UNIQUE INDEX … CREATE UNIQUE INDEX TESTINDEX ON TEST (EMPID, ENAME)
8 DELETE INDEX … Delete existing index

表 2 反对的 DDL 语句

4.3.2 MYSQL Source Connector

MYSQL 的 Binary Log(Bin Log)文件程序记录了所有提交到数据库的操作,包含了对表构造的变更和对表数据的变更。MYSQL Source Connector 通过读取 Bin Log 文件,生产 CDC 事件并提交到 Kafka 的 Topic 中。

MYSQL Source Connector 次要反对的性能场景有:

  • 捕捉 DML 事件,并且反对并行处理所捕捉的 DML 事件,晋升整体性能
  • 反对表过滤
  • 反对配置表和 Topic 的映射关系
  • 为了保障 CDC 事件的相对程序,咱们个别要求一张表只对应一个 Partition,然而,MYSQL Source Connector 依然提供了写入多 Partition 的能力,来满足某些须要就义音讯保序性来晋升性能的场景
  • 提供基于指定 Bin Log 文件、指定地位或 GTID 来重启工作的能力,保障异样场景下数据不失落
  • 反对多种简单数据类型
  • 反对捕捉 DDL 事件

4.3.3 PostgreSQL Source Connector

PostgreSQL 的逻辑解码个性容许咱们解析提交到事务日志的变更事件,这须要通过输入插件来解决这些变更。PostgreSQL Source Connector 应用 pgoutput 插件来实现这项工作。pgoutput 插件是 PostgreSQL 10+ 提供的规范逻辑解码插件,无需装置额定的依赖包。

PostgreSQL Source Connector 和 MYSQL Source Connector 除了局部数据类型的区别外其余性能基本一致。

4.4 Sink Connectors

MRS 提供多种 Sink Connector,能够从 Kafka 中拉取数据并推送到不同的指标存储中。当初反对的 Sink Connector 有:

  • HDFS Sink Connector
  • OBS Sink Connector
  • Hudi Sink Connector
  • ClickHouse Sink Connector
  • Hive Sink Connector
    其中 Hudi Sink Connector 和 ClickHouse Sink Connector 也反对通过 Flink/Spark 利用来调度运行。

4.5 表过滤

当咱们想在一个 CDL Job 中同时捕捉多张表的变更时,咱们能够应用通配符(正则表达式)来代替表名,即容许同时捕捉名称满足规定的表的 CDC 事件。当通配符(正则表达式)不能严格匹配指标时,就会呈现多余的表被捕捉。为此,CDL 提供表过滤性能,来辅助通配符含糊匹配的场景。以后 CDL 同时反对白名单和黑名单两种过滤形式。

4.6 对立数据格式

MRS CDL 对于不同的数据源类型如 Oracle、MYSQL、PostgreSQL 采纳了对立的音讯格局存储在 Kafka 中,后端消费者只需解析一种数据格式来进行后续的数据处理和传输,防止了数据格式多样导致后端开发成本减少的问题。

4.7 工作级的日志浏览

通常境况下,一个 CDL Connector 会运行多个 Task 线程来进行 CDC 事件的抓取,当其中一个 Task 失败时,很难从海量的日志中抽取出强相干的日志信息,来进行进一步的剖析。

为了解决如上问题,CDL 标准了 CDL Connector 的日志打印,并且提供了专用的 REST API,用户能够通过该 API 一键获取指定 Connector 或者 Task 的日志文件。甚至能够指定起止工夫来进一步放大日志查问的范畴。

4.8 监控

MRS CDL 提供 REST API 来查问 CDL 服务所有核心部件的 Metric 信息,包含服务级、角色级、实例级以及工作级。

4.9 应用程序错误处理

在业务运行过程中,经常会呈现某些音讯无奈发送到指标数据源的状况,咱们把这种音讯叫做谬误记录。在 CDL 中,呈现谬误记录的场景有很多种,比方:

  • Topic 中的音讯体与特定的序列化形式不匹配,导致无奈失常读取
  • 指标存储中并不存在音讯中所存储的表名称,导致音讯无奈发送到指标端

为了解决这种问题,CDL 定义了一种“dead letter queue”,专门用于存储运行过程中呈现的谬误记录。实质上“dead letter queue”是由 Sink Connector 创立的特定的 Topic,当呈现谬误记录时,由 Sink Connector 将其发往“dead letter queue”进行存储。

同时,CDL 提供了 REST API 来供用户随时查问这些谬误记录进行进一步剖析,并且提供 Rest API 能够容许用户对这些谬误记录进行编辑和重发。


图 6 CDL Application Error Handling

5 性能

CDL 应用了多种性能优化计划来进步吞吐量:

  • Task 并发
    咱们利用 Kafka Connect 提供的工作并行化性能,其中 Connect 能够将作业拆分为多个工作来并行复制数据,如下所示:


图 7 Task 并发

  • 应用 Executor 线程并行化执行工作
    因为 Log Miner,Bin Log 等数据复制技术的限度,咱们的 Source Connector 只能程序的捕捉 CDC 事件,因而,为了进步性能,咱们将这些 CDC 事件先缓存到内存队列中,而后应用 Executor 线程并行的解决它们。这些线程会先从外部队列中读取数据,而后解决并且推送到 Kafka 中。


图 8 Executor 线程并发

6 总结

MRS CDL 是数据实时入湖场景下重要的一块拼图,咱们依然须要在数据一致性、易用性、多组件对接以及性能晋升等场景须要进一步扩大和欠缺,在将来可能更好的为客户发明价值。

本文由华为云公布

退出移动版