关于阿里云:降本增效利器趣头条Spark-Remote-Shuffle-Service最佳实践

  • 王振华,趣头条大数据总监,趣头条大数据负责人
  • 曹佳清,趣头条大数据离线团队高级研发工程师,曾就任于饿了么大数据INF团队负责存储层和计算层组件研发,目前负责趣头条大数据计算层组件Spark的建设
  • 范振,花名辰繁,阿里云计算平台EMR高级技术专家,目前次要关注开源大数据技术以及云原生技术。
  1. 业务场景与现状

趣头条是一家依赖大数据的科技公司,在2018-2019年经验了业务的高速倒退,主App和其余翻新App的日活减少了10倍以上,相应的大数据系统也从最后的100台机器减少到了1000台以上规模。多个业务线依赖于大数据平台开展业务,大数据系统的高效和稳固成了公司业务倒退的基石,在大数据的架构上咱们应用了业界成熟的计划,存储构建在HDFS上、计算资源调度依赖Yarn、表元数据应用Hive治理、用Spark进行计算,具体如图1所示:

图1 趣头条离线大数据平台架构图
其中Yarn集群应用了繁多大集群的计划,HDFS应用了联邦的计划,同时基于老本因素,HDFS和Yarn服务在ECS上进行了DataNode和NodeManager的混部。
在趣头条每天有6W+的Spark工作跑在Yarn集群上,每天新增的Spark工作稳固在100左右,公司的迅速倒退要求需要疾速实现,积攒了很多治理欠债,种种问题体现进去集群稳定性须要晋升,其中Shuffle的稳定性越来越成为集群的枷锁,亟需解决。

  1. 以后大数据平台的挑战与思考

近半年大数据平台次要的业务指标是降本增效,一方面业务方心愿离线平台每天可能承载更多的作业,另一方面咱们本身有降本的需要,如何在降本的前提下撑持更多地业务量对于每个技术人都是十分大地挑战。相熟Spark的同学应该十分分明,在大规模集群场景下,Spark Shuffle在实现上有比拟大的缺点,体现在以下的几个方面:

  • Spark Shuffle Fetch过程存在大量的网络小包,现有的External Shuffle Service设计并没有十分粗疏的解决这些RPC申请,大规模场景下会有很多connection reset产生,导致FetchFailed,从而导致stage重算。
  • Spark Shuffle Fetch过程存在大量的随机读,大规模高负载集群条件下,磁盘IO负载高、CPU满载时常产生,极容易产生FetchFailed,从而导致stage重算。
  • 重算过程会放大集群的忙碌水平,抢占机器资源,导致恶性循环重大,SLA完不成,须要运维人员手动将作业跑在闲暇的Label集群。
  • 计算和Shuffle过程架构不能拆开,不能把Shuffle限定在指定的集群内,不能利用局部SSD机器。
  • M*N次的shuffle过程:对于10K mapper,5K reducer级别的作业,根本跑不完。
  • NodeManager和Spark Shuffle Service是同一过程,Shuffle过程太重,常常导致NodeManager重启,从而影响Yarn调度稳定性。

以上的这些问题对于Spark研发同学是十分苦楚的,好多作业每天运行时长方差会十分大,而且总有一些无奈实现的作业,要么业务进行拆分,要么跑到独有的Yarn集群中。除了现有面临的挑战之外,咱们也在踊跃构建下一代基础架构设施,随着云原生Kubernetes概念越来越火,Spark社区也提供了Spark on Kubernetes版本,相比拟于Yarn来说,Kubernetes可能更好的利用云原生的弹性,提供更加丰盛的运维、部署、隔离等个性。然而Spark on Kubernetes目前还存在很多问题没有解决,包含容器内的Shuffle形式、动静资源调度、调度性能无限等等。咱们针对Kubernetes在趣头条的落地,次要有以下几个方面的需要:

  • 实时集群、OLAP集群和Spark集群之前都是互相独立的,怎么可能将这些资源造成对立大数据资源池。通过Kubernetes的天生隔离个性,更好的实现离线业务与实时业务混部,达到降本增效目标。
  • 公司的在线业务都运行在Kubernetes集群中,如何利用在线业务和大数据业务的不同特点进行错峰调度,达成ECS的总资源量起码。
  • 心愿可能基于Kubernetes来容纳在线服务、大数据、AI等基础架构,做到运维体系统一化。

因为趣头条的大数据业务目前全都部署在阿里云上,阿里云EMR团队和趣头条的大数据团队进行了深刻技术共创,独特研发了Remote Shuffle Service(以下简称RSS),旨在解决Spark on Yarn层面提到的所有问题,并为Spark跑在Kubernetes上提供Shuffle根底组件。

  1. Remote Shuffle Service设计与实现

3.1 Remote Shuffle Service的背景

