乐趣区

关于sql:实时数仓入门训练营实时计算-Flink-版-SQL-实践

简介:《实时数仓入门训练营》由阿里云研究员王峰、阿里云高级产品专家刘一鸣等实时计算 Flink 版和 Hologres 的多名技术 / 产品一线专家齐上阵,合力搭建此次训练营的课程体系,精心打磨课程内容,直击当下同学们所遇到的痛点问题。由浅入深全方位解析实时数仓的架构、场景、以及实操利用,7 门精品课程帮忙你 5 天工夫从小白成长为大牛!

本文整顿自直播《实时计算 Flink 版 SQL 实际 - 李麟(海豹)》
视频链接:https://developer.aliyun.com/learning/course/807/detail/13887

内容简要:
一、实时计算 Flink 版 SQL 简介
二、实时计算 Flink 版 SQL 上手示例
三、开发常见问题和解法

实时计算 Flink 版 SQL 简介

(一)对于实时计算 Flink 版 SQL

实时计算 Flink 版抉择了 SQL 这种申明式语言作为顶层 API,比较稳定,也不便用户应用。Flink SQL 具备流批对立的个性,给用户对立的开发体验,并且语义统一。另外,Flink SQL 可能主动优化,包含屏蔽流计算外面 State 的复杂性,也提供了主动优化的 Plan,并且还集成了 AutoPilot 主动调优的性能。Flink SQL 的利用场景也比拟宽泛,包含数据集成、实时报表、实时风控,还有在线机器学习等场景。

(二)基本操作

在基本操作上,能够看到 SQL 的语法和规范 SQL 十分相似。示例中包含了根本的 SELECT、FILTER 操作。,能够应用内置函数,如日期的格式化,也能够应用自定义函数,比方示例中的汇率转换就是一个用户自定义函数,在平台上注册后就能够间接应用。

(三)维表 Lookup Join

在理论的数据处理过程中,维表的 Lookup Join 也是一个比拟常见的例子。

这里展现的是一个维表 INNER JOIN 示例。

例子中显示的 SOURCE 表是一个实时变动的订单信息表,它通过 INNER JOIN 去关联维表信息,这里标黄高亮的就是维表 JOIN 的语法,能够看到它和传统的批处理有一个写法上的差别,多了 FOR SYSTEM\_TIME AS OF 这个子句来表明它是一个维表 JOIN 的操作。SOURCE 表每来一条订单音讯,它都会触发维表算子,去做一次对维表信息的查问,所以把它叫做一个 Lookup Join。

(四)Window Aggregation

Window Aggregation(窗口聚合)操作也是常见的操作,Flink SQL 中内置反对了几种罕用的 Window 类型,比方 Tumble Window,Session Window,Hop Window,还有新引入的 Cumulate Window。

Tumble

Tumble Window 能够了解成固定大小的工夫窗口,也叫滚窗,比如说 5 分钟、10 分钟或者 1 个小时的固定距离的窗口,窗口之间没有重叠。

Session

Session Window(会话窗口)定义了一个间断事件的范畴,窗口定义中的一个参数叫做 Session Gap,示意两条数据的距离如果超过定义的时长,那么前一个 Window 就完结了,同时生成了一个新的窗口。

Hop

Hop Window 不同于滚动窗口的窗口不重叠,滑动窗口的窗口之间能够重叠。滑动窗口有两个参数:size 和 slide。size 为窗口的大小,slide 为每次滑动的步长。如果 slide < size,则窗口会重叠,同一条数据可能会被调配到多个窗口;如果 slide = size,则等同于 Tumble Window。如果 slide > size,窗口之间没有重叠且有间隙。

Cumulate

Cumulate Window(累积窗口),是 Flink 社区 1.13 版本里新引入的,能够比照 Hop Window 来了解,区别是从 Window Start 开始一直去累积。示例中 Window 1、Window 2、Window 3 是在一直地增长的。它有一个最大的窗口长度,比方咱们定义 Window Size 是一天,而后 Step 步长是 1 个小时,那么它会在一天中的每个小时产生累积到以后小时的聚合后果。

看一个具体的 Window 聚合解决示例。

如上图所示,比如说须要进行每 5 分钟单个用户的点击数统计。

