简介:阿里云EMR自2020年推出Remote Shuffle Service(RSS)以来,帮忙了诸多客户解决Spark作业的性能、稳定性问题,并使得存算拆散架构得以施行。为了更不便大家应用和扩大,RSS在2022年初开源,欢送各路开发者共建。本文将介绍RSS最新的两个重要性能:反对Adaptive Query Execution(AQE),以及流控。

作者 | 一锤、明济
起源 | 阿里开发者公众号

阿里云EMR自2020年推出Remote Shuffle Service(RSS)以来,帮忙了诸多客户解决Spark作业的性能、稳定性问题,并使得存算拆散架构得以施行。为了更不便大家应用和扩大,RSS在2022年初开源,欢送各路开发者共建。RSS的整体架构请参考[1],本文将介绍RSS最新的两个重要性能:反对Adaptive Query Execution(AQE),以及流控。

一 RSS反对AQE

1 AQE简介

自适应执行(Adaptive Query Execution, AQE)是Spark3的重要性能[2],通过收集运行时Stats,来动静调整后续的执行打算,从而解决因为Optimizer无奈精确预估Stats导致生成的执行打算不够好的问题。AQE次要有三个优化场景: Partition合并(Partition Coalescing), Join策略切换(Switch Join Strategy),以及歪斜Join优化(Optimize Skew Join)。这三个场景都对Shuffle框架的能力提出了新的需要。

Partition合并

Partition合并的目标是尽量让reducer解决的数据量适中且平均,做法是首先Mapper按较多的Partition数目进行Shuffle Write,AQE框架统计每个Partition的Size,若间断多个Partition的数据量都比拟小,则将这些Partition合并成一个,交由一个Reducer去解决。过程如下所示。

由上图可知,优化后的Reducer2需读取原属于Reducer2-4的数据,对Shuffle框架的需要是ShuffleReader须要反对范畴Partition:

def getReader[K, C](    handle: ShuffleHandle,    startPartition: Int,    endPartition: Int,    context: TaskContext): ShuffleReader[K, C]

Join策略切换

Join策略切换的目标是修改因为Stats预估不准导致Optimizer把本应做的Broadcast Join谬误的抉择了SortMerge Join或ShuffleHash Join。具体而言,在Join的两张表做完Shuffle Write之后,AQE框架统计了理论大小,若发现小表合乎Broadcast Join的条件,则将小表Broadcast进来,跟大表的本地Shuffle数据做Join。流程如下:

Join策略切换有两个优化:1. 改写成Broadcast Join; 2. 大表的数据通过LocalShuffleReader直读本地。其中第2点对Shuffle框架提的新需要是反对Local Read。

歪斜Join优化

歪斜Join优化的目标是让歪斜的Partition由更多的Reducer去解决,从而防止长尾。具体而言,在Shuffle Write完结之后,AQE框架统计每个Partition的Size,接着依据特定规定判断是否存在歪斜,若存在,则把该Partition决裂成多个Split,每个Split跟另外一张表的对应Partition做Join。如下所示。

Partiton决裂的做法是依照MapId的程序累加他们Shuffle Output的Size,累加值超过阈值时触发决裂。对Shuffle框架的新需要是ShuffleReader要能反对范畴MapId。综合Partition合并优化对范畴Partition的需要,ShuffleReader的接口演变为:

def getReader[K, C](    handle: ShuffleHandle,    startMapIndex: Int,    endMapIndex: Int,    startPartition: Int,    endPartition: Int,    context: TaskContext,    metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C]

2 RSS架构回顾

RSS的外围设计是Push Shuffle + Partition数据聚合,即不同的Mapper把属于同一个Partition的数据推给同一个Worker做聚合,Reducer直读聚合后的文件。如下图所示。

在外围设计之外,RSS还实现了多正本,全链路容错,Master HA,磁盘容错,自适应Pusher,滚动降级等个性,详见[1]。

3 RSS反对Partition合并

Partition合并对Shuffle框架的需要是反对范畴Partition,在RSS中每个Partition对应着一个文件,因而人造反对,如下图所示。

4 RSS反对Join策略切换

Join策略切换对Shuffle框架的需要是可能反对LocalShuffleReader。因为RSS的Remote属性,数据寄存在RSS集群,仅当RSS和计算集群混部的场景下才会存在在本地,因而暂不反对Local Read(未来会优化混部场景并加以反对)。须要留神的是,只管不反对Local Read,但并不影响Join的改写,RSS反对Join改写优化如下图所示。

5 RSS反对Join歪斜优化

在AQE的三个场景中,RSS反对Join歪斜优化是最为艰难的一点。RSS的外围设计是Partition数据聚合,目标是把Shuffle Read的随机读转变为程序读,从而晋升性能和稳定性。多个Mapper同时推送给RSS Worker,RSS在内存聚合后刷盘,因而Partition文件中来自不同Mapper的数据是无序的,如下图所示。

Join歪斜优化须要读取范畴Map,例如读Map1-2的数据,惯例的做法有两种:

  • 读取残缺文件,并抛弃范畴之外的数据。
  • 引入索引文件,记录每个Block的地位及所属MapId,仅读取范畴内的数据。

这两种做法的问题不言而喻。办法1会导致大量冗余的磁盘读;办法2实质上回退成了随机读,丢失了RSS最外围的劣势,并且创立索引文件成为通用的Overhead,即便是针对非歪斜的数据(Shuffle Write过程中难以精确预测是否存在歪斜)。

为了解决以上两个问题,咱们提出了新的设计:被动Split + Sort On Read。

被动Split