早在2019年初咱们就关注到了社区曾经有相应的探讨,如SPARK-25299。该Issue次要心愿解决的问题是在云原生环境下,Spark须要将Shuffle数据写出到近程的服务中。然而咱们通过调研后发现Spark 3.0(之前的master分支)只反对了局部的接口,而没有对应的实现。该接口次要心愿在现有的Shuffle代码框架下,将数据写到近程服务中。如果基于这种形式实现,比方间接将Shuffle以流的形式写入到HDFS或者Alluxio等高速内存零碎,会有相当大的性能开销,趣头条也做了一些相应的工作,并进行了局部的Poc,性能与原版Spark Shuffle实现相差特地多,最差性能可降落3倍以上。同时咱们也调研了一部分其余公司的实现计划,例如Facebook的Riffle计划以及LinkedIn开源的Magnet,这些实现计划是首先将Shuffle文件写到本地,而后在进行Merge或者Upload到近程的服务上,这和后续咱们的Kubernetes架构是不兼容的,因为Kubernetes场景下,本地磁盘Hostpath或者LocalPV并不是一个必选项,而且也会存在隔离和权限的问题。
基于上述背景,咱们与阿里云EMR团队共同开发了Remote Shuffle Service。RSS能够提供以下的能力,完满的解决了Spark Shuffle面临的技术挑战,为咱们集群的稳定性和容器化的落地提供了强有力的保障,次要体现在以下几个方面:

  • 高性能服务器的设计思路,不同于Spark原有Shuffle Service,RPC更轻量、通用和稳固。
  • 两正本机制,可能保障的Shuffle fetch极小概率(低于0.01%)失败。
  • 合并shuffle文件,从M*N次shuffle变成N次shuffle,程序读HDD磁盘会显著晋升shuffle heavy作业性能。
  • 缩小Executor计算时内存压力,防止map过程中Shuffle Spill。
  • 计算与存储拆散架构,能够将Shuffle Service部署到非凡硬件环境中,例如SSD机器,能够保障SLA极高的作业。
  • 完满解决Spark on Kubernetes计划中对于本地磁盘的依赖。

3.2 Remote Shuffle Service的实现

3.2.1 整体设计

Spark RSS架构蕴含三个角色: Master, Worker, Client。Master和Worker形成服务端,Client以不侵入的形式集成到Spark ShuffleManager里(RssShuffleManager实现了ShuffleManager接口)。

  • Master的主要职责是资源分配与状态治理。
  • Worker的主要职责是解决和存储Shuffle数据。
  • Client的主要职责是缓存和推送Shuffle数据。

整体流程如下所示(其中ResourceManager和MetaService是Master的组件),如图2。

图2 RSS整体架构图

3.2.2 实现流程

上面重点来讲一下实现的流程:

  • RSS采纳Push Style的shuffle模式,每个Mapper持有一个按Partition分界的缓存区,Shuffle数据首先写入缓存区,每当某个Partition的缓存满了即触发PushData。
  • Driver先和Master产生StageStart的申请,Master承受到该RPC后,会调配对应的Worker Partition并返回给Driver,Shuffle Client失去这些元信息后,进行后续的推送数据。
  • Client开始向主正本推送数据。主正本Worker收到申请后,把数据缓存到本地内存,同时把该申请以Pipeline的形式转发给从正本,从而实现了2正本机制。
  • 为了不阻塞PushData的申请,Worker收到PushData申请后会以纯异步的形式交由专有的线程池异步解决。依据该Data所属的Partition拷贝到当时调配的buffer里,若buffer满了则触发flush。RSS反对多种存储后端,包含DFS和Local。若后端是DFS,则主从正本只有一方会flush,依附DFS的双正本保障容错;若后端是Local,则主从单方都会flush。
  • 在所有的Mapper都完结后,Driver会触发StageEnd申请。Master接管到该RPC后,会向所有Worker发送CommitFiles申请,Worker收到后把属于该Stage buffer里的数据flush到存储层,close文件,并开释buffer。Master收到所有响应后,记录每个partition对应的文件列表。若CommitFiles申请失败,则Master标记此Stage为DataLost。
  • 在Reduce阶段,reduce task首先向Master申请该Partition对应的文件列表,若返回码是DataLost,则触发Stage重算或间接abort作业。若返回失常,则间接读取文件数据。

总体来讲,RSS的设计要点总结为3个层面:

  • 采纳PushStyle的形式做shuffle,防止了本地存储,从而适应了计算存储拆散架构。
  • 依照reduce做聚合,防止了小文件随机读写和小数据量网络申请。
  • 做了2正本,进步了零碎稳定性。

3.2.3 容错

