乐趣区

关于Flink:使用-Flink-CDC-实现-MySQL-数据实时入-Apache-Doris

本文通过实例来演示怎么通过 Flink CDC 联合 Doris 的 Flink Connector 实现从 Mysql 数据库中监听数据并实时入库到 Doris 数仓对应的表中。

1. 什么是 CDC

CDC 是变更数据捕捉(Change Data Capture)技术的缩写,它能够将源数据库(Source)的增量变动记录,同步到一个或多个数据目标(Sink)。在同步过程中,还能够对数据进行肯定的解决,例如分组(GROUP BY)、多表的关联(JOIN)等。

例如对于电商平台,用户的订单会实时写入到某个源数据库;A 部门须要将每分钟的实时数据简略聚合解决后保留到 Redis 中以供查问,B 部门须要将当天的数据暂存到 Elasticsearch 一份来做报表展现,C 部门也须要一份数据到 ClickHouse 做实时数仓。随着工夫的推移,后续 D 部门、E 部门也会有数据分析的需要,这种场景下,传统的拷贝散发多个正本办法很不灵便,而 CDC 能够实现一份变动记录,实时处理并投递到多个目的地。

1.1 CDC 的利用场景

数据同步:用于备份,容灾;
数据散发:一个数据源分发给多个上游零碎;
数据采集:面向数据仓库 / 数据湖的 ETL 数据集成,是十分重要的数据源。
CDC 的技术计划十分多,目前业界支流的实现机制能够分为两种:

  • 基于查问的 CDC:
    离线调度查问作业,批处理。把一张表同步到其余零碎,每次通过查问去获取表中最新的数据;
    无奈保障数据一致性,查的过程中有可能数据曾经产生了屡次变更;
    不保障实时性,基于离线调度存在人造的提早。
  • 基于日志的 CDC:
    实时生产日志,流解决,例如 MySQL 的 binlog 日志残缺记录了数据库中的变更,能够把 binlog 文件当作流的数据源;
    保障数据一致性,因为 binlog 文件蕴含了所有历史变更明细;
    保障实时性,因为相似 binlog 的日志文件是能够流式生产的,提供的是实时数据。

2.Flink CDC

Flink 在 1.11 版本中新增了 CDC 的个性,简称 扭转数据捕捉。名称来看有点乱,咱们先从之前的数据架构来看 CDC 的内容。

以上是之前的 mysq binlog 日志解决流程,例如 canal 监听 binlog 把日志写入到 kafka 中。而 Apache Flink 实时生产 Kakfa 的数据实现 mysql 数据的同步或其余内容等。拆分来说整体上能够分为以下几个阶段。

mysql 开启 binlog
canal 同步 binlog 数据写入到 kafka
flink 读取 kakfa 中的 binlog 数据进行相干的业务解决。
整体的解决链路较长,须要用到的组件也比拟多。Apache Flink CDC 能够间接从数据库获取到 binlog 供上游进行业务计算剖析

2.1 Flink Connector Mysql CDC 2.0 个性

提供 MySQL CDC 2.0,外围 feature 包含

并发读取,全量数据的读取性能能够程度扩大;
全程无锁,不对线上业务产生锁的危险;
断点续传,反对全量阶段的 checkpoint。
网上有测试文档显示用 TPC-DS 数据集中的 customer 表进行了测试,Flink 版本是 1.13.1,customer 表的数据量是 6500 万条,Source 并发为 8,全量读取阶段:

MySQL CDC 2.0 用时 13 分钟;
MySQL CDC 1.4 用时 89 分钟;
读取性能晋升 6.8 倍。

3. 什么是 Flink Doris Connector

Apache Doris 是一个现代化的 MPP 剖析型数据库产品。仅需亚秒级响应工夫即可取得查问后果,无效地反对实时数据分析。Apache Doris 的分布式架构十分简洁,易于运维,并且能够反对 10PB 以上的超大数据集。

Apache Doris 能够满足多种数据分析需要,例如固定历史报表,实时数据分析,交互式数据分析和摸索式数据分析等。令您的数据分析工作更加简略高效!

Flink Doris Connector 是 doris 社区为了不便用户应用 Flink 读写 Doris 数据表的一个扩大,

