共计 6030 个字符,预计需要花费 16 分钟才能阅读完成。
本文通过实例来演示怎么通过 Flink CDC 联合 Doris 的 Flink Connector 实现从 Mysql 数据库中监听数据并实时入库到 Doris 数仓对应的表中。次要内容包含:
- 什么是 CDC
- Flink CDC
- 什么是 Flink Doris Connector
- 用法示例
Flink 中文学习网站
https://flink-learning.org.cn
一、什么是 CDC
CDC 是变更数据捕捉 (Change Data Capture) 技术的缩写,它能够将源数据库 (Source) 的增量变动记录,同步到一个或多个数据目标 (Sink)。在同步过程中,还能够对数据进行肯定的解决,例如分组 (GROUP BY)、多表的关联 (JOIN) 等。
例如对于电商平台,用户的订单会实时写入到某个源数据库;A 部门须要将每分钟的实时数据简略聚合解决后保留到 Redis 中以供查问,B 部门须要将当天的数据暂存到 Elasticsearch 一份来做报表展现,C 部门也须要一份数据到 ClickHouse 做实时数仓。随着工夫的推移,后续 D 部门、E 部门也会有数据分析的需要,这种场景下,传统的拷贝散发多个正本办法很不灵便,而 CDC 能够实现一份变动记录,实时处理并投递到多个目的地。
CDC 的利用场景
- 数据同步:用于备份,容灾;
- 数据散发:一个数据源分发给多个上游零碎;
- 数据采集:面向数据仓库 / 数据湖的 ETL 数据集成,是十分重要的数据源。
CDC 的技术计划十分多,目前业界支流的实现机制能够分为两种:
-
基于查问的 CDC:
- 离线调度查问作业,批处理。把一张表同步到其余零碎,每次通过查问去获取表中最新的数据;
- 无奈保障数据一致性,查的过程中有可能数据曾经产生了屡次变更;
- 不保障实时性,基于离线调度存在人造的提早。
-
基于日志的 CDC:
- 实时生产日志,流解决,例如 MySQL 的 binlog 日志残缺记录了数据库中的变更,能够把 binlog 文件当作流的数据源;
- 保障数据一致性,因为 binlog 文件蕴含了所有历史变更明细;
- 保障实时性,因为相似 binlog 的日志文件是能够流式生产的,提供的是实时数据。
二、Flink CDC
Flink 在 1.11 版本中新增了 CDC 的个性,简称扭转数据捕捉。名称来看有点乱,咱们先从之前的数据架构来看 CDC 的内容。
以上是之前的 mysq binlog
日志解决流程,例如 canal 监听 binlog 把日志写入到 kafka 中。而 Apache Flink 实时生产 Kakfa 的数据实现 mysql 数据的同步或其余内容等。拆分来说整体上能够分为以下几个阶段:
- Mysql 开启 binlog;
- Canal 同步 binlog 数据写入到 Kafka;
- Flink 读取 Kakfa 中的 binlog 数据进行相干的业务解决。
整体的解决链路较长,须要用到的组件也比拟多。Apache Flink CDC 能够间接从数据库获取到 binlog 供上游进行业务计算剖析
Flink Connector Mysql CDC 2.0 个性
提供 MySQL CDC 2.0,外围 feature 包含:
- 并发读取,全量数据的读取性能能够程度扩大;
- 全程无锁,不对线上业务产生锁的危险;
- 断点续传,反对全量阶段的 checkpoint。
网上有测试文档显示用 TPC-DS 数据集中的 customer 表进行了测试,Flink 版本是 1.13.1,customer 表的数据量是 6500 万条,Source 并发为 8,全量读取阶段:
- MySQL CDC 2.0 用时 13 分钟;
- MySQL CDC 1.4 用时 89 分钟;
- 读取性能晋升 6.8 倍。
三、什么是 Flink Doris Connector
Flink Doris Connector 是 Doris 社区为了不便用户应用 Flink 读写 Doris 数据表的一个扩大,目前 Doris 反对 Flink 1.11.x,1.12.x,1.13.x;Scala 版本:2.12.x。
目前 Flink Doris connector 目前管制入库通过两个参数:
- sink.batch.size:每多少条写入一次,默认 100 条;
- sink.batch.interval:每个多少秒写入一下,默认 1 秒。
这两参数同时起作用,哪个条件先到就触发写 Doris 表操作,
留神:
这里留神的是要启用 http v2 版本,具体在 fe.conf 中配置 enable_http_server_v2=true
,同时因为是通过 fe http rest api 获取 be 列表,这俩须要配置的用户有 admin 权限。
四、用法示例
4.1 Flink Doris Connector 编译
首先咱们要编译 Doris 的 Flink connector,也能够通过上面的地址进行下载:
https://github.com/hf200012/h…
留神:
这里因为 Doris 的 Flink Connector 是基于 Scala 2.12.x 版本进行开发的,所以你在应用 Flink 的时候请抉择对应 Scala 2.12 的版本,如果你应用下面地址下载了相应的 jar,请疏忽上面的编译内容局部。
在 Doris 的 docker 编译环境 apache/incubator-doris:build-env-1.2
下进行编译,因为 1.3 上面的 JDK 版本是 11,会存在编译问题。
在 extension/flink-doris-connector/ 源码目录下执行:
sh build.sh
编译胜利后,会在 output/
目录下生成文件 doris-flink-1.0.0-SNAPSHOT.jar
。将此文件复制到 Flink
的 ClassPath
中即可应用 Flink-Doris-Connector
。例如,Local
模式运行的 Flink
,将此文件放入 jars/
文件夹下。Yarn
集群模式运行的 Flink
,则将此文件放入预部署包中。
针对 Flink 1.13.x 版本适配问题
<properties>
<scala.version>2.12</scala.version>
<flink.version>1.11.2</flink.version>
<libthrift.version>0.9.3</libthrift.version>
<arrow.version>0.15.1</arrow.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<doris.home>${basedir}/../../</doris.home>
<doris.thirdparty>${basedir}/../../thirdparty</doris.thirdparty>
</properties>
只须要将这里的 flink.version
改成和你 Flink 集群版本统一,从新编辑即可。
4.2 配置 Flink
这里咱们是通过 Flink Sql Client 形式来进行操作。
这里咱们演示应用的软件版本:
- Mysql 8.x
- Apache Flink:1.13.3
- Apache Doris:0.14.13.1
4.2.1 装置 Flink
首先下载和装置 Flink:
https://dlcdn.apache.org/flin…
这里演示应用的是本地单机模式:
# wget https://dlcdn.apache.org/flink/flink-1.12.5/flink-1.12.5-bin-scala_2.12.tgz
# tar zxvf flink-1.12.5-bin-scala_2.12.tgz
下载 Flink CDC 相干 Jar 包:
https://repo1.maven.org/maven…
这里留神 Flink CDC 和 Flink 的版本对应关系。
- 将下面下载或者编译好的 Flink Doris Connector jar 包复制到 Flink 根目录下的 lib 目录下;
- Flink CDC 的 jar 包也复制到 Flink 根目录下的 lib 目录下。
4.2.2 启动 Flink
这里咱们应用的是本地单机模式。
# bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host doris01.
Starting taskexecutor daemon on host doris01.
咱们通过 web 拜访 (默认端口是 8081) 启动起来 Flink 集群,能够看到集群失常启动。
4.3 装置 Apache Doris
具体装置部署 Doris 的办法,参照上面的连贯:
https://hf200012.github.io/20… 环境装置部署。
4.4 装置配置 Mysql
-
装置 Mysql,疾速应用 Docker 装置配置 Mysql,具体参照上面的连贯:
https://segmentfault.com/a/11…
-
开启 Mysql binlog,进入 Docker 容器批改 /etc/my.cnf 文件,在 [mysqld] 上面增加以下内容,
log_bin=mysql_bin binlog-format=Row server-id=1
而后重启 Mysql。
systemctl restart mysqld
- 创立 Mysql 数据库表。
CREATE TABLE `test_cdc` (
`id` int NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB
4.5 创立 Doris 表
CREATE TABLE `doris_test` (
`id` int NULL COMMENT "",
`name` varchar(100) NULL COMMENT ""
) ENGINE=OLAP
UNIQUE KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"storage_format" = "V2"
);
4.6 启动 Flink Sql Client
./bin/sql-client.sh embedded
> set execution.result-mode=tableau;
4.6.1 创立 Flink CDC Mysql 映射表
CREATE TABLE test_flink_cdc (
id INT,
name STRING,
primary key(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = 'password',
'database-name' = 'demo',
'table-name' = 'test_cdc'
);
执行查问创立的 Mysql 映射表,显示失常。
select * from test_flink_cdc;
4.6.2 创立 Flink Doris Table 映射表
应用 Doris Flink Connector 创立 Doris 映射表。
CREATE TABLE doris_test_sink (
id INT,
name STRING
)
WITH (
'connector' = 'doris',
'fenodes' = 'localhost:8030',
'table.identifier' = 'db_audit.doris_test',
'sink.batch.size' = '2',
'sink.batch.interval'='1',
'username' = 'root',
'password' = ''
)
在命令行下执行下面的语句,能够看到创立表胜利,而后执行查问语句,验证是否失常。
select * from doris_test_sink;
执行插入操作,将 Mysql 里的数据通过 Flink CDC 联合 Doris Flink Connector 形式插入到 Doris 中。
INSERT INTO doris_test_sink select id,name from test_flink_cdc
提交胜利之后咱们在 Flink 的 Web 界面能够看到相干的 Job 工作信息。
4.6.3 向 Mysql 表中插入数据
INSERT INTO test_cdc VALUES (123, 'this is a update');
INSERT INTO test_cdc VALUES (1212, '测试 flink CDC');
INSERT INTO test_cdc VALUES (1234, '这是测试');
INSERT INTO test_cdc VALUES (11233, 'zhangfeng_1');
INSERT INTO test_cdc VALUES (21233, 'zhangfeng_2');
INSERT INTO test_cdc VALUES (31233, 'zhangfeng_3');
INSERT INTO test_cdc VALUES (41233, 'zhangfeng_4');
INSERT INTO test_cdc VALUES (51233, 'zhangfeng_5');
INSERT INTO test_cdc VALUES (61233, 'zhangfeng_6');
INSERT INTO test_cdc VALUES (71233, 'zhangfeng_7');
INSERT INTO test_cdc VALUES (81233, 'zhangfeng_8');
INSERT INTO test_cdc VALUES (91233, 'zhangfeng_9');
4.6.4 察看 Doris 表的数据
首先停掉 Insert into 这个工作,因为我是在本地单机模式,只有一个 task 工作,所以要停掉,而后在命令行执行查问语句能力看到数据。
4.6.5 批改 Mysql 的数据
重新启动 Insert into 工作:
批改 Mysql 表里的数据:
update test_cdc set name='这个是验证批改的操作' where id =123
再去察看 Doris 表中的数据,你会发现曾经批改。
留神这里如果要想 Mysql 表里的数据批改,Doris 里的数据也同样批改,Doris 数据表的模型要是 Unique key 模型,其余数据模型 (Aggregate Key 和 Duplicate Key) 不能进行数据的更新操作。
4.6.6 删除数据操作
目前 Doris Flink Connector 还不反对删除操作,前面打算会加上这个操作。
更多 Flink CDC 相干技术问题,可扫码退出社区钉钉交换群~
相干文章
- Flink CDC 系列 – 构建 MySQL 和 Postgres 上的 Streaming ETL
- Flink CDC 2.1 正式公布,稳定性大幅晋升,新增 Oracle,MongoDB 反对
近期热点
- Flink Forward Asia 2021 延期,线上相见
- 奖金翻倍!Flink Forward Asia Hackathon 最新参赛指南请查收
更多 Flink 相干技术问题,可扫码退出社区钉钉交换群
第一工夫获取最新技术文章和社区动静,请关注公众号~