关于阿里云:降本增效利器趣头条Spark-Remote-Shuffle-Service最佳实践

10次阅读

共计 6658 个字符,预计需要花费 17 分钟才能阅读完成。

  • 王振华,趣头条大数据总监,趣头条大数据负责人
  • 曹佳清,趣头条大数据离线团队高级研发工程师,曾就任于饿了么大数据 INF 团队负责存储层和计算层组件研发,目前负责趣头条大数据计算层组件 Spark 的建设
  • 范振,花名辰繁,阿里云计算平台 EMR 高级技术专家,目前次要关注开源大数据技术以及云原生技术。
  1. 业务场景与现状

趣头条是一家依赖大数据的科技公司,在 2018-2019 年经验了业务的高速倒退,主 App 和其余翻新 App 的日活减少了 10 倍以上,相应的大数据系统也从最后的 100 台机器减少到了 1000 台以上规模。多个业务线依赖于大数据平台开展业务,大数据系统的高效和稳固成了公司业务倒退的基石,在大数据的架构上咱们应用了业界成熟的计划,存储构建在 HDFS 上、计算资源调度依赖 Yarn、表元数据应用 Hive 治理、用 Spark 进行计算,具体如图 1 所示:

图 1 趣头条离线大数据平台架构图
其中 Yarn 集群应用了繁多大集群的计划,HDFS 应用了联邦的计划,同时基于老本因素,HDFS 和 Yarn 服务在 ECS 上进行了 DataNode 和 NodeManager 的混部。
在趣头条每天有 6W+ 的 Spark 工作跑在 Yarn 集群上,每天新增的 Spark 工作稳固在 100 左右,公司的迅速倒退要求需要疾速实现,积攒了很多治理欠债,种种问题体现进去集群稳定性须要晋升,其中 Shuffle 的稳定性越来越成为集群的枷锁,亟需解决。

  1. 以后大数据平台的挑战与思考

近半年大数据平台次要的业务指标是降本增效,一方面业务方心愿离线平台每天可能承载更多的作业,另一方面咱们本身有降本的需要,如何在降本的前提下撑持更多地业务量对于每个技术人都是十分大地挑战。相熟 Spark 的同学应该十分分明,在大规模集群场景下,Spark Shuffle 在实现上有比拟大的缺点,体现在以下的几个方面:

  • Spark Shuffle Fetch 过程存在大量的网络小包,现有的 External Shuffle Service 设计并没有十分粗疏的解决这些 RPC 申请,大规模场景下会有很多 connection reset 产生,导致 FetchFailed,从而导致 stage 重算。
  • Spark Shuffle Fetch 过程存在大量的随机读,大规模高负载集群条件下,磁盘 IO 负载高、CPU 满载时常产生,极容易产生 FetchFailed,从而导致 stage 重算。
  • 重算过程会放大集群的忙碌水平,抢占机器资源,导致恶性循环重大,SLA 完不成,须要运维人员手动将作业跑在闲暇的 Label 集群。
  • 计算和 Shuffle 过程架构不能拆开,不能把 Shuffle 限定在指定的集群内,不能利用局部 SSD 机器。
  • M* N 次的 shuffle 过程:对于 10K mapper,5K reducer 级别的作业,根本跑不完。
  • NodeManager 和 Spark Shuffle Service 是同一过程,Shuffle 过程太重,常常导致 NodeManager 重启,从而影响 Yarn 调度稳定性。

以上的这些问题对于 Spark 研发同学是十分苦楚的,好多作业每天运行时长方差会十分大,而且总有一些无奈实现的作业,要么业务进行拆分,要么跑到独有的 Yarn 集群中。除了现有面临的挑战之外,咱们也在踊跃构建下一代基础架构设施,随着云原生 Kubernetes 概念越来越火,Spark 社区也提供了 Spark on Kubernetes 版本,相比拟于 Yarn 来说,Kubernetes 可能更好的利用云原生的弹性,提供更加丰盛的运维、部署、隔离等个性。然而 Spark on Kubernetes 目前还存在很多问题没有解决,包含容器内的 Shuffle 形式、动静资源调度、调度性能无限等等。咱们针对 Kubernetes 在趣头条的落地,次要有以下几个方面的需要:

  • 实时集群、OLAP 集群和 Spark 集群之前都是互相独立的,怎么可能将这些资源造成对立大数据资源池。通过 Kubernetes 的天生隔离个性,更好的实现离线业务与实时业务混部,达到降本增效目标。
  • 公司的在线业务都运行在 Kubernetes 集群中,如何利用在线业务和大数据业务的不同特点进行错峰调度,达成 ECS 的总资源量起码。
  • 心愿可能基于 Kubernetes 来容纳在线服务、大数据、AI 等基础架构,做到运维体系统一化。

