关于flink:Flink-SQL-在快手的扩展和实践

42次阅读

共计 7535 个字符,预计需要花费 19 分钟才能阅读完成。

摘要:本文整顿自快手实时计算团队技术专家张静、张芒在 Flink Forward Asia 2021 的分享。次要内容包含:

  1. Flink SQL 在快手
  2. 性能扩大
  3. 性能优化
  4. 稳定性晋升
  5. 将来瞻望

FFA 2021 直播回放 & 演讲 PDF 下载

一、Flink SQL 在快手

通过一年多的推广,快手外部用户对 Flink SQL 的认可度逐步进步,往年新增的 Flink 作业中,SQL 作业达到了 60%,与去年相比有了一倍的晋升,峰值吞吐达到了 6 亿条 / 秒。

二、性能扩大

为了反对外部的业务需要,快手做了很多性能扩大,本文重点分享其中的两个围绕窗口的扩大,一个是 Group Window Aggregate 扩大,一个是在 Flip-145 里提出的 Window Table-valued Function 扩大。

解释一下以上两者的区别和分割:

  • Group Window Aggregate 是在 Flink 1.12 和更早的版本里用来做窗口聚合的,它有两个局限性,第一个是它的语法不合乎 SQL 规范,要借助非凡的窗口函数,还要配合窗口辅助函数来实现作业聚合。另外它还限度了窗口函数只能呈现在 group by 的子句外面,所以只能用于聚合;
  • 因而 Flink 在 Flip-145 里提出了 Window TVF,它是基于 2017 年的 SQL 规范里提出的多态表函数的语法,另外它除了能够在窗口上做聚合,还能够做窗口关联,TopN 和去重等操作。

2.1 Group Window Aggregate 扩大

大家可能会问,既然曾经有 Window TVF 了,为什么还要在 Group Window Aggregate 上做性能扩大呢?因为快手是在往年下半年才开始进行版本 1.10 到 1.13 的降级,大部分业务还是在应用 1.10 版本。

在 Group Window Aggregate 上快手做了两个扩大,一个是反对多维聚合,一个是引入高阶窗口函数。

2.1.1 反对多维分析

Flink SQL 很早就反对有限流上的多维聚合,快手在 Group Window Aggregate 上也减少了多维分析的性能,反对规范的 Grouping Sets、Rollup 和 CUBE 子句,另外还反对各种窗口类型,比方滚动、滑动、会话窗口等。

比方上图实例,须要统计主题维度和总维度下的累计 UV,SQL 的 group by 子句里蕴含两局部:一个是 CUMULATE 窗口函数,一个是 Grouping Sets 子句。括号里有两个元素:一个示意总维度,一个示意主题维度。

2.1.2 引入高阶窗口函数

数据分析的开发者常常会遇到这样的需要,绘制一条曲线,每个点的含意是当天 0 点到以后工夫点的累计指标,横坐标示意工夫,纵坐标是示意累计的指标。对这样的需要能够有两个解决方案:

  • 第一个计划是应用有限流聚合,把工夫归一到分钟粒度当前作为 group key 的一列,然而业务上要求输入到屏幕上的曲线不再变动,而有限流聚合的输入后果是一个更新流,所以不符合要求。
  • 第二个计划是应用一天的滚动窗口函数。为了提前输入后果,还是要设置提前触发,工夫点选用以后机器的工夫或者是历史输出数据里最大的工夫戳。这个计划的毛病,首先是每个点的纵坐标的值并不是这个工夫点上的累计值。这会导致几个异常现象,比方作业失败重启或者是业务被动回溯历史的时候,不能齐全还原过后的历史曲线。而且各个点上分维度的累计值加起来也不等于总维度的值。还有一个毛病,统计 UV 的时候,咱们常常会应用两阶段的聚合来防止 distinct key 的歪斜,然而应用这个计划的时候,本来本身的曲线上可能会呈现凹坑。

上图是计划二导致的一些异样曲线:

  • 第一个曲线是进行历史回溯,lag 打消当前曲线才开始失常,在没有齐全打消 lag 的时候,曲线是不平滑的,而且不能还原历史曲线。
  • 第二个曲线是自增曲线上呈现凹坑。

因为第一级聚合的输入流是一个更新流,Flink 目前的更新机制是发送撤回和更新两条独立的音讯,而不是一个原子音讯,所以第二个聚合可能会先收到上游多个并发上发下来的撤回音讯,这就会导致累计值先降落再回升,所以造成了凹坑。

咱们引入 CUMULATE 窗口来解决这些问题。

这个窗口函数和 Flip-145 里提出的 CUMULATE 窗口是不约而同的,只是语法上在 Group Window Aggregate 上引入这个窗口类型。它有三个必选参数:工夫属性列、窗口的步长和 max size,还有一个可选参数,用来指定窗口开始的偏移量。

