Flink1.11引入了CDC的connector,通过这种形式能够很不便地捕捉变动的数据,大大简化了数据处理的流程。Flink1.11的CDC connector次要包含:MySQL CDCPostgres CDC,同时对Kafka的Connector反对canal-jsondebezium-json以及changelog-json的format。本文次要分享以下内容:

  • CDC简介
  • Flink提供的 table format
  • 应用过程中的留神点
  • mysql-cdc的操作实际
  • canal-json的操作实际
  • changelog-json的操作实际

简介

Flink CDC Connector 是ApacheFlink的一组数据源连接器,应用变动数据捕捉change data capture (CDC))从不同的数据库中提取变更数据。Flink CDC连接器将Debezium集成为引擎来捕捉数据变更。因而,它能够充分利用Debezium的性能。

特点

  • 反对读取数据库快照,并且可能继续读取数据库的变更日志,即便产生故障,也反对exactly-once 的解决语义
  • 对于DataStream API的CDC connector,用户无需部署Debezium和Kafka,即可在单个作业中应用多个数据库和表上的变更数据。
  • 对于Table/SQL API 的CDC connector,用户能够应用SQL DDL创立CDC数据源,来监督单个表上的数据变更。

应用场景

  • 数据库之间的增量数据同步
  • 审计日志
  • 数据库之上的实时物化视图
  • 基于CDC的维表join

Flink提供的 table format

Flink提供了一系列能够用于table connector的table format,具体如下:

FormatsSupported Connectors
CSVApache Kafka, Filesystem
JSONApache Kafka, Filesystem, Elasticsearch
Apache AvroApache Kafka, Filesystem
Debezium CDCApache Kafka
Canal CDCApache Kafka
Apache ParquetFilesystem
Apache ORCFilesystem

应用过程中的留神点

应用MySQL CDC的留神点

如果要应用MySQL CDC connector,对于程序而言,须要增加如下依赖:

<dependency>  <groupId>com.alibaba.ververica</groupId>  <artifactId>flink-connector-mysql-cdc</artifactId>  <version>1.0.0</version></dependency>

如果要应用Flink SQL Client,须要增加如下jar包:flink-sql-connector-mysql-cdc-1.0.0.jar,将该jar包放在Flink装置目录的lib文件夹下即可。

应用canal-json的留神点

如果要应用Kafka的canal-json,对于程序而言,须要增加如下依赖:

<!-- universal --><dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-connector-kafka_2.11</artifactId>    <version>1.11.0</version></dependency>

如果要应用Flink SQL Client,须要增加如下jar包:flink-sql-connector-kafka_2.11-1.11.0.jar,将该jar包放在Flink装置目录的lib文件夹下即可。因为Flink1.11的安装包 的lib目录下并没有提供该jar包,所以必须要手动增加依赖包,否则会报如下谬误:

[ERROR] Could not execute SQL statement. Reason:org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.Available factory identifiers are:datagenmysql-cdc

应用changelog-json的留神点

如果要应用Kafka的changelog-json Format,对于程序而言,须要增加如下依赖:

<dependency>  <groupId>com.alibaba.ververica</groupId>  <artifactId>flink-format-changelog-json</artifactId>  <version>1.0.0</version></dependency>

如果要应用Flink SQL Client,须要增加如下jar包:flink-format-changelog-json-1.0.0.jar,将该jar包放在Flink装置目录的lib文件夹下即可。

mysql-cdc的操作实际

创立MySQL数据源表

在创立MySQL CDC表之前,须要先创立MySQL的数据表,如下:

