乐趣区

关于Flink:Flink-CDC-系列-同步-MySQL-分库分表构建-Iceberg-实时数据湖

作者:罗宇侠

本篇教程将展现如何应用 Flink CDC 构建实时数据湖,并解决分库分表合并同步的场景。
Flink-CDC 我的项目地址:

https://github.com/ververica/…

Flink 中文学习网站
https://flink-learning.org.cn

在 OLTP 零碎中,为了解决单表数据量大的问题,通常采纳分库分表的形式将单个大表进行拆分以进步零碎的吞吐量。

然而为了不便数据分析,通常须要将分库分表拆分出的表在同步到数据仓库、数据湖时,再合并成一个大表。

这篇教程将展现如何应用 Flink CDC 构建实时数据湖来应答这种场景,本教程的演示基于 Docker,只波及 SQL,无需一行 Java/Scala 代码,也无需装置 IDE,你能够很不便地在本人的电脑上实现本教程的全部内容。

接下来将以数据从 MySQL 同步到 Iceberg [1] 为例展现整个流程,架构图如下所示:

一、筹备阶段

筹备一台曾经装置了 Docker 的 Linux 或者 MacOS 电脑。

1.1 筹备教程所须要的组件

接下来的教程将以 docker-compose 的形式筹备所须要的组件。

应用上面的内容创立一个 docker-compose.yml 文件:

version: '2.1'
services:
  sql-client:
    user: flink:flink
    image: yuxialuo/flink-sql-client:1.13.2.v1 
    depends_on:
      - jobmanager
      - mysql
    environment:
      FLINK_JOBMANAGER_HOST: jobmanager
      MYSQL_HOST: mysql
    volumes:
      - shared-tmpfs:/tmp/iceberg
  jobmanager:
    user: flink:flink
    image: flink:1.13.2-scala_2.11
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
    volumes:
      - shared-tmpfs:/tmp/iceberg
  taskmanager:
    user: flink:flink
    image: flink:1.13.2-scala_2.11
    depends_on:
      - jobmanager
    command: taskmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2
    volumes:
      - shared-tmpfs:/tmp/iceberg
  mysql:
    image: debezium/example-mysql:1.1
    ports:
      - "3306:3306"
    environment:
      - MYSQL_ROOT_PASSWORD=123456
      - MYSQL_USER=mysqluser
      - MYSQL_PASSWORD=mysqlpw

volumes:
  shared-tmpfs:
    driver: local
    driver_opts:
      type: "tmpfs"
      device: "tmpfs"

该 Docker Compose 中蕴含的容器有:

  • SQL-Client:Flink SQL Client, 用来提交 SQL 查问和查看 SQL 的执行后果;
  • Flink Cluster:蕴含 Flink JobManager 和 Flink TaskManager,用来执行 Flink SQL;
  • MySQL:作为分库分表的数据源,存储本教程的 user 表。

docker-compose.yml 所在目录下执行上面的命令来启动本教程须要的组件:

docker-compose up -d

该命令将以 detached 模式主动启动 Docker Compose 配置中定义的所有容器。你能够通过 docker ps 来察看上述的容器是否失常启动了,也能够通过拜访 http://localhost:8081/ 来查看 Flink 是否运行失常。

留神:

  1. 本教程接下来用到的容器相干的命令都须要在 docker-compose.yml 所在目录下执行。
  2. 为了简化整个教程,本教程须要的 jar 包都曾经被打包进 SQL-Client 容器中了,镜像的构建脚本能够在 GitHub [2] 上找到。

如果你想要在本人的 Flink 环境运行本教程,须要下载上面列出的包并且把它们放在 Flink 所在目录的 lib 目录下,即 FLINK_HOME/lib/

  • flink-sql-connector-mysql-cdc-2.1.0.jar
  • flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
  • iceberg-flink-1.13-runtime-0.13.0-SNAPSHOT.jar

