关于后端:Flink-Batch-SQL-Improvements-on-Lakehouse

6次阅读

共计 7943 个字符,预计需要花费 20 分钟才能阅读完成。

本文整顿自阿里云研发工程师刘大龙(风离),在 Streaming Lakehouse Meetup 的分享。内容次要分为三个局部:

  1. Flink Batch on Paimon 挑战
  2. Flink Batch 外围优化
  3. 后续布局

点击查看原文视频 & 演讲 PPT

一、Flink Batch on Paimon 挑战

家喻户晓,Paimon 在创建之初就是为了解决流式数仓场景的问题。从上面的架构图里咱们能够看到,这里有 Flink CDC 的高效入湖,Flink SQL 进行流式、批式的 ETL、Ad-hoc 剖析,用一套引擎实现数据的入湖、剖析与查问,整个架构上十分简洁,语义对立,解决了传统 lambda 架构下实时离线的数据一致性问题。

Flink 作为一个流批一体的计算引擎,天生就具备上述能力。

作为流计算引擎,Flink 曾经是业界的事实标准;在批处理畛域,Spark 是业界的事实标准,Flink Batch 尽管整体能力上不差,然而如果想在数据湖剖析场景占据一席之地,的确还面临一些挑战。针对 Flink + Paimon 构建流式数仓场景的需要,咱们对 Flink Batch 可能存在的问题进行了总结,次要有如下几个方面:

  • Flink 如果作为一个流计算引擎,对于 Schema 变更是没有强需要的,然而和存储联合之后,就须要不断完善这类 API,补齐相干能力;
  • 在数据湖场景,会存在行级的数据更新、删除等需要;
  • 个别湖上的数据都是有版本的,每次的写入与更新,可能都会产生一个新的 Snapshot,因而咱们也须要对 Sanpshot 进行治理,比方通过 CALL 命令清理过期 Snapshot,工夫旅行查问;
  • 除了数据管理方面的需要,另外一个重要的挑战就是如何通过 Flink Batch 对湖上数据进行高效的 ETL&Ad-hoc 剖析,同时还要保障稳定性。

针对这些挑战,从 1.16 版本社区开始投入大量精力对 Flink Batch 做了诸多方面的优化。

二、Flink Batch 外围优化

在介绍 Flink Batch 外围优化之前,首先咱们按版本主线回顾一下过来三个版本别离做了哪些重要的性能。

  • Flink 1.16 从易用性、稳定性、性能方面做了诸多改良,包含 SQL Gateway、统计信息加强、Join Hint、动静分区裁剪等,该版本是 Flink 批处理里程碑式的版本,也是走向成熟的重要一步。
  • Flink 1.17 继续在批处理方面对性能、稳定性和可用性方面做了显著的改良,包含 Update&Delete API 欠缺,引入新的 Join Reorder 算法改良 Join 性能,自适应的批处理调度器成为默认的调度器,用户不再须要关怀作业并行度等。
  • Flink 1.18 社区持续投入大量精力优化 Batch,残缺的反对了湖存储上须要的各类 API,Gateway 反对 JCBC Driver,在执行层面做了 Runtime Filter,多算子交融 CodeGen 等继续优化算子性能。

2.1 Lakehouse API 加强

Flink SQL 在 API 设计这块之前更多的是面向流场景,批场景或数据分析场景的很多 API 都没有思考也没有去做。从 1.16 版本开始加强这方面的 API,以提供更好的数据管理能力。

在数据湖场景中,Schema Evolution 是一个比拟常见的需要。于是,咱们在 API 层面加强了这些方面的能力,包含反对了各类变更 Schema 的 DDL。在建表方面,咱们也反对了 Create/Replace Table As Select 语法,能够间接基于源表简略高效的创立指标表,同时把数据导入到指标表里。

其次,加强了数据湖的数据管理相干 API,在工夫旅行查问、数据更新和删除、CALL 命令等方面都有所增强。综上所述,Lakehouse API 次要是围绕着数据湖剖析场景做了欠缺了诸多性能。

