Hadoop-YARN调度性能优化实践

33次阅读

共计 9995 个字符,预计需要花费 25 分钟才能阅读完成。

背景

YARN 作为 Hadoop 的资源管理系统,负责 Hadoop 集群上计算资源的管理和作业调度。

美团的 YARN 以社区 2.7.1 版本为基础构建分支。目前在 YARN 上支撑离线业务、实时业务以及机器学习业务。

  • 离线业务主要运行的是 Hive on MapReduce,Spark SQL 为主的数据仓库作业。
  • 实时业务主要运行 Spark Streaming,Flink 为主的实时流计算作业。
  • 机器学习业务主要运行 TensorFlow,MXNet,MLX(美团点评自研的大规模机器学习系统)等计算作业。

YARN 面临高可用、扩展性、稳定性的问题很多。其中扩展性上遇到的最严重的,是集群和业务规模增长带来的调度器性能问题。从业务角度来看,假设集群 1000 台节点,每个节点提供 100 个 CPU 的计算能力。每个任务使用 1 个 CPU,平均执行时间 1 分钟。集群在高峰期始终有超过 10 万 CPU 的资源需求。集群的调度器平均每分钟只能调度 5 万的任务。从分钟级别观察,集群资源使用率是 50000/(100*1000)=0.5,那么集群就有 50% 的计算资源因为调度能力的问题而无法使用。

随着集群规模扩大以及业务量的增长,集群调度能力会随着压力增加而逐渐下降。假设调度能力依然保持不变,每分钟调度 5 万个任务,按照 5000 台节点的规模计算,如果不做任何优化改进,那么集群资源使用率为:50000/(100*5000) = 10%,剩余的 90% 的机器资源无法被利用起来。

这个问题解决后,集群在有空余资源的情况下,作业资源需求可以快速得到满足,集群的计算资源得到充分地利用。

下文会逐步将 Hadoop YARN 调度系统的核心模块展开说明,揭开上述性能问题的根本原因,提出系统化的解决方案,最终 Hadoop YARN 达到支撑单集群万级别节点,支持并发运行数万作业的调度能力。

整体架构

YARN 架构

YARN 负责作业资源调度,在集群中找到满足业务的资源,帮助作业启动任务,管理作业的生命周期。

YARN 详细的架构设计请参考 Hadoop 官方文档。

资源抽象

YARN 在 cpu,memory 这两个资源维度对集群资源做了抽象。

class Resource{
  int cpu;   //cpu 核心个数
  int memory-mb; // 内存的 MB 数
}

作业向 YARN 申请资源的请求是:List[ResourceRequest]

class ResourceRequest{
  int numContainers; // 需要的 container 个数
  Resource capability;// 每个 container 的资源
}

YARN 对作业响应是:List[Container]

class Container{
  ContainerId containerId; //YARN 全局唯一的 container 标示
  Resource capability;  // 该 container 的资源信息
  String nodeHttpAddress; // 该 container 可以启动的 NodeManager 的 hostname
}

YARN 调度架构

名词解释

  • ResourceScheduler 是 YARN 的调度器,负责 Container 的分配。
  • AsyncDispatcher 是单线程的事件分发器,负责向调度器发送调度事件。
  • ResourceTrackerService 是资源跟踪服务,主要负责接收处理 NodeManager 的心跳信息。
  • ApplicationMasterService 是作业的 RPC 服务,主要负责接收处理作业的心跳信息。
  • AppMaster 是作业的程序控制器,负责跟 YARN 交互获取 / 释放资源。

调度流程

  1. 作业资源申请过程:AppMaster 通过心跳告知 YARN 资源需求(List[ResourceRequest]),并取回上次心跳之后,调度器已经分配好的资源(List[Container])。
  2. 调度器分配资源流程是:Nodemanager 心跳触发调度器为该 NodeManager 分配 Container。

资源申请和分配是异步进行的。ResourceScheduler 是抽象类,需要自行实现。社区实现了公平调度器 (FairScheduler) 和容量调度器(CapacityScheduler)。美团点评根据自身的业务模式的特点,采用的是公平调度器。

