本篇将对 Yarn 调度器中的资源抢占形式进行探索。剖析当集群资源有余时,占用量资源少的队列,是如何从其余队列中争夺资源的。咱们将深刻源码,一步步剖析争夺资源的具体逻辑。

一、简介

在资源调度器中,以 CapacityScheduler 为例(Fair 相似),每个队列可设置一个最小资源量和最大资源量。其中,最小资源量是资源紧缺状况下每个队列需保障的资源量,而最大资源量则是极其状况下队列也不能超过的资源使用量。
资源抢占产生的起因,是为了进步资源利用率,资源调度器(包含 Capacity Scheduler 和 Fair Scheduler)会将负载较轻的队列的资源临时调配给负载重的队列。
仅当负载较轻队列忽然收到新提交的应用程序时,调度器才进一步将本属于该队列的资源归还给它。
但因为此时资源可能正被其余队列应用,因而调度器必须期待其余队列开释资源后,能力将这些资源“完璧归赵”,为了避免应用程序等待时间过长,RM 在期待一段时间后强制回收。

开启容器抢占须要配置的参数 yarn-site.xml

yarn.resourcemanager.scheduler.monitor.enableyarn.resourcemanager.scheduler.monitor.policies

二、抢占具体逻辑

这里咱们次要剖析如何选出待抢占容器这一过程。
整顿流程如下图所示:

接下来咱们深刻源码,看看具体的逻辑:
首先 ResourceManager 通过 ResourceManager#createPolicyMonitors 办法创立资源抢占服务:

    protected void createPolicyMonitors() {      // 只有 capacity scheduler 实现了 PreemptableResourceScheduler 接口,fair 是如何实现资源抢占的?      if (scheduler instanceof PreemptableResourceScheduler          && conf.getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS,          YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS)) {        LOG.info("Loading policy monitors");        // 是否配置了 scheduler.monitor.policies        // 默认值是 ProportionalCapacityPreemptionPolicy? 代码中没看到默认值,然而 yarn-site.xml doc 中有默认值        List<SchedulingEditPolicy> policies = conf.getInstances(            YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,            SchedulingEditPolicy.class);        if (policies.size() > 0) {          for (SchedulingEditPolicy policy : policies) {            LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName());            // periodically check whether we need to take action to guarantee            // constraints            // 此处创立了资源抢占服务类。            // 当此服务启动时,会启动一个线程每隔 PREEMPTION_MONITORING_INTERVAL(默认 3s)调用一次            // ProportionalCapacityPreemptionPolicy 类中的 editSchedule办法,            // 【重点】在此办法中实现了具体的资源抢占逻辑。            SchedulingMonitor mon = new SchedulingMonitor(rmContext, policy);            addService(mon);          }

资源抢占服务会启动一个线程每隔 3 秒钟调用配置的抢占规定,这里以 ProportionalCapacityPreemptionPolicy(比例容量抢占规定)为例介绍其中的抢占具体逻辑(editSchedule 办法):

// ProportionalCapacityPreemptionPolicy#editSchedule  public void editSchedule() {    updateConfigIfNeeded();    long startTs = clock.getTime();    CSQueue root = scheduler.getRootQueue();    // 获取集群以后资源快照    Resource clusterResources = Resources.clone(scheduler.getClusterResource());    // 具体的资源抢占逻辑    containerBasedPreemptOrKill(root, clusterResources);    if (LOG.isDebugEnabled()) {      LOG.debug("Total time used=" + (clock.getTime() - startTs) + " ms.");    }  }

editSchedule 办法很简略,逻辑都被封装到 containerBasedPreemptOrKill() 办法中,咱们持续深刻。
其中次要分三步:

  1. 生成资源快照
  2. 依据规定找出各队列待抢占的容器(重点)
  3. 执行容器资源抢占 或 kill超时未主动进行的容器

    // 仅保留重要逻辑  private void containerBasedPreemptOrKill(CSQueue root,   Resource clusterResources) { // ------------ 第一步 ------------ (生成资源快照) // extract a summary of the queues from scheduler // 将所有队列信息拷贝到 queueToPartitions - Map<队列名, Map<资源池, 队列详情>>。生成快照,避免队列变动造成计算问题。   for (String partitionToLookAt : allPartitions) {     cloneQueues(root, Resources             .clone(nlm.getResourceByLabel(partitionToLookAt, clusterResources)), partitionToLookAt);   } // ------------ 第二步 ------------ (找出待抢占的容器) // compute total preemption allowed // based on ideal allocation select containers to be preemptionCandidates from each queue and each application // candidatesSelectionPolicies 默认会放入 FifoCandidatesSelector, // 如果配置了 INTRAQUEUE_PREEMPTION_ENABLED,会减少 IntraQueueCandidatesSelector for (PreemptionCandidatesSelector selector :     candidatesSelectionPolicies) {   // 【外围办法】 计算待抢占 Container 放到 preemptMap   toPreempt = selector.selectCandidates(toPreempt,       clusterResources, totalPreemptionAllowed); } // 这里有个相似 dryrun 的参数 yarn.resourcemanager.monitor.capacity.preemption.observe_only if (observeOnly) {   return; } // ------------ 第三步 ------------ (执行容器资源抢占 或 kill超时未主动进行的容器) // preempt (or kill) the selected containers preemptOrkillSelectedContainerAfterWait(toPreempt); // cleanup staled preemption candidates cleanupStaledPreemptionCandidates();  }

一)找出待抢占的容器

