关于flink:Flink-SQL-实战双流-join-场景应用

25次阅读

共计 7560 个字符,预计需要花费 19 分钟才能阅读完成。

作者:余敖

本文次要介绍在流式场景中 join 的实战。大家都晓得在应用 SQL 进行数据分析的过程中,join 是常常要应用的操作。在离线场景中,join 的数据集是有边界的,能够缓存数据有边界的数据集进行查问,有 Nested Loop/Hash Join/Sort Merge Join 等多表 join;而在实时场景中,join 两侧的数据都是无边界的数据流,所以缓存数据集对长时间 job 来说,存储和查问压力很大,另外双流的达到工夫可能不统一,造成 join 计算结果准确度不够;因而,Flink SQL 提供了多种 join 办法,来帮忙用户应答各种 join 场景。

本文次要介绍 regular join/interval join/temproal table join 这种 3 种 join 的实战利用,次要蕴含如下几个局部:

  • 数据筹备
  • Flink SQL join 之 regular join
  • Flink SQL join 之 interval join
  • Flink SQL join 之 temproal table join
  • 总结

01 数据筹备

一般来说大部分公司的实时的数据是保留在 kafka,物料数据保留在 MySQL 等相似的关系型数据库中,依据 Flink SQL 提供的 Kafka/JDBC connector,咱们先注册两张 Flink Kafka Table 以及注册一张 Flink MySQL Table,明细建表语句如下所示:

  • 注册 Flink Kafka Table, 作为两条须要 join 的数据流;对于点击流,咱们定义 Process time 工夫属性,用来做 temproal table join,同时也定义 Event Time 和 watermark,用来做双流 join;对于曝光流,咱们定义 Event Time 和 watermark,用来做双流 join。
DROP TABLE IF EXISTS flink_rtdw.demo.adsdw_dwd_max_click_mobileapp;
CREATE TABLE flink_rtdw.demo.adsdw_dwd_max_click_mobileapp (
  ...   
  publisher_adspace_adspaceId INT COMMENT '广告位惟一 ID',
  ...
  audience_behavior_click_creative_impressionId BIGINT COMMENT '受众用户点击的广告创意的 ImpressionId',
  audience_behavior_click_timestamp BIGINT COMMENT '受众用户点击广告的工夫戳 (毫秒)',
  ...
  procTime AS PROCTIME(), 
  ets AS TO_TIMESTAMP(FROM_UNIXTIME(audience_behavior_click_timestamp / 1000)),
  WATERMARK FOR ets AS ets - INTERVAL '5' MINUTE
) WITH (
  'connector' = 'kafka',
  'topic' = 'adsdw.dwd.max.click.mobileapp',
  'properties.group.id' = 'adsdw.dwd.max.click.mobileapp_group',
  'properties.bootstrap.servers' = 'broker1:9092,broker2:9092,broker3:9092',
  'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka-administrator"password="kafka-administrator-password";',
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.sasl.mechanism' = 'SCRAM-SHA-256',
  'avro-confluent.schema-registry.url' = 'http://schema.registry.url:8081',
  'avro-confluent.schema-registry.subject' = 'adsdw.dwd.max.click.mobileapp-value',
  'format' = 'avro-confluent'
);
  • 注册 Flink Mysql Table, 作为维度表
DROP TABLE IF EXISTS flink_rtdw.demo.adsdw_dwd_max_show_mobileapp;
CREATE TABLE flink_rtdw.demo.adsdw_dwd_max_show_mobileapp (
     ...
     audience_behavior_watch_creative_impressionId BIGINT COMMENT '受众用户观看到的广告创意的 ImpressionId',
     audience_behavior_watch_timestamp BIGINT COMMENT '受众用户观看到广告的工夫 (毫秒)',
     ...
     ets AS TO_TIMESTAMP(FROM_UNIXTIME(audience_behavior_watch_timestamp / 1000)),
     WATERMARK FOR ets AS ets - INTERVAL '5' MINUTE
) WITH (
  'connector' = 'kafka',
  'topic' = 'adsdw.dwd.max.show.mobileapp',
  'properties.group.id' = 'adsdw.dwd.max.show.mobileapp_group',
  'properties.bootstrap.servers' = 'broker1:9092,broker2:9092,broker3:9092',
  'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka-administrator"password="kafka-administrator-password";',
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.sasl.mechanism' = 'SCRAM-SHA-256',
  'avro-confluent.schema-registry.url' = 'http://schema.registry.url:8081',
  'avro-confluent.schema-registry.subject' = 'adsdw.dwd.max.show.mobileapp-value',
  'format' = 'avro-confluent'
);

