乐趣区

关于hadoop:深入浅出-Yarn-架构与实现53-Yarn-调度器资源抢占模型

本篇将对 Yarn 调度器中的资源抢占形式进行探索。剖析当集群资源有余时,占用量资源少的队列,是如何从其余队列中争夺资源的。咱们将深刻源码,一步步剖析争夺资源的具体逻辑。

一、简介

在资源调度器中,以 CapacityScheduler 为例(Fair 相似),每个队列可设置一个最小资源量和最大资源量。其中,最小资源量是资源紧缺状况下每个队列需保障的资源量,而最大资源量则是极其状况下队列也不能超过的资源使用量。
资源抢占产生的起因,是为了进步资源利用率,资源调度器(包含 Capacity Scheduler 和 Fair Scheduler)会将负载较轻的队列的资源临时调配给负载重的队列。
仅当负载较轻队列忽然收到新提交的应用程序时,调度器才进一步将本属于该队列的资源归还给它。
但因为此时资源可能正被其余队列应用,因而调度器必须期待其余队列开释资源后,能力将这些资源“完璧归赵”,为了避免应用程序等待时间过长,RM 在期待一段时间后强制回收。

开启容器抢占须要配置的参数 yarn-site.xml

yarn.resourcemanager.scheduler.monitor.enable
yarn.resourcemanager.scheduler.monitor.policies

二、抢占具体逻辑

这里咱们次要剖析如何选出待抢占容器这一过程。
整顿流程如下图所示:

接下来咱们深刻源码,看看具体的逻辑:
首先 ResourceManager 通过 ResourceManager#createPolicyMonitors 办法创立资源抢占服务:

    protected void createPolicyMonitors() {
      // 只有 capacity scheduler 实现了 PreemptableResourceScheduler 接口,fair 是如何实现资源抢占的?if (scheduler instanceof PreemptableResourceScheduler
          && conf.getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS,
          YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS)) {LOG.info("Loading policy monitors");
        // 是否配置了 scheduler.monitor.policies
        // 默认值是 ProportionalCapacityPreemptionPolicy?代码中没看到默认值,然而 yarn-site.xml doc 中有默认值
        List<SchedulingEditPolicy> policies = conf.getInstances(
            YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
            SchedulingEditPolicy.class);
        if (policies.size() > 0) {for (SchedulingEditPolicy policy : policies) {LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName());
            // periodically check whether we need to take action to guarantee
            // constraints
            // 此处创立了资源抢占服务类。// 当此服务启动时,会启动一个线程每隔 PREEMPTION_MONITORING_INTERVAL(默认 3s)调用一次
            // ProportionalCapacityPreemptionPolicy 类中的 editSchedule 办法,//【重点】在此办法中实现了具体的资源抢占逻辑。SchedulingMonitor mon = new SchedulingMonitor(rmContext, policy);
            addService(mon);
          }

资源抢占服务会启动一个线程每隔 3 秒钟调用配置的抢占规定,这里以 ProportionalCapacityPreemptionPolicy(比例容量抢占规定)为例介绍其中的抢占具体逻辑(editSchedule 办法):

// ProportionalCapacityPreemptionPolicy#editSchedule
  public void editSchedule() {updateConfigIfNeeded();

    long startTs = clock.getTime();

    CSQueue root = scheduler.getRootQueue();
    // 获取集群以后资源快照
    Resource clusterResources = Resources.clone(scheduler.getClusterResource());
    // 具体的资源抢占逻辑
    containerBasedPreemptOrKill(root, clusterResources);

    if (LOG.isDebugEnabled()) {LOG.debug("Total time used=" + (clock.getTime() - startTs) + "ms.");
    }
  }

editSchedule 办法很简略,逻辑都被封装到 containerBasedPreemptOrKill() 办法中,咱们持续深刻。
其中次要分三步:

  1. 生成资源快照
  2. 依据规定找出各队列待抢占的容器(重点)
  3. 执行容器资源抢占 或 kill 超时未主动进行的容器

    // 仅保留重要逻辑
      private void containerBasedPreemptOrKill(CSQueue root,
       Resource clusterResources) {
     // ------------ 第一步 ------------(生成资源快照)// extract a summary of the queues from scheduler
     // 将所有队列信息拷贝到 queueToPartitions - Map< 队列名, Map< 资源池, 队列详情 >>。生成快照,避免队列变动造成计算问题。for (String partitionToLookAt : allPartitions) {
         cloneQueues(root, Resources
                 .clone(nlm.getResourceByLabel(partitionToLookAt, clusterResources)), partitionToLookAt);
       }
    
     // ------------ 第二步 ------------(找出待抢占的容器)// compute total preemption allowed
     // based on ideal allocation select containers to be preemptionCandidates from each queue and each application
     // candidatesSelectionPolicies 默认会放入 FifoCandidatesSelector,// 如果配置了 INTRAQUEUE_PREEMPTION_ENABLED,会减少 IntraQueueCandidatesSelector
     for (PreemptionCandidatesSelector selector :
         candidatesSelectionPolicies) {
       //【外围办法】计算待抢占 Container 放到 preemptMap
       toPreempt = selector.selectCandidates(toPreempt,
           clusterResources, totalPreemptionAllowed);
     }
    
     // 这里有个相似 dryrun 的参数 yarn.resourcemanager.monitor.capacity.preemption.observe_only
     if (observeOnly) {return;}
    
     // ------------ 第三步 ------------(执行容器资源抢占 或 kill 超时未主动进行的容器)// preempt (or kill) the selected containers
     preemptOrkillSelectedContainerAfterWait(toPreempt);
     // cleanup staled preemption candidates
     cleanupStaledPreemptionCandidates();}