公平调度器

作业的组织方式

在公平调度器中,作业 (App) 是挂载如下图的树形队列的叶子。

核心调度流程

  1. 调度器锁住 FairScheduler 对象,避免核心数据结构冲突。
  2. 调度器选取集群的一个节点(node),从树形队列的根节点 ROOT 开始出发,每层队列都会按照公平策略选择一个子队列,最后在叶子队列按照公平策略选择一个 App,为这个 App 在 node 上找一块适配的资源。

对于每层队列进行如下流程:

  1. 队列预先检查:检查队列的资源使用量是否已经超过了队列的 Quota
  2. 排序子队列 /App:按照公平调度策略,对子队列 /App 进行排序
  3. 递归调度子队列 /App

例如,某次调度的路径是 ROOT -> ParentQueueA -> LeafQueueA1 -> App11,这次调度会从 node 上给 App11 分配 Container。

伪代码

class FairScheduler{
  /* input:NodeId
   *  output:Resource 表示分配出来的某个 app 的一个 container 的资源量
   *  root 是树形队列 Queue 的根
   */
  synchronized Resource attemptScheduling(NodeId node){root.assignContainer(NodeId); 
  }
}

class Queue{Resource assignContainer(NodeId node){if(! preCheck(node) ) return;  // 预先检查
      sort(this.children);  // 排序
    if(this.isParent){for(Queue q: this.children)
        q.assignContainer(node);  // 递归调用
    }else{for(App app: this.runnableApps)
        app.assignContainer(node); 
    }
  }
}

class App{Resource assignContainer(NodeId node){......}
}

公平调度器架构

公平调度器是一个多线程异步协作的架构,而为了保证调度过程中数据的一致性,在主要的流程中加入了 FairScheduler 对象锁。其中核心调度流程是单线程执行的。这意味着 Container 分配是串行的,这是调度器存在性能瓶颈的核心原因。

  • scheduler Lock:FairScheduler 对象锁
  • AllocationFileLoaderService:负责公平策略配置文件的热加载,更新队列数据结构
  • Continuous Scheduling Thread:核心调度线程,不停地执行上节的核心调度流程
  • Update Thread:更新队列资源需求,执行 Container 抢占流程等
  • Scheduler Event Dispatcher Thread: 调度器事件的处理器,处理 App 新增,App 结束,node 新增,node 移除等事件

性能评估

上文介绍了公平调度器的架构,在大规模的业务压力下,这个系统存在性能问题。从应用层的表现看,作业资源需求得不到满足。从系统模块看,多个模块协同工作,每个模块多多少少都存在性能问题。如何评估系统性能已经可以满足线上业务的需求?如何评估系统的业务承载能力?我们需要找到一个系统的性能目标。因此在谈性能优化方案之前,需要先说一说调度系统性能评估方法。

一般来说,在线业务系统的性能是用该系统能够承载的 QPS 和响应的 TP99 的延迟时间来评估,而调度系统与在线业务系统不同的是:调度系统的性能不能用 RPC(ResourceManager 接收 NodeManager 和 AppMaster 的 RPC 请求)的响应延迟来评估。原因是:这些 RPC 调用过程跟调度系统的调度过程是异步的,因此不论调度性能多么差,RPC 响应几乎不受影响。同理,不论 RPC 响应多么差,调度性能也几乎不受影响。

业务指标 - 有效调度

首先从满足业务需求角度分析调度系统的业务指标。调度系统的业务目标是满足业务资源需求。指标是:有效调度(validSchedule)。在生产环境,只要 validSchedule 达标,我们就认为目前调度器是满足线上业务需求的。

定义 validSchedulePerMin 表示某一分钟的调度性能达标的情况。达标值为 1,不达标值为 0。