因为趣头条的大数据业务目前全都部署在阿里云上,阿里云 EMR 团队和趣头条的大数据团队进行了深刻技术共创,独特研发了 Remote Shuffle Service(以下简称 RSS),旨在解决 Spark on Yarn 层面提到的所有问题,并为 Spark 跑在 Kubernetes 上提供 Shuffle 根底组件。

  1. Remote Shuffle Service 设计与实现

3.1 Remote Shuffle Service 的背景

早在 2019 年初咱们就关注到了社区曾经有相应的探讨,如 SPARK-25299。该 Issue 次要心愿解决的问题是在云原生环境下,Spark 须要将 Shuffle 数据写出到近程的服务中。然而咱们通过调研后发现 Spark 3.0(之前的 master 分支)只反对了局部的接口,而没有对应的实现。该接口次要心愿在现有的 Shuffle 代码框架下,将数据写到近程服务中。如果基于这种形式实现,比方间接将 Shuffle 以流的形式写入到 HDFS 或者 Alluxio 等高速内存零碎,会有相当大的性能开销,趣头条也做了一些相应的工作,并进行了局部的 Poc,性能与原版 Spark Shuffle 实现相差特地多,最差性能可降落 3 倍以上。同时咱们也调研了一部分其余公司的实现计划,例如 Facebook 的 Riffle 计划以及 LinkedIn 开源的 Magnet,这些实现计划是首先将 Shuffle 文件写到本地,而后在进行 Merge 或者 Upload 到近程的服务上,这和后续咱们的 Kubernetes 架构是不兼容的,因为 Kubernetes 场景下,本地磁盘 Hostpath 或者 LocalPV 并不是一个必选项,而且也会存在隔离和权限的问题。
基于上述背景,咱们与阿里云 EMR 团队共同开发了 Remote Shuffle Service。RSS 能够提供以下的能力,完满的解决了 Spark Shuffle 面临的技术挑战,为咱们集群的稳定性和容器化的落地提供了强有力的保障,次要体现在以下几个方面:

  • 高性能服务器的设计思路,不同于 Spark 原有 Shuffle Service,RPC 更轻量、通用和稳固。
  • 两正本机制,可能保障的 Shuffle fetch 极小概率(低于 0.01%)失败。
  • 合并 shuffle 文件,从 M * N 次 shuffle 变成 N 次 shuffle,程序读 HDD 磁盘会显著晋升 shuffle heavy 作业性能。
  • 缩小 Executor 计算时内存压力,防止 map 过程中 Shuffle Spill。
  • 计算与存储拆散架构,能够将 Shuffle Service 部署到非凡硬件环境中,例如 SSD 机器,能够保障 SLA 极高的作业。
  • 完满解决 Spark on Kubernetes 计划中对于本地磁盘的依赖。

3.2 Remote Shuffle Service 的实现

3.2.1 整体设计

Spark RSS 架构蕴含三个角色: Master, Worker, Client。Master 和 Worker 形成服务端,Client 以不侵入的形式集成到 Spark ShuffleManager 里(RssShuffleManager 实现了 ShuffleManager 接口)。

  • Master 的主要职责是资源分配与状态治理。
  • Worker 的主要职责是解决和存储 Shuffle 数据。
  • Client 的主要职责是缓存和推送 Shuffle 数据。

整体流程如下所示 (其中 ResourceManager 和 MetaService 是 Master 的组件),如图 2。

图 2 RSS 整体架构图

3.2.2 实现流程

