简介:阿里云 EMR 自 2020 年推出 Remote Shuffle Service(RSS)以来,帮忙了诸多客户解决 Spark 作业的性能、稳定性问题,并使得存算拆散架构得以施行。为了更不便大家应用和扩大,RSS 在 2022 年初开源(https://github.com/alibaba/Re…),欢送各路开发者共建)
阿里云 RemoteShuffleService 新性能:AQE 和流控
阿里云 EMR 自 2020 年推出 Remote Shuffle Service(RSS) 以来,帮忙了诸多客户解决 Spark 作业的性能、稳定性问题,并使得存算拆散架构得以施行。为了更不便大家应用和扩大,RSS 在 2022 年初开源(https://github.com/alibaba/Re…),欢送各路开发者共建: ) RSS 的整体架构请参考[1],本文将介绍 RSS 最新的两个重要性能:反对 Adaptive Query Execution(AQE),以及流控。
RSS 反对 AQE
AQE 简介
自适应执行 (Adaptive Query Execution,AQE) 是 Spark3 的重要性能[2],通过收集运行时 Stats,来动静调整后续的执行打算,从而解决因为 Optimizer 无奈精确预估 Stats 导致生成的执行打算不够好的问题。AQE 次要有三个优化场景: Partition 合并(Partition Coalescing), Join 策略切换(Switch Join Strategy),以及歪斜 Join 优化(Optimize Skew Join)。这三个场景都对 Shuffle 框架的能力提出了新的需要。
Partition 合并
Partition 合并的目标是尽量让 reducer 解决的数据量适中且平均,做法是首先 Mapper 按较多的 Partition 数目进行 Shuffle Write,AQE 框架统计每个 Partition 的 Size,若间断多个 Partition 的数据量都比拟小,则将这些 Partition 合并成一个,交由一个 Reducer 去解决。过程如下所示。
由上图可知,优化后的 Reducer2 需读取原属于 Reducer2-4 的数据,对 Shuffle 框架的需要是 ShuffleReader 须要反对范畴 Partition:
def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext): ShuffleReader[K, C]
Join 策略切换
Join 策略切换的目标是修改因为 Stats 预估不准导致 Optimizer 把本应做的 Broadcast Join 谬误的抉择了 SortMerge Join 或 ShuffleHash Join。具体而言,在 Join 的两张表做完 Shuffle Write 之后,AQE 框架统计了理论大小,若发现小表合乎 Broadcast Join 的条件,则将小表 Broadcast 进来,跟大表的本地 Shuffle 数据做 Join。流程如下:
Join 策略切换有两个优化:1. 改写成 Broadcast Join; 2. 大表的数据通过 LocalShuffleReader 直读本地。其中第 2 点对 Shuffle 框架提的新需要是反对 Local Read。
歪斜 Join 优化
歪斜 Join 优化的目标是让歪斜的 Partition 由更多的 Reducer 去解决,从而防止长尾。具体而言,在 Shuffle Write 完结之后,AQE 框架统计每个 Partition 的 Size,接着依据特定规定判断是否存在歪斜,若存在,则把该 Partition 决裂成多个 Split,每个 Split 跟另外一张表的对应 Partition 做 Join。如下所示。
Partiton 决裂的做法是依照 MapId 的程序累加他们 Shuffle Output 的 Size,累加值超过阈值时触发决裂。对 Shuffle 框架的新需要是 ShuffleReader 要能反对范畴 MapId。综合 Partition 合并优化对范畴 Partition 的需要,ShuffleReader 的接口演变为:
def getReader[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C]
RSS 架构回顾
RSS 的外围设计是 Push Shuffle + Partition 数据聚合,即不同的 Mapper 把属于同一个 Partition 的数据推给同一个 Worker 做聚合,Reducer 直读聚合后的文件。如下图所示。
在外围设计之外,RSS 还实现了多正本,全链路容错,Master HA,磁盘容错,自适应 Pusher,滚动降级等个性,详见[1]。
RSS 反对 Partition 合并
Partition 合并对 Shuffle 框架的需要是反对范畴 Partition,在 RSS 中每个 Partition 对应着一个文件,因而人造反对,如下图所示。
RSS 反对 Join 策略切换
Join 策略切换对 Shuffle 框架的需要是可能反对 LocalShuffleReader。因为 RSS 的 Remote 属性,数据寄存在 RSS 集群,仅当 RSS 和计算集群混部的场景下才会存在在本地,因而暂不反对 Local Read(未来会优化混部场景并加以反对)。须要留神的是,只管不反对 Local Read,但并不影响 Join 的改写,RSS 反对 Join 改写优化如下图所示。
RSS 反对 Join 歪斜优化
在 AQE 的三个场景中,RSS 反对 Join 歪斜优化是最为艰难的一点。RSS 的外围设计是 Partition 数据聚合,目标是把 Shuffle Read 的随机读转变为程序读,从而晋升性能和稳定性。多个 Mapper 同时推送给 RSS Worker,RSS 在内存聚合后刷盘,因而 Partition 文件中来自不同 Mapper 的数据是无序的,如下图所示。
Join 歪斜优化须要读取范畴 Map,例如读 Map1- 2 的数据,惯例的做法有两种:
- 读取残缺文件,并抛弃范畴之外的数据。
- 引入索引文件,记录每个 Block 的地位及所属 MapId,仅读取范畴内的数据。
这两种做法的问题不言而喻。办法 1 会导致大量冗余的磁盘读;办法 2 实质上回退成了随机读,丢失了 RSS 最外围的劣势,并且创立索引文件成为通用的 Overhead,即便是针对非歪斜的数据(Shuffle Write 过程中难以精确预测是否存在歪斜)。
为了解决以上两个问题,咱们提出了新的设计:被动 Split + Sort On Read。
被动 Split
歪斜的 Partition 大概率 Size 十分大,极其状况会间接打爆磁盘,即便在非歪斜场景呈现大 Partition 的几率仍然不小。因而,从磁盘负载平衡的角度,监控 Partition 文件的 Size 并做被动 Split (默认阈值 256m)是十分必要的。
Split 产生时,RSS 会为以后 Partition 重新分配一对 Worker(主正本),后续数据将推给新的 Worker。为了防止 Split 对正在运行的 Mapper 产生影响,咱们提出了 Soft Split 的办法,即当触发 Split 时,RSS 异步去筹备新的 Worker,Ready 之后去热更新 Mapper 的 PartitionLocation 信息,因而不会对 Mapper 的 PushData 产生任何烦扰。整体流程如下图所示。
Sort On Read
为了防止随机读的问题,RSS 采纳了 Sort On Read 的策略。具体而言,File Split 的首次 Range 读会触发排序(非 Range 读不会触发),排好序的文件连同其地位索引写回磁盘。后续的 Range 读即可保障是程序读取。如下图所示。
为了防止多个 Sub-Reducer 期待同一个 File Split 的排序,咱们打散了各个 Sub-Reducer 读取 Split 的程序,如下图所示。
Sort 优化
Sort On Read 能够无效防止冗余读和随机读,但须要对 Split File(256m)做排序,本节探讨排序的实现及开销。文件排序包含 3 个步骤:读文件,对 MapId 做排序,写文件。RSS 的 Block 默认 256k,Block 的数量大略是 1000,因而排序的过程十分快,次要开销在文件读写。整个排序过程大抵有三种计划:
- 事后调配文件大小的内存,文件整体读入,解析并排序 MapId,按 MapId 程序把 Block 写回磁盘。
- 不分配内存,Seek 到每个 Block 的地位,解析并排序 MapId,按 MapId 程序把原文件的 Block transferTo 新文件。
- 调配小块内存(如 256k),程序读完整个文件并解析和排序 MapId,按 MapId 程序把原文件的 Block transferTo 新文件。
从 IO 的视角,乍看之下,计划 1 通过应用足量内存,不存在程序读写;计划 2 存在随机读和随机写;计划 3 存在随机写;直观上计划 1 性能更好。然而,因为 PageCache 的存在,计划 3 在写文件时原文件大概率缓存在 PageCache 中,因而实测下来计划 3 的性能更好,如下图所示。
同时计划 3 无需占用过程额定内存,故 RSS 采纳计划 3 的算法。咱们同时还测试了 Sort On Read 跟上述的不排序、仅做索引的随机读办法的比照,如下图所示。
整体流程
RSS 反对 Join 歪斜优化的整体流程如下图所示。
RSS 流控
流控的次要目标是避免 RSS Worker 内存被打爆。流控通常有两种形式:
- Client 在每次 PushData 前先向 Worker 预留内存,预留胜利才触发 Push。
- Worker 端反压。
因为 PushData 是十分高频且性能要害的操作,若每次推送都额定进行一次 RPC 交互,则开销太大,因而咱们采纳了反压的策略。以 Worker 的视角,流入数据有两个源:
- Client 推送的数据
- 主正本发送的数据
如下图所示,Worker2 既接管来自 Mapper 推送的 Partition3 的数据,也接管 Worker1 发送的 Partition1 的正本数据,同时会把 Partition3 的数据发给对应的从正本。
其中,来自 Mapper 推送的数据,当且仅当同时满足以下条件时才会开释内存:
- Replication 执行胜利
- 数据写盘胜利
来自主正本推送的数据,当且仅当满足以下条件时才会开释内存:
- 数据写盘胜利
咱们在设计流控策略时,不仅要思考限流(升高流入的数据),更要思考泄流(内存能及时开释)。具体而言,高水位咱们定义了两档内存阈值(别离对应 85% 和 95% 内存应用),低水位只有一档(50% 内存应用)。达到高水位一档阈值时,触发流控,暂停接管 Mapper 推送的数据,同时强制刷盘,从而达到泄流的指标。仅限度来自 Mapper 的流入并不能管制来自主正本的流量,因而咱们定义了高水位第二档,达到此阈值时将同时暂停接管主正本发送的数据。当水位低于低水位后,恢复正常状态。整体流程如下图所示。
性能测试
咱们比照了 RSS 和原生的 External Shufle Service(ESS) 在 Spark3.2.0 开启 AQE 的性能。RSS 采纳混部的形式,没有额定占用任何机器资源。此外,RSS 所应用的内存为 8g,仅占机器内存的 2.3%(机器内存 352g)。具体环境如下。
测试环境
硬件:
header 机器组 1x ecs.g5.4xlarge
worker 机器组 8x ecs.d2c.24xlarge,96 CPU,352 GB,12x 3700GB HDD。
Spark AQE 相干配置:
**spark.sql.adaptive.enabled true
spark.sql.adaptive.coalescePartitions.enabled true
spark.sql.adaptive.coalescePartitions.initialPartitionNum 1000
spark.sql.adaptive.skewJoin.enabled true
spark.sql.adaptive.localShuffleReader.enabled false**
RSS 相干配置:
RSS_MASTER_MEMORY=2g
RSS_WORKER_MEMORY=1g
RSS_WORKER_OFFHEAP_MEMORY=7g
TPCDS 10T 测试集
咱们测试了 10T 的 TPCDS,E2E 来看,ESS 耗时 11734s,RSS 单正本 / 两正本别离耗时 8971s/10110s,别离比 ESS 快了 23.5%/13.8%,如下图所示。咱们察看到 RSS 开启两正本时网络带宽达到下限,这也是两正本比单正本低的次要因素。
具体每个 Query 的工夫比照如下:
原文链接
本文为阿里云原创内容,未经容许不得转载。