一)找出待抢占的容器

第一步资源快照没什么好说的,间接进入到重点:第二步找出待抢占的容器
selector.selectCandidates(),以默认的 FifoCandidatesSelector 实现为例解说,其余的同理。
次要分两步:

  1. 依据使用量和需求量重新分配资源,失去各队列要被抢占的资源量
  2. 依据资源差额,计算要 kill 的 container

    // yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
      public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
       Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
       Resource clusterResource, Resource totalPreemptionAllowed) {
     // ------------ 第一步 ------------(依据使用量和需求量重新分配资源)// Calculate how much resources we need to preempt
     // 计算出每个资源池每个队列以后资源分配量,和理论要 preempt 的量
     preemptableAmountCalculator.computeIdealAllocation(clusterResource,
         totalPreemptionAllowed);
    
     // ------------ 第二步 ------------(依据资源差额,计算要 kill 的 container)// 选 container 是有优先级的:应用共享池的资源 -> 队列中后提交的工作 -> amContainer
     for (String queueName : preemptionContext.getLeafQueueNames()) {synchronized (leafQueue) {
           // 省略了大部分逻辑,在前面介绍
           // 从 application 中选出要被抢占的容器
           preemptFrom(fc, clusterResource, resToObtainByPartition,
               skippedAMContainerlist, skippedAMSize, selectedCandidates,
               totalPreemptionAllowed);
         }
     }

从新计算各队列调配的资源量

咱们先来看「依据使用量和需求量重新分配资源」,即 PreemptableResourceCalculator#computeIdealAllocation()

  // 计算每个队列理论要被 preempt 的量
  public void computeIdealAllocation(Resource clusterResource,
      Resource totalPreemptionAllowed) {for (String partition : context.getAllPartitions()) {
      TempQueuePerPartition tRoot = context.getQueueByPartition(CapacitySchedulerConfiguration.ROOT, partition);
      // 这里计算好每个队列超出资源配置的局部,存在 TempQueuePerPartition
      // preemptableExtra 示意能够被抢占的
      // untouchableExtra 示意不可被抢占的(队列配置了不可抢占)// yarn.scheduler.capacity.<queue>.disable_preemption
      updatePreemptableExtras(tRoot);

      tRoot.idealAssigned = tRoot.getGuaranteed();
      //【重点】遍历队列树,从新计算资源分配,并计算出每个队列打算要 Preempt 的量
      recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed);
    }

    // 计算理论每个队列要被 Preempt 的量 actuallyToBePreempted(有个阻尼因子,不会一下把所有超量的都干掉)calculateResToObtainByPartitionForLeafQueues(context.getLeafQueueNames(),
        clusterResource);
  }
}