上面重点来讲一下实现的流程:

  • RSS 采纳 Push Style 的 shuffle 模式,每个 Mapper 持有一个按 Partition 分界的缓存区,Shuffle 数据首先写入缓存区,每当某个 Partition 的缓存满了即触发 PushData。
  • Driver 先和 Master 产生 StageStart 的申请,Master 承受到该 RPC 后,会调配对应的 Worker Partition 并返回给 Driver,Shuffle Client 失去这些元信息后,进行后续的推送数据。
  • Client 开始向主正本推送数据。主正本 Worker 收到申请后,把数据缓存到本地内存,同时把该申请以 Pipeline 的形式转发给从正本,从而实现了 2 正本机制。
  • 为了不阻塞 PushData 的申请,Worker 收到 PushData 申请后会以纯异步的形式交由专有的线程池异步解决。依据该 Data 所属的 Partition 拷贝到当时调配的 buffer 里,若 buffer 满了则触发 flush。RSS 反对多种存储后端,包含 DFS 和 Local。若后端是 DFS,则主从正本只有一方会 flush,依附 DFS 的双正本保障容错;若后端是 Local,则主从单方都会 flush。
  • 在所有的 Mapper 都完结后,Driver 会触发 StageEnd 申请。Master 接管到该 RPC 后,会向所有 Worker 发送 CommitFiles 申请,Worker 收到后把属于该 Stage buffer 里的数据 flush 到存储层,close 文件,并开释 buffer。Master 收到所有响应后,记录每个 partition 对应的文件列表。若 CommitFiles 申请失败,则 Master 标记此 Stage 为 DataLost。
  • 在 Reduce 阶段,reduce task 首先向 Master 申请该 Partition 对应的文件列表,若返回码是 DataLost,则触发 Stage 重算或间接 abort 作业。若返回失常,则间接读取文件数据。

总体来讲,RSS 的设计要点总结为 3 个层面:

  • 采纳 PushStyle 的形式做 shuffle,防止了本地存储,从而适应了计算存储拆散架构。
  • 依照 reduce 做聚合,防止了小文件随机读写和小数据量网络申请。
  • 做了 2 正本,进步了零碎稳定性。

3.2.3 容错

对于 RSS 零碎,容错性是至关重要的,咱们分为以下几个维度来实现:

  • PushData 失败

    • 当 PushData 失败次数 (Worker 挂了,网络忙碌,CPU 忙碌等) 超过 MaxRetry 后,Client 会给 Master 发消息申请新的 Partition Location,尔后本 Client 都会应用新的 Location 地址,该阶段称为 Revive。
    • 若 Revive 是因为 Client 端而非 Worker 的问题导致,则会产生同一个 Partition 数据分布在不同 Worker 上的状况,Master 的 Meta 组件会正确处理这种情景。
    • 若产生 WorkerLost,则会导致大量 PushData 同时失败,此时会有大量同一 Partition 的 Revive 申请打到 Master。为了防止给同一个 Partition 调配过多的 Location,Master 保障仅有一个 Revive 申请真正失去解决,其余的申请塞到 pending queue 里,待 Revive 解决完结后返回同一个 Location。
  • Worker 宕机

    • 当产生 WorkerLost 时,对于该 Worker 上的正本数据,Master 向其 peer 发送 CommitFile 的申请,而后清理 peer 上的 buffer。若 Commit Files 失败,则记录该 Stage 为 DataLost;若胜利,则后续的 PushData 通过 Revive 机制从新申请 Location。
  • 数据去重

    • Speculation task 和 task 重算会导致数据反复。解决办法是每个 PushData 的数据片里编码了所属的 mapId,attemptId 和 batchId,并且 Master 为每个 map task 记录胜利 commit 的 attemtpId。read 端通过 attemptId 过滤不同的 attempt 数据,并通过 batchId 过滤同一个 attempt 的反复数据。
  • 多正本

    • RSS 目前反对 DFS 和 Local 两种存储后端。
    • 在 DFS 模式下,ReadPartition 失败会间接导致 Stage 重算或 abort job。在 Local 模式,ReadPartition 失败会触发从 peer location 读,若主从都失败则触发 Stage 重算或 abort job。

3.2.4 高可用