对于 CUMULATE 窗口的划分逻辑,假如 CUMULATE 窗口的步长是一分钟,max size 是三分钟,window1 的区间是 0~1 分,window2 是 0~2 分,window3 是 0~3 分,window4 开始是 3~4 分,window5 是 3~5 分,以此类推,而一条工夫戳是 0 分 30 秒的数据,会被划分到 window1、window2 和 window3 三个窗口里。

比方须要绘制一条数据曲线,一分钟打一个点,每个点示意各个子页面当天累计 UV。查问语句采纳事件工夫,CUMULATE 窗口函数的步长是一分钟,max size 是一天,业务的 group key 是子页面的 ID,工夫戳是窗口的完结工夫。

上图能够看到,应用 CUMULATE 计划绘制进去的曲线不论是失常生产还是历史回溯都很平滑。

CUMULATE 窗口的长处

  • 第一个长处是应用窗口的完结工夫作为每个点的横坐标,曲线上每个点的纵坐标就是在横坐标对应工夫点上的累计值,所以无论在回溯历史或者是作业产生 failover 的状况下,曲线都能够齐全还原,而且各个工夫点上分维度的值加起来总是等于总维度的值。
  • 第二个长处是应用两阶段聚合,可能避免 distinct key 歪斜。因为数据是在窗口完结的时候才发送,所以不存在撤回,输入的是 append 流,因而自增曲线上也不会有凹坑。

Dynamic cumulate window

Dynamic cumulate window 也是为了解决曲线类的需要,比方计算直播间自开播以来的累计指标,与后面需要的不同点是每个直播间的开播关播能继续多久都是不确定的,但它也是一种计算累计指标。它有两个必选参数:工夫属性列和窗口的步长,还有一个可选参数窗口的 gap,用来定义窗口多久没有输出数据就认为它曾经完结了。这里须要留神,一个窗口完结会触发窗口的后果输入,而且会清理掉状态,如果又来了一条雷同 key 的数据,早退的数据会被抛弃,没有早退的数据会被划分到新的窗口去,累计值也会从零开始。

如上图案例,须要绘制一个曲线,每个点示意每个直播间开播以来的累计 UV,如果一个直播间间断一个小时没有数据流入,则认为关播。窗口函数应用 Dynamic cumulate window,步长是 1 分钟,gap 是 60 分钟,Group key 是直播间 ID,工夫戳仍然应用窗口的完结工夫。

2.2 Window Table-valued Function 扩大

2.2.1 丰盛 Window TVF 算子

社区在 Flip-145 中提出的 Window Table-valued Function (window tvf) 语法,并且实现了窗口聚合。在这个根底上咱们丰盛了窗口算子,包含 TopN、关联和去重,还反对了一个独自的 window Table-valued Function 的查问语句,这些性能都曾经陆续推到社区的各个版本里。有了这些窗口的算子,用户就能够用 Flink SQL 实现更简单的业务逻辑。

如上图,须要统计当天最热销的 100 件商品的销售额和买家数,以及这些爆品所属主播的销售状况。首先做一个窗口聚合,失去 0 点以来每个商品的销售额和买家数,再做一个窗口聚合,失去每个主播所有宝贝的买家数,两个后果做窗口关联,最初做窗口上的 TopN,失去前 100 名爆品以及它的各项累计指标。

2.2.2 反对 Window Offset

window offset 次要用来调整窗口的划分逻辑,它是个可选参数,默认值是 0,示意 unix 工夫的零点作为窗口划分的起始工夫,它的值能够是一个负数,示意向右偏移,也能够是一个正数,示意向左偏移。然而它只会影响如何划分窗口,不会影响 watermark。另外,雷同的窗口,不同的 offset 可能会产生雷同的偏移成果,比方对一个 10 分钟的滚动窗口,把终点向左偏移 4 分钟或者向右偏移 6 分钟,对窗口划分产生的影响是一样的。

如上图实例,须要绘制一个数据曲线,每分钟打一个点示意每个页面本周以来的累积 UV。能够应用 CUMULATE 窗口函数,采纳事件工夫,步长是 1 分钟,max size 是 7 天。因为 unix time 零点那天是在周四,假如用默认的 offset,窗口划分就是从本周四到下周四,所以要设置 offset 为 4 天,示意向右偏移 4 天,这样就是从本周一到下周一。

2.2.3 反对 Batch Mode

另外咱们还减少了对批模式的反对。原理是引入一个 windows 算子,给输出数据附上所属的窗口属性后发给上游,而上游的算子复用批上曾经存在的算子,比如说聚合上是用 HashAgg 或者 SortAgg,关联上是 HashJoin 或者 SortMergeJoin,这些批上的算子和流模式下的算子相比,不须要状态,所以吞吐上也有更好的体现。

三、性能优化