咱们间接深刻到 recursivelyComputeIdealAssignment() 办法中的外围逻辑:从新计算各队列资源分配值 AbstractPreemptableResourceCalculator#computeFixpointAllocation()
次要逻辑如下:

  1. 首先保障每个队列有本人配置的资源。若使用量小于配置量,多余的资源会被调配到其余队列
  2. 若队列有超出配置资源需要,则放到一个优先级队列中,按 (使用量 / 配置量) 从小到大排序
  3. 对于有资源需要的队列,在残余的资源中,按配置比例计算每个队列可调配的资源量
  4. 每次从优先级队列当选需要优先级最高的,进行调配
  5. 计算 min(可调配量, 队列最大残余用量, 需求量)。作为本次调配的资源。若仍有资源需要则放回优先级队列,期待下次调配
  6. 当满足所有队列资源需要,或者没有残余资源时完结
  7. 仍有资源需要的队列会记录在 underServedQueues

      // 按肯定规定将资源分给各个队列
      protected void computeFixpointAllocation(Resource totGuarant,
       Collection<TempQueuePerPartition> qAlloc, Resource unassigned,
       boolean ignoreGuarantee) {
     // 传进来 unassigned = totGuarant
     // 有序队列,(使用量 / 配置量) 从小到大排序
     PriorityQueue<TempQueuePerPartition> orderedByNeed = new PriorityQueue<>(10,
         tqComparator);
    
       // idealAssigned = min(使用量,配置量)。对于不可抢占队列,则再加上超出的局部,避免资源被再调配。if (Resources.greaterThan(rc, totGuarant, used, q.getGuaranteed())) {q.idealAssigned = Resources.add(q.getGuaranteed(), q.untouchableExtra);
       } else {q.idealAssigned = Resources.clone(used);
       }
    
       // 如果该队列有超出配置资源需要,就把这个队列放到 orderedByNeed 有序队列中(即这个队列有资源缺口)if (Resources.lessThan(rc, totGuarant, q.idealAssigned, curPlusPend)) {orderedByNeed.add(q);
       }
     }
    
     // 此时 unassigned 是 整体可用资源 排除掉 所有已应用的资源(used)// 把未调配的资源(unassigned)调配进来
     // 形式就是从 orderedByNeed 中每次取出 most under-guaranteed 队列,按规定调配一块资源给他,如果仍不满足就按程序再放回 orderedByNeed
     // 直到满足所有队列资源,或者没有资源可调配
     while (!orderedByNeed.isEmpty() && Resources.greaterThan(rc, totGuarant,
         unassigned, Resources.none())) {Resource wQassigned = Resource.newInstance(0, 0);
       // 对于有资源缺口的队列,从新计算他们的资源保障比例:normalizedGuarantee。// 即(该队列保障量 / 所有资源缺口队列保障量)resetCapacity(unassigned, orderedByNeed, ignoreGuarantee);
    
       // 这里返回是个列表,是因为可能有需要度(优先级)相等的状况
       Collection<TempQueuePerPartition> underserved = getMostUnderservedQueues(orderedByNeed, tqComparator);
       for (Iterator<TempQueuePerPartition> i = underserved.iterator(); i
           .hasNext();) {TempQueuePerPartition sub = i.next();
         // 依照 normalizedGuarantee 比例能从残余资源中分走多少。Resource wQavail = Resources.multiplyAndNormalizeUp(rc, unassigned,
             sub.normalizedGuarantee, Resource.newInstance(1, 1));
         //【重点】按肯定规定将资源分配给队列,并返回剩下的资源。Resource wQidle = sub.offer(wQavail, rc, totGuarant,
             isReservedPreemptionCandidatesSelector);
         // 调配给队列的资源
         Resource wQdone = Resources.subtract(wQavail, wQidle);
    
         // 这里 wQdone > 0 证实本次迭代调配进来了资源,那么还会放回到待分配资源的汇合中(哪怕本次已满足资源申请),直到未再分配资源了才退出。if (Resources.greaterThan(rc, totGuarant, wQdone, Resources.none())) {orderedByNeed.add(sub);
         }
         Resources.addTo(wQassigned, wQdone);
       }
       Resources.subtractFrom(unassigned, wQassigned);
     }
    
     // 这里有可能整个资源都调配完了,还有队列资源不满足
     while (!orderedByNeed.isEmpty()) {TempQueuePerPartition q1 = orderedByNeed.remove();
       context.addPartitionToUnderServedQueues(q1.queueName, q1.partition);
     }
      }

    下面第 5 步是重点,也就是 sub.offer(),是计算给该队列在保障值之外,还能提供多少资源:

      /**
    * 计算队列 idealAssigned,在原有根底上减少新调配的资源。同时返回 avail 中未应用的资源。* 参数阐明:* avail 按比例该队列能从残余资源中调配到的
    * clusterResource 整体资源量
    * considersReservedResource?* idealAssigned = min(使用量,配置量)
    */
      Resource offer(Resource avail, ResourceCalculator rc,
       Resource clusterResource, boolean considersReservedResource) {
     // 计算的是还有多少可分配资源的空间(maxCapacity - assigned)Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax(Resources.subtract(getMax(), idealAssigned),
         Resource.newInstance(0, 0));
     // remain = avail - min(avail, (max - assigned), (current + pending - assigned))
     // 队列承受资源的计算方法:可提供的资源,队列最大资源 - 已分配资源,以后已应用资源 + 未满足的资源 -min(使用量,配置量) 三者中的最小值。Resource accepted = Resources.min(rc, clusterResource,
         absMaxCapIdealAssignedDelta,
         Resources.min(rc, clusterResource, avail, Resources
             .subtract(
                 Resources.add((considersReservedResource
                     ? getUsed()
                     : getUsedDeductReservd()), pending),
                 idealAssigned)));
     Resource remain = Resources.subtract(avail, accepted);
     Resources.addTo(idealAssigned, accepted);
     return remain;
      }

    外围的资源重新分配算法逻辑曾经计算结束,剩下的就是:
    依据从新计算的资源分配,失去各队列超用的资源,这部分就是要被抢占的资源。
    这里不会一下把队列超用的资源都干掉,有个阻尼因子,用于平滑抢占解决。