大家能够看到 RSS 的设计中 Master 是一个单点,尽管 Master 的负载很小,不会轻易地挂掉,然而这对于线上稳定性来说无疑是一个危险点。在我的项目的最后上线阶段,咱们心愿能够通过 SubCluster 的形式进行 workaround,即通过部署多套 RSS 来承载不同的业务,这样即便 RSS Master 宕机,也只会影响无限的一部分业务。然而随着零碎的深刻应用,咱们决定直面问题,引进高可用 Master。次要的实现如下:

  • 首先,Master 目前的元数据比拟多,咱们能够将一部分与 ApplD+ShuffleId 自身相干的元数据下沉到 Driver 的 ShuffleManager 中,因为元数据并不会很多,Driver 减少的内存开销十分无限。
  • 另外,对于全局负载平衡的元数据和调度相干的元数据,咱们利用 Raft 实现了 Master 组件的高可用,这样咱们通过部署 3 或 5 台 Master,真正的实现了大规模可扩大的需要。

实际效果与剖析


4.1 性能与稳定性

团队针对 TeraSort,TPC-DS 以及大量的外部作业进行了测试,在 Reduce 阶段缩小了随机读的开销,工作的稳定性和性能都有了大幅度晋升。
图 3 是 TeraSort 的 benchmark,以 10T Terasort 为例,Shuffle 量压缩后大概 5.6T。能够看出该量级的作业在 RSS 场景下,因为 Shuffle read 变为程序读,性能会有大幅晋升。

图 3 TeraSort 性能测试(RSS 性能更好)
图 4 是一个线上理论脱敏后的 Shuffle heavy 大作业,之前在混部集群中很小概率能够跑完,每天工作 SLA 不能按时达成,剖析起因次要是因为大量的 FetchFailed 导致 stage 进行重算。应用 RSS 之后每天能够稳固的跑完,2.1T 的 shuffle 也不会呈现任何 FetchFailed 的场景。在更大的数据集性能和 SLA 体现都更为显著。

图 4 理论业务的作业 stage 图(应用 RSS 保障稳定性和性能)

4.2 业务成果

在大数据团队和阿里云 EMR 团队的共同努力下,通过近半年的上线、经营 RSS,以及和业务部门的长时间测试,业务价值次要体现在以下方面:

  • 降本增效成果显著,在集群规模小幅降落的根底上,撑持了更多的计算工作,TCO 老本降落 20%。
  • SLA 显著晋升,大规模 Spark Shuffle 工作从跑不完到能跑完,咱们可能将不同 SLA 级别作业合并到同一集群,减小集群节点数量,达到对立治理,放大老本的目标。本来业务方有一部分 SLA 比拟高的作业在一个独有的 Yarn 集群 B 中运行,因为主 Yarn 集群 A 的负载十分高,如果跑到集群 A 中,会常常的挂掉。利用 RSS 之后能够释怀的将作业跑到主集群 A 中,从而开释掉独有 Yarn 集群 B。
  • 作业执行效率显著晋升,跑的慢 -> 跑的快。咱们比拟了几个典型的 Shuffle heavy 作业,一个重要的业务线作业本来须要 3 小时,RSS 版本须要 1.6 小时。抽取线上 5~10 个作业,大作业的性能晋升相当显著,不同作业均匀下来有 30% 以上的性能晋升,即便是 shuffle 量不大的作业,因为比较稳定不须要 stage 重算,长期运行均匀工夫也会缩小 10%-20%。
  • 架构灵活性显著晋升,降级为计算与存储拆散架构。Spark 在容器中运行的过程中,将 RSS 作为根底组件,能够使得 Spark 容器化可能大规模的落地,为离线在线对立资源、对立调度打下了根底。

将来瞻望


趣头条大数据平台和阿里云 EMR 团队后续会持续放弃深刻共创,将摸索更多的方向。次要有以下的一些思路:

  • RSS 存储能力优化,包含将云的对象存储作为存储后端。
  • RSS 多引擎反对,例如 MapReduce、Tez 等,晋升历史工作执行效率。
  • 减速大数据容器化落地,配合 RSS 能力,解决 K8s 调度器性能、调度策略等一系列挑战。
  • 继续优化老本,配合 EMR 的弹性伸缩性能,一方面 Spark 能够应用更多的阿里云 ECS/ECI 抢占式实例来进一步压缩老本,另一方面将已有机器包含阿里云 ACK,ECI 等资源造成对立大池子,将大数据的计算组件和在线业务进行错峰调度以及混部。

原文链接
本文为阿里云原创内容,未经容许不得转载。

正文完
 0