关于flink:Flink111中的CDC-Connectors操作实践

Flink1.11引入了CDC的connector,通过这种形式能够很不便地捕捉变动的数据,大大简化了数据处理的流程。Flink1.11的CDC connector次要包含:MySQL CDCPostgres CDC,同时对Kafka的Connector反对canal-jsondebezium-json以及changelog-json的format。本文次要分享以下内容:

  • CDC简介
  • Flink提供的 table format
  • 应用过程中的留神点
  • mysql-cdc的操作实际
  • canal-json的操作实际
  • changelog-json的操作实际

简介

Flink CDC Connector 是ApacheFlink的一组数据源连接器,应用变动数据捕捉change data capture (CDC))从不同的数据库中提取变更数据。Flink CDC连接器将Debezium集成为引擎来捕捉数据变更。因而,它能够充分利用Debezium的性能。

特点

  • 反对读取数据库快照,并且可能继续读取数据库的变更日志,即便产生故障,也反对exactly-once 的解决语义
  • 对于DataStream API的CDC connector,用户无需部署Debezium和Kafka,即可在单个作业中应用多个数据库和表上的变更数据。
  • 对于Table/SQL API 的CDC connector,用户能够应用SQL DDL创立CDC数据源,来监督单个表上的数据变更。

应用场景

  • 数据库之间的增量数据同步
  • 审计日志
  • 数据库之上的实时物化视图
  • 基于CDC的维表join

Flink提供的 table format

Flink提供了一系列能够用于table connector的table format,具体如下:

Formats Supported Connectors
CSV Apache Kafka, Filesystem
JSON Apache Kafka, Filesystem, Elasticsearch
Apache Avro Apache Kafka, Filesystem
Debezium CDC Apache Kafka
Canal CDC Apache Kafka
Apache Parquet Filesystem
Apache ORC Filesystem

应用过程中的留神点

应用MySQL CDC的留神点

如果要应用MySQL CDC connector,对于程序而言,须要增加如下依赖:

<dependency>
  <groupId>com.alibaba.ververica</groupId>
  <artifactId>flink-connector-mysql-cdc</artifactId>
  <version>1.0.0</version>
</dependency>

如果要应用Flink SQL Client,须要增加如下jar包:flink-sql-connector-mysql-cdc-1.0.0.jar,将该jar包放在Flink装置目录的lib文件夹下即可。

应用canal-json的留神点

如果要应用Kafka的canal-json,对于程序而言,须要增加如下依赖:

<!-- universal -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.11.0</version>
</dependency>

如果要应用Flink SQL Client,须要增加如下jar包:flink-sql-connector-kafka_2.11-1.11.0.jar,将该jar包放在Flink装置目录的lib文件夹下即可。因为Flink1.11的安装包 的lib目录下并没有提供该jar包,所以必须要手动增加依赖包,否则会报如下谬误:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.

Available factory identifiers are:

datagen
mysql-cdc

应用changelog-json的留神点

如果要应用Kafka的changelog-json Format,对于程序而言,须要增加如下依赖:

<dependency>
  <groupId>com.alibaba.ververica</groupId>
  <artifactId>flink-format-changelog-json</artifactId>
  <version>1.0.0</version>
</dependency>

如果要应用Flink SQL Client,须要增加如下jar包:flink-format-changelog-json-1.0.0.jar,将该jar包放在Flink装置目录的lib文件夹下即可。

mysql-cdc的操作实际

创立MySQL数据源表

在创立MySQL CDC表之前,须要先创立MySQL的数据表,如下:

-- MySQL
/*Table structure for table `order_info` */
DROP TABLE IF EXISTS `order_info`;
CREATE TABLE `order_info` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
  `consignee` varchar(100) DEFAULT NULL COMMENT '收货人',
  `consignee_tel` varchar(20) DEFAULT NULL COMMENT '收件人电话',
  `total_amount` decimal(10,2) DEFAULT NULL COMMENT '总金额',
  `order_status` varchar(20) DEFAULT NULL COMMENT '订单状态,1示意下单,2示意领取',
  `user_id` bigint(20) DEFAULT NULL COMMENT '用户id',
  `payment_way` varchar(20) DEFAULT NULL COMMENT '付款形式',
  `delivery_address` varchar(1000) DEFAULT NULL COMMENT '送货地址',
  `order_comment` varchar(200) DEFAULT NULL COMMENT '订单备注',
  `out_trade_no` varchar(50) DEFAULT NULL COMMENT '订单交易编号(第三方领取用)',
  `trade_body` varchar(200) DEFAULT NULL COMMENT '订单形容(第三方领取用)',
  `create_time` datetime DEFAULT NULL COMMENT '创立工夫',
  `operate_time` datetime DEFAULT NULL COMMENT '操作工夫',
  `expire_time` datetime DEFAULT NULL COMMENT '生效工夫',
  `tracking_no` varchar(100) DEFAULT NULL COMMENT '物流单编号',
  `parent_order_id` bigint(20) DEFAULT NULL COMMENT '父订单编号',
  `img_url` varchar(200) DEFAULT NULL COMMENT '图片门路',
  `province_id` int(20) DEFAULT NULL COMMENT '地区',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单表';
-- ----------------------------
-- Records of order_info
-- ----------------------------
INSERT INTO `order_info` 
VALUES (476, 'lAXjcL', '13408115089', 433.00, '2', 10, '2', 'OYyAdSdLxedceqovndCD', 'ihjAYsSjrgJMQVdFQnSy', '8728720206', '', '2020-06-18 02:21:38', NULL, NULL, NULL, NULL, NULL, 9);
INSERT INTO `order_info`
VALUES (477, 'QLiFDb', '13415139984', 772.00, '1', 90, '2', 'OizYrQbKuWvrvdfpkeSZ', 'wiBhhqhMndCCgXwmWVQq', '1679381473', '', '2020-06-18 09:12:25', NULL, NULL, NULL, NULL, NULL, 3);
INSERT INTO `order_info`
VALUES (478, 'iwKjQD', '13320383859', 88.00, '1', 107, '1', 'cbXLKtNHWOcWzJVBWdAs', 'njjsnknHxsxhuCCeNDDi', '0937074290', '', '2020-06-18 15:56:34', NULL, NULL, NULL, NULL, NULL, 7);

