关于flink:Flink-SQL-性能优化multiple-input-详解

42次阅读

共计 4769 个字符,预计需要花费 12 分钟才能阅读完成。

作者|贺小令、翁才智

执行效率的优化始终是 Flink 追寻的指标。在大多数作业,特地是批作业中,数据通过网络在 task 之间传递(称为数据 shuffle)的代价较大。失常状况下一条数据通过网络须要通过序列化、磁盘读写、socket 读写与反序列化等艰难险阻,能力从上游 task 传输到上游;而雷同数据在内存中的传输,仅须要消耗几个 CPU 周期传输一个八字节指针即可。

Flink 在晚期版本中曾经通过 operator chaining 机制,将并发雷同的相邻单输出算子整合进同一个 task 中,打消了单输出算子之间不必要的网络传输。然而,join 等多输出算子之间同样存在额定的数据 shuffle 问题,shuffle 数据量最大的 source 节点与多输出算子之间的数据传输也无奈利用 operator chaining 机制进行优化。

在 Flink 1.12 中,咱们针对目前 operator chaining 无奈笼罩的场景,推出了 multiple input operator 与 source chaining 优化。该优化将打消 Flink 作业中大多数冗余 shuffle,进一步提高作业的执行效率。本文将以一个 SQL 作业为例介绍上述优化,并展现 Flink 1.12 在 TPC-DS 测试集上获得的成绩。

优化案例解析:订单量统计

咱们将以 TPC-DS q96 为例子具体介绍如何打消冗余 shuffle,该 SQL 意在通过多路 join 筛选并统计合乎特定条件的订单量。

select count(*) 
from store_sales
    ,household_demographics 
    ,time_dim, store
where ss_sold_time_sk = time_dim.t_time_sk   
    and ss_hdemo_sk = household_demographics.hd_demo_sk 
    and ss_store_sk = s_store_sk
    and time_dim.t_hour = 8
    and time_dim.t_minute >= 30
    and household_demographics.hd_dep_count = 5
    and store.s_store_name = 'ese'


图 1 – 初始执行打算

冗余 Shuffle 是如何产生的?

因为局部算子对输出数据的散布有要求(如 hash join 算子要求同一并发内数据 join key 的 hash 值雷同),数据在算子之间传递时可能须要通过从新排布与整顿。与 map-reduce 的 shuffle 过程相似,Flink shuffle 将上游 task 产生的两头后果进行整顿,并按需发送给须要这些两头后果的上游 task。但在一部分状况下,上游产出的数据曾经满足了数据分布要求(如间断多个 join key 雷同的 hash join 算子),此时对数据的整顿便不再必要,由此产生的 shuffle 也就成为了冗余 shuffle,在执行打算中以 forward shuffle 示意。

图 1 中的 hash join 算子是一种称为 broadcast hash join 的非凡算子。以 store_sales join time_dim 为例,因为 time_dim 表数据量很小,此时通过 broadcast shuffle 将该表的全量数据发送给 hash join 的每个并发,就能让任何并发承受 store_sales 表的任意数据而不影响 join 后果的正确性,同时进步 hash join 的执行效率。此时 store_sales 表向 join 算子的网络传输也成为了冗余 shuffle。同理几个 join 之间的 shuffle 也是不必要的。

图 2 – 冗余的 shuffle(红框标记)

除 hash join 与 broadcast hash join 外,产生冗余 shuffle 的场景还有很多,例如 group key 与 join key 雷同的 hash aggregate + hash join、group key 具备蕴含关系的多个 hash aggregate 等等,这里不再开展形容。

Operator Chaining 能解决吗?

对 Flink 优化过程有肯定理解的读者可能会晓得,为了打消不必要的 forward shuffle,Flink 在晚期就曾经引入了 operator chaining 机制。该机制将并发雷同的相邻单输出算子整合进同一个 task 中,并在同一个线程中一起运算。Operator chaining 机制在图 1 中其实曾经在发挥作用,如果没有它,做 broadcast shuffle 的三个 Source 节点名称中被“->”分隔的算子将会被拆分至多个不同的 task,产生冗余的数据 shuffle。图 3 为 Operator chaining 敞开是的执行打算。

图 3 – Operator chaining 敞开后的执行打算

缩小数据在 TM 之间通过网络和文件传输并将算子链接合并入 task 是十分无效的优化:它能缩小线程之间的切换,缩小音讯的序列化与反序列化,缩小数据在缓冲区的替换,并缩小提早的同时进步整体吞吐量。然而,operator chaining 对算子的整合有十分严格的限度,其中一条就是“上游算子的入度为 1”,也就是说上游算子只能有一路输出。这就将多路输出的算子(如 join)排除在外。

多输出算子的解决方案:Multiple Input Operator

如果咱们能仿照 operator chaining 的优化思路,引入新的优化机制并满足以下条件:

  1. 该机制能够组合多输出的算子;
  2. 该机制反对多路输出(为被组合的算子提供输出)