validPending = min(queuePending, QueueMaxQuota)
if  (usage / total  > 90% || validPending == 0):   validSchedulePerMin = 1 // 集群资源使用率高于 90%,或者集群有效资源需求为 0,这时调度器性能达标。if (validPending > 0 &&  usage / total < 90%) : validSchedulePerMin = 0;// 集群资源使用率低于 90%,并且集群存在有效资源需求,这时调度器性能不达标。
  • validPending 表示集群中作业有效的资源需求量
  • queuePending 表示队列中所有作业的资源需求量
  • QueueMaxQuota 表示该队列资源最大限额
  • usage 表示集群已经使用的资源量
  • tatal 表示集群总体资源

设置 90% 的原因是:资源池中的每个节点可能都有一小部分资源因为无法满足任何的资源需求,出现的资源碎片问题。这个问题类似 linux 内存的碎片问题。由于离线作业的任务执行时间非常短,资源很快可以得到回收。在离线计算场景,调度效率的重要性远远大于更精确地管理集群资源碎片,因此离线调度策略暂时没有考虑资源碎片的问题。

validSchedulePerDay 表示调度性能每天的达标率。
validSchedulePerDay = ΣvalidSchedulePerMin /1440

目前线上业务规模下,业务指标如下:
validSchedulePerMin > 0.9; validSchedulePerDay > 0.99

系统性能指标 - 每秒调度 Container 数

调度系统的本质是为作业分配 Container,因此提出调度系统性能指标 CPS– 每秒调度 Container 数。
在生产环境,只要 validSchedule 达标,表明目前调度器是满足线上业务需求的。而在测试环境,需要关注不同压力条件下的 CPS,找到当前系统承载能力的上限,并进一步指导性能优化工作。

CPS 是与测试压力相关的,测试压力越大,CPS 可能越低。从上文公平调度器的架构可以看到,CPS 跟如下信息相关:

  • 集群总体资源数;集群资源越多,集群可以并发运行的的 Container 越多,对调度系统产生越大的调度压力。目前每台物理机的 cpu、memory 资源量差距不大,因此集群总体资源数主要看集群的物理机节点个数。
  • 集群中正在运行的 App 数;作业数越多,需要调度的信息越多,调度压力越大。
  • 集群中的队列个数;队列数越多,需要调度的信息越多,调度压力越大。
  • 集群中每个任务的执行时间;任务执行时间越短会导致资源释放越快,那么动态产生的空闲资源越多,对调度系统产生的压力越大。

例如,集群 1000 个节点,同时运行 1000 个 App,这些 App 分布在 500 个 Queue 上,每个 App 的每个 Container 执行时间是 1 分钟。在这样的压力条件下,调度系统在有大量资源需求的情况下,每秒可以调度 1000 个 Container。那么在这个条件下,调度系统的 CPS 是 1000/s。

调度压力模拟器

在线上环境中,我们可以通过观察上文提到的调度系统的指标来看当前调度性能是否满足业务需求。但我们做了一个性能优化策略,不能直接到在线上环境去实验,因此我们必须有能力在线下环境验证调度器的性能是满足业务需求的,之后才能把实验有效的优化策略推广到线上环境。

那我们在线下也搭建一套跟线上规模一样的集群,是否就可以进行调度器性能优化的分析和研究呢?理论上是可以的,但这需要大量的物理机资源,对公司来说是个巨大的成本。因此我们需要一个调度器的压力模拟器,在不需要大量物理机资源的条件下,能够模拟 YARN 的调度过程。

社区提供了开源调度器的压力模拟工具 –Scheduler Load Simulater(SLS)。

如上图,左侧是开源 SLS 的架构图,整体都在一个进程中,ResourceManager 模块里面有一个用线程模拟的 Scheduler。App 和 NM(NodeManager)都是由线程模拟。作业资源申请和 NM 节点心跳采用方法调用。

开源架构存在的问题有:

  • 模拟大规模 APP 和 NM 需要开启大量的线程,导致调度器线程和 NM/App 的模拟线程争抢 cpu 资源,影响调度器的评估。
  • SLS 的 Scheduler Wapper 中加入了不合理的逻辑,严重影响调度器的性能。
  • SLS 为了通用性考虑,没有侵入 FairScheduler 的调度过程获取性能指标,仅仅从外围获取了 Queue 资源需求,Queue 资源使用量,App 资源需求,App 资源使用量等指标。这些指标都不是性能指标,无法利用这些指标分析系统性能瓶颈。