02 Flink SQL join 之 regular join

首先介绍 regular join, 因为 regular join 是最通用的 join 类型,不反对工夫窗口以及工夫属性,任何一侧数据流有更改都是可见的,间接影响整个 join 后果。如果有一侧数据流减少一个新纪录,那么它将会把另一侧的所有的过来和未来的数据合并在一起,因为 regular join 没有剔除策略,这就影响最新输入的后果; 正因为历史数据不会被清理,所以 regular join 反对数据流的任何更新操作。对于 regular join 来说,更适宜用于离线场景和小数据量场景。

  • 应用语法
SELECT columns
FROM t1  [AS <alias1>]
[LEFT/INNER/FULL OUTER] JOIN t2
ON t1.column1 = t2.key-name1
  • 应用场景:离线场景和小数据量场景
  • 依据大节 1 中的数据,咱们来做一个简略的 regular join,将 click 流和曝光流依据 impressionId 进行 regualr join,输入广告位和 impressionId,具体 SQL 语句如下所示:
select adsdw_dwd_max_click_mobileapp.publisher_adspace_adspaceId as publisher_adspace_adspaceId,
       adsdw_dwd_max_click_mobileapp.audience_behavior_click_creative_impressionId as click_impressionId,
       adsdw_dwd_max_show_mobileapp.audience_behavior_watch_creative_impressionId as show_impressionId
from adsdw_dwd_max_click_mobileapp  
inner join adsdw_dwd_max_show_mobileapp 
on adsdw_dwd_max_click_mobileapp.audience_behavior_click_creative_impressionId = adsdw_dwd_max_show_mobileapp.audience_behavior_watch_creative_impressionId;
  • 提交到 Flink 集群的 job 以及输入的后果如下所示:

03 Flink SQL join 之 interval join

绝对于 regular join,interval Join 则利用窗口的给两个输出表设定一个 Join 的工夫界线,超出工夫范畴的数据则对 join 不可见并能够被清理掉,这样就能修改 regular join 因为没有剔除数据策略带来 join 后果的误差以及须要大量的资源。然而应用 interval join,须要定义好工夫属性字段,能够是计算产生的 Processing Time,也能够是依据数据自身提取的 Event Time;如果是定义的是 Processing Time,则 Flink 框架自身依据零碎划分的工夫窗口定时清理数据;如果定义的是 Event Time,Flink 框架调配 Event Time 窗口并依据设置的 watermark 来清理数据。而在后面的数据筹备中,咱们依据点击流和曝光流提取实际工夫属性字段,并且设置了容许 5 分钟乱序的 watermark。目前 Interval join 曾经反对 inner ,left outer, right outer , full outer 等类型的 join。因而,interval join 只须要缓存工夫边界内的数据,存储空间占用小,计算更为精确的实时 join 后果。

  • 应用语法
-- 写法 1
SELECT columns
FROM t1  [AS <alias1>]
[LEFT/INNER/FULL OUTER] JOIN t2
ON t1.column1 = t2.key-name1 AND t1.timestamp BETWEEN t2.timestamp  AND  BETWEEN t2.timestamp + + INTERVAL '10' MINUTE;
-- 写法 2
SELECT columns
FROM t1  [AS <alias1>]
[LEFT/INNER/FULL OUTER] JOIN t2
ON t1.column1 = t2.key-name1 AND t2.timestamp <= t1.timestamp and t1.timestamp <=  t2.timestamp + + INTERVAL’10' MINUTE ;
  • 如何设置边界条件
right.timestamp ∈ [left.timestamp + lowerBound, left.timestamp + upperBound]
  • 应用场景:双流 join 场景
  • 依据大节 1 中的数据,咱们来做一个 inertval join(用 between and 的形式),将 click 流和曝光流依据 impressionId 进行 interval join, 边界条件是点击流介于曝光流产生到曝光流产生后的 10 分钟,输入广告位和 impressionId,具体 SQL 语句如下所示:
select adsdw_dwd_max_click_mobileapp.publisher_adspace_adspaceId as publisher_adspace_adspaceId,
       adsdw_dwd_max_click_mobileapp.audience_behavior_click_creative_impressionId as click_impressionId,
       adsdw_dwd_max_show_mobileapp.audience_behavior_watch_creative_impressionId as show_impressionId
from adsdw_dwd_max_click_mobileapp  
inner join adsdw_dwd_max_show_mobileapp
on adsdw_dwd_max_click_mobileapp.audience_behavior_click_creative_impressionId = adsdw_dwd_max_show_mobileapp.audience_behavior_watch_creative_impressionId and
   adsdw_dwd_max_click_mobileapp.ets between adsdw_dwd_max_show_mobileapp.ets and adsdw_dwd_max_show_mobileapp.ets + INTERVAL '10' MINUTE;

提交到 Flink 集群的 job 以及输入的后果如下所示:

  • Interval join 有多种写法来实现 interval join,依据大节 1 中的数据咱们用 <= 的形式来实现,还是做同样的逻辑,将 click 流和曝光流依据 impressionId 进行 interval join, 边界条件是点击流介于曝光流产生到曝光流产生后的 10 分钟,输入广告位和 impressionId,具体 SQL 语句如下所示:
select adsdw_dwd_max_click_mobileapp.publisher_adspace_adspaceId as publisher_adspace_adspaceId,
       adsdw_dwd_max_click_mobileapp.audience_behavior_click_creative_impressionId as click_impressionId,
       adsdw_dwd_max_show_mobileapp.audience_behavior_watch_creative_impressionId as show_impressionId
from adsdw_dwd_max_click_mobileapp  
inner join adsdw_dwd_max_show_mobileapp
on adsdw_dwd_max_click_mobileapp.audience_behavior_click_creative_impressionId = adsdw_dwd_max_show_mobileapp.audience_behavior_watch_creative_impressionId and 
   adsdw_dwd_max_show_mobileapp.ets <= adsdw_dwd_max_click_mobileapp.ets and adsdw_dwd_max_click_mobileapp.ets <= adsdw_dwd_max_show_mobileapp.ets + INTERVAL '10' MINUTE;
  • 提交到 Flink 集群的 job 以及输入的后果如下所示:


04 Flink SQL join 之 temproal table join

上节中 interval Join 提供了剔除数据的策略,解决资源问题以及计算更加精确,这是有个前提:join 的两个流须要工夫属性,须要明确工夫的下界,来不便剔除数据;显然,这种场景不适宜维度表的 join,因为维度表没有工夫界线,对于这种场景,Flink 提供了 temproal table join 来笼罩此类场景。

在 regular join 和 interval join 中,join 两侧的表是平等的,任意的一个表的更新,都会去和另外的历史纪录进行匹配,temproal table 的更新对另一表在该工夫节点以前的记录是不可见的。而在 temproal table join 中,比拟显著的应用场景之一就是点击流去 join 广告位的维度表,引入广告位的中文名称。

  • 应用语法
SELECT columns
FROM t1  [AS <alias1>]
[LEFT] JOIN t2 FOR SYSTEM_TIME AS OF t1.proctime [AS <alias2>]
ON t1.column1 = t2.key-name1
  • 应用场景:维度表 join 场景

依据大节 1 中的数据,咱们来做一个 temproal table join,将 click 流和广告位维度表依据广告位 Id 进行 temproal rable join,输入广告位和广告位中文名字,具体 SQL 语句如下所示:

select adsdw_dwd_max_click_mobileapp.publisher_adspace_adspaceId as publisher_adspace_adspaceId,
       mysql_dim_table.name as publisher_adspace_name
from adsdw_dwd_max_click_mobileapp
join mysql_dim_table FOR SYSTEM_TIME AS OF adsdw_dwd_max_click_mobileapp.procTime
on adsdw_dwd_max_click_mobileapp.publisher_adspace_adspaceId = mysql_dim_table.ID;
  • 提交到 Flink 集群的 job 以及输入的后果如下所示:


05 总结

下面简略介绍 Flink SQL 三种 join 形式的应用,个别对于流式 join 来说,对于双流 join 的场景,举荐应用 interval join,对于流和维度表 join 的场景举荐应用 temproal table join。

作者简介

余敖,360 数据开发高级工程师,目前专一于基于 Flink 的实时数仓建设与平台化工作。对 Flink、Kafka、Hive、Spark 等进行数据 ETL 和数仓开发有丰盛的教训。

正文完
 0