/*Table structure for table `order_detail` */
CREATE TABLE `order_detail` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
  `order_id` bigint(20) DEFAULT NULL COMMENT '订单编号',
  `sku_id` bigint(20) DEFAULT NULL COMMENT 'sku_id',
  `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称(冗余)',
  `img_url` varchar(200) DEFAULT NULL COMMENT '图片名称(冗余)',
  `order_price` decimal(10,2) DEFAULT NULL COMMENT '购买价格(下单时sku价格)',
  `sku_num` varchar(200) DEFAULT NULL COMMENT '购买个数',
  `create_time` datetime DEFAULT NULL COMMENT '创立工夫',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单明细表';

-- ----------------------------
-- Records of order_detail
-- ----------------------------
INSERT INTO `order_detail` 
VALUES (1329, 476, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 挪动联通电信4G手机 双卡双待', 'http://XLMByOyZDTJQYxphQHNTgYAFzJJCKTmCbzvEJIpz', 8900.00, '3', '2020-06-18 02:21:38');
INSERT INTO `order_detail` 
VALUES (1330, 477, 9, '光荣10 GT游戏减速 AIS手持夜景 6GB+64GB 幻影蓝全网通 挪动联通电信', 'http://ixOCtlYmlxEEgUfPLiLdjMftzrleOEIBKSjrhMne', 2452.00, '4', '2020-06-18 09:12:25');
INSERT INTO `order_detail`
VALUES (1331, 478, 4, '小米Play 流光突变AI双摄 4GB+64GB 梦幻蓝 全网通4G 双卡双待 小水滴全面屏拍照游戏智能手机', 'http://RqfEFnAOqnqRnNZLFRvBuwXxwNBtptYJCILDKQYv', 1442.00, '1', '2020-06-18 15:56:34');
INSERT INTO `order_detail` 
VALUES (1332, 478, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 挪动联通电信4G手机 双卡双待', 'http://IwhuCDlsiLenfKjPzbJrIoxswdfofKhJLMzlJAKV', 8900.00, '3', '2020-06-18 15:56:34');
INSERT INTO `order_detail` 
VALUES (1333, 478, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 挪动联通电信4G手机 双卡双待', 'http://bbfwTbAzTWapywODzOtDJMJUEqNTeRTUQuCDkqXP', 8900.00, '1', '2020-06-18 15:56:34');

Flink SQL Cli创立CDC数据源

启动 Flink 集群,再启动 SQL CLI,执行上面命令:

-- 创立订单信息表
CREATE TABLE order_info(
    id BIGINT,
    user_id BIGINT,
    create_time TIMESTAMP(0),
    operate_time TIMESTAMP(0),
    province_id INT,
    order_status STRING,
    total_amount DECIMAL(10, 5)
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'kms-1',
    'port' = '3306',
    'username' = 'root',
    'password' = '123qwe',
    'database-name' = 'mydw',
    'table-name' = 'order_info'
);

在Flink SQL Cli中查问该表的数据:result-mode: tableau,+示意数据的insert

在SQL CLI中创立订单详情表:

CREATE TABLE order_detail(
    id BIGINT,
    order_id BIGINT,
    sku_id BIGINT,
    sku_name STRING,
    sku_num BIGINT,
    order_price DECIMAL(10, 5),
    create_time TIMESTAMP(0)
 ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'kms-1',
    'port' = '3306',
    'username' = 'root',
    'password' = '123qwe',
    'database-name' = 'mydw',
    'table-name' = 'order_detail'
);

查问后果如下:

执行JOIN操作:

SELECT
    od.id,
    oi.id order_id,
    oi.user_id,
    oi.province_id,
    od.sku_id,
    od.sku_name,
    od.sku_num,
    od.order_price,
    oi.create_time,
    oi.operate_time
FROM
   (
    SELECT * 
    FROM order_info
    WHERE 
         order_status = '2'-- 已领取
   ) oi
   JOIN
  (
    SELECT *
    FROM order_detail
  ) od 
  ON oi.id = od.order_id;

canal-json的操作实际

对于cannal的应用形式,能够参考我的另一篇文章:基于Canal与Flink实现数据实时增量同步(一)。我曾经将上面的表通过canal同步到了kafka,具体格局为:

{
    "data":[
        {
            "id":"1",
            "region_name":"华北"
        },
        {
            "id":"2",
            "region_name":"华东"
        },
        {
            "id":"3",
            "region_name":"西南"
        },
        {
            "id":"4",
            "region_name":"华中"
        },
        {
            "id":"5",
            "region_name":"华南"
        },
        {
            "id":"6",
            "region_name":"东北"
        },
        {
            "id":"7",
            "region_name":"东南"
        }
    ],
    "database":"mydw",
    "es":1597128441000,
    "id":102,
    "isDdl":false,
    "mysqlType":{
        "id":"varchar(20)",
        "region_name":"varchar(20)"
    },
    "old":null,
    "pkNames":null,
    "sql":"",
    "sqlType":{
        "id":12,
        "region_name":12
    },
    "table":"base_region",
    "ts":1597128441424,
    "type":"INSERT"
}

在SQL CLI中创立该canal-json格局的表:

CREATE TABLE region (
  id BIGINT,
  region_name STRING
) WITH (
 'connector' = 'kafka',
 'topic' = 'mydw.base_region',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
);

查问后果如下:

changelog-json的操作实际

创立MySQL数据源

参见下面的order_info

Flink SQL Cli创立changelog-json表

CREATE TABLE order_gmv2kafka (
  day_str STRING,
  gmv DECIMAL(10, 5)
) WITH (
    'connector' = 'kafka',
    'topic' = 'order_gmv_kafka',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);

INSERT INTO order_gmv2kafka
SELECT DATE_FORMAT(create_time, 'yyyy-MM-dd') as day_str, SUM(total_amount) as gmv
FROM order_info
WHERE order_status = '2' -- 订单已领取
GROUP BY DATE_FORMAT(create_time, 'yyyy-MM-dd'); 

查问表看一下后果:

再查一下kafka的数据:

{"data":{"day_str":"2020-06-18","gmv":433},"op":"+I"}

当将另外两个订单的状态order_status更新为2时,总金额=443+772+88=1293再察看数据:

再看kafka中的数据:

总结

本文基于Flink1.11的SQL,对新增加的CDC connector的应用形式进行了论述。次要包含MySQL CDC connector、canal-json及changelog-json的format,并指出了应用过程中的留神点。另外本文给出了残缺的应用示例,如果你有现成的环境,那么能够间接进行测试应用。

公众号『大数据技术与数仓』,回复『材料』支付大数据资料包

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理