关于sql:实时数仓以upsert的方式读写Kafka数据以Flink112为例

7次阅读

共计 7704 个字符,预计需要花费 20 分钟才能阅读完成。

在某些场景中,比方 GROUP BY 聚合之后的后果,须要去更新之前的后果值。这个时候,须要将 Kafka 音讯记录的 key 当成主键解决,用来确定一条数据是应该作为插入、删除还是更新记录来解决。在 Flink1.11 中,能够通过 flink-cdc-connectors 我的项目提供的 changelog-json format 来实现该性能。对于该性能的应用,见之前的分享 Flink1.11 中的 CDC Connectors 操作实际。

在 Flink1.12 版本中,新增了一个 upsert connector(upsert-kafka),该 connector 扩大自现有的 Kafka connector,工作在 upsert 模式(FLIP-149)下。新的 upsert-kafka connector 既能够作为 source 应用,也能够作为 sink 应用,并且提供了与现有的 kafka connector 雷同的基本功能和持久性保障,因为两者之间复用了大部分代码。本文将以 Flink1.12 为例,介绍该性能的根本应用步骤,以下是全文,心愿对你有所帮忙。

公众号『大数据技术与数仓』,回复『材料』支付大数据资料包

Upsert Kafka connector 简介

Upsert Kafka Connector容许用户以 upsert 的形式从 Kafka 主题读取数据或将数据写入 Kafka 主题。

当作为数据源时 ,upsert-kafka Connector 会生产一个 changelog 流,其中每条数据记录都示意一个更新或删除事件。更精确地说,如果不存在对应的 key,则视为INSERT 操作。如果曾经存在了绝对应的 key,则该 key 对应的 value 值为最初一次更新的值。

用表来类比,changelog 流中的数据记录被解释为 UPSERT,也称为 INSERT/UPDATE,因为任何具备雷同 key 的现有行都被笼罩。另外,value 为空的音讯将会被视作为 DELETE 音讯。

当作为数据汇时 ,upsert-kafka Connector 会生产一个 changelog 流。它将INSERT / UPDATE_AFTER 数据作为失常的 Kafka 音讯值写入(即 INSERT 和 UPDATE 操作,都会进行失常写入,如果是更新,则同一个 key 会存储多条数据,但在读取该表数据时,只保留最初一次更新的值),并将 DELETE 数据以 value 为空的 Kafka 音讯写入(key 被打上墓碑标记,示意对应 key 的音讯被删除)。Flink 将依据主键列的值对数据进行分区,从而保障主键上的音讯有序,因而同一主键上的更新 / 删除音讯将落在同一分区中

依赖

为了应用 Upsert Kafka 连接器,须要增加上面的依赖

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.12.0</version>
</dependency>

如果应用 SQL Client,须要下载flink-sql-connector-kafka_2.11-1.12.0.jar,并将其搁置在 Flink 装置目录的 lib 文件夹下。

应用形式

应用样例

— 创立一张 kafka 表,用户存储 sink 的数据
CREATE TABLE pageviews_per_region (
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
‘connector’ = ‘upsert-kafka’,
‘topic’ = ‘pageviews_per_region’,
‘properties.bootstrap.servers’ = ‘kms-2:9092,kms-3:9092,kms-4:9092’,
‘key.format’ = ‘avro’,
‘value.format’ = ‘avro’
);

尖叫提醒:

要应用 upsert-kafka connector,必须在创立表时应用 PRIMARY KEY 定义主键,并为键(key.format)和值(value.format)指定序列化反序列化格局。

upsert-kafka connector 参数

  • connector

必选。指定要应用的连接器,Upsert Kafka 连接器应用:'upsert-kafka'

  • topic

必选。用于读取和写入的 Kafka topic 名称。

  • properties.bootstrap.servers

必选。以逗号分隔的 Kafka brokers 列表。

  • key.format

必选。用于对 Kafka 音讯中 key 局部序列化和反序列化的格局。key 字段由 PRIMARY KEY 语法指定。反对的格局包含 'csv''json''avro'

  • value.format

必选。用于对 Kafka 音讯中 value 局部序列化和反序列化的格局。反对的格局包含 'csv''json''avro'

  • properties.*

可选。该选项能够传递任意的 Kafka 参数。选项的后缀名必须匹配定义在 Kafka 参数文档中的参数名。Flink 会主动移除 选项名中的 “properties.” 前缀,并将转换后的键名以及值传入 KafkaClient。例如,你能够通过 'properties.allow.auto.create.topics' = 'false' 来禁止主动创立 topic。然而,某些选项,例如'key.deserializer''value.deserializer' 是不容许通过该形式传递参数,因为 Flink 会重写这些参数的值。

  • value.fields-include

可选 ,默认为ALL。管制 key 字段是否呈现在 value 中。当取ALL 时,示意 音讯的 value 局部将蕴含 schema 中所有的字段,包含定义为主键的字段。当取 EXCEPT_KEY 时,示意记录的 value 局部蕴含 schema 的所有字段,定义为主键的字段除外。

  • key.fields-prefix

