共计 8034 个字符,预计需要花费 21 分钟才能阅读完成。
Flink1.11 引入了 CDC 的 connector,通过这种形式能够很不便地捕捉变动的数据,大大简化了数据处理的流程。Flink1.11 的 CDC connector 次要包含:MySQL CDC
和 Postgres CDC
, 同时对 Kafka 的Connector 反对 canal-json
和debezium-json
以及 changelog-json
的 format。本文次要分享以下内容:
- CDC 简介
- Flink 提供的 table format
- 应用过程中的留神点
- mysql-cdc 的操作实际
- canal-json 的操作实际
- changelog-json 的操作实际
简介
Flink CDC Connector 是 ApacheFlink 的一组数据源连接器,应用 变动数据捕捉 change data capture (CDC))从不同的数据库中提取变更数据。Flink CDC 连接器将 Debezium 集成为引擎来捕捉数据变更。因而,它能够充分利用 Debezium 的性能。
特点
- 反对读取数据库快照,并且可能继续读取数据库的变更日志,即便产生故障,也反对exactly-once 的解决语义
- 对于 DataStream API 的 CDC connector,用户无需部署 Debezium 和 Kafka,即可在单个作业中应用多个数据库和表上的变更数据。
- 对于 Table/SQL API 的 CDC connector,用户能够应用 SQL DDL 创立 CDC 数据源,来监督单个表上的数据变更。
应用场景
- 数据库之间的增量数据同步
- 审计日志
- 数据库之上的实时物化视图
- 基于 CDC 的维表 join
- …
Flink 提供的 table format
Flink 提供了一系列能够用于 table connector 的 table format,具体如下:
Formats | Supported Connectors |
---|---|
CSV | Apache Kafka, Filesystem |
JSON | Apache Kafka, Filesystem, Elasticsearch |
Apache Avro | Apache Kafka, Filesystem |
Debezium CDC | Apache Kafka |
Canal CDC | Apache Kafka |
Apache Parquet | Filesystem |
Apache ORC | Filesystem |
应用过程中的留神点
应用 MySQL CDC 的留神点
如果要应用 MySQL CDC connector,对于程序而言,须要增加如下依赖:
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.0.0</version>
</dependency>
如果要应用 Flink SQL Client,须要增加如下 jar 包:flink-sql-connector-mysql-cdc-1.0.0.jar,将该 jar 包放在 Flink 装置目录的 lib 文件夹下即可。
应用 canal-json 的留神点
如果要应用 Kafka 的 canal-json,对于程序而言,须要增加如下依赖:
<!-- universal -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.11.0</version>
</dependency>
如果要应用 Flink SQL Client,须要增加如下 jar 包:flink-sql-connector-kafka_2.11-1.11.0.jar,将该 jar 包放在 Flink 装置目录的 lib 文件夹下即可。因为 Flink1.11 的安装包 的 lib 目录下并没有提供该 jar 包,所以必须要手动增加依赖包,否则会报如下谬误:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.
Available factory identifiers are:
datagen
mysql-cdc
应用 changelog-json 的留神点
如果要应用 Kafka 的 changelog-json Format,对于程序而言,须要增加如下依赖:
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-format-changelog-json</artifactId>
<version>1.0.0</version>
</dependency>
如果要应用 Flink SQL Client,须要增加如下 jar 包:flink-format-changelog-json-1.0.0.jar,将该 jar 包放在 Flink 装置目录的 lib 文件夹下即可。
mysql-cdc 的操作实际
创立 MySQL 数据源表
在创立 MySQL CDC 表之前,须要先创立 MySQL 的数据表,如下:
-- MySQL
/*Table structure for table `order_info` */
DROP TABLE IF EXISTS `order_info`;
CREATE TABLE `order_info` (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
`consignee` varchar(100) DEFAULT NULL COMMENT '收货人',
`consignee_tel` varchar(20) DEFAULT NULL COMMENT '收件人电话',
`total_amount` decimal(10,2) DEFAULT NULL COMMENT '总金额',
`order_status` varchar(20) DEFAULT NULL COMMENT '订单状态,1 示意下单,2 示意领取',
`user_id` bigint(20) DEFAULT NULL COMMENT '用户 id',
`payment_way` varchar(20) DEFAULT NULL COMMENT '付款形式',
`delivery_address` varchar(1000) DEFAULT NULL COMMENT '送货地址',
`order_comment` varchar(200) DEFAULT NULL COMMENT '订单备注',
`out_trade_no` varchar(50) DEFAULT NULL COMMENT '订单交易编号(第三方领取用)',
`trade_body` varchar(200) DEFAULT NULL COMMENT '订单形容(第三方领取用)',
`create_time` datetime DEFAULT NULL COMMENT '创立工夫',
`operate_time` datetime DEFAULT NULL COMMENT '操作工夫',
`expire_time` datetime DEFAULT NULL COMMENT '生效工夫',
`tracking_no` varchar(100) DEFAULT NULL COMMENT '物流单编号',
`parent_order_id` bigint(20) DEFAULT NULL COMMENT '父订单编号',
`img_url` varchar(200) DEFAULT NULL COMMENT '图片门路',
`province_id` int(20) DEFAULT NULL COMMENT '地区',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单表';
-- ----------------------------
-- Records of order_info
-- ----------------------------
INSERT INTO `order_info`
VALUES (476, 'lAXjcL', '13408115089', 433.00, '2', 10, '2', 'OYyAdSdLxedceqovndCD', 'ihjAYsSjrgJMQVdFQnSy', '8728720206', '','2020-06-18 02:21:38', NULL, NULL, NULL, NULL, NULL, 9);
INSERT INTO `order_info`
VALUES (477, 'QLiFDb', '13415139984', 772.00, '1', 90, '2', 'OizYrQbKuWvrvdfpkeSZ', 'wiBhhqhMndCCgXwmWVQq', '1679381473', '','2020-06-18 09:12:25', NULL, NULL, NULL, NULL, NULL, 3);
INSERT INTO `order_info`
VALUES (478, 'iwKjQD', '13320383859', 88.00, '1', 107, '1', 'cbXLKtNHWOcWzJVBWdAs', 'njjsnknHxsxhuCCeNDDi', '0937074290', '','2020-06-18 15:56:34', NULL, NULL, NULL, NULL, NULL, 7);
/*Table structure for table `order_detail` */
CREATE TABLE `order_detail` (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
`order_id` bigint(20) DEFAULT NULL COMMENT '订单编号',
`sku_id` bigint(20) DEFAULT NULL COMMENT 'sku_id',
`sku_name` varchar(200) DEFAULT NULL COMMENT 'sku 名称(冗余)',
`img_url` varchar(200) DEFAULT NULL COMMENT '图片名称(冗余)',
`order_price` decimal(10,2) DEFAULT NULL COMMENT '购买价格(下单时 sku 价格)',
`sku_num` varchar(200) DEFAULT NULL COMMENT '购买个数',
`create_time` datetime DEFAULT NULL COMMENT '创立工夫',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单明细表';
-- ----------------------------
-- Records of order_detail
-- ----------------------------
INSERT INTO `order_detail`
VALUES (1329, 476, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 挪动联通电信 4G 手机 双卡双待', 'http://XLMByOyZDTJQYxphQHNTgYAFzJJCKTmCbzvEJIpz', 8900.00, '3', '2020-06-18 02:21:38');
INSERT INTO `order_detail`
VALUES (1330, 477, 9, '光荣 10 GT 游戏减速 AIS 手持夜景 6GB+64GB 幻影蓝全网通 挪动联通电信', 'http://ixOCtlYmlxEEgUfPLiLdjMftzrleOEIBKSjrhMne', 2452.00, '4', '2020-06-18 09:12:25');
INSERT INTO `order_detail`
VALUES (1331, 478, 4, '小米 Play 流光突变 AI 双摄 4GB+64GB 梦幻蓝 全网通 4G 双卡双待 小水滴全面屏拍照游戏智能手机', 'http://RqfEFnAOqnqRnNZLFRvBuwXxwNBtptYJCILDKQYv', 1442.00, '1', '2020-06-18 15:56:34');
INSERT INTO `order_detail`
VALUES (1332, 478, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 挪动联通电信 4G 手机 双卡双待', 'http://IwhuCDlsiLenfKjPzbJrIoxswdfofKhJLMzlJAKV', 8900.00, '3', '2020-06-18 15:56:34');
INSERT INTO `order_detail`
VALUES (1333, 478, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 挪动联通电信 4G 手机 双卡双待', 'http://bbfwTbAzTWapywODzOtDJMJUEqNTeRTUQuCDkqXP', 8900.00, '1', '2020-06-18 15:56:34');
Flink SQL Cli 创立 CDC 数据源
启动 Flink 集群,再启动 SQL CLI, 执行上面命令:
-- 创立订单信息表
CREATE TABLE order_info(
id BIGINT,
user_id BIGINT,
create_time TIMESTAMP(0),
operate_time TIMESTAMP(0),
province_id INT,
order_status STRING,
total_amount DECIMAL(10, 5)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'kms-1',
'port' = '3306',
'username' = 'root',
'password' = '123qwe',
'database-name' = 'mydw',
'table-name' = 'order_info'
);
在 Flink SQL Cli 中查问该表的数据:result-mode: tableau,+ 示意数据的 insert
在 SQL CLI 中创立订单详情表:
CREATE TABLE order_detail(
id BIGINT,
order_id BIGINT,
sku_id BIGINT,
sku_name STRING,
sku_num BIGINT,
order_price DECIMAL(10, 5),
create_time TIMESTAMP(0)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'kms-1',
'port' = '3306',
'username' = 'root',
'password' = '123qwe',
'database-name' = 'mydw',
'table-name' = 'order_detail'
);
查问后果如下:
执行 JOIN 操作:
SELECT
od.id,
oi.id order_id,
oi.user_id,
oi.province_id,
od.sku_id,
od.sku_name,
od.sku_num,
od.order_price,
oi.create_time,
oi.operate_time
FROM
(
SELECT *
FROM order_info
WHERE
order_status = '2'-- 已领取
) oi
JOIN
(
SELECT *
FROM order_detail
) od
ON oi.id = od.order_id;
canal-json 的操作实际
对于 cannal 的应用形式,能够参考我的另一篇文章:基于 Canal 与 Flink 实现数据实时增量同步(一)。我曾经将上面的表通过 canal 同步到了 kafka,具体格局为:
{
"data":[
{
"id":"1",
"region_name":"华北"
},
{
"id":"2",
"region_name":"华东"
},
{
"id":"3",
"region_name":"西南"
},
{
"id":"4",
"region_name":"华中"
},
{
"id":"5",
"region_name":"华南"
},
{
"id":"6",
"region_name":"东北"
},
{
"id":"7",
"region_name":"东南"
}
],
"database":"mydw",
"es":1597128441000,
"id":102,
"isDdl":false,
"mysqlType":{"id":"varchar(20)",
"region_name":"varchar(20)"
},
"old":null,
"pkNames":null,
"sql":"","sqlType":{"id":12,"region_name":12},
"table":"base_region",
"ts":1597128441424,
"type":"INSERT"
}
在 SQL CLI 中创立该 canal-json 格局的表:
CREATE TABLE region (
id BIGINT,
region_name STRING
) WITH (
'connector' = 'kafka',
'topic' = 'mydw.base_region',
'properties.bootstrap.servers' = 'kms-3:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' ,
'scan.startup.mode' = 'earliest-offset'
);
查问后果如下:
changelog-json 的操作实际
创立 MySQL 数据源
参见下面的order_info
Flink SQL Cli 创立 changelog-json 表
CREATE TABLE order_gmv2kafka (
day_str STRING,
gmv DECIMAL(10, 5)
) WITH (
'connector' = 'kafka',
'topic' = 'order_gmv_kafka',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'kms-3:9092',
'format' = 'changelog-json'
);
INSERT INTO order_gmv2kafka
SELECT DATE_FORMAT(create_time, 'yyyy-MM-dd') as day_str, SUM(total_amount) as gmv
FROM order_info
WHERE order_status = '2' -- 订单已领取
GROUP BY DATE_FORMAT(create_time, 'yyyy-MM-dd');
查问表看一下后果:
再查一下 kafka 的数据:
{"data":{"day_str":"2020-06-18","gmv":433},"op":"+I"}
当将另外两个订单的状态 order_status 更新为 2 时,总金额 =443+772+88=1293
再察看数据:
再看 kafka 中的数据:
总结
本文基于 Flink1.11 的 SQL,对新增加的 CDC connector 的应用形式进行了论述。次要包含 MySQL CDC connector、canal-json 及 changelog-json 的 format,并指出了应用过程中的留神点。另外本文给出了残缺的应用示例,如果你有现成的环境,那么能够间接进行测试应用。
公众号『大数据技术与数仓』,回复『材料』支付大数据资料包