目前 doris 反对 Flink 1.11.x,1.12.x,1.13.x,Scala 版本:2.12.x

目前 Flink doris connector 目前管制入库通过两个参数:

sink.batch.size:每多少条写入一次,默认 100 条
sink.batch.interval:每个多少秒写入一下,默认 1 秒
这两参数同时起作用,那个条件先到就触发写 doris 表操作,

留神:

这里留神的是要启用 http v2 版本,具体在 fe.conf 中配置 enable_http_server_v2=true,同时因为是通过 fe http rest api 获取 be 列表,这俩须要配置的用户有 admin 权限。

4. 用法示例

Flink Doris Connector 编译

首先咱们要编译 Doris 的 Flink connector,也能够通过上面的地址进行下载:

https://github.com/hf200012/h…

留神:
这里因为 Doris 的 Flink Connector 是基于 Scala 2.12.x 版本进行开发的,所有你在应用 Flink 的时候请抉择对应 scala 2.12 的版本,
如果你应用下面地址下载了相应的 jar,请疏忽上面的编译内容局部
在 doris 的 docker 编译环境 apache/incubator-doris:build-env-1.2 下进行编译,因为 1.3 上面的 JDK 版本是 11,会存在编译问题。

在 extension/flink-doris-connector/ 源码目录下执行:

sh build.sh

编译胜利后,会在 output/ 目录下生成文件 doris-flink-1.0.0-SNAPSHOT.jar。将此文件复制到 Flink 的 ClassPath 中即可应用 Flink-Doris-Connector。例如,Local 模式运行的 Flink,将此文件放入 jars/ 文件夹下。Yarn 集群模式运行的 Flink,则将此文件放入预部署包中。

针对 Flink 1.13.x 版本适配问题


<properties>
 <scala.version>2.12</scala.version>
 <flink.version>1.11.2</flink.version> 
 <libthrift.version>0.9.3</libthrift.version> 
 <arrow.version>0.15.1</arrow.version>
 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
 <doris.home>${basedir}/../../</doris.home> 
 <doris.thirdparty>${basedir}/../../thirdparty</doris.thirdparty> 
</properties> 

只须要将这里的 flink.version 改成和你 Flink 集群版本统一,从新编辑即可

4.1 配置 Flink

这里咱们是通过 Flink Sql Client 形式来进行操作。

这里咱们演示应用的软件版本:

Mysql 8.x
Apache Flink:1.13.3
Apache Doris:0.14.13.1

4.1.1 装置 Flink

首先下载和装置 Flink:

https://dlcdn.apache.org/flin…

这里演示应用的是本地单机模式,

# wget https://dlcdn.apache.org/flink/flink-1.12.5/flink-1.12.5-bin-scala_2.12.tgz # tar zxvf flink-1.12.5-bin-scala_2.12.tgz

下载 Flink CDC 相干 Jar 包:

https://repo1.maven.org/maven…

这里留神 Flink CDC 和 Flink 的版本对应关系

# wget https://dlcdn.apache.org/flink/flink-1.13.3/flink-1.13.3-bin-scala_2.12.tgz 
# tar zxvf flink-1.13.3-bin-scala_2.12.tgz  
# cd flink-1.13.3 
# wget https://repo1.maven.org/maven2/com/ververica/flink-connector-mysql-cdc/2.0.2/flink-connector-mysql-cdc-2.0.2.jar -P ./lib/ 
# wget https://github.com/hf200012/hf200012.github.io/raw/main/lib/doris-flink-1.0-SNAPSHOT.jar -P ./lib/ 

4.1.2 启动 Flink

这里咱们应用的是本地单机模式

# bin/start-cluster.sh 
Starting cluster. 
Starting standalonesession daemon on host doris01. 
Starting taskexecutor daemon on host doris01. 

咱们通过 web 拜访(默认端口是 8081)启动起来 Flink 集群,能够看到集群失常启动

4.2 装置 Apache Doris

具体装置部署 Doris 的办法,参照上面的连贯:

https://hf200012.github.io/20… 环境装置部署

4.3 装置配置 Mysql

装置 Mysql
疾速应用 Docker 装置配置 Mysql,具体参照上面的连贯
https://segmentfault.com/a/11…
开启 Mysql binlog
进入 Docker 容器批改 /etc/my.cnf 文件,在 [mysqld] 上面增加以下内容,

