共计 6939 个字符,预计需要花费 18 分钟才能阅读完成。
Flink 1.11 最重要的 Feature —— Hive Streaming 之前曾经和大家分享过了,明天就和大家来聊一聊另一个特地重要的性能 —— CDC。
CDC 概述
何为 CDC?Change Data Capture,将数据库中的’增’、’改’、’删’操作记录下来。在很早之前是通过触发器来实现记录,当初通过 binlog+ 同步中间件来实现。罕用的 binlog 同步中间件有很多,比方 Alibaba 开源的 canal[1],Red Hat 开源的 debezium[2],Zendesk 开源的 Maxwell[3] 等等。
这些中间件会负责 binlog 的解析,并同步到消息中间件中,咱们只须要生产对应的 Topic 即可。
回到 Flink 上,CDC 仿佛和咱们没有太大的关联?其实不然,让咱们更加抽象地来看这个世界。
当咱们用 Flink 去生产数据比方 Kafka 时,咱们就好像在读一张表,什么表?一张一直有记录被插入的表,咱们将每一条被插入的数据取出来,实现咱们的逻辑。
当插入的每条数据都没有问题时,所有都很美妙。关联、聚合、输入。
但当咱们发现,某条曾经被计算过的数据有问题时,麻烦大了。咱们间接改最初的输入值其实是没有用的,这次改了,当再来数据触发计算时,后果还是会被谬误的数据笼罩,因为两头计算结果没有被批改,它依然是一个谬误的值。怎么办?撤回流仿佛能解决这个问题,这也的确是解决这个问题的伎俩,然而问题来了,撤回流怎么确定读取的数据是要被撤回的?另外,怎么去触发一次撤回?
CDC 解决了这些:将消息中间件的数据反序列化后,依据 Type 来辨认数据是 Insert 还是 Delete;另外,如果大家看过 Flink 源码,会发现反序列化后的数据类型变了,从 Row 降级为 RowData,RowData 可能将数据标记为撤回还是插入,这就意味着每个算子可能判断出数据到底是须要下发还是撤回。
CDC 的重要性就先说这么多,之后有机会的话,出一篇实时 DQC 的视频,通知大家 CDC 的呈现,对于实时 DQC 的帮忙有多大。上面让咱们回到正题。
既然有那么多 CDC 同步中间件,那么肯定会有各种各样的格局寄存在消息中间件中,咱们必然须要去解析它们。于是 Flink 1.11 提供了 canal-json 和 debezium-json,但咱们用的是 Maxwell 怎么办?只能等官网出或者说是等有人向社区奉献吗?那如果咱们用的是自研的同步中间件怎么办?
所以就有了明天的分享:如何去自定义实现一个 Maxwell format。大家也能够基于此文的思路去实现其余 CDC format,比方 OGG, 或是自研 CDC 工具产生的数据格式。
如何实现
当咱们提交工作之后,Flink 会通过 SPI 机制将 classpath 下注册的所有工厂类加载进来,包含 DynamicTableFactory、DeserializationFormatFactory 等等。而对于 Format 来说,到底应用哪个 DeserializationFormatFactory,是依据 DDL 语句中的 Format 来决定的。通过将 Format 的值与工厂类的 factoryIdentifier() 办法的返回值进行匹配 来确定。
再通过 DeserializationFormatFactory 中的 createDecodingFormat(…) 办法,将反序列化对象提供给 DynamicTableSource。
通过图来理解整个过程 (仅从反序列化数据并生产的角度来看):
想要实现 CDC Format 去解析某种 CDC 工具产生的数据其实很简略,外围组件其实就三个:
- 工厂类(DeserializationFormatFactory):负责编译时依据‘format’=‘maxwell-json’创立对应的反序列化器。即 MaxwellJsonFormatFactory。
- 反序列化类(DeserializationSchema):负责运行时的解析,依据固定格局将 CDC 数据转换成 Flink 零碎能意识的 INSERT/DELETE/UPDATE 音讯,如 RowData。即 MaxwellJsonDeserializationSchema。
- Service 注册文件:须要增加 Service 文件 META-INF/services/org.apache.flink.table.factories.Factory,并在其中减少一行咱们实现的 MaxwellJsonFormatFactory 类门路。
再通过代码,来看看反序列化中的细节:
public void deserialize(byte[] message, Collectorout) throws IOException {
try {RowData row = jsonDeserializer.deserialize(message);
String type = row.getString(2).toString(); // "type" field
if (OP_INSERT.equals(type)) {RowData insert = row.getRow(0, fieldCount);
insert.setRowKind(RowKind.INSERT);
out.collect(insert);
} else if (OP_UPDATE.equals(type)) {GenericRowData after = (GenericRowData) row.getRow(0, fieldCount); // "data" field
GenericRowData before = (GenericRowData) row.getRow(1, fieldCount); // "old" field
for (int f = 0; f < fieldCount; f++) {if (before.isNullAt(f)) {before.setField(f, after.getField(f));
}
}
before.setRowKind(RowKind.UPDATE_BEFORE);
after.setRowKind(RowKind.UPDATE_AFTER);
out.collect(before);
out.collect(after);
} else if (OP_DELETE.equals(type)) {RowData delete = row.getRow(0, fieldCount);
delete.setRowKind(RowKind.DELETE);
out.collect(delete);
} else {if (!ignoreParseErrors) {
throw new IOException(format("Unknown"type"value"%s". The Maxwell JSON message is'%s'", type, new String(message)));
}
}
} catch (Throwable t) {if (!ignoreParseErrors) {
throw new IOException(format("Corrupt Maxwell JSON message'%s'.", new String(message)), t);
}
}
}
其实并不简单:先通过 jsonDeserializer 将字节数组依据 [data: ROW, old: ROW, type: String] 的 schema 反序列化成 RowData,而后依据“type”列的值来判断数据是什么类型:增、改、删;再依据数据类型取出“data”或者“old”区的数据,来组装成 Flink 意识的 INSERT/DELETE/UPDATE 数据并下发。
对象 jsonDeserializer 即 JSON 格局的反序列化器,它能够通过指定的 RowType 类型,读取 JSON 的字节数组中指定的字段并反序列化成 RowData。在咱们的场景中,咱们须要去读取如下 Maxwell 数据的“data”,“old”和“type”局部的数据。
{"database":"test","table":"product","type":"update","ts":1596684928,"xid":7291,"commit":true,"data":{"id":102,"name":"car battery","description":"12V car battery","weight":5.17},"old":{"weight":8.1}}
因而 MaxwellJsonDeserializationSchema 中定义的 JSON 的 RowType 如下所示。
private RowType createJsonRowType(DataType databaseSchema) {
_// Maxwell JSON contains other information, e.g. "database", "ts"_
_// but we don't need them_
return (RowType) DataTypes.ROW(DataTypes.FIELD("data", databaseSchema),
DataTypes.FIELD("old", databaseSchema),
DataTypes.FIELD("type", DataTypes.STRING())).getLogicalType();}
databaseSchema 是用户通过 DDL 定义的 schema 信息,也对应着数据库中表的 schema。联合下面的 JSON 和代码,咱们可能得悉 jsonDeserializer 只会取走 byte[] 中 data、old、type 这三个字段对应的值,其中 data 和 old 还是个嵌套 JSON,它们的 schema 信息和 databaseSchema 统一。因为 Maxwell 在同步数据时,“old”区不蕴含未被更新的字段,所以 jsonDeserializer 返回后,咱们会通过“data”区的 RowData 将 old 区的缺失字段补齐。
失去 RowData 之后,会取出 type 字段,而后依据对应的值,会有三种分支:
- insert:取出 data 中的值,也就是咱们通过 DDL 定义的字段对应的值,再将其标记为 RowKind.INSERT 类型数据,最初下发。
- update:别离取出 data 和 old 的值,而后循环 old 中每个字段,字段值如果为空阐明是未修改的字段,那就用 data 中对应地位字段的值代替;之后将 old 标记为 RowKind.UPDATE_BEFORE 也就意味着 Flink 引擎须要将之前对应的值撤回,data 标记为 RowKind.UPDATE_AFTER 失常下发。
- delete:取出 data 中的值,标记为 RowKind.DELETE,代表须要撤回。
解决的过程中,如果抛出异样,会依据 DDL 中 maxwell-json.ignore-parse-errors 的值来确定是漠视这条数据持续解决下一条数据,还是让工作报错。
笔者在 maxwell-json 反序列化性能的根底之上,还实现了序列化的性能,即能将 Flink 产生的 changelog 以 Maxwell 的 JSON 格局输入到内部零碎中。其实现思路与反序列化器的思路正好相同,更多细节能够参考 Pull Request 中的实现。
PR 实现详情链接:
https://github.com/apache/flink/pull/13090
性能演示
给大家演示一下从 Kafka 中读取 Maxwell 推送来的 maxwell json 格局数据,并将聚合后的数据再次写入 Kafka 后,从新读出来验证数据是否正确。
Kafka 数据源表
CREATE TABLE topic_products (
_-- schema is totally the same to the MySQL "products" table_
id BIGINT,
name STRING,
description STRING,
weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'maxwell',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'maxwell-json');
Kafka 数据后果表 & 数据源表
CREATE TABLE topic_sink (
name STRING,
sum_weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'maxwell-sink',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'maxwell-json'
);
MySQL 表
_-- 留神,这部分 SQL 在 MySQL 中执行,不是 Flink 中的表_
CREATE TABLE product (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255),
description VARCHAR(512),
weight FLOAT
);
truncate product ;
ALTER TABLE product AUTO_INCREMENT = 101;
INSERT INTO product
VALUES (default,"scooter","Small 2-wheel scooter",3.14),
(default,"car battery","12V car battery",8.1),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8),
(default,"hammer","12oz carpenter's hammer",0.75),
(default,"hammer","14oz carpenter's hammer",0.875),
(default,"hammer","16oz carpenter's hammer",1.0),
(default,"rocks","box of assorted rocks",5.3),
(default,"jacket","water resistent black wind breaker",0.1),
(default,"spare tire","24 inch spare tire",22.2);
UPDATE product SET description='18oz carpenter hammer' WHERE id=106;
UPDATE product SET weight='5.1' WHERE id=107;
INSERT INTO product VALUES (default,"jacket","water resistent white wind breaker",0.2);
INSERT INTO product VALUES (default,"scooter","Big 2-wheel scooter",5.18);
UPDATE product SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;
UPDATE product SET weight='5.17' WHERE id=111;
DELETE FROM product WHERE id=111;
UPDATE product SET weight='5.17' WHERE id=102 or id = 101;
DELETE FROM product WHERE id=102 or id = 103;
先看看能不能失常读取 Kafka 中的 maxwell json 数据。
select * from topic_products;
能够看到,所有字段值都变成了 Update 之后的值,同时,被 Delete 的数据也没有呈现。
接着让咱们再将聚合数据写入 Kafka。
insert into topic_sink select name,sum(weight) as sum_weight from topic_products group by name;
在 Flink 集群的 Web 页面也可能看到工作正确提交,接下来再让咱们把聚合数据查出来。
select * from topic_sink
最初,让咱们查问一下 MySQL 中的表,来验证数据是否统一;因为在 Flink 中,咱们将 weight 字段定义成 Decimal(10,2),所以咱们在查问 MySQL 的时候,须要将 weight 字段进行类型转换。
没有问题,咱们的 maxwell json 解析很胜利。
写在最初
依据笔者实现 maxwell-json format 的教训,Flink 对于接口的定义、对于模块职责的划分还是很清晰的,所以实现一个自定义 CDC format 非常简单(外围代码只有 200 多行)。因而,如果你是用的 OGG,或是自研的同步中间件,能够通过本文的思路疾速实现一个 CDC format,一起解放你的 CDC 数据!
原文链接
本文为阿里云原创内容,未经容许不得转载。