乐趣区

别被官方文档迷惑了!这篇文章帮你详解yarn公平调度

欢迎大家前往腾讯云 + 社区,获取更多腾讯海量技术实践干货哦~
本文由 @edwinhzhang 发表于云 + 社区专栏

FairScheduler 是 yarn 常用的调度器,但是仅仅参考官方文档,有很多参数和概念文档里没有详细说明,但是这些参明显会影响到集群的正常运行。本文的主要目的是通过梳理代码将关键参数的功能理清楚。下面列出官方文档中常用的参数:

yarn.scheduler.fair.preemption.cluster-utilization-threshold
The utilization threshold after which preemption kicks in. The utilization is computed as the maximum ratio of usage to capacity among all resources. Defaults to 0.8f.

yarn.scheduler.fair.update-interval-ms
The interval at which to lock the scheduler and recalculate fair shares, recalculate demand, and check whether anything is due for preemption. Defaults to 500 ms.

maxAMShare
limit the fraction of the queue’s fair share that can be used to run application masters. This property can only be used for leaf queues. For example, if set to 1.0f, then AMs in the leaf queue can take up to 100% of both the memory and CPU fair share. The value of -1.0f will disable this feature and the amShare will not be checked. The default value is 0.5f.

minSharePreemptionTimeout
number of seconds the queue is under its minimum share before it will try to preempt containers to take resources from other queues. If not set, the queue will inherit the value from its parent queue.

fairSharePreemptionTimeout
number of seconds the queue is under its fair share threshold before it will try to preempt containers to take resources from other queues. If not set, the queue will inherit the value from its parent queue.

fairSharePreemptionThreshold
If the queue waits fairSharePreemptionTimeout without receiving fairSharePreemptionThreshold*fairShare resources, it is allowed to preempt containers to take resources from other queues. If not set, the queue will inherit the value from its parent queue.