针对存在的问题,我们进行了架构改造。右侧是改造后的架构图,从 SLS 中剥离 Scheduler Wapper 的模拟逻辑,用真实的 ResourceManager 代替。SLS 仅仅负责模拟作业的资源申请和节点的心跳汇报。ResourceManager 是真实的,线上生产环境和线下压测环境暴露的指标是完全一样的,因此线上线下可以很直观地进行指标对比。详细代码参考:YARN-7672

细粒度监控指标

利用调度压力模拟器进行压测,观察到 validSchedule 不达标,但依然不清楚性能瓶颈到底在哪里。因此需要细粒度指标来确定性能的瓶颈点。由于调度过程是单线程的,因此细粒度指标获取的手段是侵入 FairScheduler,在调度流程中采集关键函数每分钟的时间消耗。目标是找到花费时间占比最多的函数,从而定位系统瓶颈。例如:在 preCheck 函数的前后加入时间统计,就可以收集到调度过程中 preCheck 消耗的时间。

基于以上的思路,我们定义了 10 多个细粒度指标,比较关键的指标有:

  • 每分钟父队列 preCheck 时间
  • 每分钟父队列排序时间
  • 每分钟子队列 preCheck 时间
  • 每分钟子队列排序时间
  • 每分钟为作业分配资源的时间
  • 每分钟因为作业无资源需求而花费的时间

关键优化点

第一次做压测,给定的压力就是当时线上生产环境峰值的压力情况(1000 节点、1000 作业并发、500 队列、单 Container 执行时间 40 秒)。经过优化后,调度器性能提升,满足业务需求,之后通过预估业务规模增长来调整测试压力,反复迭代地进行优化工作。

下图是性能优化时间线,纵轴为调度性能 CPS。

优化排序比较函数

在核心调度流程中,第 2 步是排序子队列。观察细粒度指标,可以很清楚地看到每分钟调度流程总共用时 50 秒,其中排序时间占用了 30 秒,占了最大比例,因此首先考虑优化排序时间。

排序本身用的快速排序算法,已经没有优化空间。进一步分析排序比较函数,发现排序比较函数的时间复杂度非常高。

计算复杂度最高的部分是:需要获取队列 / 作业的资源使用情况(resourceUsage)。原算法中,每 2 个队列进行比较,需要获取 resourceUsage 的时候,程序都是现场计算。计算方式是递归累加该队列下所有作业的 resourceUsage。这造成了巨大的重复计算量。

优化策略:将现场计算优化为提前计算。

提前计算算法:当为某个 App 分配了一个 Container(资源量定义为 containerResource),那么递归调整父队列的 resourceUsage,让父队列的 resourceUsage += containerResource。当释放某个 App 的一个 Container,同样的道理,让父队列 resourceUsage -= containerResource。利用提前计算算法,队列 resourceUsage 的统计时间复杂度降低到 O(1)。

优化效果:排序相关的细粒度指标耗时明显下降。

红框中的指标表示每分钟调度器用来做队列 / 作业排序的时间。从图中可以看出,经过优化,排序时间从每分钟 30G(30 秒)下降到 5G(5 秒)以内。详细代码参考:YARN-5969

优化作业跳过时间

从上图看,优化排序比较函数后,蓝色的线有明显的增加,从 2 秒增加到了 20 秒。这条蓝线指标含义是每分钟调度器跳过没有资源需求的作业花费的时间。从时间占比角度来看,目前优化目标是减少这条蓝线的时间。

分析代码发现,所有队列 / 作业都会参与调度。但其实很多队列 / 作业根本没有资源需求,并不需要参与调度。因此优化策略是:在排序之前,从队列的 Children 中剔除掉没有资源需求的队列 / 作业。

优化效果:这个指标从 20 秒下降到几乎可以忽略不计。详细代码参考:YARN-3547