歪斜的Partition大概率Size十分大,极其状况会间接打爆磁盘,即便在非歪斜场景呈现大Partition的几率仍然不小。因而,从磁盘负载平衡的角度,监控Partition文件的Size并做被动Split(默认阈值256m)是十分必要的。

Split产生时,RSS会为以后Partition重新分配一对Worker(主正本),后续数据将推给新的Worker。为了防止Split对正在运行的Mapper产生影响,咱们提出了Soft Split的办法,即当触发Split时,RSS异步去筹备新的Worker,Ready之后去热更新Mapper的PartitionLocation信息,因而不会对Mapper的PushData产生任何烦扰。整体流程如下图所示。

Sort On Read

为了防止随机读的问题,RSS采纳了Sort On Read的策略。具体而言,File Split的首次Range读会触发排序(非Range读不会触发),排好序的文件连同其地位索引写回磁盘。后续的Range读即可保障是程序读取。如下图所示。

为了防止多个Sub-Reducer期待同一个File Split的排序,咱们打散了各个Sub-Reducer读取Split的程序,如下图所示。

Sort优化

Sort On Read能够无效防止冗余读和随机读,但须要对Split File(256m)做排序,本节探讨排序的实现及开销。文件排序包含3个步骤:读文件,对MapId做排序,写文件。RSS的Block默认256k,Block的数量大略是1000,因而排序的过程十分快,次要开销在文件读写。整个排序过程大抵有三种计划:

  • 事后调配文件大小的内存,文件整体读入,解析并排序MapId,按MapId程序把Block写回磁盘。
  • 不分配内存,Seek到每个Block的地位,解析并排序MapId,按MapId程序把原文件的Block transferTo新文件。
  • 调配小块内存(如256k),程序读完整个文件并解析和排序MapId,按MapId程序把原文件的Block transferTo新文件。

从IO的视角,乍看之下,计划1通过应用足量内存,不存在程序读写;计划2存在随机读和随机写;计划3存在随机写;直观上计划1性能更好。然而,因为PageCache的存在,计划3在写文件时原文件大概率缓存在PageCache中,因而实测下来计划3的性能更好,如下图所示。

同时计划3无需占用过程额定内存,故RSS采纳计划3的算法。咱们同时还测试了Sort On Read跟上述的不排序、仅做索引的随机读办法的比照,如下图所示。

整体流程

RSS反对Join歪斜优化的整体流程如下图所示。

二 RSS流控

流控的次要目标是避免RSS Worker内存被打爆。流控通常有两种形式:

  • Client在每次PushData前先向Worker预留内存,预留胜利才触发Push。
  • Worker端反压。

因为PushData是十分高频且性能要害的操作,若每次推送都额定进行一次RPC交互,则开销太大,因而咱们采纳了反压的策略。以Worker的视角,流入数据有两个源:

  • Client推送的数据
  • 主正本发送的数据

如下图所示,Worker2既接管来自Mapper推送的Partition3的数据,也接管Worker1发送的Partition1的正本数据,同时会把Partition3的数据发给对应的从正本。

其中,来自Mapper推送的数据,当且仅当同时满足以下条件时才会开释内存:

  • Replication执行胜利
  • 数据写盘胜利

来自主正本推送的数据,当且仅当满足以下条件时才会开释内存:

数据写盘胜利

咱们在设计流控策略时,不仅要思考限流(升高流入的数据),更要思考泄流(内存能及时开释)。具体而言,高水位咱们定义了两档内存阈值(别离对应85%和95%内存应用),低水位只有一档(50%内存应用)。达到高水位一档阈值时,触发流控,暂停接管Mapper推送的数据,同时强制刷盘,从而达到泄流的指标。仅限度来自Mapper的流入并不能管制来自主正本的流量,因而咱们定义了高水位第二档,达到此阈值时将同时暂停接管主正本发送的数据。当水位低于低水位后,恢复正常状态。整体流程如下图所示。

三 性能测试

咱们比照了RSS和原生的External Shufle Service(ESS)在Spark3.2.0开启AQE的性能。RSS采纳混部的形式,没有额定占用任何机器资源。此外,RSS所应用的内存为8g,仅占机器内存的2.3%(机器内存352g)。具体环境如下。

1 测试环境

硬件:

header 机器组 1x ecs.g5.4xlarge
worker 机器组 8x ecs.d2c.24xlarge,96 CPU,352 GB,12x 3700GB HDD。

Spark AQE相干配置:

spark.sql.adaptive.enabled truespark.sql.adaptive.coalescePartitions.enabled truespark.sql.adaptive.coalescePartitions.initialPartitionNum 1000spark.sql.adaptive.skewJoin.enabled truespark.sql.adaptive.localShuffleReader.enabled false

RSS相干配置:

RSS_MASTER_MEMORY=2gRSS_WORKER_MEMORY=1gRSS_WORKER_OFFHEAP_MEMORY=7g

2 TPCDS 10T测试集

咱们测试了10T的TPCDS,E2E来看,ESS耗时11734s,RSS单正本/两正本别离耗时8971s/10110s,别离比ESS快了23.5%/13.8%,如下图所示。咱们察看到RSS开启两正本时网络带宽达到下限,这也是两正本比单正本低的次要因素。

具体每个Query的工夫比照如下:

相干链接

欢送各位开发者参加探讨和共建!

github地址:

https://github.com/alibaba/Re...

Reference

[1]阿里云EMR Remote Shuffle Service在小米的实际,以及开源. https://developer.aliyun.com/...

[2]Adaptive Query Execution: Speeding Up Spark SQL at Runtime. https://databricks.com/blog/2...

原文链接
本文为阿里云原创内容,未经容许不得转载。