上面是 Flink 和 Spark 在 API 方面的比照图,这里把其分为 DDL、DML 和 Auxlilary Statements 三类,从图中能够看出,Flink Batch 曾经根本追平 Spark。API 上曾经比较完善,用户能够纵情的把 Flink Batch 用起来。

2.2 Join 优化

当 Flink Batch 易用性方面比较完善之后,那么用的好不好、稳不稳、快不快,就是下一个须要关注的点。

Join 是咱们在业务中遇到的最多的场景,尤其是数仓大宽表的场景,可能存在几十个表进行 Join。依据笔者之前做业务的教训来看,很多时候报表出不来,都是因为上游的 Join 太慢。因而对于很多的计算引擎,Join 算子是重点优化的局部。

如果想让 Join 执行的快,前提是须要有丰盛的统计信息。统计信息对优化器来说十分重要,只有领有足够丰盛的统计信息,它能力优化出高效的执行打算来。所以,咱们首先要做的是针对统计信息的优化,这能够通过两步实现。

  • 第一,通过 Analyze Table 语法来做,用户写一个剖析统计信息的 SQL,其会转化成 Flink Batch 作业,查问物理存储生成所需的统计信息,而后写回到 Catalog 中。
  • 第二,通过 SuppotReportStatistics 接口来获取,由 Format 或是湖存储来实现这个接口,优化器优化的过程中如果从 Catalog 中拿不到统计信息,那么就能够通过这个接口实时收集统计信息,而后再基于收集到的统计信息进行执行打算的优化。在 Flink 内置的 Format 中,Parquet 和 ORC 等列存格局已反对,另外 Paimon 也已反对。

如果遇到无论用以上哪种形式都拿不到统计信息时,该如何让优化器给出一个正当的 Join 策略呢?这种状况下,在业界也有成熟的解决方案,那就是 Join Hint。

在介绍 Join Hint 之前,先介绍下 Flink Batch Join 底层的算子实现策略。

如上图所示,一共有四种 Join 策略,也是业界支流计算引擎采纳的 Join 策略。

当统计信息精确的时候,基于 CBO 的优化器是能够给出比拟优的执行打算。然而在无统计信息或者统计信息不准时,优化器可能抉择出谬误的 Join 策略,导致 Join 执行的很慢,或者无奈执行胜利。针对这种状况,一种解决办法是 Join Hint。在 FLIP-229 中 Flink 提供了 4 种 Join Hint,与上文提到的四种 Join 策略一一对应,每种 Join 策略都有对应的 Hint 策略。具体如下:

  • BROADCAST
  • SHUFFLE_HASH
  • SHUFFLE_MERGE
  • NEST_LOOP

介绍完 Join Hint 之后,来介绍下 Join 可能存在的另外一个问题,即多表 Join。针对多表 Join,须要优化器能抉择出一个比拟优的执行打算,这样能力进步执行效率。

以上图 TPC-DS q18 为例,它波及到了 6 张表的 Join,大表是 catalog_sales 和 customer 相干的一些表,除此之外还有一些小维表等。咱们的指标是针对这几张表,通过 Join Reorder 算法,让 Join 程序绝对正当,从而使得 Query 的执行效率最高。

目前,Flink 默认采纳了基于左深树的 Join Reorder 算法。针对 q18 这个例子,其优化进去的执行打算如下图所示。

什么是左深树呢?左深树如上图所示是二杈树,它的右侧是叶子节点。也就是说先用 catalog_sales 和 date_dim 两个小表进行 Join,输入的 Row Count 是最小的,而后再和残余的表一一进行比拟,而后再选出输入 Row Count 最小的表进行 Join,以此类推。

这是一种贪婪的算法,也就是每次 Join 都只看到以后最优的 Join 程序,而后再持续往后 Join。左深树算法尽管搜寻空间小,优化速度很快,然而存在几个问题:首先是很容易陷入部分最优,二是只能按程序执行,肯定只能从左侧最小面的 Join 开始执行。所以当资源比拟短缺的时候,导致资源利用率不高。另外,左深树算法还会对动静分区裁剪有肯定影响,导致某些表无奈利用上动静分区裁剪。