截止目前反对 Flink 1.13 的 iceberg-flink-runtime jar 包还没有公布,所以咱们在这里提供了一个反对 Flink 1.13 的 iceberg-flink-runtime jar 包,这个 jar 包是基于 Iceberg 的 master 分支打包的。

当 Iceberg 0.13.0 版本公布后,你也能够在 apache official repository [3] 下载到反对 Flink 1.13 的 iceberg-flink-runtime jar 包。

1.2 筹备数据

  1. 进入 MySQL 容器中:

    docker-compose exec mysql mysql -uroot -p123456
  1. 创立数据和表,并填充数据。

创立两个不同的数据库,并在每个数据库中创立两个表,作为 user 表分库分表下拆分出的表。

 CREATE DATABASE db_1;
 USE db_1;
 CREATE TABLE user_1 (
   id INTEGER NOT NULL PRIMARY KEY,
   name VARCHAR(255) NOT NULL DEFAULT 'flink',
   address VARCHAR(1024),
   phone_number VARCHAR(512),
   email VARCHAR(255)
 );
 INSERT INTO user_1 VALUES (110,"user_110","Shanghai","123567891234","user_110@foo.com");

 CREATE TABLE user_2 (
   id INTEGER NOT NULL PRIMARY KEY,
   name VARCHAR(255) NOT NULL DEFAULT 'flink',
   address VARCHAR(1024),
   phone_number VARCHAR(512),
   email VARCHAR(255)
 );
INSERT INTO user_2 VALUES (120,"user_120","Shanghai","123567891234","user_120@foo.com");
CREATE DATABASE db_2;
USE db_2;
CREATE TABLE user_1 (
  id INTEGER NOT NULL PRIMARY KEY,
  name VARCHAR(255) NOT NULL DEFAULT 'flink',
  address VARCHAR(1024),
  phone_number VARCHAR(512),
  email VARCHAR(255)
);
INSERT INTO user_1 VALUES (110,"user_110","Shanghai","123567891234", NULL);

CREATE TABLE user_2 (
  id INTEGER NOT NULL PRIMARY KEY,
  name VARCHAR(255) NOT NULL DEFAULT 'flink',
  address VARCHAR(1024),
  phone_number VARCHAR(512),
  email VARCHAR(255)
);
INSERT INTO user_2 VALUES (220,"user_220","Shanghai","123567891234","user_220@foo.com");

二、在 Flink SQL CLI 中应用 Flink DDL 创立表

首先,应用如下的命令进入 Flink SQL CLI 容器中:

docker-compose exec sql-client ./sql-client

咱们能够看到如下界面:

而后,进行如下步骤:

  1. 开启 checkpoint

Checkpoint 默认是不开启的,咱们须要开启 Checkpoint 来让 Iceberg 能够提交事务。
并且,mysql-cdc 在 binlog 读取阶段开始前,须要期待一个残缺的 checkpoint 来防止 binlog 记录乱序的状况。

-- Flink SQL
-- 每隔 3 秒做一次 checkpoint                 
Flink SQL> SET execution.checkpointing.interval = 3s;
  1. 创立 MySQL 分库分表 source 表

创立 source 表 user_source 来捕捉 MySQL 中所有 user 表的数据,在表的配置项 database-name , table-name 应用正则表达式来匹配这些表。
并且,user_source 表也定义了 metadata 列来辨别数据是来自哪个数据库和表。

-- Flink SQL
Flink SQL> CREATE TABLE user_source (
    database_name STRING METADATA VIRTUAL,
    table_name STRING METADATA VIRTUAL,
    `id` DECIMAL(20, 0) NOT NULL,
    name STRING,
    address STRING,
    phone_number STRING,
    email STRING,
    PRIMARY KEY (`id`) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'mysql',
    'port' = '3306',
    'username' = 'root',
    'password' = '123456',
    'database-name' = 'db_[0-9]+',
    'table-name' = 'user_[0-9]+'
  );
  1. 创立 Iceberg sink 表