log_bin=mysql_bin 
binlog-format=Row 
server-id=1

而后重启 Mysql

systemctl restart mysqld
创立 Mysql 数据库表

 CREATE TABLE `test_cdc` (
  `id` int NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
 ) ENGINE=InnoDB 

4.4 创立 doris 表

CREATE TABLE `doris_test` (
  `id` int NULL COMMENT "",
  `name` varchar(100) NULL COMMENT ""
 ) ENGINE=OLAP
 UNIQUE KEY(`id`)
 COMMENT "OLAP"
 DISTRIBUTED BY HASH(`id`) BUCKETS 1
 PROPERTIES (
 "replication_num" = "3",
 "in_memory" = "false",
 "storage_format" = "V2"
 );

4.5 启动 Flink Sql Client

./bin/sql-client.sh embedded 
> set execution.result-mode=tableau; 

4.5.1 创立 Flink CDC Mysql 映射表

CREATE TABLE test_flink_cdc ( 
  id INT, 
  name STRING,
  primary key(id)  NOT ENFORCED
) WITH ( 
  'connector' = 'mysql-cdc', 
  'hostname' = 'localhost', 
  'port' = '3306', 
  'username' = 'root', 
  'password' = 'password', 
  'database-name' = 'demo', 
  'table-name' = 'test_cdc' 
);

执行查问创立的 Mysql 映射表,显示失常

select * from test_flink_cdc;

4.5.2 创立 Flink Doris Table 映射表

应用 Doris Flink Connector 创立 Doris 映射表

CREATE TABLE doris_test_sink (
   id INT,
   name STRING
) 
WITH (
  'connector' = 'doris',
  'fenodes' = 'localhost:8030',
  'table.identifier' = 'db_audit.doris_test',
  'sink.batch.size' = '2',
  'sink.batch.interval'='1',
  'username' = 'root',
  'password' = ''
)

在命令行下执行下面的语句,能够看到创立表胜利,而后执行查问语句,验证是否失常

select * from doris_test_sink;

执行插入操作,将 Mysql 里的数据通过 Flink CDC 联合 Doris Flink Connector 形式插入到 Doris 中

INSERT INTO doris_test_sink select id,name from test_flink_cdc 

提交胜利之后咱们在 Flink 的 Web 界面能够看到相干的 Job 工作信息

4.5.3 向 Mysql 表中插入数据

INSERT INTO test_cdc VALUES (123, 'this is a update');
INSERT INTO test_cdc VALUES (1212, '测试 flink CDC');
INSERT INTO test_cdc VALUES (1234, '这是测试');
INSERT INTO test_cdc VALUES (11233, 'zhangfeng_1');
INSERT INTO test_cdc VALUES (21233, 'zhangfeng_2');
INSERT INTO test_cdc VALUES (31233, 'zhangfeng_3');
INSERT INTO test_cdc VALUES (41233, 'zhangfeng_4');
INSERT INTO test_cdc VALUES (51233, 'zhangfeng_5');
INSERT INTO test_cdc VALUES (61233, 'zhangfeng_6');
INSERT INTO test_cdc VALUES (71233, 'zhangfeng_7');
INSERT INTO test_cdc VALUES (81233, 'zhangfeng_8');
INSERT INTO test_cdc VALUES (91233, 'zhangfeng_9');

4.5.4 察看 Doris 表的数据

首先停掉 Insert into 这个工作,因为我是在本地单机模式,只有一个 task 工作,所以要停掉,而后在命令行执行查问语句能力看到数据

4.5.5 批改 Mysql 的数据

重新启动 Insert into 工作

批改 Mysql 表里的数据

update test_cdc set name='这个是验证批改的操作' where id =123 

再去察看 Doris 表中的数据,你会发现曾经批改

留神这里如果要想 Mysql 表里的数据批改,Doris 里的数据也同样批改,Doris 数据表的模型要是 Unique key 模型,其余数据模型(Aggregate Key 和 Duplicate Key)不能进行数据的更新操作。

4.5.6 删除数据操作

目前 Doris Flink Connector 还不反对删除操作,前面打算会加上这个操作

退出移动版