基于以上存在的问题,咱们在 Flink Batch 1.17 中引入了基于浓密树的 Join Reorder 算法,如下图所示。

针对 Q18 这个例子,基于浓密树算法优化进去的执行打算树曾经比拟均衡了。在 Join 的过程中,catalog_sales 表并不是先和 date_dim 这个维表做 Join,因为它采纳的是动静布局的算法,也就是全局枚举,不会只思考以后的最优,而是全局最优,这就大大提高了 Join 的效率,Q18 的性能晋升了一倍以上。

总结来讲,左深树算法是贪婪的,不必枚举所有的执行打算,所以速度比拟快;浓密树算法基于动静布局,搜寻的空间更大,也更费时间。这两种算法别离实用于不同复杂度的业务场景,也各有优缺点,目前优化器会综合思考 Query 复杂度和优化耗时,在两个算法之间做自适应的切换。

对于 Hive 数仓或是数据湖剖析场景,很多时候咱们会遇到分区表 Join。对于该类场景,咱们是能够在执行层进行优化,比方少读一些不必要的分区,这样就能够大大减少 Join 所需的数据量。

上面介绍如何缩小有效分区数据读取。

对于分区表裁剪,有两种办法,别离是动态分区裁剪和动静分区裁剪。动态分区裁剪要求在 Query 外面肯定要指定分区的过滤条件,这样能力在优化阶段裁剪掉。另外一种状况是,没有具体指定哪个分区,然而在 Join 过程中,能够依据维表里数据动静确定要读取的分区,从而动静的进行分区裁剪。

在 FLIP-248 中,提出并引入了动静分区裁剪优化。如下图所示,sold_date 是分区表的分区字段,然而并没有指定读哪些分区。然而在 date_dim 维表中,有“year=2000”这个过滤条件,通过这个条件,在运行时就能够确定须要读取的分区信息,对于不须要读取的分区能够间接过滤掉,缩小有效数据量。

后面是针对分区表 Join 的优化,那么如果 Join 条件中没有带分区字段,是否也能够缩小 Join 须要解决的数据量?事实上针对这个问题也有成熟的解决方案 Runtime Filter,其外围思路是缩小 Join 大表侧须要 Shuffle 的数据量。

如上图所示,sales 表 Join date_dim 表,item_id 字段是 Join 的等值条件,然而其并不是分区字段,那咱们是没法做动静分区裁剪的。尽管没法做动静分区裁剪,但在运行时读取完维表的数据后,咱们是能够拿到 item_id 的汇合,通过其来缩小大表侧须要 Shuffle 的数据量。

在 Flink 1.18 版本中,该性能默认是敞开的,大家能够通过 table.optimizer.runtime-filter.enabled 参数关上。

2.3 Runtime 优化

除了上文介绍的 Join 层面的优化,在执行层面也进行了一些优化。这里介绍两个外围的优化,第一个是 Operator Fusion CodeGen,第二个是 Adaptive Batch Scheduler。

(1)Operator Fusion CodeGen

首先咱们从一个具体问题切入,来看一下为什么要做 Operator Fusion CodeGen。在跑 TPC-DS 的过程中,对相干 Query 进行 profie 剖析的时候,咱们从中发现了一些问题。下图是 Q99 的火焰图,通过该图可知,在屡次 Join 的过程中,CPU 工夫耗费的很扩散,有很多有效的 CPU 计算,次要包含上面几个方面:

  • 虚函数调用开销;
  • 两头数据物化到内存的开销;
  • Source Iterator 开销;
  • 多个算子之间的通过 collector 传递数据的开销;
  • 其余有效计算开销。