第一步资源快照没什么好说的,间接进入到重点:第二步找出待抢占的容器
selector.selectCandidates(),以默认的 FifoCandidatesSelector 实现为例解说,其余的同理。
次要分两步:

  1. 依据使用量和需求量重新分配资源,失去各队列要被抢占的资源量
  2. 依据资源差额,计算要 kill 的 container

    // yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java  public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(   Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,   Resource clusterResource, Resource totalPreemptionAllowed) { // ------------ 第一步 ------------ (依据使用量和需求量重新分配资源) // Calculate how much resources we need to preempt // 计算出每个资源池每个队列以后资源分配量,和理论要 preempt 的量 preemptableAmountCalculator.computeIdealAllocation(clusterResource,     totalPreemptionAllowed); // ------------ 第二步 ------------ (依据资源差额,计算要 kill 的 container) // 选 container 是有优先级的: 应用共享池的资源 -> 队列中后提交的工作 -> amContainer for (String queueName : preemptionContext.getLeafQueueNames()) {   synchronized (leafQueue) {       // 省略了大部分逻辑,在前面介绍       // 从 application 中选出要被抢占的容器       preemptFrom(fc, clusterResource, resToObtainByPartition,           skippedAMContainerlist, skippedAMSize, selectedCandidates,           totalPreemptionAllowed);     } }

从新计算各队列调配的资源量

咱们先来看「依据使用量和需求量重新分配资源」,即 PreemptableResourceCalculator#computeIdealAllocation()

  // 计算每个队列理论要被 preempt 的量  public void computeIdealAllocation(Resource clusterResource,      Resource totalPreemptionAllowed) {    for (String partition : context.getAllPartitions()) {      TempQueuePerPartition tRoot = context.getQueueByPartition(          CapacitySchedulerConfiguration.ROOT, partition);      // 这里计算好每个队列超出资源配置的局部,存在 TempQueuePerPartition      // preemptableExtra 示意能够被抢占的      // untouchableExtra 示意不可被抢占的(队列配置了不可抢占)      // yarn.scheduler.capacity.<queue>.disable_preemption      updatePreemptableExtras(tRoot);      tRoot.idealAssigned = tRoot.getGuaranteed();      // 【重点】遍历队列树,从新计算资源分配,并计算出每个队列打算要 Preempt 的量      recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed);    }    // 计算理论每个队列要被 Preempt 的量 actuallyToBePreempted(有个阻尼因子,不会一下把所有超量的都干掉)    calculateResToObtainByPartitionForLeafQueues(context.getLeafQueueNames(),        clusterResource);  }}