源数据是用户的点击日志,咱们冀望算出每 5 分钟单个用户的点击总数,SQL 中应用的是社区最新的 WindowTVF 语法,先对源表开窗,再 GROUP BY 窗口对应的属性 window\_start 和 window\_end,COUNT(*) 就是点击数统计。

能够看到,当解决 12:00 到 12:04 的数据,有 2 个用户产生了 4 次点击,别离能统计进去用户 Mary 是 3 次,Bob 是 1 次。在接下来一批数据外面,又来了 3 条数据,对应地更新到下一个窗口中,别离是 1 次和 2 次。

(五)Group Aggregation

绝对于 Window Aggregation 来说,Group Aggregation 间接触发计算,并不需要等到窗口完结,实用的一个场景是计算累积值。

上图的例子是单个用户累积到以后的点击数统计。从 Query 上看,写法绝对简略一点,间接 GROUP BY user 去计算 COUNT(*),就是累积计数。

能够看到,在后果上和 Window 的输入是有差别的,在与 Window 雷同的前 4 条输出数据,Group Aggregation 输入的后果是 Mary 的点击数已更新到 3 次,具体的计算过程可能是从 1 变成 2 再变成 3,Bob 是 1 次,随着前面 3 条数据的输出,Bob 对应的点击数又会更新成 2 次,对后果是继续更新的过程,这和 Window 的计算场景是有一些区别的。

之前 Window 窗口外面输入的数据,在窗口完结后后果就不会再扭转,而在 Group Aggregation 里,同一个 Group Key 的后果是会产生继续更新的。

(六)Window Aggregation Vs Group Aggregation

更全面地比照一下 Window 和 Group Aggregation 的一些区别。

Window Aggregation 在输入模式上是按时输入,是在定义的数据到期之后它才会输入。比方定义 5 分钟的窗口,后果是提早输入的,比方 00:00~00:05 这个时间段,它会等整个窗口数据都到齐之后,才残缺输入进去,并且后果只输入一次,不会再扭转。

Group Aggregation 是数据触发,比方第一条数据来它就会输入后果,同一个 Key 的第二条数据来后果会更新,所以在输入流的性质上两者也是不一样的。Window Aggregation 个别状况下输入的是 Append Stream,而在 Group Aggregation 输入的是 Update Stream。

在状态 State 解决上两者的差别也比拟大。Window Aggregation 会主动清理过期数据,用户就不须要额定再去关注 State 的收缩状况。Group Aggregation 是基于有限的状态去做累积,所以须要用户依据本人的计算场景来定义 State 的 TTL,就是 State 保留多久。

比方统计一天内累计的 PV 和 UV,不思考数据提早的状况,也至多要保障 State 的 TTL 要大于等于一天,这样能力保障计算的精确性。如果 State 的 TTL 定义成半天,统计值就可能不精确了。

对输入的存储要求也是由输入流的性质来决定的。在 Window 的输入上,因为它是 Append 流,所有的类型都是能够对接输入的。而 Group Aggregatio 输入了更新流,所以要求指标存储反对更新,能够用 Hologres、MySQL 或者 HBase 这些反对更新的存储。

实时计算 Flink 版 SQL 上手示例

上面通过具体的例子来看每一种 SQL 操作在实在的业务场景中会怎么应用,比方 SQL 根本的语法操作,包含一些常见的 Aggregation 的应用。

(一)示例场景阐明:电商交易数据 – 实时数仓场景

这里的例子是电商交易数据场景,模仿了实时数仓里分层数据处理的状况。

在数据接入层,咱们模仿了电商的交易订单数据,它包含了订单 ID,商品 ID,用户 ID,交易金额,商品的叶子类目,交易工夫等根本信息,这是一个简化的表。

示例 1 会从接入层到数据明细层,实现一个数据荡涤工作,此外还会做类目信息的关联,而后数据的汇总层咱们会演示怎么实现分钟级的成交统计、小时级口径怎么做实时成交统计,最初会介绍下在天级累积的成交场景上,怎么去做准实时统计。

– 示例环境:内测版

演示环境是目前内测版的实时计算 Flink 产品,在这个平台能够间接做一站式的作业开发,包含调试,还有线上的运维工作。

– 接入层数据

应用 SQL DataGen Connector 生成模仿电商交易数据。