可选 。为了防止与 value 字段命名抵触,为 key 字段增加一个自定义前缀。默认前缀为空。一旦指定了 key 字段的前缀,必须在 DDL 中指明前缀的名称,然而在构建 key 的序列化数据类型时,将移除该前缀。见上面的示例。在须要留神的是:应用该配置属性,value.fields-include 的值必须为EXCEPT_KEY

— 创立一张 upsert 表,当指定了 qwe 前缀,波及的 key 必须指定 qwe 前缀
CREATE TABLE result_total_pvuv_min_prefix (
qwedo_date     STRING,     — 统计日期,必须蕴含 qwe 前缀
qwedo_min     STRING,      — 统计分钟,必须蕴含 qwe 前缀
pv          BIGINT,     — 点击量
uv          BIGINT,     — 一天内同个访客屡次拜访仅计算一个 UV
currenttime TIMESTAMP,  — 以后工夫
PRIMARY KEY (qwedo_date, qwedo_min) NOT ENFORCED — 必须蕴含 qwe 前缀
) WITH (
‘connector’ = ‘upsert-kafka’,
‘topic’ = ‘result_total_pvuv_min_prefix’,
‘properties.bootstrap.servers’ = ‘kms-2:9092,kms-3:9092,kms-4:9092’,
‘key.json.ignore-parse-errors’ = ‘true’,
‘value.json.fail-on-missing-field’ = ‘false’,
‘key.format’ = ‘json’,
‘value.format’ = ‘json’,
‘key.fields-prefix’=’qwe’, — 指定前缀 qwe
‘value.fields-include’ = ‘EXCEPT_KEY’ — key 不呈现 kafka 音讯的 value 中
);
— 向该表中写入数据
INSERT INTO result_total_pvuv_min_prefix
SELECT
do_date,    — 工夫分区
cast(DATE_FORMAT (access_time,’HH:mm’) AS STRING) AS do_min,– 分钟级别的工夫
pv,
uv,
CURRENT_TIMESTAMP AS currenttime — 以后工夫
from
view_total_pvuv_min;

尖叫提醒:

如果指定了 key 字段前缀,但在 DDL 中并没有增加该前缀字符串,那么在向该表写入数时,会抛出上面异样:

[ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.ValidationException: All fields in ‘key.fields’ must be prefixed with ‘qwe’ when option ‘key.fields-prefix’ is set but field ‘do_date’ is not prefixed.

  • sink.parallelism

可选。定义 upsert-kafka sink 算子的并行度。默认状况下,由框架确定并行度,与上游链接算子的并行度保持一致。

其余注意事项

Key 和 Value 的序列化格局

对于 Key、value 的序列化能够参考 Kafka connector。值得注意的是,必须指定 Key 和 Value 的序列化格局,其中 Key 是通过 PRIMARY KEY 指定的。

Primary Key 束缚

Upsert Kafka 工作在 upsert 模式(FLIP-149)下。当咱们创立表时,须要在 DDL 中定义主键。具备雷同 key 的数据,会存在雷同的分区中。在 changlog source 上定义主键意味着在物化后的 changelog 上主键具备唯一性。定义的主键将决定哪些字段呈现在 Kafka 音讯的 key 中。

一致性保障

默认状况下,如果启用 checkpoint,Upsert Kafka sink 会保障至多一次将数据插入 Kafka topic。

这意味着,Flink 能够将具备雷同 key 的重复记录写入 Kafka topic。但因为该连接器以 upsert 的模式工作,该连接器作为 source 读入时,能够确保具备雷同主键值下仅最初一条音讯会失效。因而,upsert-kafka 连接器能够像 HBase sink 一样实现幂等写入。

分区水位线

Flink 反对依据 Upsert Kafka 的 每个分区的数据个性发送相应的 watermark。当应用这个个性的时候,watermark 是在 Kafka consumer 外部生成的。合并每个分区生成的 watermark 的形式和 streaming shuffle 的形式是统一的 ( 单个分区的输出取最大值,多个分区的输出取最小值)。数据源产生的 watermark 是取决于该 consumer 负责的所有分区中以后最小的 watermark。如果该 consumer 负责的局部分区是闲暇的,那么整体的 watermark 并不会后退。在这种状况下,能够通过设置适合的 table.exec.source.idle-timeout 来缓解这个问题。

数据类型

Upsert Kafka 用字节 bytes 存储音讯的 key 和 value,因而没有 schema 或数据类型。音讯按格局进行序列化和反序列化,例如:csv、json、avro。不同的序列化格局所提供的数据类型有所不同,因而须要依据应用的序列化格局进行确定表字段的数据类型是否与该序列化类型提供的数据类型兼容。

应用案例

本文以实时地统计网页 PV 和 UV 的总量为例,介绍 upsert-kafka 根本应用形式:

  • Kafka 数据源

用户的 ippv 信息,一个用户在一天内能够有很屡次 pv

CREATE TABLE source_ods_fact_user_ippv (
user_id     STRING,       — 用户 ID
client_ip   STRING,       — 客户端 IP
client_info STRING,       — 设施机型信息
pagecode     STRING,       — 页面代码
access_time  TIMESTAMP,    — 申请工夫
dt           STRING,       — 工夫分区天
WATERMARK FOR access_time AS access_time – INTERVAL ‘5’ SECOND  — 定义 watermark
) WITH (
‘connector’ = ‘kafka’, — 应用 kafka connector
‘topic’ = ‘user_ippv’, — kafka 主题
‘scan.startup.mode’ = ‘earliest-offset’, — 偏移量
‘properties.group.id’ = ‘group1’, — 消费者组
‘properties.bootstrap.servers’ = ‘kms-2:9092,kms-3:9092,kms-4:9092’,
‘format’ = ‘json’, — 数据源格局为 json
‘json.fail-on-missing-field’ = ‘false’,
‘json.ignore-parse-errors’ = ‘true’
);

  • Kafka Sink 表

统计每分钟的 PV、UV,并将后果存储在 Kafka 中

CREATE TABLE result_total_pvuv_min (
do_date     STRING,     — 统计日期
do_min     STRING,      — 统计分钟
pv          BIGINT,     — 点击量
uv          BIGINT,     — 一天内同个访客屡次拜访仅计算一个 UV
currenttime TIMESTAMP,  — 以后工夫
PRIMARY KEY (do_date, do_min) NOT ENFORCED
) WITH (
‘connector’ = ‘upsert-kafka’,
‘topic’ = ‘result_total_pvuv_min’,
‘properties.bootstrap.servers’ = ‘kms-2:9092,kms-3:9092,kms-4:9092’,
‘key.json.ignore-parse-errors’ = ‘true’,
‘value.json.fail-on-missing-field’ = ‘false’,
‘key.format’ = ‘json’,
‘value.format’ = ‘json’,
‘value.fields-include’ = ‘EXCEPT_KEY’ — key 不呈现 kafka 音讯的 value 中
);

  • 计算逻辑

— 创立视图
CREATE VIEW view_total_pvuv_min AS
SELECT
dt AS do_date,                    — 工夫分区
count (client_ip) AS pv,          — 客户端的 IP
count (DISTINCT client_ip) AS uv, — 客户端去重
max(access_time) AS access_time   — 申请的工夫
FROM
source_ods_fact_user_ippv
GROUP BY dt;

— 写入数据
INSERT INTO result_total_pvuv_min
SELECT
do_date,    — 工夫分区
cast(DATE_FORMAT (access_time,’HH:mm’) AS STRING) AS do_min,– 分钟级别的工夫
pv,
uv,
CURRENT_TIMESTAMP AS currenttime — 以后工夫
from
view_total_pvuv_min;

  • 生产用户拜访数据到 kafka,向 kafka 中的 user_ippv 插入数据:


{“user_id”:”1″,”client_ip”:”192.168.12.1″,”client_info”:”phone”,”pagecode”:”1001″,”access_time”:”2021-01-08 11:32:24″,”dt”:”2021-01-08″}

{“user_id”:”1″,”client_ip”:”192.168.12.1″,”client_info”:”phone”,”pagecode”:”1201″,”access_time”:”2021-01-08 11:32:55″,”dt”:”2021-01-08″}

{“user_id”:”2″,”client_ip”:”192.165.12.1″,”client_info”:”pc”,”pagecode”:”1031″,”access_time”:”2021-01-08 11:32:59″,”dt”:”2021-01-08″}

{“user_id”:”1″,”client_ip”:”192.168.12.1″,”client_info”:”phone”,”pagecode”:”1101″,”access_time”:”2021-01-08 11:33:24″,”dt”:”2021-01-08″}

{“user_id”:”3″,”client_ip”:”192.168.10.3″,”client_info”:”pc”,”pagecode”:”1001″,”access_time”:”2021-01-08 11:33:30″,”dt”:”2021-01-08″}

{“user_id”:”1″,”client_ip”:”192.168.12.1″,”client_info”:”phone”,”pagecode”:”1001″,”access_time”:”2021-01-08 11:34:24″,”dt”:”2021-01-08″}

  • 查问后果表:

select * from result_total_pvuv_min;

能够看出:每分钟的 pv、uv 只显示一条数据,即代表着截止到以后工夫点的 pv 和 uv

查看 Kafka 中 result_total_pvuv_min 主题的数据,如下:

能够看出:针对每一条拜访数据,触发计算了一次 PV、UV,每一条数据都是截止到以后工夫的累计 PV 和 UV。

尖叫提醒:

默认状况下,如果在启用了检查点的状况下执行查问,Upsert Kafka 接收器会将具备至多一次保障的数据提取到 Kafka 主题中。

这意味着,Flink 可能会将具备雷同键的重复记录写入 Kafka 主题。然而,因为连接器在 upsert 模式下工作,因而作为源读回时,同一键上的最初一条记录将失效。因而,upsert-kafka 连接器就像 HBase 接收器一样实现幂等写入。

总结

本文以 Flink1.12 为例,介绍了 upsert connector(upsert-kafka) 的根本应用,该形式容许用户以 upsert 的形式读写 Kafka 中的表,应用起来十分不便。另外本文也给出了一个具体的应用案例,能够进一步加深对该性能的应用。

公众号『大数据技术与数仓』,回复『材料』支付大数据资料包

正文完
 0