咱们间接深刻到 recursivelyComputeIdealAssignment() 办法中的外围逻辑:从新计算各队列资源分配值 AbstractPreemptableResourceCalculator#computeFixpointAllocation()
次要逻辑如下:

  1. 首先保障每个队列有本人配置的资源。若使用量小于配置量,多余的资源会被调配到其余队列
  2. 若队列有超出配置资源需要,则放到一个优先级队列中,按 (使用量 / 配置量) 从小到大排序
  3. 对于有资源需要的队列,在残余的资源中,按配置比例计算每个队列可调配的资源量
  4. 每次从优先级队列当选需要优先级最高的,进行调配
  5. 计算 min(可调配量, 队列最大残余用量, 需求量)。作为本次调配的资源。若仍有资源需要则放回优先级队列,期待下次调配
  6. 当满足所有队列资源需要,或者没有残余资源时完结
  7. 仍有资源需要的队列会记录在 underServedQueues

      // 按肯定规定将资源分给各个队列  protected void computeFixpointAllocation(Resource totGuarant,   Collection<TempQueuePerPartition> qAlloc, Resource unassigned,   boolean ignoreGuarantee) { // 传进来 unassigned = totGuarant // 有序队列,(使用量 / 配置量) 从小到大排序 PriorityQueue<TempQueuePerPartition> orderedByNeed = new PriorityQueue<>(10,     tqComparator);   // idealAssigned = min(使用量,配置量)。  对于不可抢占队列,则再加上超出的局部,避免资源被再调配。   if (Resources.greaterThan(rc, totGuarant, used, q.getGuaranteed())) {     q.idealAssigned = Resources.add(q.getGuaranteed(), q.untouchableExtra);   } else {     q.idealAssigned = Resources.clone(used);   }   // 如果该队列有超出配置资源需要,就把这个队列放到 orderedByNeed 有序队列中(即这个队列有资源缺口)   if (Resources.lessThan(rc, totGuarant, q.idealAssigned, curPlusPend)) {     orderedByNeed.add(q);   } } // 此时 unassigned 是 整体可用资源 排除掉 所有已应用的资源(used) // 把未调配的资源(unassigned)调配进来 // 形式就是从 orderedByNeed 中每次取出 most under-guaranteed 队列,按规定调配一块资源给他,如果仍不满足就按程序再放回 orderedByNeed // 直到满足所有队列资源,或者没有资源可调配 while (!orderedByNeed.isEmpty() && Resources.greaterThan(rc, totGuarant,     unassigned, Resources.none())) {   Resource wQassigned = Resource.newInstance(0, 0);   // 对于有资源缺口的队列,从新计算他们的资源保障比例:normalizedGuarantee。   // 即 (该队列保障量 / 所有资源缺口队列保障量)   resetCapacity(unassigned, orderedByNeed, ignoreGuarantee);   // 这里返回是个列表,是因为可能有需要度(优先级)相等的状况   Collection<TempQueuePerPartition> underserved = getMostUnderservedQueues(       orderedByNeed, tqComparator);   for (Iterator<TempQueuePerPartition> i = underserved.iterator(); i       .hasNext();) {     TempQueuePerPartition sub = i.next();     // 依照 normalizedGuarantee 比例能从残余资源中分走多少。     Resource wQavail = Resources.multiplyAndNormalizeUp(rc, unassigned,         sub.normalizedGuarantee, Resource.newInstance(1, 1));     // 【重点】按肯定规定将资源分配给队列,并返回剩下的资源。     Resource wQidle = sub.offer(wQavail, rc, totGuarant,         isReservedPreemptionCandidatesSelector);     // 调配给队列的资源     Resource wQdone = Resources.subtract(wQavail, wQidle);     // 这里 wQdone > 0 证实本次迭代调配进来了资源,那么还会放回到待分配资源的汇合中(哪怕本次已满足资源申请),直到未再分配资源了才退出。     if (Resources.greaterThan(rc, totGuarant, wQdone, Resources.none())) {       orderedByNeed.add(sub);     }     Resources.addTo(wQassigned, wQdone);   }   Resources.subtractFrom(unassigned, wQassigned); } // 这里有可能整个资源都调配完了,还有队列资源不满足 while (!orderedByNeed.isEmpty()) {   TempQueuePerPartition q1 = orderedByNeed.remove();   context.addPartitionToUnderServedQueues(q1.queueName, q1.partition); }  }

    下面第 5 步是重点,也就是 sub.offer(),是计算给该队列在保障值之外,还能提供多少资源:

      /*** 计算队列 idealAssigned,在原有根底上减少新调配的资源。同时返回 avail 中未应用的资源。* 参数阐明:* avail 按比例该队列能从残余资源中调配到的* clusterResource 整体资源量* considersReservedResource ?* idealAssigned = min(使用量,配置量)*/  Resource offer(Resource avail, ResourceCalculator rc,   Resource clusterResource, boolean considersReservedResource) { // 计算的是还有多少可分配资源的空间( maxCapacity - assigned ) Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax(     Resources.subtract(getMax(), idealAssigned),     Resource.newInstance(0, 0)); // remain = avail - min(avail, (max - assigned), (current + pending - assigned)) // 队列承受资源的计算方法:可提供的资源,队列最大资源-已分配资源,以后已应用资源+未满足的资源-min(使用量,配置量) 三者中的最小值。 Resource accepted = Resources.min(rc, clusterResource,     absMaxCapIdealAssignedDelta,     Resources.min(rc, clusterResource, avail, Resources         .subtract(             Resources.add((considersReservedResource                 ? getUsed()                 : getUsedDeductReservd()), pending),             idealAssigned))); Resource remain = Resources.subtract(avail, accepted); Resources.addTo(idealAssigned, accepted); return remain;  }

    外围的资源重新分配算法逻辑曾经计算结束,剩下的就是:
    依据从新计算的资源分配,失去各队列超用的资源,这部分就是要被抢占的资源。
    这里不会一下把队列超用的资源都干掉,有个阻尼因子,用于平滑抢占解决。

依据资源差额,计算要抢占的容器

回到 selector.selectCandidates(),下面曾经介绍了各队列抢占量的计算逻辑,接下来介绍「如何选出各队列中的 container」

  1. 抢占该队列在共享池应用资源的 container
  2. 抢占后提交工作中,后生成的 container(也就是越晚生成的 container,会被先解决)
  3. 抢占 amContainer

      public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(   Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,   Resource clusterResource, Resource totalPreemptionAllowed) { // ...... // ------------ 第二步 ------------ (依据资源差额,计算要 kill 的 container) // 依据计算失去的要抢占的量,计算各资源池各队列要 kill 的 container List<RMContainer> skippedAMContainerlist = new ArrayList<>(); // Loop all leaf queues // 这里是有优先级的: 应用共享池的资源 -> 队列中后提交的工作 -> amContainer for (String queueName : preemptionContext.getLeafQueueNames()) {   // 获取该队列在每个资源池要被抢占的量   Map<String, Resource> resToObtainByPartition =       CapacitySchedulerPreemptionUtils           .getResToObtainByPartitionForLeafQueue(preemptionContext,               queueName, clusterResource);   synchronized (leafQueue) {     // 应用共享池资源的,先解决     Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityContainers =         leafQueue.getIgnoreExclusivityRMContainers();     for (String partition : resToObtainByPartition.keySet()) {       if (ignorePartitionExclusivityContainers.containsKey(partition)) {         TreeSet<RMContainer> rmContainers =             ignorePartitionExclusivityContainers.get(partition);         // 最初提交的工作,会被最先抢占         for (RMContainer c : rmContainers.descendingSet()) {           if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(c,               selectedCandidates)) {             // Skip already selected containers             continue;           }           // 将 Container 放到待抢占汇合 preemptMap 中           boolean preempted = CapacitySchedulerPreemptionUtils               .tryPreemptContainerAndDeductResToObtain(rc,                   preemptionContext, resToObtainByPartition, c,                   clusterResource, selectedCandidates,                   totalPreemptionAllowed);         }       }     }     // preempt other containers     Resource skippedAMSize = Resource.newInstance(0, 0);     // 默认是 FifoOrderingPolicy,desc 也就是最初提交的在最后面     Iterator<FiCaSchedulerApp> desc =         leafQueue.getOrderingPolicy().getPreemptionIterator();     while (desc.hasNext()) {       FiCaSchedulerApp fc = desc.next();       if (resToObtainByPartition.isEmpty()) {         break;       }       // 从 application 中选出要被抢占的容器(前面介绍)       preemptFrom(fc, clusterResource, resToObtainByPartition,           skippedAMContainerlist, skippedAMSize, selectedCandidates,           totalPreemptionAllowed);     }     // Can try preempting AMContainers     Resource maxAMCapacityForThisQueue = Resources.multiply(         Resources.multiply(clusterResource,             leafQueue.getAbsoluteCapacity()),         leafQueue.getMaxAMResourcePerQueuePercent());     preemptAMContainers(clusterResource, selectedCandidates, skippedAMContainerlist,         resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue,         totalPreemptionAllowed);   } } return selectedCandidates;  }

二)执行容器资源抢占

把要被抢占的 container 都选出来之后,就剩最初一步, kill 这些 container。
回到 containerBasedPreemptOrKill()

  private void containerBasedPreemptOrKill(CSQueue root,      Resource clusterResources) {    // ......    // ------------ 第三步 ------------ (执行容器资源抢占 或 kill超时未主动进行的容器)    // preempt (or kill) the selected containers    preemptOrkillSelectedContainerAfterWait(toPreempt);    // cleanup staled preemption candidates    cleanupStaledPreemptionCandidates();  }

三、总结

至此,剖析结束整个资源抢占的过程。
总结一下次要逻辑:

  1. 从新计算各资源池中各队列应调配的资源;
  2. 与当初已应用的资源进行比照,如果超过新计算的调配量,(超用的局部*阻尼系数)就是要被抢占的资源量;
  3. 各队列依据要被抢占的资源量,选出要被 kill 的 container。优先度低的 container 就会被先解决(应用了共享资源的、后生成的 container);
  4. 通过心跳告诉 AM 要被 kill 的 container,或者解决掉告诉过已超时的 container。

参考文章:
Yarn FairScheduler的抢占机制详解_小昌昌的博客的博客-CSDN博客
Yarn抢占最外围分析_Geoffrey Turing的博客-CSDN博客 - 针对 fair
Yarn调度之CapacityScheduler源码剖析资源抢占
Better SLAs via Resource-preemption in YARN's CapacityScheduler - Cloudera Blog