简介:阿里云EMR自2020年推出Remote Shuffle Service(RSS)以来,帮忙了诸多客户解决Spark作业的性能、稳定性问题,并使得存算拆散架构得以施行,与此同时RSS也在跟合作方小米的共建下一直演进。本文将介绍RSS的最新架构,在小米的实际,以及开源。
作者 | 一锤、明济、紫槿
起源 | 阿里技术公众号
阿里云EMR自2020年推出Remote Shuffle Service(RSS)以来,帮忙了诸多客户解决Spark作业的性能、稳定性问题,并使得存算拆散架构得以施行,与此同时RSS也在跟合作方小米的共建下一直演进。本文将介绍RSS的最新架构,在小米的实际,以及开源。
一 问题回顾
Shuffle是大数据计算中最为重要的算子。首先,覆盖率高,超过50%的作业都蕴含至多一个Shuffle[2]。其次,资源耗费大,阿里外部平台Shuffle的CPU占比超过20%,LinkedIn外部Shuffle Read导致的资源节约高达15%[1],单Shuffle数据量超100T[2]。第三,不稳固,硬件资源的稳定性CPU>内存>磁盘≈网络,而Shuffle的资源耗费是倒序。OutOfMemory和Fetch Failure可能是Spark作业最常见的两种谬误,前者能够通过调参解决,而后者须要系统性重构Shuffle。
传统Shuffle如下图所示,Mapper把Shuffle数据按PartitionId排序写盘后交给External Shuffle Service(ESS)治理,Reducer从每个Mapper Output中读取属于本人的Block。
传统Shuffle存在以下问题。
- 本地盘依赖限度了存算拆散。存算拆散是近年来衰亡的新型架构,它解耦了计算和存储,能够更灵便地做机型设计:计算节点强CPU弱磁盘,存储节点强磁盘强网络弱CPU。计算节点无状态,可依据负载弹性伸缩。存储端,随着对象存储(OSS, S3)+数据湖格局(Delta, Iceberg, Hudi)+本地/近地缓存等计划的成熟,可当作容量有限的存储服务。用户通过计算弹性+存储按量付费取得老本节约。然而,Shuffle对本地盘的依赖限度了存算拆散。
- 写放大。当Mapper Output数据量超过内存时触发外排,从而引入额定磁盘IO。
- 大量随机读。Mapper Output属于某个Reducer的数据量很小,如Output 128M,Reducer并发2000,则每个Reducer只读64K,从而导致大量小粒度随机读。对于HDD,随机读性能极差;对于SSD,会疾速耗费SSD寿命。
- 高网络连接数,导致线程池耗费过多CPU,带来性能和稳定性问题。
- Shuffle数据单正本,大规模集群场景坏盘/坏节点很广泛,Shuffle数据失落引发的Stage重算带来性能和稳定性问题。
二 RSS倒退历程
针对Shuffle的问题,工业界尝试了各种办法,近两年逐步收敛到Push Shuffle的计划。
1 Sailfish
Sailfish3最早提出Push Shuffle + Partition数据聚合的办法,对大作业有20%-5倍的性能晋升。Sailfish魔改了分布式文件系统KFS[4],不反对多正本。
2 Dataflow
Goolge BigQuery和Cloud Dataflow5实现了Shuffle跟计算的解耦,采纳多层存储(内存+磁盘),除此之外没有披露更多技术细节。
3 Riffle
Facebook Riffle2采纳了在Mapper端Merge的办法,物理节点上部署的Riffle服务负责把此节点上的Shuffle数据依照PartitionId做Merge,从而肯定水平把小粒度的随机读合并成较大粒度。
4 Cosco
Facebook Cosco[6]7采纳了Sailfish的办法并做了重设计,保留了Push Shuffle + Parititon数据聚合的外围办法,但应用了独立服务。服务端采纳Master-Worker架构,应用内存两正本,用DFS做长久化。Cosco基本上定义了RSS的规范架构,但受到DFS的连累,性能上并没有显著晋升。
5 Zeus
Uber Zeus[8]9同样采纳了去中心化的服务架构,但没有相似etcd的角色保护Worker状态,因而难以做状态治理。Zeus通过Client双推的形式做多正本,采纳本地存储。
6 RPMP
Intel RPMP10依附RDMA和PMEM的新硬件来减速Shuffle,并没有做数据聚合。
7 Magnet
LinkedIn Magnet1交融了本地Shuffle+Push Shuffle,其设计哲学是"尽力而为",Mapper的Output写完本地后,Push线程会把数据推给远端的ESS做聚合,且不保障所有数据都会聚合。受害于本地Shuffle,Magnet在容错和AE的反对上的体现更好(间接Fallback到传统Shuffle)。Magnet的局限包含依赖本地盘,不反对存算拆散;数据合并依赖ESS,对NodeManager造成额定压力;Shuffle Write同时写本地和远端,性能达不到最优。Magnet计划曾经被Apache Spark接收,成为默认的开源计划。
8 FireStorm
FireStorm11混合了Cosco和Zeus的设计,服务端采纳Master-Worker架构,通过Client多写实现多正本。FireStorm应用了本地盘+对象存储的多层存储,采纳较大的PushBlock(默认3M)。FireStorm在存储端保留了PushBlock的元信息,并记录在索引文件中。FireStorm的Client缓存数据的内存由Spark MemoryManager进行治理,并通过细颗粒度的内存调配(默认3K)来尽量避免内存节约。
从上述形容可知,以后的计划根本收敛到Push Shuffle,但在一些要害设计上的抉择各家不尽相同,次要体现在:
- 集成到Spark外部还是独立服务。
- RSS服务侧架构,选项包含:Master-Worker,含轻量级状态治理的去中心化,齐全去中心化。
- Shuffle数据的存储,选项包含:内存,本地盘,DFS,对象存储。
- 多正本的实现,选项包含:Client多推,服务端做Replication。
阿里云RSS12由2020年推出,外围设计参考了Sailfish和Cosco,并且在架构和实现层面做了改进,下文将具体介绍。
三 阿里云RSS外围架构
针对上一节的要害设计,阿里云RSS的抉择如下:
- 独立服务。思考到将RSS集成到Spark外部无奈满足存算拆散架构,阿里云RSS将作为独立服务提供Shuffle服务。
- Master-Worker架构。通过Master节点做服务状态治理十分必要,基于etcd的状态状态治理能力受限。
- 多种存储形式。目前反对本地盘/DFS等存储形式,主打本地盘,未来会往分层存储方向倒退。
- 服务端做Replication。Client多推会额定耗费计算节点的网络和计算资源,在独立部署或者服务化的场景下对计算集群不敌对。
下图展现了阿里云RSS的要害架构,蕴含Client(RSS Client, Meta Service),Master(Resource Manager)和Worker三个角色。Shuffle的过程如下:
- Mapper在首次PushData时申请Master调配Worker资源,Worker记录本人所须要服务的Partition列表。
- Mapper把Shuffle数据缓存到内存,超过阈值时触发Push。
- 附属同个Partition的数据被Push到同一个Worker做合并,主Worker内存接管到数据后立刻向从Worker发动Replication,数据达成内存两正本后即向Client发送ACK,Flusher后盾线程负责刷盘。
- Mapper Stage运行完结,MetaService向Worker发动CommitFiles命令,把残留在内存的数据全副刷盘并返回文件列表。
- Reducer从对应的文件列表中读取Shuffle数据。
阿里云RSS的外围架构和容错方面的介绍详见[13],本文接下来介绍阿里云RSS近一年的架构演进以及不同于其余零碎的特色。
1 状态下沉
RSS采纳Master-Worker架构,最后的设计中Master对立负责集群状态治理和Shuffle生命周期治理。集群状态包含Worker的衰弱度和负载;生命周期包含每个Shuffle由哪些Worker服务,每个Worker所服务的Partition列表,Shuffle所处的状态(Shuffle Write,CommitFile,Shuffle Read),是否有数据失落等。保护Shuffle生命周期须要较大数据量和简单数据结构,给Master HA的实现造成阻力。同时大量生命周期治理的服务调用使Master易成为性能瓶颈,限度RSS的扩展性。
为了缓解Master压力,咱们把生命周期状态治理下沉到Driver,由Application治理本人的Shuffle,Master只需保护RSS集群自身的状态。这个优化大大降低Master的负载,并使得Master HA得以顺利实现。
2 Adaptive Pusher
在最后的设计中,阿里云RSS跟其余零碎一样采纳Hash-Based Pusher,即Client会为每个Partition保护一个(或多个[11])内存Buffer,当Buffer超过阈值时触发推送。这种设计在并发度适中的状况下没有问题,而在超大并发度的状况下会导致OOM。例如Reducer的并发5W,在小Buffer[13]的零碎中(64K)极其内存耗费为64K5W=3G,在大Buffer[11]的零碎中(3M)极其内存耗费为3M5W=146G,这是不可承受的。针对这个问题,咱们开发了Sort-Based Pusher,缓存数据时不辨别Partition,当总的数据超过阈值(i.e. 64M)时对以后数据依照PartitionId排序,而后把数据Batch后推送,从而解决内存耗费过大的问题。
Sort-Based Pusher会额定引入一次排序,性能上比Hash-Based Pusher略差。咱们在ShuffleWriter初始化阶段依据Reducer的并发度主动抉择适合的Pusher。
3 磁盘容错
出于性能的思考,阿里云RSS举荐本地盘存储,因而解决坏/慢盘是保障服务可靠性的前提。Worker节点的DeviceMonitor线程定时对磁盘进行查看,查看项包含IOHang,使用量,读写异样等。此外Worker在所有磁盘操作处(创立文件,刷盘)都会捕获异样并上报。IOHang、读写异样被认为是Critical Error,磁盘将被隔离并终止该磁盘上的存储服务。慢盘、使用量超警戒线等异样仅将磁盘隔离,不再承受新的Partition存储申请,但已有的Partition服务放弃失常。在磁盘被隔离后,Worker的容量和负载将发生变化,这些信息将通过心跳发送给Master。
4 滚动降级
RSS作为常驻服务,有永不停服的要求,而零碎自身总在向前演进,因而滚动降级是必选的性能。只管通过Sub-Cluster部署形式能够绕过,即部署多个子集群,对子集群做灰度,灰度的集群暂停服务,但这种形式依赖调度零碎感知正在灰度的集群并动静批改作业配置。咱们认为RSS应该把滚动降级闭环掉,外围设计如下图所示。Client向Master节点的Leader角色(Master实现了HA,见上文)发动滚动降级申请并把更新包上传给Leader,Leader通过Raft协定批改状态为滚动降级,并启动第一阶段的降级:降级Master节点。Leader首先降级所有的Follower,而后替换本地包并重启。在Leader节点扭转的状况下,降级过程不会中断或异样。Master节点降级完结后进入第二阶段:Worker节点降级。RSS采纳滑动窗口做降级,窗口内的Worker尽量优雅下线,即回绝新的Partition申请,并期待本地Shuffle完结。为了防止等待时间过长,会设置超时工夫。此外,窗口内的Worker抉择会尽量避免同时蕴含主从两正本以升高数据失落的概率。
5 凌乱测试框架
对于服务来说,仅依附UT、集成测试、e2e测试等无奈保障服务可靠性,因为这些测试无奈笼罩线上简单环境,如坏盘、CPU过载、网络过载、机器挂掉等。RSS要求在呈现这些简单状况时放弃服务稳固,为了模仿线上环境,咱们开发了仿真(凌乱)测试框架,在测试环境中模仿线上可能呈现的异样,同时保障满足RSS运行的最小运行环境,即至多3个Master节点和2个Worker节点可用,并且每个Worker节点至多有一块盘。咱们继续对RSS做此类压力测试。
仿真测试框架架构如下图所示,首先定义测试Plan来形容事件类型、事件触发的程序及持续时间,事件类型包含节点异样,磁盘异样,IO异样,CPU过载等。客户端将Plan提交给Scheduler,Scheduler依据Plan的形容给每个节点的Runner发送具体的Operation,Runner负责具体执行并汇报以后节点的状态。在触发Operation之前,Scheduler会推演该事件产生产生的结果,若导致无奈满足RSS的最小可运行环境,将回绝此事件。
咱们认为仿真测试框架的思路是通用设计,能够推广到更多的服务测试中。
6 多引擎反对
Shuffle是通用操作,不跟引擎绑定,因而咱们尝试了多引擎反对。以后咱们反对了Hive+RSS,同时也在摸索跟流计算引擎(Flink),MPP引擎(Presto)联合的可能性。只管Hive和Spark都是批计算引擎,但Shuffle的行为并不统一,最大的差别是Hive在Mapper端做排序,Reducer只做Merge,而Spark在Reducer端做排序。因为RSS暂未反对计算,因而须要革新Tez反对Reducer排序。此外,Spark有洁净的Shuffle插件接口,RSS只需在外围扩大,而Tez没有相似形象,在这方面也有肯定侵入性。
以后大多数引擎都没有Shuffle插件化的形象,须要肯定水平的引擎批改。此外,流计算和MPP都是上游即时Push给上游的模式,而RSS是上游Push,上游Pull的模式,这两者如何联合也是须要摸索的。
7 测试
咱们比照了阿里云RSS、Magent及开源零碎X。因为大家的零碎还在向前演进,因而测试后果仅代表以后。
测试环境
Header 1: ecs.g6e.4xlarge, 16 2.5GHz/3.2GHz, 64GiB, 10Gbps
Worker 3: ecs.g6e.8xlarge, 32 2.5GHz/3.2GHz, 128GiB, 10Gbps
阿里云RSS vs. Magnet
5T Terasort的性能测试如下图所示,如上文形容,Magent的Shuffle Write有额定开销,差于RSS和传统做法。Magent的Shuffle Read有晋升,但差于RSS。在这个Benchmark下,RSS显著优于另外两个,Magent的e2e工夫略好于传统Shuffle。
阿里云RSS vs. 开源零碎X
RSS跟开源零碎X在TPCDS-3T的性能比照如下,总工夫RSS快了20%。
稳定性
在稳定性方面,咱们测试了Reducer大规模并发的场景,Magnet能够跑通但工夫比RSS慢了数倍,System X在Shuffle Write阶段报错。
四 阿里云RSS在小米的实际
1 现状及痛点
小米的离线集群以Yarn+HDFS为主,NodeManager和DataNode混合部署。Spark是次要的离线引擎,撑持着外围计算工作。Spark作业以后最大的痛点集中在Shuffle导致的稳定性差,性能差和对存算拆散架构的限度。在进行资源保障和作业调优后,作业失败起因次要归结为Fetch Failure,如下图所示。因为大部分集群应用的是HDD,传统Shuffle的高随机读和高网络连接导致性能很差,低稳定性带来的Stage重算会进一步加剧性能回退。此外,小米始终在尝试利用存算拆散架构的计算弹性降低成本,但Shuffle对本地盘的依赖造成了妨碍。
2 RSS在小米的落地
小米始终在关注Shuffle优化相干技术,21年1月份跟阿里云EMR团队就RSS我的项目建设了共创关系,3月份第一个生产集群上线,开始接入作业,6月份第一个HA集群上线,规模达100+节点,9月份第一个300+节点上线,集群默认开启RSS,后续布局会进一步扩大RSS的灰度规模。
在落地的过程,小米主导了磁盘容错的开发,大大提高了RSS的服务稳定性,技术细节如上文所述。此外,在后期RSS还未齐全稳固阶段,小米在多个环节对RSS的作业进行了容错。在调度端,若开启RSS的Spark作业因Shuffle报错,则Yarn的下次重试会回退到ESS。在ShuffleWriter初始化阶段,小米主导了自适应Fallback机制,依据以后RSS集群的负载和作业的特色(如Reducer并发是否过大)主动抉择RSS或ESS,从而晋升稳定性。
3 成果
接入RSS后,Spark作业的稳定性、性能都获得了显著晋升。之前因Fetch Failure失败的作业简直不再失败,性能均匀有20%的晋升。下图展现了接入RSS前后作业稳定性的比照。
ESS:
RSS:
下图展现了接入RSS前后作业运行工夫的比照。
ESS:
RSS:
在存算拆散方面,小米海内某集群接入RSS后,胜利上线了1600+ Core的弹性集群,且作业运行稳固。
在阿里云EMR团队及小米Spark团队的共同努力下,RSS带来的稳定性和性能晋升失去了充沛的验证。后续小米将会继续扩充RSS集群规模以及作业规模,并且在弹性资源伸缩场景下施展更大的作用。
五 开源
重要的事说三遍:“阿里云RSS开源啦!” X 3
git地址: https://github.com/alibaba/Re...
开源代码蕴含外围性能及容错,满足生产要求。
打算中的重要Feature:
- AE
- Spark多版本反对
- Better 流控
- Better 监控
- Better HA
- 多引擎反对
欢送各路开发者共建!
六 Reference
[1]Min Shen, Ye Zhou, Chandni Singh. Magnet: Push-based Shuffle Service for Large-scale Data Processing. VLDB 2020.
[2]Haoyu Zhang, Brian Cho, Ergin Seyfe, Avery Ching, Michael J. Freedman. Riffle: Optimized Shuffle Service for Large-Scale Data Analytics. EuroSys 2018.
[3]Sriram Rao, Raghu Ramakrishnan, Adam Silberstein. Sailfish: A Framework For Large Scale Data Processing. SoCC 2012.
[4]KFS. http://code.google.com/p/kosm...
[5]Google Dataflow Shuffle. https://cloud.google.com/blog...
[6]Cosco: An Efficient Facebook-Scale Shuffle Service. https://databricks.com/sessio...
[7]Flash for Apache Spark Shuffle with Cosco. https://databricks.com/sessio...
[8]Uber Zeus. https://databricks.com/sessio...
[9]Uber Zeus. https://github.com/uber/Remot...
[10]Intel RPMP. https://databricks.com/sessio...
[11]Tencent FireStorm. https://github.com/Tencent/Fi...
[12]Aliyun RSS在趣头条的实际. https://developer.aliyun.com/...
[13]Aliyun RSS架构. https://developer.aliyun.com/...
原文链接
本文为阿里云原创内容,未经容许不得转载。