关于flink:Flink-SQL-CDC-实践以及一致性分析

34次阅读

共计 10991 个字符,预计需要花费 28 分钟才能阅读完成。

本文由民生银行王健、文乔分享,次要介绍民生银行 Flink SQL CDC 实际以及一致性剖析。内容包含:

  1. 背景
  2. 什么是 Flink SQL CDC Connectors
  3. Flink SQL CDC 原理介绍
  4. 三种数据同步计划
  5. Flink SQL CDC + JDBC Connector 同步计划验证
  6. Flink SQL CDC + JDBC Connector 端到端一致性剖析
  7. Flink SQL CDC 目前存在的缺点

一. 背景

数据准实时复制(CDC)是目前行内实时数据需要大量应用的技术,随着国产化的需要,咱们也逐渐思考基于开源产品进行准实时数据同步工具的相干开发,逐渐实现对商业产品的代替。咱们评估了几种开源产品,Canal、Debezium、Flink CDC 等产品。作了如下的比照:

二. 什么是 Flink SQL CDC Connectors

在 Flink 1.11 引入了 CDC 机制,CDC 的全称是 Change Data Capture,用于捕获数据库表的增删改查操作,是目前十分成熟的同步数据库变更计划。

Flink CDC Connectors 是 Apache Flink 的一组源连接器,是能够从 MySQL、PostgreSQL 数据间接读取全量数据和增量数据的 Source Connectors,开源地址:https://github.com/ververica/…。

目前 (1.11 版本) 反对的 Connectors 如下:

另外反对解析 Kafka 中 debezium-json 和 canal-json 格局的 Change Log,通过 Flink 进行计算或者间接写入到其余内部数据存储系统(比方 Elasticsearch),或者将 Changelog Json 格局的 Flink 数据写入到 Kafka:

三. Flink SQL CDC 原理介绍

在公开的 CDC 调研报告中,Debezium 和 Canal 是最风行应用的 CDC 工具,这些 CDC 工具的外围原理是抽取数据库日志获取变更。在通过一系列调研后,我行采纳的是 Debezium (反对全量、增量同步,同时反对 MySQL、PostgreSQL、Oracle 等数据库)。

Flink SQL CDC 内置了 Debezium 引擎,利用其抽取日志获取变更的能力,将 changelog 转换为 Flink SQL 意识的 RowData 数据。(以下右侧是 Debezium 的数据格式,左侧是 Flink 的 RowData 数据格式)。

RowData 代表了一行的数据,在 RowData 下面会有一个元数据的信息 RowKind,RowKind 外面包含了插入 (+I)、更新前(-U)、更新后(+U)、删除(-D),这样和数据库外面的 binlog 概念非常相似。通过 Debezium 采集的数据,蕴含了旧数据(before) 和新数据行 (after) 以及原数据信息(source),op 的 u 示意是 update 更新操作标识符(op 字段的值 c,u,d,r 别离对应 create,update,delete,reade),ts_ms 示意同步的工夫戳。

四. 三种数据同步计划

4.1 计划一:Debezium+Kafka+ 计算程序 + 存储系统

目前我行在生产上采纳的就是这个计划,采纳 Debezium 订阅 MySQL 的 Binlog 传输到 Kafka,后端是由计算程序从 Kafka 里生产,最初将数据写入到其余存储,架构相似如下:

这种计划中利用 Kafka 音讯队列做解耦,Change Log 可供任何其余业务零碎应用,生产端可采纳 Kafka Sink Connector 或者自定义生产程序,然而因为原生 Debezium 中的 Producer 端未采纳幂等个性,因而音讯可能存在反复,另外 Kafka Sink Connector(比方 JDBC Sink Connector 只能保障 At least once)或者自定义生产程序在保证数据的一致性上也有难度。

4.2 计划二:Debezium+Kafka+Flink SQL+ 存储系统

从第二章节咱们晓得 Flink SQL 具备解析 Kafka 中 debezium-json 和 canal-json 格局的 Change Log 能力,咱们能够采纳如下同步架构:

与计划一的区别就是,采纳 Flink 通过创立 Kafka 表,指定 format 格局为 debezium-json,而后通过 Flink 进行计算后或者直接插入到其余内部数据存储系统。计划二和计划一相似,组件多保护繁冗,而前述咱们晓得 Flink 1.11 中 CDC Connectors 内置了 Debezium 引擎,能够替换 Debezium+Kafka 计划,因而有了更简化的计划三。

4.3 计划三:Flink SQL CDC + JDBC Connector

将如下架构虚线局部用 Flink SQL 替换:

咱们失去如下改良的同步计划架构:

从官网的形容中,通过 Flink CDC connectors 替换 Debezium+Kafka 的数据采集模块,实现 Flink SQL 采集 + 计算 + 传输 (ETL) 一体化,长处很多:

  • 开箱即用,简略易上手
  • 缩小保护的组件,简化实时链路,加重部署老本
  • 减小端到端提早
  • Flink 本身反对 Exactly Once 的读取和计算
  • 数据不落地,缩小存储老本
  • 反对全量和增量流式读取
  • binlog 采集位点可回溯

五. Flink SQL CDC + JDBC Connector 同步计划验证

5.1 测试环境和脚本

测试环境测试场景 应用 Flink SQL CDC 从 MySQL 数据库同步数据到指标 MySQL,Kafka。

CREATE TABLE sbtest1 (
  id INT,
  k INT,
  c STRING,
  pad STRING
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '197.XXX.XXX.XXX',
  'port' = '3306',
  'username' = 'debezium',
  'password' = 'PASSWORD',
  'database-name' = 'cdcdb',
  'table-name' = 'sbtest1',
  'debezium.snapshot.mode' = 'initial'
);

到 DB:create table printSinkTable (
  id INT,
  k INT,
  c STRING,
  pad STRING,
  primary key (id) NOT ENFORCED
) with (
 'connector' = 'jdbc',
 'url' = 'jdbc:mysql://197.XXX.XXX.XXX:3306/mydb?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&serverTimezone=UTC',
 'username' = 'debezium',
 'password' = 'PASSWORD',
 'table-name' = 'sbtest',
 'driver' = 'com.mysql.cj.jdbc.Driver',
 'sink.buffer-flush.interval' = '3s',
 'sink.buffer-flush.max-rows' = '1',
 'sink.max-retries' = '5');
 INSERT INTO printSinkTable SELECT * FROM sbtest1;

 到 KAFKA:CREATE TABLE kafka_gmv (
  id INT,
  k INT,
  c STRING,
  pad STRING
) WITH (
    'connector' = 'kafka-0.11',
    'topic' = 'kafka_gmv',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = '197.XXX.XXX.XXX:9092',
    'format' = 'changelog-json'
);

INSERT INTO kafka_gmv SELECT * FROM sbtest1;

5.2 测试论断

■ 5.2.1 功能测试

■ 5.2.2 异样测试

  • 惯例功能测试

  • 基于 DNS 的数据库切换测试

测试示意图:

Flink 须要配置的参数:工作失败后提早 5 秒重启,重试 10 次。restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 10 restart-strategy.fixed-delay.delay: 5s。
MySQL 环境信息:一主库两个从库。
DNS 配置:DNS 申请了一个域名:XX.XX.cmbc.cn 策略:以后域名指向其中一个从库,探测数据库服务端口,每个 2 分钟主动探测一次。以后数据库异样后 DNS 批改指向到第二个从库。
○ 操作系统和 JVM 缓存配置:JVM 缓存配置 30 秒,操作系统缓存 30 秒。

测试后果:

  1. 当 Flink 参数未设置上述参数的状况下,kill 以后拜访数据库,Flink 工作报错退出,查看 DNS 没有拜访记录。
  2. Flink 配置上述参数后,Flink 后盾尝试拜访上述数据库,本地 DNS 缓存在拜访失败的状况下生效,从新申请 DNS 域名服务器获取新数据库访问信息,工作持续复制。
  • Flink 高可用测试

在 Flink 高可用测试中,咱们应用 Standalone 集群高可用性计划进行测试,一个主 JobManager,一个从 JobManager,当主节点异样之后,备选节点成为新的 leader,并接管 Flink 集群。新 JobManager 成为新的 leader 后,集群恢复正常,并能够进行工作的调度,异样的工作复原运行。

这里备选和主节点是一样的,也就是说每个 JobManager 都能够充当备选和主节点。官网的下图展现了这一过程:

异样测试步骤和后果如下:

■ 5.2.3 性能测试

性能测试进行了累计测试,用以检测 Flink cdc 的极限性能,别离测试了 kafka 和 MySQL 作为指标的场景。

  • 测试形容

应用 sysbench 进行压测,插入 200 余万数据,表构造如下:

CREATE TABLE `sbtest1` (`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `k` int(10) unsigned NOT NULL DEFAULT '0',
  `c` char(120) NOT NULL DEFAULT '',
  `pad` char(60) NOT NULL DEFAULT '',
  PRIMARY KEY (`id`),
  KEY `k_1` (`k`)
);

累计性能测试后果:

六. Flink SQL CDC + JDBC Connector 端到端一致性剖析

Flink SQL CDC + JDBC Connector 实质上是一个 Source 和 Sink 并行度为 1 的 Flink Stream Application,Source 和 Sink 之间无 Operator,上面咱们逐渐剖析 Flink SQL CDC + JDBC Connector 端到端如何保障一致性。

6.1 端到端一致性实现条件

一致性就是业务正确性,在“流零碎中间件”这个业务畛域,端到端一致性就代表 Exacly Once Msg Processing(简称 EOMP),即一个音讯只被解决一次,造成一次成果。即便机器或软件呈现故障,既没有反复数据,也不会丢数据。

幂等就是一个雷同的操作,无论反复多少次,造成的成果和只操作一次相等。
流零碎端到端链路较长,波及到上游 Source 层、两头计算层和上游 Sink 层三局部,要实现端到端的一致性,须要实现以下条件:

  • 上游能够 replay,否则两头计算层收到音讯后未计算,却产生 failure 而重启,音讯就会失落。
  • 记录音讯解决进度,并保障存储计算结果不呈现反复,二者是一个原子操作,或者存储计算结果是个幂等操作,否则若先记录解决进度,再存储计算结果时产生 failure,计算结果会失落,或者是记录完计算结果再产生 failure,就会 replay 生成多个计算结果。
  • 两头计算结果高可用,应答上游在接到计算结果后产生 failure,并未胜利解决该后果的场景,能够思考将两头计算结果放在高可用的 DataStore 里。
  • 上游去重,应答上游解决完音讯后产生 failure,反复接管音讯的场景,这种可通过给音讯设置 SequcenceId 实现去重,或者上游实现幂等。

在 Flink SQL CDC + JDBC Connector 计划中,上游是数据库系统的日志,是能够 replay 的,满足条件 1“上游可 replay”,接下来咱们别离剖析 Flink SQL CDC 如何实现条件 2 和 3,JDBCConnector 如何实现条件 4,最终实现端到端的一致性。以 MySQL->MySQL 为例,架构图如下(目前 Flink SQL 是不反对 Source/Sink 并行度配置的,Flink SQL 中各算子并行度默认是依据 Source 的 Partition 数或文件数来决定的,而 DebeziumSource 的并行度是 1,因而整个 Flink Task 的并行度为 1):

6.2 Flink SQL CDC 的一致性保障

Flink SQL CDC 用于获取数据库变更日志的 Source 函数是 DebeziumSourceFunction,且最终返回的类型是 RowData,该函数实现了 CheckpointedFunction,即通过 Checkpoint 机制来保障产生 failure 时不会丢数,实现 exactly once 语义,这部分在函数的正文中有明确的解释。

/**
 * The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data
 * from databases into Flink.
 * 通过 Checkpoint 机制来保障产生 failure 时不会丢数,实现 exactly once 语义
 * <p>The source function participates in checkpointing and guarantees that no data is lost
 * during a failure, and that the computation processes elements "exactly once".
 * 留神:这个 Source Function 不能同时运行多个实例
 * <p>Note: currently, the source function can't run in multiple parallel instances.
 *
 * <p>Please refer to Debezium's documentation for the available configuration properties:
 * https://debezium.io/documentation/reference/1.2/development/engine.html#engine-properties</p>
 */
@PublicEvolving
public class DebeziumSourceFunction<T> extends RichSourceFunction<T> implements
  CheckpointedFunction,
  ResultTypeQueryable<T> {

为实现 CheckpointedFunction,须要实现以下两个办法:

public interface CheckpointedFunction {
  // 做快照,把内存中的数据保留在 checkpoint 状态中
  void snapshotState(FunctionSnapshotContext var1) throws Exception;

  // 程序异样复原后从 checkpoint 状态中复原数据
  void initializeState(FunctionInitializationContext var1) throws Exception;
}

接下来咱们看看 DebeziumSourceFunction 中都记录了哪些状态。

/** Accessor for state in the operator state backend. 
    offsetState 中记录了读取的 binlog 文件和位移信息等,对应 Debezium 中的
*/
 private transient ListState<byte[]> offsetState;

/**
 * State to store the history records, i.e. schema changes.
 * historyRecordsState 记录了 schema 的变动等信息
 * @see FlinkDatabaseHistory
*/
 private transient ListState<String> historyRecordsState;

再回到端到端一致性的条件 2 和 3

2. 记录音讯解决进度,并保障存储计算结果不呈现反复,二者是一个原子操作,或者存储计算结果是个幂等操作,否则若先记录解决进度,再存储计算结果时产生 failure,计算结果会失落,或者是记录完计算结果再产生 failure,就会 replay 生成多个计算结果。

3. 两头计算结果高可用,应答上游在接到计算结果后产生 failure,并未胜利解决该后果的场景,能够思考将两头计算结果放在高可用的 DataStore 里。

咱们发现在 Flink SQL CDC 是一个绝对繁难的场景,没有两头算子,是通过 Checkpoint 长久化 binglog 生产位移和 schema 变动信息的快照,来实现 Exactly Once。接下来咱们剖析 Sink 端。

■ 6.2.1 JDBC Sink Connector 如何保障一致性

咱们在官网上发现对于 JDBC Sink Connector 的幂等性中有如下解释:

如果定义了主键,JDBC 写入时是可能保障 Upsert 语义的,如果 DB 不反对 Upsert 语法,则会进化成 DELETE + INSERT 语义。Upsert query 是原子执行的,能够保障幂等性。

这个在官网文档中也详细描述了更新失败或者存在故障时候如何做出的解决,上面的表格是不同的 DB 对应不同的 Upsert 语法:

Database    Upsert Grammar
MySQL    INSERT .. ON DUPLICATE KEY UPDATE ..
PostgreSQL    INSERT .. ON CONFLICT .. DO UPDATE SET ..

因而咱们能够通过写入时保障 Upsert 语义,从而保障上游 Sink 端的幂等性,再 Review 一次到端到端一致性实现条件 4,上游去重也能够通过实现幂等从而实现上游的 Exactly Once 语义。

4. 上游去重,应答上游解决完音讯后产生 failure,反复接管音讯的场景,这种可通过给音讯设置 SequcenceId 实现去重,或者上游实现幂等。

■ 6.2.2 Flink SQL CDC + JDBC Sink Connector 组合后如何保障一致性

在前两大节咱们剖析了组件各自如何保障一致性,接下来,咱们剖析组合后在源库异样、Flink 作业异样、指标库异样三种异样场景下如何保障端到端一致性。

  • Debezium Source 对 MySQL 进行 Snapshot 时产生异样

在 Flink Task 启动后,首先会进行 MySQL 全表扫描,也就是做 Snapshot,这里有个须要留神的中央就是,在 Snapshot 阶段,在扫描全表数据时,没有可用于复原的位点,所以无奈在全表扫描阶段去执行 Checkpoint。为了不执行 Checkpoint,MySQL 的 CDC 源表会让执行中的 Checkpoint 始终期待(通过持有 checkpoint 锁实现),甚至 Checkpoint 超时(如果表超级大,扫描耗时十分长)。这块能够从 DebeziumChangeConsumer 的代码中看到:

@Override
 public void handleBatch(
   List<ChangeEvent<SourceRecord, SourceRecord>> changeEvents,
   DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> committer) throws InterruptedException {
  try {for (ChangeEvent<SourceRecord, SourceRecord> event : changeEvents) {SourceRecord record = event.value();
    deserialization.deserialize(record, debeziumCollector);

    if (isInDbSnapshotPhase) {if (!lockHold) {MemoryUtils.UNSAFE.monitorEnter(checkpointLock);
      lockHold = true;
            // 在 snapshot 阶段不做 checkpoint
      LOG.info("Database snapshot phase can't perform checkpoint, acquired Checkpoint lock.");
     }
     if (!isSnapshotRecord(record)) {MemoryUtils.UNSAFE.monitorExit(checkpointLock);
      isInDbSnapshotPhase = false;
      LOG.info("Received record from streaming binlog phase, released checkpoint lock.");
     }
    }

    // emit the actual records. this also updates offset state atomically
    emitRecordsUnderCheckpointLock(debeziumCollector.records, record.sourcePartition(), record.sourceOffset());
   }
      ...

在做 Snapshot 阶段,可能会碰到源库 MySQL 异样或者 Flink 工作自身异样,那咱们别离剖析下异样后如何复原:

  1. 若遇到源库 MySQL 异样,Flink Task 发现无奈连贯数据库异样退出,重新启动 Flink Task(或者 retry),因为没有做 snapshot 没做 checkpoint,那么会从新再做一次 Snapshot,这些全量数据最初发送到目标 MySQL,因为上游 MySQL 实现了写幂等,因而最终放弃一致性。
  2. 若遇到 Flink 工作异样,重新启动(或者 retry),同下面状况一样,从新做一次 Snapshot,最终也能放弃一致性。
  3. 若遇到指标库 MySQL 异样,同场景一统一,Flink Task 无奈往指标数据库写入异样退出,在须要重新启动或 retry 后,从新做一次 Snapshot,全量数据最初发送到目标 MySQL,因为目标上游 M 有 SQL 实现了写幂等,最终放弃一致性。
  • Snapshot 实现后读取 binlog 时产生异样

在全量数据实现同步后,开始进行增量获取,此时 Flink 会进行定时 Checkpoint,将读取 binlog 的位移信息和 schema 信息存入到 StateBackend,若此时产生异样,那咱们剖析下异样后如何复原:

  1. 若源 MySQL 异样,Flink Task 发现无奈连贯数据库异样退出,重新启动 Flink Task(或者 retry),将会从最近一次 Checkpoint 的数据进行复原,因为能够读取到 mysql binlog 位移信息,实现持续同步,不会失落数据,最终也能放弃一致性。
  2. 若 Flink 工作异样,重新启动或 retry 后,同场景 1 统一,持续读取 binlog,能放弃一致性。
  3. 若目标 MySQL 异样,jdbc connector 无奈往指标数据库写入,cdc connector 读取到的 binlog 位移信息也不再更新,两个操作是一个原子性操作,在 Flink Task 复原后,从最近一次 Checkpoint 进行复原,最终放弃一致性。

6.3 总结

分布式系统中端到端一致性须要各个组件参加实现,Flink SQL CDC + JDBC Connector 能够通过如下办法保障端到端的一致性:

  • 源端是数据库的 binlog 日志,全量同步做 Snapshot 异样后能够再次做 Snapshot,增量同步时,Flink SQL CDC 中会记录读取的日志位移信息,也能够 replay
  • Flink SQL CDC 作为 Source 组件,是通过 Flink Checkpoint 机制,周期性长久化存储数据库日志文件生产位移和状态等信息(StateBackend 将 checkpoint 长久化),记录生产位移和写入指标库是一个原子操作,保障产生 failure 时不丢数据,实现 Exactly Once
  • JDBC Sink Connecotr 是通过写入时保障 Upsert 语义,从而保障上游的写入幂等性,实现 Exactly Once

再来回顾一下端到端放弃一致性的条件,发现全都能满足。

1.上游能够 replay,否则两头计算层收到音讯后未计算,却产生 failure 而重启,音讯就会失落。

2.记录音讯解决进度,并保障存储计算结果不呈现反复,二者是一个原子操作,或者存储计算结果是个幂等操作,否则若先记录解决进度,再存储计算结果时产生 failure,计算结果会失落,或者是记录完计算结果再产生 failure,就会 replay 生成多个计算结果。

3.两头计算结果高可用,应答上游在接到计算结果后产生 failure,并未胜利解决该后果的场景,能够思考将两头计算结果放在高可用的 DataStore 里。

4.上游去重,应答上游解决完音讯后产生 failure,反复接管音讯的场景,这种可通过给音讯设置 SequcenceId 实现去重,或者上游实现幂等。

七. Flink SQL CDC 目前存在的缺点

  • 应用正则匹配原表后(多个源端表),到指标表无奈进行一对一的映射。须要一一匹配。
  • CDC source 端定义时,须要指定所有字段,目前不反对省略字段定义。
  • CDC 到 KAFKA 时无奈依照主键进行主动分区散发、无奈指定分区键散发数据。到 KAFKA 的数据格式指定(JSON,AVRO JSON 等)。
  • 指标端反对需要:DB2、ADB/GreenPlum、Oracle 暂不反对。不反对 DDL 同步,不反对表的创立。
  • 工作治理和监控的 REST API 不欠缺。

参考资料:

端到端一致性,流零碎 Spark/Flink/Kafka/DataFlow 比照总结 https://zhuanlan.zhihu.com/p/…
基于 Flink SQL CDC 的实时数据同步计划 https://developer.aliyun.com/…
Flink SQL 1.11 新性能与最佳实际 https://developer.aliyun.com/…
分布式快照算法
https://zhuanlan.zhihu.com/p/…

作者介绍:

文乔:2012 年硕士毕业后退出民生银行生产运营部零碎管理中心,天眼日志平台次要参加人,目前在开源工具组负责 Flume、Kafka 的源码钻研和工具开发等相干工作。

王健:2011 年退出民生银行科技部,数据库管理员(负责 DB2,Oracle,MySQL 等运维工作,对 MPP 等数据库有很长的保护和施行教训,善于数据迁徙等等),同时负责行内 KAFKA 集群运维和施行工作,负责行内数据库实时复制等工作。

正文完
 0