接入层数据:为了不便演示,简化了链路,用内置的 SQL DataGen Connector 来模仿电商数据的产生。

这外面 order\_id 是设计了一个自增序列,Connector 的参数没有残缺贴出来。DataGen Connector 反对几种生成模式,比方能够用 Sequence 产生自增序列,Random 模式能够模仿随机值,这里依据不同的字段业务含意,抉择了不同的生成策略。

比方 order\_id 是自增的,商品 ID 是随机选取了 1~10 万,用户 ID 是 1~1000 万,交易金额用分做单位,cate\_id 是叶子类目 ID,这里共模仿 100 个叶子类目,间接通过计算列对商品 ID 取余来生成,订单创立工夫应用以后工夫模仿,这样就能够在开发平台上调试,而不须要去创立 Kafka 或者 DataHub 做接入层的模仿。

(二)示例 1 -1 数据荡涤

– 电商交易数据 - 订单过滤

这是一个数据荡涤的场景,比方须要实现业务上的订单过滤,业务方可能会对交易金额有最大最小的异样过滤,比方要大于 1 元,小于 1 万才保留为无效数据。

交易的创立工夫是选取某个时刻之后的,通过 WHERE 条件组合过滤,就能够实现这个逻辑。

实在的业务场景可能会简单很多,上面来看下 SQL 如何运行。

这是应用调试模式,在平台上点击运行按钮进行本地调试,能够看到金额这一列被过滤,订单创立工夫也都是大于要求的工夫值。

从这个简略的荡涤场景能够看到,实时和传统的批处理相比,在写法上包含输入后果差别并不大,流作业次要的差别是运行起来之后是长周期放弃运行的,而不像传统批处理,解决完数据之后就完结了。

(三)示例 1 -2 类目信息关联

接下来看一下怎么做维表关联。

依据方才接入层的订单数据,因为原始数据外面是叶子类目信息,在业务上须要关联类目标维度表,维度表外面记录了叶子类目到一级类目标关联关系,ID 和名称,荡涤过程须要实现的指标是用原始表外面叶子类目 ID 去关联维表,补齐一级类目标 ID 和 Name。这里通过 INNER JOIN 维表的写法,关联之后把维表对应的字段选出来。

和批处理的写法差别仅仅在于维表的非凡语法 FOR SYSTEM\_TIME AS OF。

如上所示,平台上能够上传本人的数据用于调试,比方这里应用了 1 个 CSV 的测试数据,把 100 个叶子类目映射到 10 个一级类目上。

对应叶子类目 ID 的个位数就是它一级类目标 ID,会关联到对应的一级类目信息,返回它的名称。本地调试运行长处是速度比拟快,能够即时看到后果。在本地调试模式中,终端收到 1000 条数据之后,会主动暂停,避免后果过大而影响应用。

(四)示例 2 -1 分钟级成交统计

接下来咱们来看一下基于 Window 的统计。

第一个场景是分钟级成交统计,这是在汇总层比拟罕用的计算逻辑。

分钟级统计很容易想到 Tumble Window,每一分钟都是各算各的,须要计算几个指标,包含总订单数、总金额、成交商品数、成交用户数等。成交的商品数和用户数要做去重,所以在写法上做了一个 Distinct 解决。
窗口是刚刚介绍过的 Tumble Window,依照订单创立工夫去整齐分钟的窗口,而后按一级类目标维度统计每一分钟的成交状况。

– 运行模式

上图和方才的调试模式有点区别,上线之后就真正提交到集群里去运行一个作业,它的输入采纳了调试输入,间接 Print 到 Log 里。开展作业拓扑,能够看到主动开启了 Local-Global 的两阶段优化。

– 运行日志 – 查看调试输入后果

在运行一段时间之后,通过 Task 外面的日志能够看到最终的输入后果。

用的是 Print Sink,会间接打到 Log 外面。在实在场景的输入上,比方写到 Hologres/MySQL,那就须要去对应存储的数据库上查看。

能够看到,输入的数据绝对于数据的原始工夫是存在肯定滞后的。

在 19:46:05 的时候,输入了 19:45:00 这一个窗口的数据,提早了 5 秒钟左右输入前 1 分钟的聚合后果。