这些框架和算子层面的开销其实是能够省掉的,从而晋升 Query 的执行效率。因而咱们能够从上面几点进行优化:

  • 尽量避免内存拜访,数据驻留在寄存器中;
  • 打消虚函数调用;
  • 编译执行,面向每个 Query 生成最优的代码,执行效率会更高;
  • 从 Source 读完数据之后,用一个 For 循环去解决;
  • 提早计算。

针对上述优化点,一种成熟的解决方案是 Operator Fusion CodeGen。那什么是 Operator Fusion CodeGen 呢,简称多算子交融 CodeGen,来自于托马斯 - 诺依曼的论文,其外围思路是把多个算子的代码通过 produce-consume 接口交融到一起,每个算子只生成一部分代码片段,最初把所有算子的代码片段拼接组装在一起,最终只生成一个算子,该算子蕴含了原来所有算子解决数据逻辑的代码。

Operator Fusion CodeGen 外围指标就是尽量把整个 Query 的多个算子通过 CodeGen 的形式编译组装到一个函数或是一个算子里,这样就能尽可能的打消一些冗余的代码和计算,让整个 Pipeline 的数据处理更加高效。

在 FLIP-315 中,咱们为 Flink 引入了这个优化,目前反对了 Calc、HashJoin、HashAgg 算子。以后还是个试验性质的性能,默认是敞开的,用户能够通过参数 table.exec.operator.fusion.codegen 关上。

(2)Adaptive Batch Scheduler

Adaptive Batch Scheduler 是另外一个优化,这个在 Flink 1.15 版本中就曾经引入了,它解决的是批作业并行度设置的问题。

在流场景中,设置并行度是很失常的事件。然而对于批作业来讲,每天的数据量都可能在变动,而且批作业的数量相比流作业会多很多,这就导致咱们无奈 case by case 的对每个批作业进行并行度调优,因而也就须要具备主动设置并行度的能力。

在 Flink 1.17 版本中,咱们将 Adaptive Batch Scheduler 设置为批作业默认的 Scheduler,也就是说用户不须要额定去关上这个性能。除了动静设置并发,后续可能会做更深刻的 Adaptive Query Execution,能够动静的调整 Join 策略,以及解歪斜等。

基于上述咱们在 QO 和 QE 层面做的优化,Flink Batch 过来几个版本在 TPC-DS 10T 数据集场景下,性能始终在晋升,整体工夫也已追平支流批计算引擎。

2.4 稳定性优化

除了性能方面的优化,咱们也在继续优化 Flink Batch 的稳定性。在稳定性方面,做的第一个优化是 Adaptive Hash Join。

因为一些历史起因,Flink 的 HashJoin 没有抉择纯内存版的 Join 算法,而是抉择了 Hybird HashJoin 算法。算法的基本思路是依据 Join key 对 build 端先划分 Partition,当内存不够时,会抉择以后占用内存最大的 Partition 进行落盘,开释一部分内存;接着构建新的分区。当 probe 端数据来的时候,会判断以后数据对应的 build 端的 Partition 是否在内存中,如果在内存中,则间接 Join。如果不在内存中,则会把 probe 端数据同样落盘,等后面所有内存中的 Partition 解决完之后再来解决,顺次递归解决上来。该算法尽管能够防止 OOM,但也存在一个问题,当 Join key 数据歪斜重大,也就是当某个 Partition 数据量特地大的话,递归会有限循环上来,导致 Join 永远都无奈完结。

针对这个问题,在 Flink 1.16 版本对歪斜分区进行了优化,即当它的递归次数大于 3 次之后,阐明该 Partition 歪斜曾经十分重大了,这个时候能够自适应的 fallback 到 Sort Merge Join 策略来解决,咱们把这个过程称作 Adaptive Hash Join。Adaptive Hash Join 的外围思路是把内存中能够放的下的 Partition 用 Hash Join 来解决,内存中放不下的 Partition 用 Sort Merge Join 来解决,这样既保证了性能,又晋升了稳定性。

Speculative Execution 揣测执行