这时,从上图中可以明显看到有一条线呈现上升趋势,并且这个指标占了整个调度时间的最大比例。这条线对应的指标含义是确定要调度的作业后,调度器为这个作业分配出一个 Container 花费的时间。这部分逻辑平均执行一次的时间在 0.02ms 以内,并且不会随着集群规模、作业规模的增加而增加, 因此暂时不做进一步优化。

队列并行排序优化

从核心调度流程可以看出,分配每一个 Container,都需要进行队列的排序。排序的时间会随着业务规模增加(作业数、队列数的增加)而线性增加。

架构思考:对于公平调度器来说,排序是为了实现公平的调度策略,但资源需求是时时刻刻变化的,每次变化,都会引起作业资源使用的不公平。即使分配每一个 Container 时都进行排序,也无法在整个时间轴上达成公平策略。

例如,集群有 10 个 cpu,T1 时刻,集群只有一个作业 App1 在运行,申请了 10 个 cpu,那么集群会把这 10 个 cpu 都分配给 App1。T2 时刻(T2 > T1),集群中新来一个作业 App2,这时集群已经没有资源了,因此无法为 App2 分配资源。这时集群中 App1 和 App2 对资源的使用是不公平的。从这个例子看,仅仅通过调度的分配算法是无法在时间轴上实现公平调度。

目前公平调度器的公平策略是保证集群在某一时刻资源调度的公平。在整个时间轴上是需要抢占策略来补充达到公平的目标。因此从时间轴的角度考虑,没有必要在分配每一个 Container 时都进行排序。

综上分析,优化策略是排序过程与调度过程并行化。要点如下:

  1. 调度过程不再进行排序的步骤。
  2. 独立的线程池处理所有队列的排序,其中每个线程处理一个队列的排序。
  3. 排序之前,通过深度克隆队列 / 作业中用于排序部分的信息,保证排序过程中队列 / 作业的数据结构不变。

优化效果如下:

队列排序效率:利用线程池对 2000 个队列进行一次排序只需要 5 毫秒以内(2ms-5ms),在一秒内至少可以完成 200 次排序,对业务完全没有影响。

在并行运行 1 万作业,集群 1.2 万的节点,队列个数 2000,单 Container 执行时间 40 秒的压力下,调度 CPS 达到 5 万,在一分钟内可以将整个集群资源打满,并持续打满。

上图中,15:26 分,pending 值是 0,表示这时集群目前所有的资源需求已经被调度完成。15:27 分,resourceUsage 达到 1.0,表示集群资源使用率为 100%,集群没有空闲资源。pending 值达到 4M(400 万 mb 的内存需求)是因为没有空闲资源导致的资源等待。

稳定上线的策略

线下压测的结果非常好,最终要上到线上才能达成业务目标。然而稳定上线是有难度的,原因:

  • 线上环境和线下压测环境中的业务差别非常大。线下没问题,上线不一定没问题。
  • 当时 YARN 集群只有一个,那么调度器也只有一个,如果调度器出现异常,是整个集群的灾难,导致整个集群不可用。

除了常规的单元测试、功能测试、压力测试、设置报警指标之外,我们根据业务场景提出了针对集群调度系统的上线策略。

在线回滚策略

离线生产的业务高峰在凌晨,因此凌晨服务出现故障的概率是最大的。而凌晨 RD 同学接到报警电话,执行通常的服务回滚流程(回滚代码,重启服务)的效率是很低的。并且重启期间,服务不可用,对业务产生了更长的不可用时间。因此我们针对调度器的每个优化策略都有参数配置。只需要修改参数配置,执行配置更新命令,那么在不重启服务的情况下,就可以改变调度器的执行逻辑,将执行逻辑切换回优化前的流程。

这里的关键问题是:系统通过配置加载线程更新了调度器某个参数的值,而调度线程也同时在按照这个参数值进行工作。在一次调度过程中可能多次查看这个参数的值,并且根据参数值来执行相应的逻辑。调度线程在一次调度过程中观察到的参数值发生变化,就会导致系统异常。

