本文通过实例来演示怎么通过Flink CDC 联合Doris的Flink Connector实现从Mysql数据库中监听数据并实时入库到Doris数仓对应的表中。
1.什么是CDC
CDC 是变更数据捕捉(Change Data Capture)技术的缩写,它能够将源数据库(Source)的增量变动记录,同步到一个或多个数据目标(Sink)。在同步过程中,还能够对数据进行肯定的解决,例如分组(GROUP BY)、多表的关联(JOIN)等。
例如对于电商平台,用户的订单会实时写入到某个源数据库;A 部门须要将每分钟的实时数据简略聚合解决后保留到 Redis 中以供查问,B 部门须要将当天的数据暂存到 Elasticsearch 一份来做报表展现,C 部门也须要一份数据到 ClickHouse 做实时数仓。随着工夫的推移,后续 D 部门、E 部门也会有数据分析的需要,这种场景下,传统的拷贝散发多个正本办法很不灵便,而 CDC 能够实现一份变动记录,实时处理并投递到多个目的地。
1.1 CDC的利用场景
数据同步:用于备份,容灾;
数据散发:一个数据源分发给多个上游零碎;
数据采集:面向数据仓库 / 数据湖的 ETL 数据集成,是十分重要的数据源。
CDC 的技术计划十分多,目前业界支流的实现机制能够分为两种:
- 基于查问的 CDC:
离线调度查问作业,批处理。把一张表同步到其余零碎,每次通过查问去获取表中最新的数据;
无奈保障数据一致性,查的过程中有可能数据曾经产生了屡次变更;
不保障实时性,基于离线调度存在人造的提早。 - 基于日志的 CDC:
实时生产日志,流解决,例如 MySQL 的 binlog 日志残缺记录了数据库中的变更,能够把 binlog 文件当作流的数据源;
保障数据一致性,因为 binlog 文件蕴含了所有历史变更明细;
保障实时性,因为相似 binlog 的日志文件是能够流式生产的,提供的是实时数据。
2.Flink CDC
Flink在1.11版本中新增了CDC的个性,简称 扭转数据捕捉。名称来看有点乱,咱们先从之前的数据架构来看CDC的内容。
以上是之前的mysq binlog日志解决流程,例如 canal 监听 binlog 把日志写入到 kafka 中。而 Apache Flink 实时生产 Kakfa 的数据实现 mysql 数据的同步或其余内容等。拆分来说整体上能够分为以下几个阶段。
mysql开启binlog
canal同步binlog数据写入到kafka
flink读取kakfa中的binlog数据进行相干的业务解决。
整体的解决链路较长,须要用到的组件也比拟多。Apache Flink CDC能够间接从数据库获取到binlog供上游进行业务计算剖析
2.1 Flink Connector Mysql CDC 2.0 个性
提供 MySQL CDC 2.0,外围 feature 包含
并发读取,全量数据的读取性能能够程度扩大;
全程无锁,不对线上业务产生锁的危险;
断点续传,反对全量阶段的 checkpoint。
网上有测试文档显示用 TPC-DS 数据集中的 customer 表进行了测试,Flink 版本是 1.13.1,customer 表的数据量是 6500 万条,Source 并发为 8,全量读取阶段:
MySQL CDC 2.0 用时 13 分钟;
MySQL CDC 1.4 用时 89 分钟;
读取性能晋升 6.8 倍。
3.什么是Flink Doris Connector
Apache Doris是一个现代化的MPP剖析型数据库产品。仅需亚秒级响应工夫即可取得查问后果,无效地反对实时数据分析。Apache Doris的分布式架构十分简洁,易于运维,并且能够反对10PB以上的超大数据集。
Apache Doris能够满足多种数据分析需要,例如固定历史报表,实时数据分析,交互式数据分析和摸索式数据分析等。令您的数据分析工作更加简略高效!
Flink Doris Connector 是 doris 社区为了不便用户应用 Flink 读写Doris数据表的一个扩大,
目前 doris 反对 Flink 1.11.x ,1.12.x,1.13.x,Scala版本:2.12.x
目前Flink doris connector目前管制入库通过两个参数:
sink.batch.size :每多少条写入一次,默认100条
sink.batch.interval :每个多少秒写入一下,默认1秒
这两参数同时起作用,那个条件先到就触发写doris表操作,
留神:
这里留神的是要启用 http v2 版本,具体在 fe.conf 中配置 enable_http_server_v2=true,同时因为是通过 fe http rest api 获取 be 列表,这俩须要配置的用户有 admin 权限。
4. 用法示例
Flink Doris Connector 编译
首先咱们要编译Doris的Flink connector,也能够通过上面的地址进行下载:
https://github.com/hf200012/h...
留神:
这里因为Doris 的Flink Connector 是基于Scala 2.12.x版本进行开发的,所有你在应用Flink 的时候请抉择对应scala 2.12的版本,
如果你应用下面地址下载了相应的jar,请疏忽上面的编译内容局部
在 doris 的 docker 编译环境 apache/incubator-doris:build-env-1.2 下进行编译,因为 1.3 上面的JDK 版本是 11,会存在编译问题。
在 extension/flink-doris-connector/ 源码目录下执行:
sh build.sh
编译胜利后,会在 output/ 目录下生成文件 doris-flink-1.0.0-SNAPSHOT.jar。将此文件复制到 Flink 的 ClassPath 中即可应用 Flink-Doris-Connector。例如,Local 模式运行的 Flink,将此文件放入 jars/ 文件夹下。Yarn集群模式运行的Flink,则将此文件放入预部署包中。
针对Flink 1.13.x版本适配问题
<properties> <scala.version>2.12</scala.version> <flink.version>1.11.2</flink.version> <libthrift.version>0.9.3</libthrift.version> <arrow.version>0.15.1</arrow.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <doris.home>${basedir}/../../</doris.home> <doris.thirdparty>${basedir}/../../thirdparty</doris.thirdparty> </properties>
只须要将这里的 flink.version 改成和你 Flink 集群版本统一,从新编辑即可
4.1 配置Flink
这里咱们是通过Flink Sql Client 形式来进行操作。
这里咱们演示应用的软件版本:
Mysql 8.x
Apache Flink : 1.13.3
Apache Doris :0.14.13.1
4.1.1 装置Flink
首先下载和装置 Flink :
https://dlcdn.apache.org/flin...
这里演示应用的是本地单机模式,
# wget https://dlcdn.apache.org/flink/flink-1.12.5/flink-1.12.5-bin-scala_2.12.tgz # tar zxvf flink-1.12.5-bin-scala_2.12.tgz
下载Flink CDC相干Jar包:
https://repo1.maven.org/maven...
这里留神Flink CDC 和Flink 的版本对应关系
# wget https://dlcdn.apache.org/flink/flink-1.13.3/flink-1.13.3-bin-scala_2.12.tgz # tar zxvf flink-1.13.3-bin-scala_2.12.tgz # cd flink-1.13.3 # wget https://repo1.maven.org/maven2/com/ververica/flink-connector-mysql-cdc/2.0.2/flink-connector-mysql-cdc-2.0.2.jar -P ./lib/ # wget https://github.com/hf200012/hf200012.github.io/raw/main/lib/doris-flink-1.0-SNAPSHOT.jar -P ./lib/
4.1.2 启动Flink
这里咱们应用的是本地单机模式
# bin/start-cluster.sh Starting cluster. Starting standalonesession daemon on host doris01. Starting taskexecutor daemon on host doris01.
咱们通过web拜访(默认端口是8081)启动起来Flink 集群,能够看到集群失常启动
4.2 装置Apache Doris
具体装置部署Doris的办法,参照上面的连贯:
https://hf200012.github.io/20...环境装置部署
4.3 装置配置 Mysql
装置Mysql
疾速应用Docker装置配置Mysql,具体参照上面的连贯
https://segmentfault.com/a/11...
开启Mysql binlog
进入 Docker 容器批改/etc/my.cnf 文件,在 [mysqld] 上面增加以下内容,
log_bin=mysql_bin binlog-format=Row server-id=1
而后重启Mysql
systemctl restart mysqld
创立Mysql数据库表
CREATE TABLE `test_cdc` ( `id` int NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB
4.4 创立doris表
CREATE TABLE `doris_test` ( `id` int NULL COMMENT "", `name` varchar(100) NULL COMMENT "" ) ENGINE=OLAP UNIQUE KEY(`id`) COMMENT "OLAP" DISTRIBUTED BY HASH(`id`) BUCKETS 1 PROPERTIES ( "replication_num" = "3", "in_memory" = "false", "storage_format" = "V2" );
4.5 启动 Flink Sql Client
./bin/sql-client.sh embedded > set execution.result-mode=tableau;
4.5.1 创立 Flink CDC Mysql 映射表
CREATE TABLE test_flink_cdc ( id INT, name STRING, primary key(id) NOT ENFORCED) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = 'password', 'database-name' = 'demo', 'table-name' = 'test_cdc' );
执行查问创立的Mysql映射表,显示失常
select * from test_flink_cdc;
4.5.2 创立Flink Doris Table 映射表
应用Doris Flink Connector创立 Doris映射表
CREATE TABLE doris_test_sink ( id INT, name STRING) WITH ( 'connector' = 'doris', 'fenodes' = 'localhost:8030', 'table.identifier' = 'db_audit.doris_test', 'sink.batch.size' = '2', 'sink.batch.interval'='1', 'username' = 'root', 'password' = '')
在命令行下执行下面的语句,能够看到创立表胜利,而后执行查问语句,验证是否失常
select * from doris_test_sink;
执行插入操作,将Mysql 里的数据通过 Flink CDC联合Doris Flink Connector形式插入到 Doris中
INSERT INTO doris_test_sink select id,name from test_flink_cdc
提交胜利之后咱们在Flink的Web界面能够看到相干的Job工作信息
4.5.3 向Mysql表中插入数据
INSERT INTO test_cdc VALUES (123, 'this is a update');INSERT INTO test_cdc VALUES (1212, '测试flink CDC');INSERT INTO test_cdc VALUES (1234, '这是测试');INSERT INTO test_cdc VALUES (11233, 'zhangfeng_1');INSERT INTO test_cdc VALUES (21233, 'zhangfeng_2');INSERT INTO test_cdc VALUES (31233, 'zhangfeng_3');INSERT INTO test_cdc VALUES (41233, 'zhangfeng_4');INSERT INTO test_cdc VALUES (51233, 'zhangfeng_5');INSERT INTO test_cdc VALUES (61233, 'zhangfeng_6');INSERT INTO test_cdc VALUES (71233, 'zhangfeng_7');INSERT INTO test_cdc VALUES (81233, 'zhangfeng_8');INSERT INTO test_cdc VALUES (91233, 'zhangfeng_9');
4.5.4 察看Doris表的数据
首先停掉Insert into这个工作,因为我是在本地单机模式,只有一个task工作,所以要停掉,而后在命令行执行查问语句能力看到数据
4.5.5 批改Mysql的数据
重新启动Insert into工作
批改Mysql表里的数据
update test_cdc set name='这个是验证批改的操作' where id =123
再去察看Doris表中的数据,你会发现曾经批改
留神这里如果要想Mysql表里的数据批改,Doris里的数据也同样批改,Doris数据表的模型要是Unique key模型,其余数据模型(Aggregate Key 和 Duplicate Key)不能进行数据的更新操作。
4.5.6 删除数据操作
目前Doris Flink Connector 还不反对删除操作,前面打算会加上这个操作