本文次要介绍两个方面的优化,一个是聚合上的状态优化,一个是维表关联上的攒批优化。

3.1 聚合上的状态优化

先通过一个例子来理解一下聚合场景下 distinct states 的状态复用。须要统计利用下每个子频道的 UV,该用例有两个特点,频道是可枚举的以及每个频道访客的重合度很高的。

最原始的查问语句如上图,group key 是一个频道,用一个 count distinct 来计算各个频道的 UV。设施汇合在状态中首先是存在一个 map state,假如频道的枚举只有三个,A、B 和 other,group key 是频道 ID, map state 的 key 设施 ID,value 是一个 64 bit 的 long 类型的值,每个 bit 示意这个频道下该设施是否呈现,在简略的场景下这个 Value 值就是 1。

上图 A 频道下有两个设施,ID 别离是 1 和 2,ID 为 1 的设施同时拜访了 B 频道,id 为 2 的设施同时拜访了 other 频道。能够发现,不同频道的 map 能够有大量的重合,想要复用这些 key,能够用社区提供的办法来手动改写 SQL。

首先做个行转列的操作,把三个频道值拍到三个 count distinct 聚合函数的 filter 条件,在输入之前再用一个自定义的表函数来做列转行。

改写后的查问语句、设施汇合的状态和存储如上图。Group key 是 empty,map state key 是设施 ID,map state value 是一个 64bit 的 long 类型,每个 bit 示意各频道下此设施是否呈现,比方 ID 为 1 的设施 value 是 110,示意这个设施拜访了频道 A 和 B,ID 为 3 的设施拜访了频道 B 和 other。

这个计划大大减少了状态,但也存在两个毛病。第一是须要手动改写 SQL,如果一个维度有多个值或有多个可枚举的维度,那么手动改写的 SQL 会很长,另外一个毛病是须要用自定义的表函数进行列转行转换。

咱们提出一种简化的 SQL 表达方式,既能达到状态上的收益,又能加重数据开发人员的累赘。用户只须要在查问语句里,通过一个形式通知优化器 group key 的枚举值,优化器就会主动改写,进行转列和列转行,改写后就能够复用 distinct map state。改写后等价下的查问语句,只须要在过滤条件里指定枚举值就能够,用 in 或 or 的表达方式都能够。

上述性能优化能够用在有限流聚合和窗口聚合,并且一个可枚举维度或多个可枚举维度都是能够的,能够用在简略的聚合查问,也能够用在多维聚合。

但它的限度条件是 group key 外面至多有一个 key 是可枚举的,而且枚举值必须是动态的,可能明确写在过滤条件里。另外每个维度下的 distinct key 得有重合能力达到节约状态的成果。如果须要统计每个省份的 UV,基本上能够认为不同省份的访客是没有交加的,这个时候复用 distinct key 是没有收益的。另外在窗口聚合的时候,窗口函数必须具备行语义,不能够是汇合语义。对于行语义的窗口,以后这个数据属于哪个窗口取决于数据自身;然而对于汇合语义的窗口,以后这条数据属于哪个窗口,不仅取决于数据自身,还取决于这个窗口收到过的历史数据汇合。这个优化调整聚合算子的 group key,会影响每个窗口收到的数据汇合,所以不适用于汇合语义的窗口。

最初可能有用户会问,为什么语法上不采纳 Calcite 提供 pivot/unpivot 来显式地表白行转列和列转行。首先是条件不具备,因为 calcite 的 1.26 版本才引入 pivot,1.27 才引入 unpivot,而 Flink 从 1.12 版本至今都是依赖 Calcite 1.26。第二个起因是如果用 pivot/unpivot 的语法,SQL 会比当初表达方式长很多。

3.2 维表关联的攒批优化

维表关联的攒批优化是为了缩小 RPC 的调用次数。原理是攒一批数据当前,调用维表的批量查问接口,语法上咱们引入通用的 Mini-Batch hint,它有两个参数:一个示意多长时间攒一批,一个示意多少条数据攒一批。一个非法的 Mini-Batch hint 的至多蕴含一个参数。咱们将 hint 设计得很通用,心愿它不仅能够用于维表关联,还能够用于聚合的攒批优化。

再看一个例子,须要打宽订单表,关联订单的客户信息。查问语句在 customers 维表前面跟一个 hint 示意 5 秒攒一批或 1 万条数据攒一批,这个优化在底层算子和设计的实现上,远比 SQL 语法的表白要简单得多。

四、稳定性晋升

稳定性方面次要介绍对 Group Window Aggregate 解决数据歪斜和 Flink SQL 聚合指标调整之后的状态兼容这两局部快手做的一些优化和改良。

4.1 Group Window Aggregate 的数据歪斜