依据资源差额,计算要抢占的容器

回到 selector.selectCandidates(),下面曾经介绍了各队列抢占量的计算逻辑,接下来介绍「如何选出各队列中的 container」

  1. 抢占该队列在共享池应用资源的 container
  2. 抢占后提交工作中,后生成的 container(也就是越晚生成的 container,会被先解决)
  3. 抢占 amContainer

      public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
       Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
       Resource clusterResource, Resource totalPreemptionAllowed) {
     // ......
    
     // ------------ 第二步 ------------(依据资源差额,计算要 kill 的 container)// 依据计算失去的要抢占的量,计算各资源池各队列要 kill 的 container
     List<RMContainer> skippedAMContainerlist = new ArrayList<>();
    
     // Loop all leaf queues
     // 这里是有优先级的:应用共享池的资源 -> 队列中后提交的工作 -> amContainer
     for (String queueName : preemptionContext.getLeafQueueNames()) {
       // 获取该队列在每个资源池要被抢占的量
       Map<String, Resource> resToObtainByPartition =
           CapacitySchedulerPreemptionUtils
               .getResToObtainByPartitionForLeafQueue(preemptionContext,
                   queueName, clusterResource);
    
       synchronized (leafQueue) {
         // 应用共享池资源的,先解决
         Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityContainers =
             leafQueue.getIgnoreExclusivityRMContainers();
         for (String partition : resToObtainByPartition.keySet()) {if (ignorePartitionExclusivityContainers.containsKey(partition)) {
             TreeSet<RMContainer> rmContainers =
                 ignorePartitionExclusivityContainers.get(partition);
             // 最初提交的工作,会被最先抢占
             for (RMContainer c : rmContainers.descendingSet()) {
               if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(c,
                   selectedCandidates)) {
                 // Skip already selected containers
                 continue;
               }
               // 将 Container 放到待抢占汇合 preemptMap 中
               boolean preempted = CapacitySchedulerPreemptionUtils
                   .tryPreemptContainerAndDeductResToObtain(rc,
                       preemptionContext, resToObtainByPartition, c,
                       clusterResource, selectedCandidates,
                       totalPreemptionAllowed);
             }
           }
         }
    
         // preempt other containers
         Resource skippedAMSize = Resource.newInstance(0, 0);
         // 默认是 FifoOrderingPolicy,desc 也就是最初提交的在最后面
         Iterator<FiCaSchedulerApp> desc =
             leafQueue.getOrderingPolicy().getPreemptionIterator();
         while (desc.hasNext()) {FiCaSchedulerApp fc = desc.next();
           if (resToObtainByPartition.isEmpty()) {break;}
    
           // 从 application 中选出要被抢占的容器(前面介绍)preemptFrom(fc, clusterResource, resToObtainByPartition,
               skippedAMContainerlist, skippedAMSize, selectedCandidates,
               totalPreemptionAllowed);
         }
    
         // Can try preempting AMContainers
         Resource maxAMCapacityForThisQueue = Resources.multiply(
             Resources.multiply(clusterResource,
                 leafQueue.getAbsoluteCapacity()),
             leafQueue.getMaxAMResourcePerQueuePercent());
    
         preemptAMContainers(clusterResource, selectedCandidates, skippedAMContainerlist,
             resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue,
             totalPreemptionAllowed);
       }
     }
    
     return selectedCandidates;
      }