在生产中,很多时候作业是共享集群的,不同的业务跑在同一个集群上,这都可能导致机器热点,使得下面运行的 Flink 工作异样迟缓。这些迟缓的工作会影响整个作业的执行工夫,使得作业的产出基线无奈失去保障,成为了局部用户应用 Flink 来进行批处理的妨碍。

Speculative Execution,也叫揣测执行,是一种曾经失去广泛的认可、用来解决这类问题的办法,因而咱们也在 Flink 1.16 中引入了这套机制。

揣测执行计划的整体架构如下图所示,在 FLIP-168 中框架层面对外部算子反对了揣测执行,在 FLIP-245 里对 Source 实现了反对;在 FLIP-281 中,对 Sink 实现了反对。总而言之,通过多个版本的迭代,揣测执行在 Flink Batch 中的反对曾经比较完善,用户能够释怀的用起来。

2.5 SQL 服务化

Flink 是一个流批一体计算引擎,Flink SQL 能够满足 Streaming/Batch/OLAP 三类场景的需要。面对这么多不同类的需要,如果有一个对立的 SQL 服务化组件,能够很不便咱们提交 SQL、治理作业,不必每个用户都本人去搭一套服务化组件,防止反复开发。另外,其实最早社区在 Github Ververica 组织下提供了 SQL Gateway,起初因为一些起因就没有再继续开发,但社区始终有用户在用。思考到 Flink Batch 的需要,以及社区用户的呼声,因而在 1.16 版本社区在 Flink 主仓库引入了 SQL Gateway 组件。

SQL Gateway 是一种反对近程多个客户端并发执行 SQL 的服务化网关,其在架构设计上由可插拔的 Endpoint 和 SQLGatewayService 组成,能够反对不同协定,包含 HTTPS REST 协定、HiveServer2 协定,能够兼容不同 SQL 的语法。

基于前置的工作,在 FLIP-275 中,社区对 SQL Client 也进行了重构,把底座对立架在 SQL Gateway 上,这样两者的能力是共享和对齐的,SQL Client 也就更加灵便。

另外社区提供了 Flink JDBC Driver,其兼容老的 SQL Gateway(底层基于 RestClient),并且能够反对用户以 JDBC 的形式提交 Flink 流作业和批作业,这对已有的批作业迁徙到 Flink 上也很不便,客户端只须要把 JDBC Driver 换成 Flink 的即可,无需大的改变。

三、后续布局

上文介绍了 Flink Batch 过来三个版本做的一些外围优化,后续咱们也会在 Flink Batch 上继续投入,次要会从两个方面去做。首先在引擎层面会继续进行优化,比方持续欠缺 OFCG、Runtime Filter 等,同时也会思考去反对 Adaptive Query Execution。

其次,咱们会花更多精力在用户落地方面,比方去对接数据湖存储 Paimon、Hudi、Iceberg 等,在新兴数据湖剖析场景继续打磨 Flink Batch 引擎。

Q&A

Q:请问 SQL Gateway 在 Flink 社区是否有长线布局?将来是否会跟 Kyuubi 竞争?
A:目前没有这样的打算,因为 SQL Gateway 更多思考还是面向 Flink 自身。Flink 是流批一体的计算引擎,Gateway 在设计上,更多的兼容了 Flink 流批两种模式。Kyuubi 是一个独立社区,它集成了 Flink SQL,然而次要面向 Hive/Spark 等 Batch 生态,在流模式的服务化上反对的不太欠缺。从久远来看,社区目前没有将 SQL Gateway 变成独立的内部 Service 组件的打算。

Q:请问通过优化器优化掉的数据,将来是否能够再复原?
A:如果是基于 CBO 的优化,优化器在优化的过程中产生的 Plan 都是会保留的,只是它最终会依据不同的 Plan 计算 cost,哪个最小就会选哪个。

点击查看原文视频 & 演讲 PPT


Flink Forward Asia 2023 正式启动

点击查看流动详情


更多内容

流动举荐

阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
首购 99 元包月试用,有机会赢取定制周边礼品!
产品官网:https://www.aliyun.com/product/bigdata/sc

正文完
 0