简介:阿里云 EMR 自 2020 年推出 Remote Shuffle Service(RSS)以来,帮忙了诸多客户解决 Spark 作业的性能、稳定性问题,并使得存算拆散架构得以施行,与此同时 RSS 也在跟合作方小米的共建下一直演进。本文将介绍 RSS 的最新架构,在小米的实际,以及开源。
作者 | 一锤、明济、紫槿
起源 | 阿里技术公众号
阿里云 EMR 自 2020 年推出 Remote Shuffle Service(RSS)以来,帮忙了诸多客户解决 Spark 作业的性能、稳定性问题,并使得存算拆散架构得以施行,与此同时 RSS 也在跟合作方小米的共建下一直演进。本文将介绍 RSS 的最新架构,在小米的实际,以及开源。
一 问题回顾
Shuffle 是大数据计算中最为重要的算子。首先,覆盖率高,超过 50% 的作业都蕴含至多一个 Shuffle[2]。其次,资源耗费大,阿里外部平台 Shuffle 的 CPU 占比超过 20%,LinkedIn 外部 Shuffle Read 导致的资源节约高达 15%[1],单 Shuffle 数据量超 100T[2]。第三,不稳固,硬件资源的稳定性 CPU> 内存 > 磁盘≈网络,而 Shuffle 的资源耗费是倒序。OutOfMemory 和 Fetch Failure 可能是 Spark 作业最常见的两种谬误,前者能够通过调参解决,而后者须要系统性重构 Shuffle。
传统 Shuffle 如下图所示,Mapper 把 Shuffle 数据按 PartitionId 排序写盘后交给 External Shuffle Service(ESS)治理,Reducer 从每个 Mapper Output 中读取属于本人的 Block。
传统 Shuffle 存在以下问题。
- 本地盘依赖限度了存算拆散。存算拆散是近年来衰亡的新型架构,它解耦了计算和存储,能够更灵便地做机型设计:计算节点强 CPU 弱磁盘,存储节点强磁盘强网络弱 CPU。计算节点无状态,可依据负载弹性伸缩。存储端,随着对象存储(OSS, S3)+ 数据湖格局(Delta, Iceberg, Hudi)+ 本地 / 近地缓存等计划的成熟,可当作容量有限的存储服务。用户通过计算弹性 + 存储按量付费取得老本节约。然而,Shuffle 对本地盘的依赖限度了存算拆散。
- 写放大。当 Mapper Output 数据量超过内存时触发外排,从而引入额定磁盘 IO。
- 大量随机读。Mapper Output 属于某个 Reducer 的数据量很小,如 Output 128M,Reducer 并发 2000,则每个 Reducer 只读 64K,从而导致大量小粒度随机读。对于 HDD,随机读性能极差;对于 SSD,会疾速耗费 SSD 寿命。
- 高网络连接数,导致线程池耗费过多 CPU,带来性能和稳定性问题。
- Shuffle 数据单正本,大规模集群场景坏盘 / 坏节点很广泛,Shuffle 数据失落引发的 Stage 重算带来性能和稳定性问题。
二 RSS 倒退历程
针对 Shuffle 的问题,工业界尝试了各种办法,近两年逐步收敛到 Push Shuffle 的计划。
1 Sailfish
Sailfish3 最早提出 Push Shuffle + Partition 数据聚合的办法,对大作业有 20%- 5 倍的性能晋升。Sailfish 魔改了分布式文件系统 KFS[4],不反对多正本。
2 Dataflow
Goolge BigQuery 和 Cloud Dataflow5 实现了 Shuffle 跟计算的解耦,采纳多层存储(内存 + 磁盘),除此之外没有披露更多技术细节。
3 Riffle
Facebook Riffle2 采纳了在 Mapper 端 Merge 的办法,物理节点上部署的 Riffle 服务负责把此节点上的 Shuffle 数据依照 PartitionId 做 Merge,从而肯定水平把小粒度的随机读合并成较大粒度。
4 Cosco
Facebook Cosco[6]7 采纳了 Sailfish 的办法并做了重设计,保留了 Push Shuffle + Parititon 数据聚合的外围办法,但应用了独立服务。服务端采纳 Master-Worker 架构,应用内存两正本,用 DFS 做长久化。Cosco 基本上定义了 RSS 的规范架构,但受到 DFS 的连累,性能上并没有显著晋升。
5 Zeus
Uber Zeus[8]9 同样采纳了去中心化的服务架构,但没有相似 etcd 的角色保护 Worker 状态,因而难以做状态治理。Zeus 通过 Client 双推的形式做多正本,采纳本地存储。
6 RPMP
Intel RPMP10 依附 RDMA 和 PMEM 的新硬件来减速 Shuffle,并没有做数据聚合。
7 Magnet
LinkedIn Magnet1 交融了本地 Shuffle+Push Shuffle,其设计哲学是 ” 尽力而为 ”,Mapper 的 Output 写完本地后,Push 线程会把数据推给远端的 ESS 做聚合,且不保障所有数据都会聚合。受害于本地 Shuffle,Magnet 在容错和 AE 的反对上的体现更好(间接 Fallback 到传统 Shuffle)。Magnet 的局限包含依赖本地盘,不反对存算拆散;数据合并依赖 ESS,对 NodeManager 造成额定压力;Shuffle Write 同时写本地和远端,性能达不到最优。Magnet 计划曾经被 Apache Spark 接收,成为默认的开源计划。
8 FireStorm
FireStorm11 混合了 Cosco 和 Zeus 的设计,服务端采纳 Master-Worker 架构,通过 Client 多写实现多正本。FireStorm 应用了本地盘 + 对象存储的多层存储,采纳较大的 PushBlock(默认 3M)。FireStorm 在存储端保留了 PushBlock 的元信息,并记录在索引文件中。FireStorm 的 Client 缓存数据的内存由 Spark MemoryManager 进行治理,并通过细颗粒度的内存调配 (默认 3K) 来尽量避免内存节约。
从上述形容可知,以后的计划根本收敛到 Push Shuffle,但在一些要害设计上的抉择各家不尽相同,次要体现在:
- 集成到 Spark 外部还是独立服务。
- RSS 服务侧架构,选项包含:Master-Worker,含轻量级状态治理的去中心化,齐全去中心化。
- Shuffle 数据的存储,选项包含:内存,本地盘,DFS,对象存储。
- 多正本的实现,选项包含:Client 多推,服务端做 Replication。
阿里云 RSS12 由 2020 年推出,外围设计参考了 Sailfish 和 Cosco,并且在架构和实现层面做了改进,下文将具体介绍。
三 阿里云 RSS 外围架构
针对上一节的要害设计,阿里云 RSS 的抉择如下:
- 独立服务。思考到将 RSS 集成到 Spark 外部无奈满足存算拆散架构,阿里云 RSS 将作为独立服务提供 Shuffle 服务。
- Master-Worker 架构。通过 Master 节点做服务状态治理十分必要,基于 etcd 的状态状态治理能力受限。
- 多种存储形式。目前反对本地盘 /DFS 等存储形式,主打本地盘,未来会往分层存储方向倒退。
- 服务端做 Replication。Client 多推会额定耗费计算节点的网络和计算资源,在独立部署或者服务化的场景下对计算集群不敌对。
下图展现了阿里云 RSS 的要害架构,蕴含 Client(RSS Client, Meta Service),Master(Resource Manager)和 Worker 三个角色。Shuffle 的过程如下:
- Mapper 在首次 PushData 时申请 Master 调配 Worker 资源,Worker 记录本人所须要服务的 Partition 列表。
- Mapper 把 Shuffle 数据缓存到内存,超过阈值时触发 Push。
- 附属同个 Partition 的数据被 Push 到同一个 Worker 做合并,主 Worker 内存接管到数据后立刻向从 Worker 发动 Replication,数据达成内存两正本后即向 Client 发送 ACK,Flusher 后盾线程负责刷盘。
- Mapper Stage 运行完结,MetaService 向 Worker 发动 CommitFiles 命令,把残留在内存的数据全副刷盘并返回文件列表。
- Reducer 从对应的文件列表中读取 Shuffle 数据。
阿里云 RSS 的外围架构和容错方面的介绍详见[13],本文接下来介绍阿里云 RSS 近一年的架构演进以及不同于其余零碎的特色。
1 状态下沉
RSS 采纳 Master-Worker 架构,最后的设计中 Master 对立负责集群状态治理和 Shuffle 生命周期治理。集群状态包含 Worker 的衰弱度和负载;生命周期包含每个 Shuffle 由哪些 Worker 服务,每个 Worker 所服务的 Partition 列表,Shuffle 所处的状态(Shuffle Write,CommitFile,Shuffle Read),是否有数据失落等。保护 Shuffle 生命周期须要较大数据量和简单数据结构,给 Master HA 的实现造成阻力。同时大量生命周期治理的服务调用使 Master 易成为性能瓶颈,限度 RSS 的扩展性。
为了缓解 Master 压力,咱们把生命周期状态治理下沉到 Driver,由 Application 治理本人的 Shuffle,Master 只需保护 RSS 集群自身的状态。这个优化大大降低 Master 的负载,并使得 Master HA 得以顺利实现。
2 Adaptive Pusher
在最后的设计中,阿里云 RSS 跟其余零碎一样采纳 Hash-Based Pusher,即 Client 会为每个 Partition 保护一个 (或多个[11]) 内存 Buffer,当 Buffer 超过阈值时触发推送。这种设计在并发度适中的状况下没有问题,而在超大并发度的状况下会导致 OOM。例如 Reducer 的并发 5W,在小 Buffer[13]的零碎中 (64K) 极其内存耗费为 64K5W=3G,在大 Buffer[11]的零碎中 (3M) 极其内存耗费为 3M5W=146G,这是不可承受的。针对这个问题,咱们开发了 Sort-Based Pusher,缓存数据时不辨别 Partition,当总的数据超过阈值 (i.e. 64M) 时对以后数据依照 PartitionId 排序,而后把数据 Batch 后推送,从而解决内存耗费过大的问题。
Sort-Based Pusher 会额定引入一次排序,性能上比 Hash-Based Pusher 略差。咱们在 ShuffleWriter 初始化阶段依据 Reducer 的并发度主动抉择适合的 Pusher。
3 磁盘容错
出于性能的思考,阿里云 RSS 举荐本地盘存储,因而解决坏 / 慢盘是保障服务可靠性的前提。Worker 节点的 DeviceMonitor 线程定时对磁盘进行查看,查看项包含 IOHang,使用量,读写异样等。此外 Worker 在所有磁盘操作处 (创立文件,刷盘) 都会捕获异样并上报。IOHang、读写异样被认为是 Critical Error,磁盘将被隔离并终止该磁盘上的存储服务。慢盘、使用量超警戒线等异样仅将磁盘隔离,不再承受新的 Partition 存储申请,但已有的 Partition 服务放弃失常。在磁盘被隔离后,Worker 的容量和负载将发生变化,这些信息将通过心跳发送给 Master。
4 滚动降级
RSS 作为常驻服务,有永不停服的要求,而零碎自身总在向前演进,因而滚动降级是必选的性能。只管通过 Sub-Cluster 部署形式能够绕过,即部署多个子集群,对子集群做灰度,灰度的集群暂停服务,但这种形式依赖调度零碎感知正在灰度的集群并动静批改作业配置。咱们认为 RSS 应该把滚动降级闭环掉,外围设计如下图所示。Client 向 Master 节点的 Leader 角色 (Master 实现了 HA,见上文) 发动滚动降级申请并把更新包上传给 Leader,Leader 通过 Raft 协定批改状态为滚动降级,并启动第一阶段的降级:降级 Master 节点。Leader 首先降级所有的 Follower,而后替换本地包并重启。在 Leader 节点扭转的状况下,降级过程不会中断或异样。Master 节点降级完结后进入第二阶段:Worker 节点降级。RSS 采纳滑动窗口做降级,窗口内的 Worker 尽量优雅下线,即回绝新的 Partition 申请,并期待本地 Shuffle 完结。为了防止等待时间过长,会设置超时工夫。此外,窗口内的 Worker 抉择会尽量避免同时蕴含主从两正本以升高数据失落的概率。
5 凌乱测试框架
对于服务来说,仅依附 UT、集成测试、e2e 测试等无奈保障服务可靠性,因为这些测试无奈笼罩线上简单环境,如坏盘、CPU 过载、网络过载、机器挂掉等。RSS 要求在呈现这些简单状况时放弃服务稳固,为了模仿线上环境,咱们开发了仿真 (凌乱) 测试框架,在测试环境中模仿线上可能呈现的异样,同时保障满足 RSS 运行的最小运行环境,即至多 3 个 Master 节点和 2 个 Worker 节点可用,并且每个 Worker 节点至多有一块盘。咱们继续对 RSS 做此类压力测试。
仿真测试框架架构如下图所示,首先定义测试 Plan 来形容事件类型、事件触发的程序及持续时间,事件类型包含节点异样,磁盘异样,IO 异样,CPU 过载等。客户端将 Plan 提交给 Scheduler,Scheduler 依据 Plan 的形容给每个节点的 Runner 发送具体的 Operation,Runner 负责具体执行并汇报以后节点的状态。在触发 Operation 之前,Scheduler 会推演该事件产生产生的结果,若导致无奈满足 RSS 的最小可运行环境,将回绝此事件。
咱们认为仿真测试框架的思路是通用设计,能够推广到更多的服务测试中。
6 多引擎反对
Shuffle 是通用操作,不跟引擎绑定,因而咱们尝试了多引擎反对。以后咱们反对了 Hive+RSS,同时也在摸索跟流计算引擎 (Flink),MPP 引擎(Presto) 联合的可能性。只管 Hive 和 Spark 都是批计算引擎,但 Shuffle 的行为并不统一,最大的差别是 Hive 在 Mapper 端做排序,Reducer 只做 Merge,而 Spark 在 Reducer 端做排序。因为 RSS 暂未反对计算,因而须要革新 Tez 反对 Reducer 排序。此外,Spark 有洁净的 Shuffle 插件接口,RSS 只需在外围扩大,而 Tez 没有相似形象,在这方面也有肯定侵入性。
以后大多数引擎都没有 Shuffle 插件化的形象,须要肯定水平的引擎批改。此外,流计算和 MPP 都是上游即时 Push 给上游的模式,而 RSS 是上游 Push,上游 Pull 的模式,这两者如何联合也是须要摸索的。
7 测试
咱们比照了阿里云 RSS、Magent 及开源零碎 X。因为大家的零碎还在向前演进,因而测试后果仅代表以后。
测试环境
Header 1: ecs.g6e.4xlarge, 16 2.5GHz/3.2GHz, 64GiB, 10Gbps
Worker 3: ecs.g6e.8xlarge, 32 2.5GHz/3.2GHz, 128GiB, 10Gbps
阿里云 RSS vs. Magnet
5T Terasort 的性能测试如下图所示,如上文形容,Magent 的 Shuffle Write 有额定开销,差于 RSS 和传统做法。Magent 的 Shuffle Read 有晋升,但差于 RSS。在这个 Benchmark 下,RSS 显著优于另外两个,Magent 的 e2e 工夫略好于传统 Shuffle。
阿里云 RSS vs. 开源零碎 X
RSS 跟开源零碎 X 在 TPCDS-3T 的性能比照如下,总工夫 RSS 快了 20%。
稳定性
在稳定性方面,咱们测试了 Reducer 大规模并发的场景,Magnet 能够跑通但工夫比 RSS 慢了数倍,System X 在 Shuffle Write 阶段报错。
四 阿里云 RSS 在小米的实际
1 现状及痛点
小米的离线集群以 Yarn+HDFS 为主,NodeManager 和 DataNode 混合部署。Spark 是次要的离线引擎,撑持着外围计算工作。Spark 作业以后最大的痛点集中在 Shuffle 导致的稳定性差,性能差和对存算拆散架构的限度。在进行资源保障和作业调优后,作业失败起因次要归结为 Fetch Failure,如下图所示。因为大部分集群应用的是 HDD,传统 Shuffle 的高随机读和高网络连接导致性能很差,低稳定性带来的 Stage 重算会进一步加剧性能回退。此外,小米始终在尝试利用存算拆散架构的计算弹性降低成本,但 Shuffle 对本地盘的依赖造成了妨碍。
2 RSS 在小米的落地
小米始终在关注 Shuffle 优化相干技术,21 年 1 月份跟阿里云 EMR 团队就 RSS 我的项目建设了共创关系,3 月份第一个生产集群上线,开始接入作业,6 月份第一个 HA 集群上线,规模达 100+ 节点,9 月份第一个 300+ 节点上线,集群默认开启 RSS,后续布局会进一步扩大 RSS 的灰度规模。
在落地的过程,小米主导了磁盘容错的开发,大大提高了 RSS 的服务稳定性,技术细节如上文所述。此外,在后期 RSS 还未齐全稳固阶段,小米在多个环节对 RSS 的作业进行了容错。在调度端,若开启 RSS 的 Spark 作业因 Shuffle 报错,则 Yarn 的下次重试会回退到 ESS。在 ShuffleWriter 初始化阶段,小米主导了自适应 Fallback 机制,依据以后 RSS 集群的负载和作业的特色 (如 Reducer 并发是否过大) 主动抉择 RSS 或 ESS,从而晋升稳定性。
3 成果
接入 RSS 后,Spark 作业的稳定性、性能都获得了显著晋升。之前因 Fetch Failure 失败的作业简直不再失败,性能均匀有 20% 的晋升。下图展现了接入 RSS 前后作业稳定性的比照。
ESS:
RSS:
下图展现了接入 RSS 前后作业运行工夫的比照。
ESS:
RSS:
在存算拆散方面,小米海内某集群接入 RSS 后,胜利上线了 1600+ Core 的弹性集群,且作业运行稳固。
在阿里云 EMR 团队及小米 Spark 团队的共同努力下,RSS 带来的稳定性和性能晋升失去了充沛的验证。后续小米将会继续扩充 RSS 集群规模以及作业规模,并且在弹性资源伸缩场景下施展更大的作用。
五 开源
重要的事说三遍:“阿里云 RSS 开源啦!”X 3
git 地址: https://github.com/alibaba/Re…
开源代码蕴含外围性能及容错,满足生产要求。
打算中的重要 Feature:
- AE
- Spark 多版本反对
- Better 流控
- Better 监控
- Better HA
- 多引擎反对
欢送各路开发者共建!
六 Reference
[1]Min Shen, Ye Zhou, Chandni Singh. Magnet: Push-based Shuffle Service for Large-scale Data Processing. VLDB 2020.
[2]Haoyu Zhang, Brian Cho, Ergin Seyfe, Avery Ching, Michael J. Freedman. Riffle: Optimized Shuffle Service for Large-Scale Data Analytics. EuroSys 2018.
[3]Sriram Rao, Raghu Ramakrishnan, Adam Silberstein. Sailfish: A Framework For Large Scale Data Processing. SoCC 2012.
[4]KFS. http://code.google.com/p/kosm…
[5]Google Dataflow Shuffle. https://cloud.google.com/blog…
[6]Cosco: An Efficient Facebook-Scale Shuffle Service. https://databricks.com/sessio…
[7]Flash for Apache Spark Shuffle with Cosco. https://databricks.com/sessio…
[8]Uber Zeus. https://databricks.com/sessio…
[9]Uber Zeus. https://github.com/uber/Remot…
[10]Intel RPMP. https://databricks.com/sessio…
[11]Tencent FireStorm. https://github.com/Tencent/Fi…
[12]Aliyun RSS 在趣头条的实际. https://developer.aliyun.com/…
[13]Aliyun RSS 架构. https://developer.aliyun.com/…
原文链接
本文为阿里云原创内容,未经容许不得转载。