咱们就能够将用 forward shuffle 连贯的的多输出算子放到一个 task 里执行,从而打消不必要的 shuffle。Flink 社区很早就关注到了 operator chaining 的有余,在 Flink 1.11 中引入了 streaming api 层的 MultipleInputTransformation 以及对应的 MultipleInputStreamTask。这些 api 满足了上述条件 2,而 Flink 1.12 在此基础上在 SQL 层中实现了满足条件 1 的新算子——multiple input operator,能够参考 FLIP 文档 [1]。

Multiple input operator 是 table 层一个可插拔的优化。它位于 table 层优化的最初一步,遍历生成的执行打算并将不被 exchange 阻隔的相邻算子整合进一个 multiple input operator 中。图 4 展现了该优化对本来 SQL 优化步骤的批改。

图 4 – 退出 multiple input operator 后的优化步骤

读者可能会有疑难:为什么不在现有的 operator chaining 上进行批改,而要重整旗鼓呢?实际上,multiple input operator 除了要实现 operator chaining 的工作之外,还须要对各个输出的优先级进行排序。这是因为一部分多输出算子(如 hash join 与 nested loop join)对输出有严格的程序限度,若输出优先级排序不当很可能造成死锁。因为算子输出优先级的信息仅在 table 层的算子中有形容,更加天然的形式是在 table 层引入该优化机制。

值得注意的是,multiple input operator 不同于治理多个 operator 的 operator chaining,其自身就是一整个大 operator,而其外部运算在外界看来就是一个黑盒。Multiple input operator 的内部结构在 operator name 中齐全体现,读者在运行蕴含该 operator 的作业时,能够从 operator name 看到哪些算子以怎么的拓扑构造被组合进了 multiple input operator 中。

图 5 展现了通过 multiple input 优化后的算子的拓扑图以及 multiple input operator 的透视图。图中三个 hash join 算子之间的冗余的 shuffle 被移除后,它们能够在一个 task 里执行,只不过 operator chaining 没法解决这种多输出的状况,将它们放到 multiple input operator 里执行,由 multiple input operator 治理各个算子的输出程序和算子之间的调用关系。

图 5 – 通过 multiple input 优化后的算子拓扑图

Multiple input operator 的构建和运行过程较为简单,对此细节有趣味的读者能够参考设计文档 [2]。

Source 也不能脱漏:Source Chaining

通过 multiple input operator 的优化,咱们将图 1 中的执行打算优化为图 6,图 3 通过 operator chaining 优化后就变为图 6 的执行图。

图 6 – 通过 multiple input operator 优化后的执行打算

图 6 中从 store_sales 表产生的 forward shuffle(如红框所示)示意咱们仍有优化空间。正如序言中所说,在大部分作业中,从 source 间接产生的数据因为没有通过 join 等算子的筛选和加工,shuffle 的数据量是最大的。以 10T 数据下的 TPC-DS q96 为例,如果不进行进一步优化,蕴含 store_sales 源表的 task 将向网络中传输 1.03T 的数据,而通过一次 join 的筛选后,数据量急速降落至 16.5G。如果咱们能将源表的 forward shuffle 省去,作业整体执行效率又能后退一大步。

惋惜的是,multiple input operator 也不能笼罩 source shuffle 的场景,这是因为 source 不同于其它任何算子,它没有任何输出。Flink 1.12 为此给 operator chaining 新增了 source chaining 性能,将不被 shuffle 阻隔的 source 合并到 operator chaining 中,省去了 source 与上游算子之间的 forward shuffle。

目前仅有 FLIP-27 source 以及 multiple input operator 能够利用 source chaining 性能,不过这曾经足够解决本文中的优化场景。

联合 multiple input operator 与 source chaining 之后,图 7 展现了本文优化案例的最终执行计划。

图 7 – 优化后的执行计划

TPC-DS 测试后果

Multiple input operator 与 source chaining 对大部分作业,特地是批作业有显著的优化成果。咱们利用 TPC-DS 测试集对 Flink 1.12 的整体性能进行了测试,与 Flink 1.10 颁布的 12267s 总用时相比,Flink 1.12 的总用时仅为 8708s,缩短了近 30% 的运行工夫!

图 8 – TPC-DS 测试集总用时比照

图 9 – TPC-DS 局部测试点用时比照

将来打算

通过 TPC-DS 的测试成果看到,source chaining + multiple input 可能给咱们带来很大的性能晋升。目前整体框架已实现,罕用批算子已反对打消冗余 exchange 的推导逻辑,后续咱们将反对更多的批算子和更精密的推导算法。

流作业的数据 shuffle 尽管不须要像批作业一样将数据写入磁盘,但将网络传输变为内存传输带来的性能晋升也是十分可观的,因而流作业反对 source chaining + multiple input 也是一个十分令人期待的优化。同时,在流作业上反对该优化还须要很多工作,例如流算子上打消冗余 exchange 的推导逻辑暂未反对,一些算子须要重构以打消输出数据是 binary 的要求等等,这也是为什么 Flink 1.12 暂未在流作业中推出推出该优化的起因。后续版本咱们将逐渐实现这些工作,也心愿更多社区的力量退出咱们一起尽早的将更多的优化落地。

参考链接:

[1]https://cwiki.apache.org/conf…
[2]https://docs.google.com/docum…

正文完
 0