这 5 秒钟实际上和定义源表时 WATERMARK 的设定是有关系的,在申明 WATERMARK 时是绝对 gmt\_create 字段加了 5 秒的 offset。这样起到的成果是,当达到的最早数据是 19:46:00 时,咱们认为水位线是到了 19:45:55,这就是 5 秒的提早成果,来实现对乱序数据的宽容解决。

(五)示例 2 -2 小时级实时成交统计

第二个例子是做小时级实时成交统计。

如上图所示,当要求实时统计,间接把 Tumble Window 开成 1 小时 Size 的 Tumble Window,这样能满足实时性吗?依照方才展现的输入后果,具备肯定的提早成果。因而开一个小时的窗口,必须等到这一个小时的数据都收到之后,在下一个小时的开始,能力输入上一个小时的后果,提早在小时级别的,满足不了实时性的要求。回顾之前介绍的 Group Aggregation 是能够满足实时要求的。

具体来看,比方须要实现小时 + 类目以及只算小时的两个口径统计,两个统计一起做,在传统批处理中罕用的 GROUPING SETS 性能,在实时 Flink 上也是反对的。

咱们能够间接 GROUP BY GROUPING SETS,第一个是小时全口径,第二个是类目 + 小时的统计口径,而后计算它的订单数,包含总金额,去重的商品数和用户数。

这种写法对后果加了空值转换解决便于查看数据,就是对小时全口径的统计,输入的一级类目是空的,须要对它做一个空值转换解决。

上方为调试模式的运行过程,能够看到 Datagen 生成的数据实时更新到一级类目和它对应的小时上。

这里能够看到,两个不同 GROUP BY 的后果在一起输入,两头有一列 ALL 是通过空值转换来的,这就是全口径的统计值。本地调试相对来说比拟直观和不便,有趣味的话也能够到阿里云官网申请或购买进行体验。

(六)示例 2 -3 天级累积成交准实时统计

第三个示例是天级累计成交统计,业务要求是准实时,比如说可能承受分钟级的更新提早。

依照方才 Group Aggregation 小时的实时统计,容易联想到间接把 Query 改成天维度,就能够实现这个需要,而且实时性比拟高,数据触发之后能够达到秒级的更新。

回顾下之前提到的 Window 和 Group Aggregation 对于内置状态解决上的区别,Window Aggregation 能够实现 State 的主动清理,Group Aggregation 须要用户本人去调整 TTL。因为业务上是准实时的要求,在这里能够有一个代替的计划,比方用新引入的 Cumulate Window 做累积的 Window 计算,天级的累积而后应用分钟级的步长,能够实现每分钟更新的准实时要求。

回顾一下 Cumulate Window,如上所示。天级累积的话,Window 的最大 Size 是到天,它的 Window Step 就是一分钟,这样就能够表白天级的累积统计。

具体的 Query 如上,这里应用新的 TVF 语法,通过一个 TABLE 关键字把 Windows 的定义蕴含在两头,而后 Cumulate Window 援用输出表,接着定义它的工夫属性,步长和 size 参数。GROUP BY 就是一般写法,因为它有提前输入,所以咱们把窗口的开始工夫和完结工夫一起打印进去。

这个例子也通过线上运行的形式去看 Log 输入。

– 运行模式

能够看到,它和之前 Tumble Window 运行的构造相似,也是预聚合加上全局聚合,它和 Tumble Window 的区别就是并不需要等到这一天数据都到齐了才输入后果。

– 运行日志 – 察看调试后果

从上方示例能够看到,在 20:47:00 的时候,曾经有 00:00:00 到 20:47:00 的后果累积,还有对应的 4 列统计值。下一个输入就是接下来的累计窗口,能够看到 20:47:00 到 20:48:00 就是一个累计的步长,这样既满足了天级别的累计统计需要,也可能满足准实时的要求。

(七)示例小结:电商交易数据 - 实时数仓场景

而后咱们来整体总结一下以上的示例。

在接入层到明细层的荡涤解决特点是绝对简略,也比拟明确,比方业务逻辑上须要做固定的过滤条件,包含维度的扩大,这都是十分明确和间接的。

从明细层到汇总层,例子中的分钟级统计,咱们是用了 Tumble Window,而小时级因为实时性的要求,换成了 Group Aggregation,而后到天级累积别离展现 Group Aggregation 和新引入的 Cumulate Window。