对于RSS零碎,容错性是至关重要的,咱们分为以下几个维度来实现:

  • PushData失败

    • 当PushData失败次数(Worker挂了,网络忙碌,CPU忙碌等)超过MaxRetry后,Client会给Master发消息申请新的Partition Location,尔后本Client都会应用新的Location地址,该阶段称为Revive。
    • 若Revive是因为Client端而非Worker的问题导致,则会产生同一个Partition数据分布在不同Worker上的状况,Master的Meta组件会正确处理这种情景。
    • 若产生WorkerLost,则会导致大量PushData同时失败,此时会有大量同一Partition的Revive申请打到Master。为了防止给同一个Partition调配过多的Location,Master保障仅有一个Revive申请真正失去解决,其余的申请塞到pending queue里,待Revive解决完结后返回同一个Location。
  • Worker宕机

    • 当产生WorkerLost时,对于该Worker上的正本数据,Master向其peer发送CommitFile的申请,而后清理peer上的buffer。若Commit Files失败,则记录该Stage为DataLost;若胜利,则后续的PushData通过Revive机制从新申请Location。
  • 数据去重

    • Speculation task和task重算会导致数据反复。解决办法是每个PushData的数据片里编码了所属的mapId,attemptId和batchId,并且Master为每个map task记录胜利commit的attemtpId。read端通过attemptId过滤不同的attempt数据,并通过batchId过滤同一个attempt的反复数据。
  • 多正本

    • RSS目前反对DFS和Local两种存储后端。
    • 在DFS模式下,ReadPartition失败会间接导致Stage重算或abort job。在Local模式,ReadPartition失败会触发从peer location读,若主从都失败则触发Stage重算或abort job。

3.2.4 高可用

大家能够看到RSS的设计中Master是一个单点,尽管Master的负载很小,不会轻易地挂掉,然而这对于线上稳定性来说无疑是一个危险点。在我的项目的最后上线阶段,咱们心愿能够通过SubCluster的形式进行workaround,即通过部署多套RSS来承载不同的业务,这样即便RSS Master宕机,也只会影响无限的一部分业务。然而随着零碎的深刻应用,咱们决定直面问题,引进高可用Master。次要的实现如下:

  • 首先,Master目前的元数据比拟多,咱们能够将一部分与ApplD+ShuffleId自身相干的元数据下沉到Driver的ShuffleManager中,因为元数据并不会很多,Driver减少的内存开销十分无限。
  • 另外,对于全局负载平衡的元数据和调度相干的元数据,咱们利用Raft实现了Master组件的高可用,这样咱们通过部署3或5台Master,真正的实现了大规模可扩大的需要。

实际效果与剖析


4.1 性能与稳定性

团队针对TeraSort,TPC-DS以及大量的外部作业进行了测试,在Reduce阶段缩小了随机读的开销,工作的稳定性和性能都有了大幅度晋升。
图3是TeraSort的benchmark,以10T Terasort为例,Shuffle量压缩后大概5.6T。能够看出该量级的作业在RSS场景下,因为Shuffle read变为程序读,性能会有大幅晋升。

图3 TeraSort性能测试(RSS性能更好)
图4是一个线上理论脱敏后的Shuffle heavy大作业,之前在混部集群中很小概率能够跑完,每天工作SLA不能按时达成,剖析起因次要是因为大量的FetchFailed导致stage进行重算。应用RSS之后每天能够稳固的跑完,2.1T的shuffle也不会呈现任何FetchFailed的场景。在更大的数据集性能和SLA体现都更为显著。

图4 理论业务的作业stage图(应用RSS保障稳定性和性能)

4.2 业务成果

在大数据团队和阿里云EMR团队的共同努力下,通过近半年的上线、经营RSS,以及和业务部门的长时间测试,业务价值次要体现在以下方面:

  • 降本增效成果显著,在集群规模小幅降落的根底上,撑持了更多的计算工作,TCO老本降落20%。
  • SLA显著晋升,大规模Spark Shuffle工作从跑不完到能跑完,咱们可能将不同SLA级别作业合并到同一集群,减小集群节点数量,达到对立治理,放大老本的目标。本来业务方有一部分SLA比拟高的作业在一个独有的Yarn集群B中运行,因为主Yarn集群A的负载十分高,如果跑到集群A中,会常常的挂掉。利用RSS之后能够释怀的将作业跑到主集群A中,从而开释掉独有Yarn集群B。
  • 作业执行效率显著晋升,跑的慢 -> 跑的快。咱们比拟了几个典型的Shuffle heavy作业,一个重要的业务线作业本来须要3小时,RSS版本须要1.6小时。抽取线上5~10个作业,大作业的性能晋升相当显著,不同作业均匀下来有30%以上的性能晋升,即便是shuffle量不大的作业,因为比较稳定不须要stage重算,长期运行均匀工夫也会缩小10%-20%。
  • 架构灵活性显著晋升,降级为计算与存储拆散架构。Spark在容器中运行的过程中,将RSS作为根底组件,能够使得Spark容器化可能大规模的落地,为离线在线对立资源、对立调度打下了根底。

将来瞻望


趣头条大数据平台和阿里云EMR团队后续会持续放弃深刻共创,将摸索更多的方向。次要有以下的一些思路:

  • RSS存储能力优化,包含将云的对象存储作为存储后端。
  • RSS多引擎反对,例如MapReduce、Tez等,晋升历史工作执行效率。
  • 减速大数据容器化落地,配合RSS能力,解决K8s调度器性能、调度策略等一系列挑战。
  • 继续优化老本,配合EMR的弹性伸缩性能,一方面Spark能够应用更多的阿里云ECS/ECI抢占式实例来进一步压缩老本,另一方面将已有机器包含阿里云ACK,ECI等资源造成对立大池子,将大数据的计算组件和在线业务进行错峰调度以及混部。

原文链接
本文为阿里云原创内容,未经容许不得转载。

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理