本篇将对 Yarn 调度器中的资源抢占形式进行探索。剖析当集群资源有余时,占用量资源少的队列,是如何从其余队列中争夺资源的。咱们将深刻源码,一步步剖析争夺资源的具体逻辑。
一、简介
在资源调度器中,以 CapacityScheduler 为例(Fair 相似),每个队列可设置一个最小资源量和最大资源量。其中,最小资源量是资源紧缺状况下每个队列需保障的资源量,而最大资源量则是极其状况下队列也不能超过的资源使用量。
资源抢占产生的起因,是为了进步资源利用率,资源调度器(包含 Capacity Scheduler 和 Fair Scheduler)会将负载较轻的队列的资源临时调配给负载重的队列。
仅当负载较轻队列忽然收到新提交的应用程序时,调度器才进一步将本属于该队列的资源归还给它。
但因为此时资源可能正被其余队列应用,因而调度器必须期待其余队列开释资源后,能力将这些资源“完璧归赵”,为了避免应用程序等待时间过长,RM 在期待一段时间后强制回收。
开启容器抢占须要配置的参数 yarn-site.xml
:
yarn.resourcemanager.scheduler.monitor.enable
yarn.resourcemanager.scheduler.monitor.policies
二、抢占具体逻辑
这里咱们次要剖析如何选出待抢占容器这一过程。
整顿流程如下图所示:
接下来咱们深刻源码,看看具体的逻辑:
首先 ResourceManager 通过 ResourceManager#createPolicyMonitors
办法创立资源抢占服务:
protected void createPolicyMonitors() {
// 只有 capacity scheduler 实现了 PreemptableResourceScheduler 接口,fair 是如何实现资源抢占的?if (scheduler instanceof PreemptableResourceScheduler
&& conf.getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS)) {LOG.info("Loading policy monitors");
// 是否配置了 scheduler.monitor.policies
// 默认值是 ProportionalCapacityPreemptionPolicy?代码中没看到默认值,然而 yarn-site.xml doc 中有默认值
List<SchedulingEditPolicy> policies = conf.getInstances(
YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
SchedulingEditPolicy.class);
if (policies.size() > 0) {for (SchedulingEditPolicy policy : policies) {LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName());
// periodically check whether we need to take action to guarantee
// constraints
// 此处创立了资源抢占服务类。// 当此服务启动时,会启动一个线程每隔 PREEMPTION_MONITORING_INTERVAL(默认 3s)调用一次
// ProportionalCapacityPreemptionPolicy 类中的 editSchedule 办法,//【重点】在此办法中实现了具体的资源抢占逻辑。SchedulingMonitor mon = new SchedulingMonitor(rmContext, policy);
addService(mon);
}
资源抢占服务会启动一个线程每隔 3 秒钟调用配置的抢占规定,这里以 ProportionalCapacityPreemptionPolicy
(比例容量抢占规定)为例介绍其中的抢占具体逻辑(editSchedule
办法):
// ProportionalCapacityPreemptionPolicy#editSchedule
public void editSchedule() {updateConfigIfNeeded();
long startTs = clock.getTime();
CSQueue root = scheduler.getRootQueue();
// 获取集群以后资源快照
Resource clusterResources = Resources.clone(scheduler.getClusterResource());
// 具体的资源抢占逻辑
containerBasedPreemptOrKill(root, clusterResources);
if (LOG.isDebugEnabled()) {LOG.debug("Total time used=" + (clock.getTime() - startTs) + "ms.");
}
}
editSchedule
办法很简略,逻辑都被封装到 containerBasedPreemptOrKill()
办法中,咱们持续深刻。
其中次要分三步:
- 生成资源快照
- 依据规定找出各队列待抢占的容器(重点)
-
执行容器资源抢占 或 kill 超时未主动进行的容器
// 仅保留重要逻辑 private void containerBasedPreemptOrKill(CSQueue root, Resource clusterResources) { // ------------ 第一步 ------------(生成资源快照)// extract a summary of the queues from scheduler // 将所有队列信息拷贝到 queueToPartitions - Map< 队列名, Map< 资源池, 队列详情 >>。生成快照,避免队列变动造成计算问题。for (String partitionToLookAt : allPartitions) { cloneQueues(root, Resources .clone(nlm.getResourceByLabel(partitionToLookAt, clusterResources)), partitionToLookAt); } // ------------ 第二步 ------------(找出待抢占的容器)// compute total preemption allowed // based on ideal allocation select containers to be preemptionCandidates from each queue and each application // candidatesSelectionPolicies 默认会放入 FifoCandidatesSelector,// 如果配置了 INTRAQUEUE_PREEMPTION_ENABLED,会减少 IntraQueueCandidatesSelector for (PreemptionCandidatesSelector selector : candidatesSelectionPolicies) { //【外围办法】计算待抢占 Container 放到 preemptMap toPreempt = selector.selectCandidates(toPreempt, clusterResources, totalPreemptionAllowed); } // 这里有个相似 dryrun 的参数 yarn.resourcemanager.monitor.capacity.preemption.observe_only if (observeOnly) {return;} // ------------ 第三步 ------------(执行容器资源抢占 或 kill 超时未主动进行的容器)// preempt (or kill) the selected containers preemptOrkillSelectedContainerAfterWait(toPreempt); // cleanup staled preemption candidates cleanupStaledPreemptionCandidates();}
一)找出待抢占的容器
第一步资源快照没什么好说的,间接进入到重点:第二步找出待抢占的容器 。
即 selector.selectCandidates()
,以默认的 FifoCandidatesSelector
实现为例解说,其余的同理。
次要分两步:
- 依据使用量和需求量重新分配资源,失去各队列要被抢占的资源量
-
依据资源差额,计算要 kill 的 container
// yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates( Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates, Resource clusterResource, Resource totalPreemptionAllowed) { // ------------ 第一步 ------------(依据使用量和需求量重新分配资源)// Calculate how much resources we need to preempt // 计算出每个资源池每个队列以后资源分配量,和理论要 preempt 的量 preemptableAmountCalculator.computeIdealAllocation(clusterResource, totalPreemptionAllowed); // ------------ 第二步 ------------(依据资源差额,计算要 kill 的 container)// 选 container 是有优先级的:应用共享池的资源 -> 队列中后提交的工作 -> amContainer for (String queueName : preemptionContext.getLeafQueueNames()) {synchronized (leafQueue) { // 省略了大部分逻辑,在前面介绍 // 从 application 中选出要被抢占的容器 preemptFrom(fc, clusterResource, resToObtainByPartition, skippedAMContainerlist, skippedAMSize, selectedCandidates, totalPreemptionAllowed); } }
从新计算各队列调配的资源量
咱们先来看「依据使用量和需求量重新分配资源」,即 PreemptableResourceCalculator#computeIdealAllocation()
// 计算每个队列理论要被 preempt 的量
public void computeIdealAllocation(Resource clusterResource,
Resource totalPreemptionAllowed) {for (String partition : context.getAllPartitions()) {
TempQueuePerPartition tRoot = context.getQueueByPartition(CapacitySchedulerConfiguration.ROOT, partition);
// 这里计算好每个队列超出资源配置的局部,存在 TempQueuePerPartition
// preemptableExtra 示意能够被抢占的
// untouchableExtra 示意不可被抢占的(队列配置了不可抢占)// yarn.scheduler.capacity.<queue>.disable_preemption
updatePreemptableExtras(tRoot);
tRoot.idealAssigned = tRoot.getGuaranteed();
//【重点】遍历队列树,从新计算资源分配,并计算出每个队列打算要 Preempt 的量
recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed);
}
// 计算理论每个队列要被 Preempt 的量 actuallyToBePreempted(有个阻尼因子,不会一下把所有超量的都干掉)calculateResToObtainByPartitionForLeafQueues(context.getLeafQueueNames(),
clusterResource);
}
}
咱们间接深刻到 recursivelyComputeIdealAssignment()
办法中的外围逻辑:从新计算各队列资源分配值 AbstractPreemptableResourceCalculator#computeFixpointAllocation()
次要逻辑如下:
- 首先保障每个队列有本人配置的资源。若使用量小于配置量,多余的资源会被调配到其余队列
- 若队列有超出配置资源需要,则放到一个优先级队列中,按 (使用量 / 配置量) 从小到大排序
- 对于有资源需要的队列,在残余的资源中,按配置比例计算每个队列可调配的资源量
- 每次从优先级队列当选需要优先级最高的,进行调配
- 计算 min(可调配量, 队列最大残余用量, 需求量)。作为本次调配的资源。若仍有资源需要则放回优先级队列,期待下次调配
- 当满足所有队列资源需要,或者没有残余资源时完结
-
仍有资源需要的队列会记录在 underServedQueues
// 按肯定规定将资源分给各个队列 protected void computeFixpointAllocation(Resource totGuarant, Collection<TempQueuePerPartition> qAlloc, Resource unassigned, boolean ignoreGuarantee) { // 传进来 unassigned = totGuarant // 有序队列,(使用量 / 配置量) 从小到大排序 PriorityQueue<TempQueuePerPartition> orderedByNeed = new PriorityQueue<>(10, tqComparator); // idealAssigned = min(使用量,配置量)。对于不可抢占队列,则再加上超出的局部,避免资源被再调配。if (Resources.greaterThan(rc, totGuarant, used, q.getGuaranteed())) {q.idealAssigned = Resources.add(q.getGuaranteed(), q.untouchableExtra); } else {q.idealAssigned = Resources.clone(used); } // 如果该队列有超出配置资源需要,就把这个队列放到 orderedByNeed 有序队列中(即这个队列有资源缺口)if (Resources.lessThan(rc, totGuarant, q.idealAssigned, curPlusPend)) {orderedByNeed.add(q); } } // 此时 unassigned 是 整体可用资源 排除掉 所有已应用的资源(used)// 把未调配的资源(unassigned)调配进来 // 形式就是从 orderedByNeed 中每次取出 most under-guaranteed 队列,按规定调配一块资源给他,如果仍不满足就按程序再放回 orderedByNeed // 直到满足所有队列资源,或者没有资源可调配 while (!orderedByNeed.isEmpty() && Resources.greaterThan(rc, totGuarant, unassigned, Resources.none())) {Resource wQassigned = Resource.newInstance(0, 0); // 对于有资源缺口的队列,从新计算他们的资源保障比例:normalizedGuarantee。// 即(该队列保障量 / 所有资源缺口队列保障量)resetCapacity(unassigned, orderedByNeed, ignoreGuarantee); // 这里返回是个列表,是因为可能有需要度(优先级)相等的状况 Collection<TempQueuePerPartition> underserved = getMostUnderservedQueues(orderedByNeed, tqComparator); for (Iterator<TempQueuePerPartition> i = underserved.iterator(); i .hasNext();) {TempQueuePerPartition sub = i.next(); // 依照 normalizedGuarantee 比例能从残余资源中分走多少。Resource wQavail = Resources.multiplyAndNormalizeUp(rc, unassigned, sub.normalizedGuarantee, Resource.newInstance(1, 1)); //【重点】按肯定规定将资源分配给队列,并返回剩下的资源。Resource wQidle = sub.offer(wQavail, rc, totGuarant, isReservedPreemptionCandidatesSelector); // 调配给队列的资源 Resource wQdone = Resources.subtract(wQavail, wQidle); // 这里 wQdone > 0 证实本次迭代调配进来了资源,那么还会放回到待分配资源的汇合中(哪怕本次已满足资源申请),直到未再分配资源了才退出。if (Resources.greaterThan(rc, totGuarant, wQdone, Resources.none())) {orderedByNeed.add(sub); } Resources.addTo(wQassigned, wQdone); } Resources.subtractFrom(unassigned, wQassigned); } // 这里有可能整个资源都调配完了,还有队列资源不满足 while (!orderedByNeed.isEmpty()) {TempQueuePerPartition q1 = orderedByNeed.remove(); context.addPartitionToUnderServedQueues(q1.queueName, q1.partition); } }
下面第 5 步是重点,也就是
sub.offer()
,是计算给该队列在保障值之外,还能提供多少资源:/** * 计算队列 idealAssigned,在原有根底上减少新调配的资源。同时返回 avail 中未应用的资源。* 参数阐明:* avail 按比例该队列能从残余资源中调配到的 * clusterResource 整体资源量 * considersReservedResource?* idealAssigned = min(使用量,配置量) */ Resource offer(Resource avail, ResourceCalculator rc, Resource clusterResource, boolean considersReservedResource) { // 计算的是还有多少可分配资源的空间(maxCapacity - assigned)Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax(Resources.subtract(getMax(), idealAssigned), Resource.newInstance(0, 0)); // remain = avail - min(avail, (max - assigned), (current + pending - assigned)) // 队列承受资源的计算方法:可提供的资源,队列最大资源 - 已分配资源,以后已应用资源 + 未满足的资源 -min(使用量,配置量) 三者中的最小值。Resource accepted = Resources.min(rc, clusterResource, absMaxCapIdealAssignedDelta, Resources.min(rc, clusterResource, avail, Resources .subtract( Resources.add((considersReservedResource ? getUsed() : getUsedDeductReservd()), pending), idealAssigned))); Resource remain = Resources.subtract(avail, accepted); Resources.addTo(idealAssigned, accepted); return remain; }
外围的资源重新分配算法逻辑曾经计算结束,剩下的就是:
依据从新计算的资源分配,失去各队列超用的资源,这部分就是要被抢占的资源。
这里不会一下把队列超用的资源都干掉,有个阻尼因子,用于平滑抢占解决。
依据资源差额,计算要抢占的容器
回到 selector.selectCandidates()
,下面曾经介绍了各队列抢占量的计算逻辑,接下来介绍「如何选出各队列中的 container」
- 抢占该队列在共享池应用资源的 container
- 抢占后提交工作中,后生成的 container(也就是越晚生成的 container,会被先解决)
-
抢占 amContainer
public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates( Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates, Resource clusterResource, Resource totalPreemptionAllowed) { // ...... // ------------ 第二步 ------------(依据资源差额,计算要 kill 的 container)// 依据计算失去的要抢占的量,计算各资源池各队列要 kill 的 container List<RMContainer> skippedAMContainerlist = new ArrayList<>(); // Loop all leaf queues // 这里是有优先级的:应用共享池的资源 -> 队列中后提交的工作 -> amContainer for (String queueName : preemptionContext.getLeafQueueNames()) { // 获取该队列在每个资源池要被抢占的量 Map<String, Resource> resToObtainByPartition = CapacitySchedulerPreemptionUtils .getResToObtainByPartitionForLeafQueue(preemptionContext, queueName, clusterResource); synchronized (leafQueue) { // 应用共享池资源的,先解决 Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityContainers = leafQueue.getIgnoreExclusivityRMContainers(); for (String partition : resToObtainByPartition.keySet()) {if (ignorePartitionExclusivityContainers.containsKey(partition)) { TreeSet<RMContainer> rmContainers = ignorePartitionExclusivityContainers.get(partition); // 最初提交的工作,会被最先抢占 for (RMContainer c : rmContainers.descendingSet()) { if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(c, selectedCandidates)) { // Skip already selected containers continue; } // 将 Container 放到待抢占汇合 preemptMap 中 boolean preempted = CapacitySchedulerPreemptionUtils .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext, resToObtainByPartition, c, clusterResource, selectedCandidates, totalPreemptionAllowed); } } } // preempt other containers Resource skippedAMSize = Resource.newInstance(0, 0); // 默认是 FifoOrderingPolicy,desc 也就是最初提交的在最后面 Iterator<FiCaSchedulerApp> desc = leafQueue.getOrderingPolicy().getPreemptionIterator(); while (desc.hasNext()) {FiCaSchedulerApp fc = desc.next(); if (resToObtainByPartition.isEmpty()) {break;} // 从 application 中选出要被抢占的容器(前面介绍)preemptFrom(fc, clusterResource, resToObtainByPartition, skippedAMContainerlist, skippedAMSize, selectedCandidates, totalPreemptionAllowed); } // Can try preempting AMContainers Resource maxAMCapacityForThisQueue = Resources.multiply( Resources.multiply(clusterResource, leafQueue.getAbsoluteCapacity()), leafQueue.getMaxAMResourcePerQueuePercent()); preemptAMContainers(clusterResource, selectedCandidates, skippedAMContainerlist, resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue, totalPreemptionAllowed); } } return selectedCandidates; }
二)执行容器资源抢占
把要被抢占的 container 都选出来之后,就剩最初一步,kill 这些 container。
回到 containerBasedPreemptOrKill()
:
private void containerBasedPreemptOrKill(CSQueue root,
Resource clusterResources) {
// ......
// ------------ 第三步 ------------(执行容器资源抢占 或 kill 超时未主动进行的容器)// preempt (or kill) the selected containers
preemptOrkillSelectedContainerAfterWait(toPreempt);
// cleanup staled preemption candidates
cleanupStaledPreemptionCandidates();}
三、总结
至此,剖析结束整个资源抢占的过程。
总结一下次要逻辑:
- 从新计算各资源池中各队列应调配的资源;
- 与当初已应用的资源进行比照,如果超过新计算的调配量,(超用的局部 * 阻尼系数)就是要被抢占的资源量;
- 各队列依据要被抢占的资源量,选出要被 kill 的 container。优先度低的 container 就会被先解决(应用了共享资源的、后生成的 container);
- 通过心跳告诉 AM 要被 kill 的 container,或者解决掉告诉过已超时的 container。
参考文章:
Yarn FairScheduler 的抢占机制详解_小昌昌的博客的博客 -CSDN 博客
Yarn 抢占最外围分析_Geoffrey Turing 的博客 -CSDN 博客 – 针对 fair
Yarn 调度之 CapacityScheduler 源码剖析资源抢占
Better SLAs via Resource-preemption in YARN’s CapacityScheduler – Cloudera Blog