从汇总层的计算特点来说,咱们须要去关注业务上的实时性要求和数据准确性要求,而后依据理论状况抉择 Window 聚合或者 Group 聚合。

这里为什么要提到数据准确性?

在一开始比拟 Window Aggregation 和 Group Aggregation 的时候,提到 Group Aggregation 的实时性十分好,然而它的数据准确性是依赖于 State 的 TTL,当统计的周期大于 TTL,那么 TTL 的数据可能会失真。

相同,在 Window Aggregation 上,对乱序的容忍度有一个下限,比方最多承受等一分钟,但在理论的业务数据中,可能 99% 的数据能满足这样的要求,还有 1% 的数据可能须要一个小时后才来。基于 WATERMARK 的解决,默认它就是一个抛弃策略,超过了最大的 offset 的这些数据就会被抛弃,不纳入统计,此时数据也会失去它的准确性,所以这是一个绝对的指标,须要依据具体的业务场景做抉择。

开发常见问题和解法

(一)开发中的常见问题

上方是实时计算实在业务接触过程中比拟高频的问题。

首先是实时计算不晓得该如何下手,怎么开始做实时计算,比方有些同学有批处理的背景,而后刚开始接触 Flink SQL,不晓得从哪开始。

另外一类问题是 SQL 写完了,也分明输出解决的数据量大略是什么级别,然而不晓得实时作业运行起来之后须要设定多大的资源

还有一类是 SQL 写得比较复杂,这个时候要去做调试,比方要查为什么计算出的数据不合乎预期等相似问题,许多同学反映无从下手。

作业跑起来之后如何调优,这也是一个十分高频的问题。

(二)开发常见问题解法

1. 实时计算如何下手?

对于上手的问题,社区有很多官网的文档,也提供了一些示例,大家能够从简略的例子上手,缓缓理解 SQL 外面不同的算子,在流式计算的时候会有一些什么样的个性。

此外,还能够关注开发者社区实时计算 Flink 版、ververica.cn 网站、B 站的 Apache Flink 公众号等分享内容。

逐步相熟了 SQL 之后,如果想利用到生产环境中去解决实在的业务问题,阿里云的行业解决方案里也提供了一些典型的架构设计,能够作为参考。

2. 简单作业如何调试?

如果遇到千行级别的简单 SQL,即便对于 Flink 的开发同学来也不能高深莫测地把问题定位进去,其实还是须要遵循由简到繁的过程,可能须要借助一些调试的工具,比方后面演示的平台调试性能,而后做分段的验证,把小段 SQL 部分的后果正确性调试完之后,再一步一步组装起来,最终让这个简单作业能达到正确性的要求。

另外,能够利用 SQL 语法上的个性,把 SQL 组织得更加清晰一点。实时计算 Flink 产品上有一个代码构造性能,能够比拟不便地定位长 SQL 里具体的语句,这都是一些辅助工具。

3. 作业初始资源设置,如何调优?

咱们有一个教训是依据输出的数据,初始做小并发测试一下,看它的性能如何,而后再去估算。在大并发压测的时候,依照需要的吞吐量,逐渐迫近,而后拿到预期的性能配置,这个是比拟间接但也比拟牢靠的形式。

调优这一块次要是借助于作业的运行是状况,咱们会去关注一些重点指标,比如说有没有产生数据的歪斜,维表的 Lookup Join 须要拜访内部存储,有没有产生 IO 的瓶颈,这都是影响作业性能的常见瓶颈点,须要加以关注。

在实时计算 Flink 产品上集成了一个叫 AutoPilot 的性能,能够了解为相似于主动驾驶,在这种性能下,初始资源设多少就不是一个麻烦问题了。

在产品上,设定作业最大的资源限度后,依据理论的数据处理量,该用多少资源能够由引擎主动帮咱们去调到最优状态,依据负载状况来做伸缩。

版权申明: 本文内容由阿里云实名注册用户自发奉献,版权归原作者所有,阿里云开发者社区不领有其著作权,亦不承当相应法律责任。具体规定请查看《阿里云开发者社区用户服务协定》和《阿里云开发者社区知识产权爱护指引》。如果您发现本社区中有涉嫌剽窃的内容,填写侵权投诉表单进行举报,一经查实,本社区将立即删除涉嫌侵权内容。

退出移动版