处理办法是通过复制资源的方式,避免多线程共享资源引起数据不一致的问题。调度线程在每次调度开始阶段,先将当前所有性能优化参数进行复制,确保在本次调度过程中观察到的参数不会变更。

数据自动校验策略

优化算法是为了提升性能,但要注意不能影响算法的输出结果,确保算法正确性。对于复杂的算法优化,确保算法正确性是一个很有难度的工作。

在“优化排序比较时间”的研发中,变更了队列 resourceUsage 的计算方法,从现场计算变更为提前计算。那么如何保证优化后算法计算出来的 resourceUsage 是正确的呢?

即使做了单元策略,功能测试,压力测试,但面对一个复杂系统,依然不能有 100% 的把握。另外,未来系统升级也可能引起这部分功能的 bug。

算法变更后,如果新的 resourceUsage 计算错误,那么就会导致调度策略一直错误执行下去。从而影响队列的资源分配。会对业务产生巨大的影响。例如,业务拿不到原本的资源量,导致业务延迟。

通过原先现场计算的方式得到的所有队列的 resourceUsage 一定是正确的,定义为 oldResourceUsage。算法优化后,通过提前计算的方式得到所有队列的 resourceUsage,定义为 newResourceUsage。

在系统中,定期对 oldResourceUsage 和 newResourceUsage 进行比较,如果发现数据不一致,说明优化的算法有 bug,newResourceUsage 计算错误。这时系统会向 RD 发送报警通知,同时自动地将所有计算错误的数据用正确的数据替换,使得错误得到及时自动修正。

总结与未来展望

本文主要介绍了美团点评 Hadoop YARN 集群公平调度器的性能优化实践。

  1. 做性能优化,首先要定义宏观的性能指标,从而能够评估系统的性能。
  2. 定义压测需要观察的细粒度指标,才能清晰看到系统的瓶颈。
  3. 工欲善其事,必先利其器。高效的压力测试工具是性能优化必备的利器。
  4. 优化算法的思路主要有:降低算法时间复杂度;减少重复计算和不必要的计算;并行化。
  5. 性能优化是永无止境的,要根据真实业务来合理预估业务压力,逐步开展性能优化的工作。
  6. 代码上线需谨慎,做好防御方案。

单个 YARN 集群调度器的性能优化总是有限的,目前我们可以支持 1 万节点的集群规模,那么未来 10 万,100 万的节点我们如何应对?

我们的解决思路是:基于社区的思路,设计适合美团点评的业务场景的技术方案。社区 Hadoop 3.0 研发了 Global Scheduling,完全颠覆了目前 YARN 调度器的架构,可以极大提高单集群调度性能。我们正在跟进这个 Feature。社区的 YARN Federation 已经逐步完善。该架构可以支撑多个 YARN 集群对外提供统一的集群计算服务,由于每个 YARN 集群都有自己的调度器,这相当于横向扩展了调度器的个数,从而提高集群整体的调度能力。我们基于社区的架构,结合美团点评的业务场景,正在不断地完善美团点评的 YARN Federation。

作者简介

世龙、廷稳,美团用户平台大数据与算法部研发工程师。

团队介绍

数据平台资源调度团队,目标是建设超大规模、高性能、支持异构计算资源和多场景的资源调度系统。目前管理的计算节点接近 3 万台,在单集群节点过万的规模下实现了单日数十万离线计算作业的高效调度,资源利用率超过 90%。资源调度系统同时实现了对实时计算作业、机器学习模型 Serving 服务等高可用场景的支持,可用性超过 99.9%。系统也提供了对 CPU/GPU 等异构资源的调度支持,实现了数千张 GPU 卡的高效调度,以及 CPU 资源的离线与训练混合调度,目前正在引入 NPU/FPGA 等更多异构资源,针对机器学习场景的特点实现更高效合理的调度策略。

我们有多个岗位正在招聘,如果你对超大规模系统的挑战感到兴奋,如果你对异构计算资源的调度策略感到好奇,欢迎加入我们,联系邮箱 sunyerui#meituan.com。

正文完
 0