在某些场景中,比方GROUP BY聚合之后的后果,须要去更新之前的后果值。这个时候,须要将 Kafka 音讯记录的 key 当成主键解决,用来确定一条数据是应该作为插入、删除还是更新记录来解决。在Flink1.11中,能够通过 flink-cdc-connectors 我的项目提供的 changelog-json format 来实现该性能。对于该性能的应用,见之前的分享Flink1.11中的CDC Connectors操作实际。
在Flink1.12版本中, 新增了一个 upsert connector(upsert-kafka),该 connector 扩大自现有的 Kafka connector,工作在 upsert 模式(FLIP-149)下。新的 upsert-kafka connector 既能够作为 source 应用,也能够作为 sink 应用,并且提供了与现有的 kafka connector 雷同的基本功能和持久性保障,因为两者之间复用了大部分代码。本文将以Flink1.12为例,介绍该性能的根本应用步骤,以下是全文,心愿对你有所帮忙。
公众号『大数据技术与数仓』,回复『材料』支付大数据资料包
Upsert Kafka connector简介
Upsert Kafka Connector容许用户以upsert的形式从Kafka主题读取数据或将数据写入Kafka主题。
当作为数据源时,upsert-kafka Connector会生产一个changelog流,其中每条数据记录都示意一个更新或删除事件。更精确地说,如果不存在对应的key,则视为INSERT操作。如果曾经存在了绝对应的key,则该key对应的value值为最初一次更新的值。
用表来类比,changelog 流中的数据记录被解释为 UPSERT,也称为 INSERT/UPDATE,因为任何具备雷同 key 的现有行都被笼罩。另外,value 为空的音讯将会被视作为 DELETE 音讯。
当作为数据汇时,upsert-kafka Connector会生产一个changelog流。它将INSERT / UPDATE_AFTER数据作为失常的Kafka音讯值写入(即INSERT和UPDATE操作,都会进行失常写入,如果是更新,则同一个key会存储多条数据,但在读取该表数据时,只保留最初一次更新的值),并将 DELETE 数据以 value 为空的 Kafka 音讯写入(key被打上墓碑标记,示意对应 key 的音讯被删除)。Flink 将依据主键列的值对数据进行分区,从而保障主键上的音讯有序,因而同一主键上的更新/删除音讯将落在同一分区中
依赖
为了应用Upsert Kafka连接器,须要增加上面的依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.12.0</version>
</dependency>
如果应用SQL Client,须要下载flink-sql-connector-kafka_2.11-1.12.0.jar,并将其搁置在Flink装置目录的lib文件夹下。
应用形式
应用样例
-- 创立一张kafka表,用户存储sink的数据
CREATE TABLE pageviews_per_region (
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'pageviews_per_region',
'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092',
'key.format' = 'avro',
'value.format' = 'avro'
);
尖叫提醒:要应用 upsert-kafka connector,必须在创立表时应用PRIMARY KEY定义主键,并为键(key.format)和值(value.format)指定序列化反序列化格局。
upsert-kafka connector参数
- connector
必选。指定要应用的连接器,Upsert Kafka 连接器应用:'upsert-kafka'
。
- topic
必选。用于读取和写入的 Kafka topic 名称。
- properties.bootstrap.servers
必选。以逗号分隔的 Kafka brokers 列表。
- key.format
必选。用于对 Kafka 音讯中 key 局部序列化和反序列化的格局。key 字段由 PRIMARY KEY 语法指定。反对的格局包含 'csv'
、'json'
、'avro'
。
- value.format
必选。用于对 Kafka 音讯中 value 局部序列化和反序列化的格局。反对的格局包含 'csv'
、'json'
、'avro'
。
- properties.*
可选。 该选项能够传递任意的 Kafka 参数。选项的后缀名必须匹配定义在 Kafka 参数文档中的参数名。 Flink 会主动移除 选项名中的 "properties." 前缀,并将转换后的键名以及值传入 KafkaClient。 例如,你能够通过 'properties.allow.auto.create.topics' = 'false'
来禁止主动创立 topic。 然而,某些选项,例如'key.deserializer'
和 'value.deserializer'
是不容许通过该形式传递参数,因为 Flink 会重写这些参数的值。
- value.fields-include
可选,默认为ALL。管制key字段是否呈现在 value 中。当取ALL时,示意音讯的 value 局部将蕴含 schema 中所有的字段,包含定义为主键的字段。
当取EXCEPT_KEY时,示意记录的 value 局部蕴含 schema 的所有字段,定义为主键的字段除外。
- key.fields-prefix
可选。为了防止与value字段命名抵触,为key字段增加一个自定义前缀。默认前缀为空。一旦指定了key字段的前缀,必须在DDL中指明前缀的名称,然而在构建key的序列化数据类型时,将移除该前缀。见上面的示例。在须要留神的是:应用该配置属性,value.fields-include的值必须为EXCEPT_KEY。
-- 创立一张upsert表,当指定了qwe前缀,波及的key必须指定qwe前缀
CREATE TABLE result_total_pvuv_min_prefix (
qwedo_date STRING, -- 统计日期,必须蕴含qwe前缀
qwedo_min STRING, -- 统计分钟,必须蕴含qwe前缀
pv BIGINT, -- 点击量
uv BIGINT, -- 一天内同个访客屡次拜访仅计算一个UV
currenttime TIMESTAMP, -- 以后工夫
PRIMARY KEY (qwedo_date, qwedo_min) NOT ENFORCED -- 必须蕴含qwe前缀
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'result_total_pvuv_min_prefix',
'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092',
'key.json.ignore-parse-errors' = 'true',
'value.json.fail-on-missing-field' = 'false',
'key.format' = 'json',
'value.format' = 'json',
'key.fields-prefix'='qwe', -- 指定前缀qwe
'value.fields-include' = 'EXCEPT_KEY' -- key不呈现kafka音讯的value中
);
-- 向该表中写入数据
INSERT INTO result_total_pvuv_min_prefix
SELECT
do_date, -- 工夫分区
cast(DATE_FORMAT (access_time,'HH:mm') AS STRING) AS do_min,-- 分钟级别的工夫
pv,
uv,
CURRENT_TIMESTAMP AS currenttime -- 以后工夫
from
view_total_pvuv_min;
尖叫提醒:如果指定了key字段前缀,但在DDL中并没有增加该前缀字符串,那么在向该表写入数时,会抛出上面异样:
[ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.ValidationException: All fields in 'key.fields' must be prefixed with 'qwe' when option 'key.fields-prefix' is set but field 'do_date' is not prefixed.
- sink.parallelism
可选。定义 upsert-kafka sink 算子的并行度。默认状况下,由框架确定并行度,与上游链接算子的并行度保持一致。
其余注意事项
Key和Value的序列化格局
对于Key、value的序列化能够参考Kafka connector。值得注意的是,必须指定Key和Value的序列化格局,其中Key是通过PRIMARY KEY指定的。
Primary Key束缚
Upsert Kafka 工作在 upsert 模式(FLIP-149)下。当咱们创立表时,须要在 DDL 中定义主键。具备雷同key的数据,会存在雷同的分区中。在 changlog source 上定义主键意味着在物化后的 changelog 上主键具备唯一性。定义的主键将决定哪些字段呈现在 Kafka 音讯的 key 中。
一致性保障
默认状况下,如果启用 checkpoint,Upsert Kafka sink 会保障至多一次将数据插入 Kafka topic。
这意味着,Flink 能够将具备雷同 key 的重复记录写入 Kafka topic。但因为该连接器以 upsert 的模式工作,该连接器作为 source 读入时,能够确保具备雷同主键值下仅最初一条音讯会失效。因而,upsert-kafka 连接器能够像 HBase sink 一样实现幂等写入。
分区水位线
Flink 反对依据 Upsert Kafka 的 每个分区的数据个性发送相应的 watermark。当应用这个个性的时候,watermark 是在 Kafka consumer 外部生成的。 合并每个分区生成的 watermark 的形式和 streaming shuffle 的形式是统一的(单个分区的输出取最大值,多个分区的输出取最小值)。 数据源产生的 watermark 是取决于该 consumer 负责的所有分区中以后最小的 watermark。如果该 consumer 负责的局部分区是闲暇的,那么整体的 watermark 并不会后退。在这种状况下,能够通过设置适合的 table.exec.source.idle-timeout 来缓解这个问题。
数据类型
Upsert Kafka 用字节bytes存储音讯的 key 和 value,因而没有 schema 或数据类型。音讯按格局进行序列化和反序列化,例如:csv、json、avro。不同的序列化格局所提供的数据类型有所不同,因而须要依据应用的序列化格局进行确定表字段的数据类型是否与该序列化类型提供的数据类型兼容。
应用案例
本文以实时地统计网页PV和UV的总量为例,介绍upsert-kafka根本应用形式:
- Kafka 数据源
用户的ippv信息,一个用户在一天内能够有很屡次pv
CREATE TABLE source_ods_fact_user_ippv (
user_id STRING, -- 用户ID
client_ip STRING, -- 客户端IP
client_info STRING, -- 设施机型信息
pagecode STRING, -- 页面代码
access_time TIMESTAMP, -- 申请工夫
dt STRING, -- 工夫分区天
WATERMARK FOR access_time AS access_time - INTERVAL '5' SECOND -- 定义watermark
) WITH (
'connector' = 'kafka', -- 应用 kafka connector
'topic' = 'user_ippv', -- kafka主题
'scan.startup.mode' = 'earliest-offset', -- 偏移量
'properties.group.id' = 'group1', -- 消费者组
'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092',
'format' = 'json', -- 数据源格局为json
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
- Kafka Sink表
统计每分钟的PV、UV,并将后果存储在Kafka中
CREATE TABLE result_total_pvuv_min (
do_date STRING, -- 统计日期
do_min STRING, -- 统计分钟
pv BIGINT, -- 点击量
uv BIGINT, -- 一天内同个访客屡次拜访仅计算一个UV
currenttime TIMESTAMP, -- 以后工夫
PRIMARY KEY (do_date, do_min) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'result_total_pvuv_min',
'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092',
'key.json.ignore-parse-errors' = 'true',
'value.json.fail-on-missing-field' = 'false',
'key.format' = 'json',
'value.format' = 'json',
'value.fields-include' = 'EXCEPT_KEY' -- key不呈现kafka音讯的value中
);
- 计算逻辑
-- 创立视图
CREATE VIEW view_total_pvuv_min AS
SELECT
dt AS do_date, -- 工夫分区
count (client_ip) AS pv, -- 客户端的IP
count (DISTINCT client_ip) AS uv, -- 客户端去重
max(access_time) AS access_time -- 申请的工夫
FROM
source_ods_fact_user_ippv
GROUP BY dt;
-- 写入数据
INSERT INTO result_total_pvuv_min
SELECT
do_date, -- 工夫分区
cast(DATE_FORMAT (access_time,'HH:mm') AS STRING) AS do_min,-- 分钟级别的工夫
pv,
uv,
CURRENT_TIMESTAMP AS currenttime -- 以后工夫
from
view_total_pvuv_min;
- 生产用户拜访数据到kafka,向kafka中的user_ippv插入数据:
{"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1001","access_time":"2021-01-08 11:32:24","dt":"2021-01-08"}
{"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1201","access_time":"2021-01-08 11:32:55","dt":"2021-01-08"}
{"user_id":"2","client_ip":"192.165.12.1","client_info":"pc","pagecode":"1031","access_time":"2021-01-08 11:32:59","dt":"2021-01-08"}
{"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1101","access_time":"2021-01-08 11:33:24","dt":"2021-01-08"}
{"user_id":"3","client_ip":"192.168.10.3","client_info":"pc","pagecode":"1001","access_time":"2021-01-08 11:33:30","dt":"2021-01-08"}
{"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1001","access_time":"2021-01-08 11:34:24","dt":"2021-01-08"}
- 查问后果表:
select * from result_total_pvuv_min;
能够看出:每分钟的pv、uv只显示一条数据,即代表着截止到以后工夫点的pv和uv
查看Kafka中result_total_pvuv_min主题的数据,如下:
能够看出:针对每一条拜访数据,触发计算了一次PV、UV,每一条数据都是截止到以后工夫的累计PV和UV。
尖叫提醒:默认状况下,如果在启用了检查点的状况下执行查问,Upsert Kafka接收器会将具备至多一次保障的数据提取到Kafka主题中。
这意味着,Flink可能会将具备雷同键的重复记录写入Kafka主题。然而,因为连接器在upsert模式下工作,因而作为源读回时,同一键上的最初一条记录将失效。因而,upsert-kafka连接器就像HBase接收器一样实现幂等写入。
总结
本文以Flink1.12为例,介绍了upsert connector(upsert-kafka)的根本应用,该形式容许用户以upsert 的形式读写Kafka中的表,应用起来十分不便。另外本文也给出了一个具体的应用案例,能够进一步加深对该性能的应用。
公众号『大数据技术与数仓』,回复『材料』支付大数据资料包