二)执行容器资源抢占

把要被抢占的 container 都选出来之后,就剩最初一步,kill 这些 container。
回到 containerBasedPreemptOrKill()

  private void containerBasedPreemptOrKill(CSQueue root,
      Resource clusterResources) {
    // ......

    // ------------ 第三步 ------------(执行容器资源抢占 或 kill 超时未主动进行的容器)// preempt (or kill) the selected containers
    preemptOrkillSelectedContainerAfterWait(toPreempt);
    // cleanup staled preemption candidates
    cleanupStaledPreemptionCandidates();}

三、总结

至此,剖析结束整个资源抢占的过程。
总结一下次要逻辑:

  1. 从新计算各资源池中各队列应调配的资源;
  2. 与当初已应用的资源进行比照,如果超过新计算的调配量,(超用的局部 * 阻尼系数)就是要被抢占的资源量;
  3. 各队列依据要被抢占的资源量,选出要被 kill 的 container。优先度低的 container 就会被先解决(应用了共享资源的、后生成的 container);
  4. 通过心跳告诉 AM 要被 kill 的 container,或者解决掉告诉过已超时的 container。

参考文章:
Yarn FairScheduler 的抢占机制详解_小昌昌的博客的博客 -CSDN 博客
Yarn 抢占最外围分析_Geoffrey Turing 的博客 -CSDN 博客 – 针对 fair
Yarn 调度之 CapacityScheduler 源码剖析资源抢占
Better SLAs via Resource-preemption in YARN’s CapacityScheduler – Cloudera Blog

退出移动版