-- MySQL/*Table structure for table `order_info` */DROP TABLE IF EXISTS `order_info`;CREATE TABLE `order_info` (  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',  `consignee` varchar(100) DEFAULT NULL COMMENT '收货人',  `consignee_tel` varchar(20) DEFAULT NULL COMMENT '收件人电话',  `total_amount` decimal(10,2) DEFAULT NULL COMMENT '总金额',  `order_status` varchar(20) DEFAULT NULL COMMENT '订单状态,1示意下单,2示意领取',  `user_id` bigint(20) DEFAULT NULL COMMENT '用户id',  `payment_way` varchar(20) DEFAULT NULL COMMENT '付款形式',  `delivery_address` varchar(1000) DEFAULT NULL COMMENT '送货地址',  `order_comment` varchar(200) DEFAULT NULL COMMENT '订单备注',  `out_trade_no` varchar(50) DEFAULT NULL COMMENT '订单交易编号(第三方领取用)',  `trade_body` varchar(200) DEFAULT NULL COMMENT '订单形容(第三方领取用)',  `create_time` datetime DEFAULT NULL COMMENT '创立工夫',  `operate_time` datetime DEFAULT NULL COMMENT '操作工夫',  `expire_time` datetime DEFAULT NULL COMMENT '生效工夫',  `tracking_no` varchar(100) DEFAULT NULL COMMENT '物流单编号',  `parent_order_id` bigint(20) DEFAULT NULL COMMENT '父订单编号',  `img_url` varchar(200) DEFAULT NULL COMMENT '图片门路',  `province_id` int(20) DEFAULT NULL COMMENT '地区',  PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单表';-- ------------------------------ Records of order_info-- ----------------------------INSERT INTO `order_info` VALUES (476, 'lAXjcL', '13408115089', 433.00, '2', 10, '2', 'OYyAdSdLxedceqovndCD', 'ihjAYsSjrgJMQVdFQnSy', '8728720206', '', '2020-06-18 02:21:38', NULL, NULL, NULL, NULL, NULL, 9);INSERT INTO `order_info`VALUES (477, 'QLiFDb', '13415139984', 772.00, '1', 90, '2', 'OizYrQbKuWvrvdfpkeSZ', 'wiBhhqhMndCCgXwmWVQq', '1679381473', '', '2020-06-18 09:12:25', NULL, NULL, NULL, NULL, NULL, 3);INSERT INTO `order_info`VALUES (478, 'iwKjQD', '13320383859', 88.00, '1', 107, '1', 'cbXLKtNHWOcWzJVBWdAs', 'njjsnknHxsxhuCCeNDDi', '0937074290', '', '2020-06-18 15:56:34', NULL, NULL, NULL, NULL, NULL, 7);/*Table structure for table `order_detail` */CREATE TABLE `order_detail` (  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',  `order_id` bigint(20) DEFAULT NULL COMMENT '订单编号',  `sku_id` bigint(20) DEFAULT NULL COMMENT 'sku_id',  `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称(冗余)',  `img_url` varchar(200) DEFAULT NULL COMMENT '图片名称(冗余)',  `order_price` decimal(10,2) DEFAULT NULL COMMENT '购买价格(下单时sku价格)',  `sku_num` varchar(200) DEFAULT NULL COMMENT '购买个数',  `create_time` datetime DEFAULT NULL COMMENT '创立工夫',  PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单明细表';-- ------------------------------ Records of order_detail-- ----------------------------INSERT INTO `order_detail` VALUES (1329, 476, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 挪动联通电信4G手机 双卡双待', 'http://XLMByOyZDTJQYxphQHNTgYAFzJJCKTmCbzvEJIpz', 8900.00, '3', '2020-06-18 02:21:38');INSERT INTO `order_detail` VALUES (1330, 477, 9, '光荣10 GT游戏减速 AIS手持夜景 6GB+64GB 幻影蓝全网通 挪动联通电信', 'http://ixOCtlYmlxEEgUfPLiLdjMftzrleOEIBKSjrhMne', 2452.00, '4', '2020-06-18 09:12:25');INSERT INTO `order_detail`VALUES (1331, 478, 4, '小米Play 流光突变AI双摄 4GB+64GB 梦幻蓝 全网通4G 双卡双待 小水滴全面屏拍照游戏智能手机', 'http://RqfEFnAOqnqRnNZLFRvBuwXxwNBtptYJCILDKQYv', 1442.00, '1', '2020-06-18 15:56:34');INSERT INTO `order_detail` VALUES (1332, 478, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 挪动联通电信4G手机 双卡双待', 'http://IwhuCDlsiLenfKjPzbJrIoxswdfofKhJLMzlJAKV', 8900.00, '3', '2020-06-18 15:56:34');INSERT INTO `order_detail` VALUES (1333, 478, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 挪动联通电信4G手机 双卡双待', 'http://bbfwTbAzTWapywODzOtDJMJUEqNTeRTUQuCDkqXP', 8900.00, '1', '2020-06-18 15:56:34');

Flink SQL Cli创立CDC数据源

启动 Flink 集群,再启动 SQL CLI,执行上面命令:

-- 创立订单信息表CREATE TABLE order_info(    id BIGINT,    user_id BIGINT,    create_time TIMESTAMP(0),    operate_time TIMESTAMP(0),    province_id INT,    order_status STRING,    total_amount DECIMAL(10, 5)  ) WITH (    'connector' = 'mysql-cdc',    'hostname' = 'kms-1',    'port' = '3306',    'username' = 'root',    'password' = '123qwe',    'database-name' = 'mydw',    'table-name' = 'order_info');

在Flink SQL Cli中查问该表的数据:result-mode: tableau,+示意数据的insert

在SQL CLI中创立订单详情表:

CREATE TABLE order_detail(    id BIGINT,    order_id BIGINT,    sku_id BIGINT,    sku_name STRING,    sku_num BIGINT,    order_price DECIMAL(10, 5),    create_time TIMESTAMP(0) ) WITH (    'connector' = 'mysql-cdc',    'hostname' = 'kms-1',    'port' = '3306',    'username' = 'root',    'password' = '123qwe',    'database-name' = 'mydw',    'table-name' = 'order_detail');

查问后果如下:

执行JOIN操作:

SELECT    od.id,    oi.id order_id,    oi.user_id,    oi.province_id,    od.sku_id,    od.sku_name,    od.sku_num,    od.order_price,    oi.create_time,    oi.operate_timeFROM   (    SELECT *     FROM order_info    WHERE          order_status = '2'-- 已领取   ) oi   JOIN  (    SELECT *    FROM order_detail  ) od   ON oi.id = od.order_id;

canal-json的操作实际

对于cannal的应用形式,能够参考我的另一篇文章:基于Canal与Flink实现数据实时增量同步(一)。我曾经将上面的表通过canal同步到了kafka,具体格局为:

{    "data":[        {            "id":"1",            "region_name":"华北"        },        {            "id":"2",            "region_name":"华东"        },        {            "id":"3",            "region_name":"西南"        },        {            "id":"4",            "region_name":"华中"        },        {            "id":"5",            "region_name":"华南"        },        {            "id":"6",            "region_name":"东北"        },        {            "id":"7",            "region_name":"东南"        }    ],    "database":"mydw",    "es":1597128441000,    "id":102,    "isDdl":false,    "mysqlType":{        "id":"varchar(20)",        "region_name":"varchar(20)"    },    "old":null,    "pkNames":null,    "sql":"",    "sqlType":{        "id":12,        "region_name":12    },    "table":"base_region",    "ts":1597128441424,    "type":"INSERT"}

在SQL CLI中创立该canal-json格局的表:

CREATE TABLE region (  id BIGINT,  region_name STRING) WITH ( 'connector' = 'kafka', 'topic' = 'mydw.base_region', 'properties.bootstrap.servers' = 'kms-3:9092', 'properties.group.id' = 'testGroup', 'format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' );

查问后果如下:

changelog-json的操作实际

创立MySQL数据源

参见下面的order_info

Flink SQL Cli创立changelog-json表

CREATE TABLE order_gmv2kafka (  day_str STRING,  gmv DECIMAL(10, 5)) WITH (    'connector' = 'kafka',    'topic' = 'order_gmv_kafka',    'scan.startup.mode' = 'earliest-offset',    'properties.bootstrap.servers' = 'kms-3:9092',    'format' = 'changelog-json');INSERT INTO order_gmv2kafkaSELECT DATE_FORMAT(create_time, 'yyyy-MM-dd') as day_str, SUM(total_amount) as gmvFROM order_infoWHERE order_status = '2' -- 订单已领取GROUP BY DATE_FORMAT(create_time, 'yyyy-MM-dd'); 

查问表看一下后果:

再查一下kafka的数据:

{"data":{"day_str":"2020-06-18","gmv":433},"op":"+I"}

当将另外两个订单的状态order_status更新为2时,总金额=443+772+88=1293再察看数据:

再看kafka中的数据:

总结

本文基于Flink1.11的SQL,对新增加的CDC connector的应用形式进行了论述。次要包含MySQL CDC connector、canal-json及changelog-json的format,并指出了应用过程中的留神点。另外本文给出了残缺的应用示例,如果你有现成的环境,那么能够间接进行测试应用。

公众号『大数据技术与数仓』,回复『材料』支付大数据资料包