创立 sink 表 all_users_sink,用来将数据加载至 Iceberg 中。
在这个 sink 表,思考到不同的 MySQL 数据库表的 id 字段的值可能雷同,咱们定义了复合主键 (database_name, table_name, id)。

-- Flink SQL
Flink SQL> CREATE TABLE all_users_sink (
    database_name STRING,
    table_name    STRING,
    `id`          DECIMAL(20, 0) NOT NULL,
    name          STRING,
    address       STRING,
    phone_number  STRING,
    email         STRING,
    PRIMARY KEY (database_name, table_name, `id`) NOT ENFORCED
  ) WITH (
    'connector'='iceberg',
    'catalog-name'='iceberg_catalog',
    'catalog-type'='hadoop',  
    'warehouse'='file:///tmp/iceberg/warehouse',
    'format-version'='2'
  );

三、流式写入 Iceberg

  1. 应用上面的 Flink SQL 语句将数据从 MySQL 写入 Iceberg 中:

    -- Flink SQL
    Flink SQL> INSERT INTO all_users_sink select * from user_source;

上述命令将会启动一个流式作业,源源不断将 MySQL 数据库中的全量和增量数据同步到 Iceberg 中。
在 Flink UI [4] 上能够看到这个运行的作业:

而后咱们就能够应用如下的命令看到 Iceberg 中的写入的文件:

docker-compose exec sql-client tree /tmp/iceberg/warehouse/default_database/

如下所示:

在你的运行环境中,理论的文件可能与下面的截图不雷同,然而整体的目录构造应该类似。

  1. 应用上面的 Flink SQL 语句查问表 all_users_sink 中的数据:

    -- Flink SQL
    Flink SQL> SELECT * FROM all_users_sink;

在 Flink SQL CLI 中咱们能够看到如下查问后果:

批改 MySQL 中表的数据,Iceberg 中的表 all_users_sink 中的数据也将实时更新:

(3.1) 在 db_1.user_1 表中插入新的一行

--- db_1
INSERT INTO db_1.user_1 VALUES (111,"user_111","Shanghai","123567891234","user_111@foo.com");

(3.2) 更新 db_1.user_2 表的数据

--- db_1
UPDATE db_1.user_2 SET address='Beijing' WHERE id=120;

(3.3) 在 db_2.user_2 表中删除一行

--- db_2
DELETE FROM db_2.user_2 WHERE id=220;

每执行一步,咱们就能够在 Flink Client CLI 中应用 SELECT * FROM all_users_sink 查问表 all_users_sink 来看到数据的变动。

最初的查问后果如下所示:

从 Iceberg 的最新后果中能够看到新增了 (db_1, user_1, 111) 的记录,(db_1, user_2, 120)的地址更新成了 Beijing,且 (db_2, user_2, 220) 的记录被删除了,与咱们在 MySQL 做的数据更新完全一致。

四、环境清理

本教程完结后,在 docker-compose.yml 文件所在的目录下执行如下命令进行所有容器:

docker-compose down

五、总结

在本文中,咱们展现了如何应用 Flink CDC 同步 MySQL 分库分表的数据,疾速构建 Icberg 实时数据湖。用户也能够同步其余数据库(Postgres/Oracle)的数据到 Hudi 等数据湖中。最初心愿通过本文,可能帮忙读者疾速上手 Flink CDC。

更多 Flink CDC 相干技术问题,可扫码退出社区钉钉交换群~

正文:

[1] https://iceberg.apache.org/

[2] https://github.com/luoyuxia/f…

[3] https://repo.maven.apache.org…


Flink Forward Asia 2021

2022 年 1 月 8-9 日,FFA 2021 重磅开启,寰球 40+ 多行业一线厂商,80+ 干货议题,带来专属于开发者的技术盛宴。

大会官网:
https://flink-forward.org.cn

大会线上观看地址 (记得预约哦):
https://developer.aliyun.com/…

更多 Flink 相干技术问题,可扫码退出社区钉钉交换群
第一工夫获取最新技术文章和社区动静,请关注公众号~

退出移动版