乐趣区

关于flink:Flink111中的CDC-Connectors操作实践

Flink1.11 引入了 CDC 的 connector,通过这种形式能够很不便地捕捉变动的数据,大大简化了数据处理的流程。Flink1.11 的 CDC connector 次要包含:MySQL CDCPostgres CDC, 同时对 Kafka 的Connector 反对 canal-jsondebezium-json以及 changelog-json 的 format。本文次要分享以下内容:

  • CDC 简介
  • Flink 提供的 table format
  • 应用过程中的留神点
  • mysql-cdc 的操作实际
  • canal-json 的操作实际
  • changelog-json 的操作实际

简介

Flink CDC Connector 是 ApacheFlink 的一组数据源连接器,应用 变动数据捕捉 change data capture (CDC))从不同的数据库中提取变更数据。Flink CDC 连接器将 Debezium 集成为引擎来捕捉数据变更。因而,它能够充分利用 Debezium 的性能。

特点

  • 反对读取数据库快照,并且可能继续读取数据库的变更日志,即便产生故障,也反对exactly-once 的解决语义
  • 对于 DataStream API 的 CDC connector,用户无需部署 Debezium 和 Kafka,即可在单个作业中应用多个数据库和表上的变更数据。
  • 对于 Table/SQL API 的 CDC connector,用户能够应用 SQL DDL 创立 CDC 数据源,来监督单个表上的数据变更。

应用场景

  • 数据库之间的增量数据同步
  • 审计日志
  • 数据库之上的实时物化视图
  • 基于 CDC 的维表 join

Flink 提供的 table format

Flink 提供了一系列能够用于 table connector 的 table format,具体如下:

Formats Supported Connectors
CSV Apache Kafka, Filesystem
JSON Apache Kafka, Filesystem, Elasticsearch
Apache Avro Apache Kafka, Filesystem
Debezium CDC Apache Kafka
Canal CDC Apache Kafka
Apache Parquet Filesystem
Apache ORC Filesystem

应用过程中的留神点

应用 MySQL CDC 的留神点

如果要应用 MySQL CDC connector,对于程序而言,须要增加如下依赖:

<dependency>
  <groupId>com.alibaba.ververica</groupId>
  <artifactId>flink-connector-mysql-cdc</artifactId>
  <version>1.0.0</version>
</dependency>

如果要应用 Flink SQL Client,须要增加如下 jar 包:flink-sql-connector-mysql-cdc-1.0.0.jar,将该 jar 包放在 Flink 装置目录的 lib 文件夹下即可。

应用 canal-json 的留神点

如果要应用 Kafka 的 canal-json,对于程序而言,须要增加如下依赖:

<!-- universal -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.11.0</version>
</dependency>

如果要应用 Flink SQL Client,须要增加如下 jar 包:flink-sql-connector-kafka_2.11-1.11.0.jar,将该 jar 包放在 Flink 装置目录的 lib 文件夹下即可。因为 Flink1.11 的安装包 的 lib 目录下并没有提供该 jar 包,所以必须要手动增加依赖包,否则会报如下谬误:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.

Available factory identifiers are:

datagen
mysql-cdc

应用 changelog-json 的留神点

如果要应用 Kafka 的 changelog-json Format,对于程序而言,须要增加如下依赖:

<dependency>
  <groupId>com.alibaba.ververica</groupId>
  <artifactId>flink-format-changelog-json</artifactId>
  <version>1.0.0</version>
</dependency>

如果要应用 Flink SQL Client,须要增加如下 jar 包:flink-format-changelog-json-1.0.0.jar,将该 jar 包放在 Flink 装置目录的 lib 文件夹下即可。

mysql-cdc 的操作实际

创立 MySQL 数据源表

在创立 MySQL CDC 表之前,须要先创立 MySQL 的数据表,如下:

-- MySQL
/*Table structure for table `order_info` */
DROP TABLE IF EXISTS `order_info`;
CREATE TABLE `order_info` (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
  `consignee` varchar(100) DEFAULT NULL COMMENT '收货人',
  `consignee_tel` varchar(20) DEFAULT NULL COMMENT '收件人电话',
  `total_amount` decimal(10,2) DEFAULT NULL COMMENT '总金额',
  `order_status` varchar(20) DEFAULT NULL COMMENT '订单状态,1 示意下单,2 示意领取',
  `user_id` bigint(20) DEFAULT NULL COMMENT '用户 id',
  `payment_way` varchar(20) DEFAULT NULL COMMENT '付款形式',
  `delivery_address` varchar(1000) DEFAULT NULL COMMENT '送货地址',
  `order_comment` varchar(200) DEFAULT NULL COMMENT '订单备注',
  `out_trade_no` varchar(50) DEFAULT NULL COMMENT '订单交易编号(第三方领取用)',
  `trade_body` varchar(200) DEFAULT NULL COMMENT '订单形容(第三方领取用)',
  `create_time` datetime DEFAULT NULL COMMENT '创立工夫',
  `operate_time` datetime DEFAULT NULL COMMENT '操作工夫',
  `expire_time` datetime DEFAULT NULL COMMENT '生效工夫',
  `tracking_no` varchar(100) DEFAULT NULL COMMENT '物流单编号',
  `parent_order_id` bigint(20) DEFAULT NULL COMMENT '父订单编号',
  `img_url` varchar(200) DEFAULT NULL COMMENT '图片门路',
  `province_id` int(20) DEFAULT NULL COMMENT '地区',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单表';
-- ----------------------------
-- Records of order_info
-- ----------------------------
INSERT INTO `order_info` 
VALUES (476, 'lAXjcL', '13408115089', 433.00, '2', 10, '2', 'OYyAdSdLxedceqovndCD', 'ihjAYsSjrgJMQVdFQnSy', '8728720206', '','2020-06-18 02:21:38', NULL, NULL, NULL, NULL, NULL, 9);
INSERT INTO `order_info`
VALUES (477, 'QLiFDb', '13415139984', 772.00, '1', 90, '2', 'OizYrQbKuWvrvdfpkeSZ', 'wiBhhqhMndCCgXwmWVQq', '1679381473', '','2020-06-18 09:12:25', NULL, NULL, NULL, NULL, NULL, 3);
INSERT INTO `order_info`
VALUES (478, 'iwKjQD', '13320383859', 88.00, '1', 107, '1', 'cbXLKtNHWOcWzJVBWdAs', 'njjsnknHxsxhuCCeNDDi', '0937074290', '','2020-06-18 15:56:34', NULL, NULL, NULL, NULL, NULL, 7);

/*Table structure for table `order_detail` */
CREATE TABLE `order_detail` (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
  `order_id` bigint(20) DEFAULT NULL COMMENT '订单编号',
  `sku_id` bigint(20) DEFAULT NULL COMMENT 'sku_id',
  `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku 名称(冗余)',
  `img_url` varchar(200) DEFAULT NULL COMMENT '图片名称(冗余)',
  `order_price` decimal(10,2) DEFAULT NULL COMMENT '购买价格(下单时 sku 价格)',
  `sku_num` varchar(200) DEFAULT NULL COMMENT '购买个数',
  `create_time` datetime DEFAULT NULL COMMENT '创立工夫',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单明细表';

-- ----------------------------
-- Records of order_detail
-- ----------------------------
INSERT INTO `order_detail` 
VALUES (1329, 476, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 挪动联通电信 4G 手机 双卡双待', 'http://XLMByOyZDTJQYxphQHNTgYAFzJJCKTmCbzvEJIpz', 8900.00, '3', '2020-06-18 02:21:38');
INSERT INTO `order_detail` 
VALUES (1330, 477, 9, '光荣 10 GT 游戏减速 AIS 手持夜景 6GB+64GB 幻影蓝全网通 挪动联通电信', 'http://ixOCtlYmlxEEgUfPLiLdjMftzrleOEIBKSjrhMne', 2452.00, '4', '2020-06-18 09:12:25');
INSERT INTO `order_detail`
VALUES (1331, 478, 4, '小米 Play 流光突变 AI 双摄 4GB+64GB 梦幻蓝 全网通 4G 双卡双待 小水滴全面屏拍照游戏智能手机', 'http://RqfEFnAOqnqRnNZLFRvBuwXxwNBtptYJCILDKQYv', 1442.00, '1', '2020-06-18 15:56:34');
INSERT INTO `order_detail` 
VALUES (1332, 478, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 挪动联通电信 4G 手机 双卡双待', 'http://IwhuCDlsiLenfKjPzbJrIoxswdfofKhJLMzlJAKV', 8900.00, '3', '2020-06-18 15:56:34');
INSERT INTO `order_detail` 
VALUES (1333, 478, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 挪动联通电信 4G 手机 双卡双待', 'http://bbfwTbAzTWapywODzOtDJMJUEqNTeRTUQuCDkqXP', 8900.00, '1', '2020-06-18 15:56:34');

Flink SQL Cli 创立 CDC 数据源

启动 Flink 集群,再启动 SQL CLI, 执行上面命令:

-- 创立订单信息表
CREATE TABLE order_info(
    id BIGINT,
    user_id BIGINT,
    create_time TIMESTAMP(0),
    operate_time TIMESTAMP(0),
    province_id INT,
    order_status STRING,
    total_amount DECIMAL(10, 5)
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'kms-1',
    'port' = '3306',
    'username' = 'root',
    'password' = '123qwe',
    'database-name' = 'mydw',
    'table-name' = 'order_info'
);

在 Flink SQL Cli 中查问该表的数据:result-mode: tableau,+ 示意数据的 insert

在 SQL CLI 中创立订单详情表:

CREATE TABLE order_detail(
    id BIGINT,
    order_id BIGINT,
    sku_id BIGINT,
    sku_name STRING,
    sku_num BIGINT,
    order_price DECIMAL(10, 5),
    create_time TIMESTAMP(0)
 ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'kms-1',
    'port' = '3306',
    'username' = 'root',
    'password' = '123qwe',
    'database-name' = 'mydw',
    'table-name' = 'order_detail'
);

查问后果如下:

执行 JOIN 操作:

SELECT
    od.id,
    oi.id order_id,
    oi.user_id,
    oi.province_id,
    od.sku_id,
    od.sku_name,
    od.sku_num,
    od.order_price,
    oi.create_time,
    oi.operate_time
FROM
   (
    SELECT * 
    FROM order_info
    WHERE 
         order_status = '2'-- 已领取
   ) oi
   JOIN
  (
    SELECT *
    FROM order_detail
  ) od 
  ON oi.id = od.order_id;

canal-json 的操作实际

对于 cannal 的应用形式,能够参考我的另一篇文章:基于 Canal 与 Flink 实现数据实时增量同步(一)。我曾经将上面的表通过 canal 同步到了 kafka,具体格局为:

{
    "data":[
        {
            "id":"1",
            "region_name":"华北"
        },
        {
            "id":"2",
            "region_name":"华东"
        },
        {
            "id":"3",
            "region_name":"西南"
        },
        {
            "id":"4",
            "region_name":"华中"
        },
        {
            "id":"5",
            "region_name":"华南"
        },
        {
            "id":"6",
            "region_name":"东北"
        },
        {
            "id":"7",
            "region_name":"东南"
        }
    ],
    "database":"mydw",
    "es":1597128441000,
    "id":102,
    "isDdl":false,
    "mysqlType":{"id":"varchar(20)",
        "region_name":"varchar(20)"
    },
    "old":null,
    "pkNames":null,
    "sql":"","sqlType":{"id":12,"region_name":12},
    "table":"base_region",
    "ts":1597128441424,
    "type":"INSERT"
}

在 SQL CLI 中创立该 canal-json 格局的表:

CREATE TABLE region (
  id BIGINT,
  region_name STRING
) WITH (
 'connector' = 'kafka',
 'topic' = 'mydw.base_region',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
);

查问后果如下:

changelog-json 的操作实际

创立 MySQL 数据源

参见下面的order_info

Flink SQL Cli 创立 changelog-json 表

CREATE TABLE order_gmv2kafka (
  day_str STRING,
  gmv DECIMAL(10, 5)
) WITH (
    'connector' = 'kafka',
    'topic' = 'order_gmv_kafka',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);

INSERT INTO order_gmv2kafka
SELECT DATE_FORMAT(create_time, 'yyyy-MM-dd') as day_str, SUM(total_amount) as gmv
FROM order_info
WHERE order_status = '2' -- 订单已领取
GROUP BY DATE_FORMAT(create_time, 'yyyy-MM-dd'); 

查问表看一下后果:

再查一下 kafka 的数据:

{"data":{"day_str":"2020-06-18","gmv":433},"op":"+I"}

当将另外两个订单的状态 order_status 更新为 2 时,总金额 =443+772+88=1293再察看数据:

再看 kafka 中的数据:

总结

本文基于 Flink1.11 的 SQL,对新增加的 CDC connector 的应用形式进行了论述。次要包含 MySQL CDC connector、canal-json 及 changelog-json 的 format,并指出了应用过程中的留神点。另外本文给出了残缺的应用示例,如果你有现成的环境,那么能够间接进行测试应用。

公众号『大数据技术与数仓』,回复『材料』支付大数据资料包

退出移动版