window 计算在快手外部的利用十分宽泛,快手的业务场景比拟容易遇到数据歪斜,比方大主播的直播、一些重大流动等。实时计算如果遇到数据歪斜,轻则指标提早,重则数据事变,所以咱们在 Tumble window 上反对了 Mini-Batch、Local-Global、Split Distinct 的优化,其余罕用的 window 上也反对了相似的优化。这些优化上线之后,不仅可能帮忙业务躲避数据歪斜,同时还能够带来不错的性能收益。

4.2 Aggregate State 兼容

首先来看 Flink SQL 上 Aggregate state 兼容的业务场景。随着业务的倒退,日常运行的工作可能要新增指标或者删除不须要的指标。重大流动过程中,业务要新增天级累计的指标或者流动周期继续累积的指标。

如果是天级指标的变更,开发者只能抛弃状态,在 0 点之后降级工作,而后再指定工作从零点的数据开始生产,从而保障指标的间断。如果是流动继续累积的指标,为了防止对原有指标的影响,只能抉择新增一个工作来独自计算新增指标,然而这样会导致资源的冗余。

之所以须要这么简单的操作,是因为 Flink SQL 判断 state 是否兼容的策略比较简单,只看引擎须要的 state 和 Savepoint 里保留的 state 的数据类型是否完全一致,完全一致就是兼容,否则不兼容。这种判断形式存在一个破绽,State 的类型没变然而 SQL 中的聚合函数变了,这种状况 Flink 也会认为状态是兼容的。

基于这个背景,咱们提出了 aggregate state 的兼容,指标是使用户学习应用 state 兼容计划的老本极低 (或 0 老本),用户能够随时降级工作,不须要再卡零点操作,反对对聚合函数的新增和删除操作。

aggregate state 兼容的操作场景只能在聚合函数尾部新增聚合函数,容许删除任意地位的聚合函数,不容许批改聚合函数的程序,也不容许一次降级同时有新增和删除两种操作,须要分为两次降级实现。

上图右表格是指标标识和 state 类型的映射关系。为了不便判断 state 是否兼容,咱们把指标标识和 state 类型的映射关系保留到 state 的 meta 中,因为有的聚合函数可能有不止一个 state,比方 avg 函数,它就须要通过 sum 和 count 两个 state 来辅助计算,所以这个映射关系很重要。

在新增聚合函数的时候,须要对新增的 state 做初始值填充,不同函数对应的初始值是不一样的,比方 count 的初始值是 0,然而 sum 的初始值必须是 null。

window 的 early-fire 和 late-fire 场景会引入 Retract 音讯,这样就多一个 state 来记录曾经下发给上游的数据。它比 window 原有的 state 多了工夫字段,在做判断和状态迁徙的时候须要做一下解决。

后面提到了咱们把指标标识和 state 类型的映射关系保留到了 state 的 meta 信息里,这会带来 state 向前兼容的问题,新版本的程序不能正确读取之前版本的 Savepoint。为了解决这个问题,须要批改 meta 的 version 信息,利用 version 信息来辨别新老版本的 state,从而做到 state 的向前兼容。

在 aggregate 的场景下,用户可能会通过设置 state TTL 来管制有效 state 的清理,aggregate state 兼容也要对这个场景做解决,保障迁徙之后的状态,TTL 的工夫戳要和原来的数据保持一致,不能做任何扭转。

Aggregate state 兼容的计划,长处是用户学习应用的老本很低,简直无感知、不依赖任何内部的服务架构,有余是对 Flink 源码有侵入,减少了将来降级 Flink 版本的老本,而且目前只能反对聚合类计算场景。

最初介绍一下快手正在做的状态兼容的终极计划。用户能够在 Savepoint 的任意地位减少、删除 state、甚至是批改 state 中的内容;还能够自定义一份残缺的 state,比方 Flink on hudi 工作的 state 初始化。

终极计划的长处是不侵入 Flink 的源码,不便 Flink 版本升级,用户能够在平台界面操作,不须要开发代码以及反对全场景的 state 兼容,不再局限于具体的场景。有余是对于用户来说学习老本比拟高,须要理解 Operator State 和 keyedState 这些比拟业余的知识点,而且还要晓得 Operator 外面是否蕴含 state。

五、将来瞻望

将来,快手会在 Stream SQL 方向继续扩大性能,晋升性能,达到降本增效的目标,以及摸索更多场景下的状态兼容;流批一提方面,快手将会欠缺 Flink Batch SQL 的能力,减少揣测执行、自适应查问等优化,晋升 Batch SQL 的稳定性和性能,持续拓宽业务利用场景;在数据湖和实时数仓方面,会持续推动它们在更多业务场景下的落地。


FFA 2021 直播回放 & 演讲 PDF 下载

更多 Flink 相干技术问题,可扫码退出社区钉钉交换群
第一工夫获取最新技术文章和社区动静,请关注公众号~

正文完
 0