在上述参数描述中,timeout 等参数值没有给出默认值,没有告知不设置会怎样。minShare,fairShare 等概念也没有说清楚,很容易让人云里雾里。关于这些参数和概念的详细解释,在下面的分析中一一给出。
FairScheduler 整体结构
图(1)FairScheduler 运行流程图
公平调度器的运行流程就是 RM 去启动 FairScheduler,SchedulerDispatcher 两个服务,这两个服务各自负责 update 线程,handle 线程。
update 线程有两个任务:(1)更新各个队列的资源(Instantaneous Fair Share),(2)判断各个 leaf 队列是否需要抢占资源(如果开启抢占功能)
handle 线程主要是处理一些事件响应,比如集群增加节点,队列增加 APP,队列删除 APP,APP 更新 container 等。
FairScheduler 类图
图(2)FairScheduler 相关类图
队列继承模块:yarn 通过树形结构来管理队列。从管理资源角度来看,树的根节点 root 队列(FSParentQueue), 非根节点(FSParentQueue),叶子节点(FSLeaf),app 任务(FSAppAttempt,公平调度器角度的 App)都是抽象的资源,它们都实现了 Schedulable 接口,都是一个可调度资源对象。它们都有自己的 fair share(队列的资源量)方法(这里又用到了 fair share 概念),weight 属性(权重)、minShare 属性(最小资源量)、maxShare 属性(最大资源量),priority 属性(优先级)、resourceUsage 属性(资源使用量属性)以及资源需求量属性(demand),同时也都实现了 preemptContainer 抢占资源的方法,assignContainer 方法(为一个 ACCEPTED 的 APP 分配 AM 的 container)。
public interface Schedulable {
/**
* Name of job/queue, used for debugging as well as for breaking ties in
* scheduling order deterministically.
*/
public String getName();

/**
* Maximum number of resources required by this Schedulable. This is defined as
* number of currently utilized resources + number of unlaunched resources (that
* are either not yet launched or need to be speculated).
*/
public Resource getDemand();

/** Get the aggregate amount of resources consumed by the schedulable. */
public Resource getResourceUsage();

/** Minimum Resource share assigned to the schedulable. */
public Resource getMinShare();

/** Maximum Resource share assigned to the schedulable. */
public Resource getMaxShare();

/** Job/queue weight in fair sharing. */
public ResourceWeights getWeights();

/** Start time for jobs in FIFO queues; meaningless for QueueSchedulables.*/
public long getStartTime();

/** Job priority for jobs in FIFO queues; meaningless for QueueSchedulables. */
public Priority getPriority();

/** Refresh the Schedulable’s demand and those of its children if any. */
public void updateDemand();

/**
* Assign a container on this node if possible, and return the amount of
* resources assigned.
*/
public Resource assignContainer(FSSchedulerNode node);

/**
* Preempt a container from this Schedulable if possible.
*/
public RMContainer preemptContainer();

/** Get the fair share assigned to this Schedulable. */
public Resource getFairShare();

/** Assign a fair share to this Schedulable. */
public void setFairShare(Resource fairShare);
}
队列运行模块:从类图角度描述公平调度的工作原理。SchedulerEventDispatcher 类负责管理 handle 线程。FairScheduler 类管理 update 线程,通过 QueueManager 获取所有队列信息。
我们从 Instantaneous Fair Share 和 Steady Fair Share 这两个 yarn 的基本概念开始进行代码分析。
Instantaneous Fair Share & Steady Fair Share
Fair Share 指的都是 Yarn 根据每个队列的权重、最大,最小可运行资源计算的得到的可以分配给这个队列的最大可用资源。本文描述的是公平调度,公平调度的默认策略 FairSharePolicy 的规则是 single-resource,即只关注内存资源这一项指标。
Steady Fair Share:是每个队列内存资源量的固定理论值。Steady Fair Share 在 RM 初期工作后不再轻易改变,只有后续在增加节点(addNode)时才会重新计算。RM 的初期工作也是 handle 线程把集群的每个节点添加到调度器中(addNode)。
Instantaneous Fair Share:是每个队列的内存资源量的实际值,是在动态变化的。yarn 里的 fair share 如果没有专门指代,都是指的的 Instantaneous Fair Share。
1 Steady Fair Share 计算方式
图(3)steady fair share 计算流程
handle 线程如果接收到 NODE_ADDED 事件,会去调用 addNode 方法。
private synchronized void addNode(RMNode node) {
FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName);
nodes.put(node.getNodeID(), schedulerNode);
// 将该节点的内存加入到集群总资源
Resources.addTo(clusterResource, schedulerNode.getTotalResource());
// 更新 available 资源
updateRootQueueMetrics();
// 更新一个 container 的最大分配,就是 UI 界面里的 MAX(如果没有记错的话)
updateMaximumAllocation(schedulerNode, true);

// 设置 root 队列的 steadyFailr=clusterResource 的总资源
queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
// 重新计算 SteadyShares
queueMgr.getRootQueue().recomputeSteadyShares();
LOG.info(“Added node ” + node.getNodeAddress() +
” cluster capacity: ” + clusterResource);
}
recomputeSteadyShares 使用广度优先遍历计算每个队列的内存资源量,直到叶子节点。
public void recomputeSteadyShares() {
// 广度遍历整个队列树
// 此时 getSteadyFairShare 为 clusterResource
policy.computeSteadyShares(childQueues, getSteadyFairShare());
for (FSQueue childQueue : childQueues) {
childQueue.getMetrics().setSteadyFairShare(childQueue.getSteadyFairShare());
if (childQueue instanceof FSParentQueue) {
((FSParentQueue) childQueue).recomputeSteadyShares();
}
}
}
computeSteadyShares 方法计算每个队列应该分配到的内存资源,总体来说是根据每个队列的权重值去分配,权重大的队列分配到的资源更多,权重小的队列分配到得资源少。但是实际的细节还会受到其他因素影响,是因为每队列有 minResources 和 maxResources 两个参数来限制资源的上下限。computeSteadyShares 最终去调用 computeSharesInternal 方法。比如以下图为例:
图中的数字是权重,假如有 600G 的总资源,parent=300G,leaf1=300G,leaf2=210G,leaf3=70G。
图(4)yarn 队列权重
computeSharesInternal 方法概括来说就是通过二分查找法寻找到一个资源比重值 R(weight-to-slots),使用这个 R 为每个队列分配资源(在该方法里队列的类型是 Schedulable, 再次说明队列是一个资源对象),公式是 steadyFairShare=R * QueueWeights。
computeSharesInternal 是计算 Steady Fair Share 和 Instantaneous Fair Share 共用的方法,根据参数 isSteadyShare 来区别计算。
之所以要做的这么复杂,是因为队列不是单纯的按照比例来分配资源的(单纯按权重比例,需要 maxR,minR 都不设置。maxR 的默认值是 0x7fffffff,minR 默认值是 0)。如果设置了 maxR,minR, 按比例分到的资源小于 minR, 那么必须满足 minR。按比例分到的资源大于 maxR,那么必须满足 maxR。因此想要找到一个 R(weight-to-slots)来尽可能满足:

R*(Queue1Weights + Queue2Weights+…+QueueNWeights)<=totalResource
R*QueueWeights >= minShare
R*QueueWeights <= maxShare

注:QueueNWeights 为队列各自的权重,minShare 和 maxShare 即各个队列的 minResources 和 maxResources
computcomputeSharesInternal 详细来说分为四个步骤:

确定可用资源:totalResources = min(totalResources-takenResources(fixedShare), totalMaxShare)

确定 R 上下限
二分查找法逼近 R
使用 R 设置 fair Share

private static void computeSharesInternal(
Collection<? extends Schedulable> allSchedulables,
Resource totalResources, ResourceType type, boolean isSteadyShare) {

Collection<Schedulable> schedulables = new ArrayList<Schedulable>();
// 第一步
// 排除有固定资源不能动的队列, 并得出固定内存资源
int takenResources = handleFixedFairShares(
allSchedulables, schedulables, isSteadyShare, type);

if (schedulables.isEmpty()) {
return;
}
// Find an upper bound on R that we can use in our binary search. We start
// at R = 1 and double it until we have either used all the resources or we
// have met all Schedulables’ max shares.
int totalMaxShare = 0;
// 遍历 schedulables(非固定 fixed 队列),将各个队列的资源相加得到 totalMaxShare
for (Schedulable sched : schedulables) {
int maxShare = getResourceValue(sched.getMaxShare(), type);
totalMaxShare = (int) Math.min((long)maxShare + (long)totalMaxShare,
Integer.MAX_VALUE);
if (totalMaxShare == Integer.MAX_VALUE) {
break;
}
}
// 总资源要减去 fiexd share
int totalResource = Math.max((getResourceValue(totalResources, type) –
takenResources), 0);
// 队列所拥有的最大资源是有集群总资源和每个队列的 MaxResource 双重限制
totalResource = Math.min(totalMaxShare, totalResource);
// 第二步: 设置 R 的上下限
double rMax = 1.0;
while (resourceUsedWithWeightToResourceRatio(rMax, schedulables, type)
< totalResource) {
rMax *= 2.0;
}

// 第三步:二分法逼近合理 R 值
// Perform the binary search for up to COMPUTE_FAIR_SHARES_ITERATIONS steps
double left = 0;
double right = rMax;
for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) {
double mid = (left + right) / 2.0;
int plannedResourceUsed = resourceUsedWithWeightToResourceRatio(
mid, schedulables, type);
if (plannedResourceUsed == totalResource) {
right = mid;
break;
} else if (plannedResourceUsed < totalResource) {
left = mid;
} else {
right = mid;
}
}
// 第四步:使用 R 值设置,确定各个非 fixed 队列的 fairShar, 意味着只有活跃队列可以分资源
// Set the fair shares based on the value of R we’ve converged to
for (Schedulable sched : schedulables) {
if (isSteadyShare) {
setResourceValue(computeShare(sched, right, type),
((FSQueue) sched).getSteadyFairShare(), type);
} else {
setResourceValue(
computeShare(sched, right, type), sched.getFairShare(), type);
}
}
}
(1) 确定可用资源
handleFixedFairShares 方法来统计出所有 fixed 队列的 fixed 内存资源(fixedShare)相加,并且 fixed 队列排除掉不得瓜分系统资源。yarn 确定 fixed 队列的标准如下:
private static int getFairShareIfFixed(Schedulable sched,
boolean isSteadyShare, ResourceType type) {

// 如果队列的 maxShare <=0 则是 fixed 队列,fixdShare=0
if (getResourceValue(sched.getMaxShare(), type) <= 0) {
return 0;
}

// 如果是计算 Instantaneous Fair Share, 并且该队列内没有 APP 再跑,
// 则是 fixed 队列,fixdShare=0
if (!isSteadyShare &&
(sched instanceof FSQueue) && !((FSQueue)sched).isActive()) {
return 0;
}

// 如果队列 weight<=0, 则是 fixed 队列
// 如果对列 minShare <=0,fixdShare=0, 否则 fixdShare=minShare
if (sched.getWeights().getWeight(type) <= 0) {
int minShare = getResourceValue(sched.getMinShare(), type);
return (minShare <= 0) ? 0 : minShare;
}

return -1;
}
(2)确定 R 上下限
R 的下限为 1.0,R 的上限是由 resourceUsedWithWeightToResourceRatio 方法来确定。该方法确定的资源值 W,第一步中确定的可用资源值 T:W>= T 时,R 才能确定。
// 根据 R 值去计算每个队列应该分配的资源
private static int resourceUsedWithWeightToResourceRatio(double w2rRatio,
Collection<? extends Schedulable> schedulables, ResourceType type) {
int resourcesTaken = 0;
for (Schedulable sched : schedulables) {
int share = computeShare(sched, w2rRatio, type);
resourcesTaken += share;
}
return resourcesTaken;
}
private static int computeShare(Schedulable sched, double w2rRatio,
ResourceType type) {
//share=R*weight,type 是内存
double share = sched.getWeights().getWeight(type) * w2rRatio;
share = Math.max(share, getResourceValue(sched.getMinShare(), type));
share = Math.min(share, getResourceValue(sched.getMaxShare(), type));
return (int) share;
}
(3)二分查找法逼近 R
满足下面两个条件中的一个即可终止二分查找:

W == T(步骤 2 中的 W 和 T)
超过 25 次(COMPUTE_FAIR_SHARES_ITERATIONS)

(4)使用 R 设置 fair share
设置 fair share 时,可以看到区分了 Steady Fair Share 和 Instantaneous Fair Share。
for (Schedulable sched : schedulables) {
if (isSteadyShare) {
setResourceValue(computeShare(sched, right, type),
((FSQueue) sched).getSteadyFairShare(), type);
} else {
setResourceValue(
computeShare(sched, right, type), sched.getFairShare(), type);
}
}
2 Instaneous Fair Share 计算方式
图(5)Instaneous Fair Share 计算流程
该计算方式与 steady fair 的计算调用栈是一致的,最终都要使用到 computeSharesInternal 方法,唯一不同的是计算的时机不一样。steady fair 只有在 addNode 的时候才会重新计算一次,而 Instantaneous Fair Share 是由 update 线程定期去更新。
此处强调的一点是,在上文中我们已经分析如果是计算 Instantaneous Fair Share,并且队列为空,那么该队列就是 fixed 队列,也就是非活跃队列,那么计算 fair share 时,该队列是不会去瓜分集群的内存资源。
而 update 线程的更新频率就是由 yarn.scheduler.fair.update-interval-ms 来决定的。
private class UpdateThread extends Thread {

@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
//yarn.scheduler.fair.update-interval-ms
Thread.sleep(updateInterval);
long start = getClock().getTime();
// 更新 Instantaneous Fair Share
update();
// 抢占资源
preemptTasksIfNecessary();
long duration = getClock().getTime() – start;
fsOpDurations.addUpdateThreadRunDuration(duration);
} catch (InterruptedException ie) {
LOG.warn(“Update thread interrupted. Exiting.”);
return;
} catch (Exception e) {
LOG.error(“Exception in fair scheduler UpdateThread”, e);
}
}
}
}
3 maxAMShare 意义
handle 线程如果接收到 NODE_UPDATE 事件,如果(1)该 node 的机器内存资源满足条件,(2)并且有 ACCEPTED 状态的 Application,那么将会为该待运行的 APP 的 AM 分配一个 container,使该 APP 在所处的 queue 中跑起来。但在分配之前还需要一道检查 canRuunAppAM。能否通过 canRuunAppAM, 就是由 maxAMShare 参数限制。
public boolean canRunAppAM(Resource amResource) {
// 默认是 0.5f
float maxAMShare =
scheduler.getAllocationConfiguration().getQueueMaxAMShare(getName());
if (Math.abs(maxAMShare – -1.0f) < 0.0001) {
return true;
}
// 该队的 maxAMResource=maxAMShare * fair share(Instantaneous Fair Share)
Resource maxAMResource = Resources.multiply(getFairShare(), maxAMShare);
//amResourceUsage 是该队列已经在运行的 App 的 AM 所占资源累加和
Resource ifRunAMResource = Resources.add(amResourceUsage, amResource);
// 查看当前 ifRunAMResource 是否超过 maxAMResource
return !policy
.checkIfAMResourceUsageOverLimit(ifRunAMResource, maxAMResource);
}
上面代码我们用公式来描述:

队列中运行的 APP 为 An,每个 APP 的 AM 占用资源为 R
ACCEPTED 状态(待运行)的 APP 的 AM 大小为 R1
队列的 fair share 为 QueFS
队列的 maxAMResource=maxAMShare * QueFS

ifRunAMResource=A1.R+A2.R+…+An.R+R1

ifRunAMResource > maxAMResource,则该队列不能接纳待运行的 APP

之所以要关注这个参数,是因为 EMR 很多客户在使用公平队列时会反映集群的总资源没有用满,但是还有 APP 在排队,没有跑起来,如下图所示:
图(6)APP 阻塞实例
公平调度默认策略不关心 Core 的资源,只关心 Memory。图中 Memory 用了 292G,还有 53.6G 的内存没用,APP 就可以阻塞。原因就是 default 队列所有运行中 APP 的 AM 资源总和超过了(345.6 * 0.5),导致 APP 阻塞。
总结
通过分析 fair share 的计算流程,搞清楚 yarn 的基本概念和部分参数,从下面的表格对比中,我们也可以看到官方的文档对概念和参数的描述是比较难懂的。剩余的参数放在第二篇 - 公平调度之抢占中分析。

官方描述
总结

Steady Fair Share
The queue’s steady fair share of resources. These shares consider all the queues irrespective of whether they are active (have running applications) or not. These are computed less frequently and change only when the configuration or capacity changes.They are meant to provide visibility into resources the user can expect, and hence displayed in the Web UI.
每个非 fixed 队列内存资源量的固定理论值。Steady Fair Share 在 RM 初期工作后不再轻易改变,只有后续在增加节点改编配置(addNode)时才会重新计算。RM 的初期工作也是 handle 线程把集群的每个节点添加到调度器中(addNode)。

Instantaneous Fair Share
The queue’s instantaneous fair share of resources. These shares consider only actives queues (those with running applications), and are used for scheduling decisions. Queues may be allocated resources beyond their shares when other queues aren’t using them. A queue whose resource consumption lies at or below its instantaneous fair share will never have its containers preempted.
每个非 fixed 队列 (活跃队列) 的内存资源量的实际值,是在动态变化的,由 update 线程去定时更新队列的 fair share。yarn 里的 fair share 如果没有专门指代,都是指的的 Instantaneous Fair Share。

yarn.scheduler.fair.update-interval-ms
The interval at which to lock the scheduler and recalculate fair shares, recalculate demand, and check whether anything is due for preemption. Defaults to 500 ms.
update 线程的间隔时间,该线程的工作是 1 更新 fair share,2 检查是否需要抢占资源。

maxAMShare
limit the fraction of the queue’s fair share that can be used to run application masters. This property can only be used for leaf queues. For example, if set to 1.0f, then AMs in the leaf queue can take up to 100% of both the memory and CPU fair share. The value of -1.0f will disable this feature and the amShare will not be checked. The default value is 0.5f.
队列所有运行中的 APP 的 AM 资源总和必须不能超过 maxAMShare * fair share

问答如何将 yarn 升级到特定版本?相关阅读 Yarn 与 MesosSpark on Yarn | Spark,从入门到精通 YARN 三大模块介绍【每日课程推荐】机器学习实战!快速入门在线广告业务及 CTR 相应知识

退出移动版