Kubernetes 调度器 Kubernetes 是一个基于容器的分布式调度器,实现了自己的调度模块。在 Kubernetes 集群中,调度器作为一个独立模块通过 pod 运行。从几个方面介绍 Kubernetes 调度器。
调度器工作方式 Kubernetes 中的调度器,是作为单独组件运行,一般运行在 Master 中,和 Master 数量保持一致。通过 Raft 协议选出一个实例作为 Leader 工作,其他实例 Backup。当 Master 故障,其他实例之间继续通过 Raft 协议选出新的 Master 工作。其工作模式如下:
调度器内部维护一个调度的 pods 队列 podQueue,并监听 APIServer。当我们创建 Pod 时,首先通过 APIServer 往 ETCD 写入 pod 元数据。调度器通过 Informer 监听 pods 状态,当有新增 pod 时,将 pod 加入到 podQueue 中。调度器中的主进程,会不断的从 podQueue 取出的 pod,并将 pod 进入调度分配节点环节调度环节分为两个步奏,Filter 过滤满足条件的节点、Prioritize 根据 pod 配置,例如资源使用率,亲和性等指标,给这些节点打分,最终选出分数最高的节点。分配节点成功,调用 apiServer 的 binding pod 接口,将 pod.Spec.NodeName 设置为所分配的那个节点。节点上的 kubelet 同样监听 ApiServer,如果发现有新的 pod 被调度到所在节点,调用本地的 dockerDaemon 运行容器。假如调度器尝试调度 Pod 不成功,如果开启了优先级和抢占功能,会尝试做一次抢占,将节点中优先级较低的 pod 删掉,并将待调度的 pod 调度到节点上。如果未开启,或者抢占失败,会记录日志,并将 pod 加入 podQueue 队尾。
实现细节 kube-scheduling 是一个独立运行的组件,主要工作内容在 Run 函数。
这里面主要做几件事情:
初始化一个 Scheduler 实例 sched,传入各种 Informer,为关心的资源变化建立监听并注册 handler,例如维护 podQuene 注册 events 组件,设置日志注册 http/https 监听,提供健康检查和 metrics 请求运行主要的调度内容入口 sched.run()。如果设置 –leader-elect=true,代表启动多个实例,通过 Raft 选主,实例只有当被选为 master 后运行主要工作函数 sched.run。调度核心内容在 sched.run() 函数,它会启动一个 go routine 不断运行 sched.scheduleOne,每次运行代表一个调度周期。
func (sched *Scheduler) Run() {
if !sched.config.WaitForCacheSync() {
return
}
go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
} 我们看下 sched.scheduleOne 主要做什么
func (sched *Scheduler) scheduleOne() { pod := sched.config.NextPod() …. // do some pre check scheduleResult, err := sched.schedule(pod)
if err != nil {
if fitError, ok := err.(*core.FitError); ok {
if !util.PodPriorityEnabled() || sched.config.DisablePreemption {
….. // do some log
} else {
sched.preempt(pod, fitError)
}
}
}
…
// Assume volumes first before assuming the pod.
allBound, err := sched.assumeVolumes(assumedPod, scheduleResult.SuggestedHost)
…
fo func() {
// Bind volumes first before Pod
if !allBound {
err := sched.bindVolumes(assumedPod)
if err != nil {
klog.Errorf(“error binding volumes: %v”, err)
metrics.PodScheduleErrors.Inc()
return
}
}
err := sched.bind(assumedPod, &v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
Target: v1.ObjectReference{
Kind: “Node”,
Name: scheduleResult.SuggestedHost,
},
})
}
} 在 sched.scheduleOne 中,主要会做几件事情
通过 sched.config.NextPod(), 从 podQuene 中取出 pod 运行 sched.schedule,尝试进行一次调度。假如调度失败,如果开启了抢占功能,会调用 sched.preempt 尝试进行抢占,驱逐一些 pod,为被调度的 pod 预留空间,在下一次调度中生效。如果调度成功,执行 bind 接口。在执行 bind 之前会为 pod volume 中声明的的 PVC 做 provision。sched.schedule 是主要的 pod 调度逻辑
func (g genericScheduler) Schedule(pod v1.Pod, nodeLister algorithm.NodeLister) (result ScheduleResult, err error) {
// Get node list
nodes, err := nodeLister.List()
// Filter
filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)
if err != nil {
return result, err
}
// Priority
priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
if err != nil {
return result, err
}
// SelectHost
host, err := g.selectHost(priorityList)
return ScheduleResult{
SuggestedHost: host,
EvaluatedNodes: len(filteredNodes) + len(failedPredicateMap),
FeasibleNodes: len(filteredNodes),
}, err
} 调度主要分为三个步奏:
Filters:过滤条件不满足的节点 PrioritizeNodes:在条件满足的节点中做 Scoring,获取一个最终打分列表 priorityListselectHost: 在 priorityList 中选取分数最高的一组节点,从中根据 round-robin 方式选取一个节点。
接下来我们继续拆解,分别看下这三个步奏会怎么做
FiltersFilters 相对比较容易,调度器默认注册了一系列的 predicates 方法,调度过程为并发调用每个节点的 predicates 方法。最终得到一个 node list,包含符合条件的节点对象。
func (g genericScheduler) findNodesThatFit(pod v1.Pod, nodes []v1.Node) ([]v1.Node, FailedPredicateMap, error) {
if len(g.predicates) == 0 {
filtered = nodes
} else {
allNodes := int32(g.cache.NodeTree().NumNodes())
numNodesToFind := g.numFeasibleNodesToFind(allNodes)
checkNode := func(i int) {
nodeName := g.cache.NodeTree().Next()
// 此处会调用这个节点的所有 predicates 方法
fits, failedPredicates, err := podFitsOnNode(
pod,
meta,
g.cachedNodeInfoMap[nodeName],
g.predicates,
g.schedulingQueue,
g.alwaysCheckAllPredicates,
)
if fits {
length := atomic.AddInt32(&filteredLen, 1)
if length > numNodesToFind {
// 如果当前符合条件的节点数已经足够,会停止计算。
cancel()
atomic.AddInt32(&filteredLen, -1)
} else {
filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node()
}
}
}
// 并发调用 checkNode 方法
workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)
filtered = filtered[:filteredLen]
}
return filtered, failedPredicateMap, nil
} 值得注意的是,1.13 中引入了 FeasibleNodes 机制,为了提高大规模集群的调度性能。允许我们通过 bad-percentage-of-nodes-to-score 参数,设置 filter 的计算比例(默认 50%),当节点数大于 100 个,在 filters 的过程,只要满足条件的节点数超过这个比例,就会停止 filter 过程,而不是计算全部节点。举个例子,当节点数为 1000,我们设置的计算比例为 30%,那么调度器认为 filter 过程只需要找到满足条件的 300 个节点,filter 过程中当满足条件的节点数达到 300 个,filter 过程结束。这样 filter 不用计算全部的节点,同样也会降低 Prioritize 的计算数量。但是带来的影响是 pod 有可能没有被调度到最合适的节点。
PrioritizePrioritize 的目的是帮助 pod,为每个符合条件的节点打分,帮助 pod 找到最合适的节点。同样调度器默认注册了一系列 Prioritize 方法。这是 Prioritize 对象的数据结构
// PriorityConfig is a config used for a priority function.type PriorityConfig struct {
Name string
Map PriorityMapFunction
Reduce PriorityReduceFunction
// TODO: Remove it after migrating all functions to
// Map-Reduce pattern.
Function PriorityFunction
Weight int
} 每个 PriorityConfig 代表一个评分的指标,会考虑服务的均衡性,节点的资源分配等因素。一个 PriorityConfig 的主要 Scoring 过程分为 Map 和 Reduce,
Map 过程计算每个节点的分数值 Reduce 过程会将当前 PriorityConfig 的所有节点的打分结果再做一次处理。所有 PriorityConfig 计算完毕后,将每个 PriorityConfig 的数值乘以对应的权重,并按照节点再做一次聚合。
workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
nodeInfo := nodeNameToInfo[nodes[index].Name]
for i := range priorityConfigs {
var err error
results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
}
})
for i := range priorityConfigs {
wg.Add(1)
go func(index int) {
defer wg.Done()
if err := priorityConfigs[index].Reduce(pod, meta, nodeNameToInfo, results[index]);
}(i)
}
wg.Wait()
// Summarize all scores.
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
for i := range nodes {
result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
for j := range priorityConfigs {
result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
}
}
此外 Filter 和 Prioritize 都支持 extener scheduler 的调用,本文不做过多阐述。
现状目前 kubernetes 调度器的调度方式是 Pod-by-Pod,也是当前调度器不足的地方。主要瓶颈如下
kubernets 目前调度的方式,每个 pod 会对所有节点都计算一遍,当集群规模非常大,节点数很多时,pod 的调度时间会非常慢。这也是 percentage-of-nodes-to-score 尝试要解决的问题 pod-by-pod 的调度方式不适合一些机器学习场景。kubernetes 早期设计主要为在线任务服务,在一些离线任务场景,比如分布式机器学习中,我们需要一种新的算法 gang scheduler,pod 也许对调度的即时性要求没有那么高,但是提交任务后,只有当一个批量计算任务的所有 workers 都运行起来时,才会开始计算任务。pod-by-pod 方式在这个场景下,当资源不足时非常容易引起资源死锁。当前调度器的扩展性不是十分好,特定场景的调度流程都需要通过硬编码实现在主流程中,比如我们看到的 bindVolume 部分,同样也导致 Gang Scheduler 无法在当前调度器框架下通过原生方式实现。Kubernetes 调度器的发展社区调度器的发展,也是为了解决这些问题
调度器 V2 框架,增强了扩展性,也为在原生调度器中实现 Gang schedule 做准备。Kube-batch:一种 Gang schedule 的实现 https://github.com/kubernetes…poseidon:Firmament 一种基于网络图调度算法的调度器,poseidon 是将 Firmament 接入 Kubernetes 调度器的实现 https://github.com/kubernetes… 接下来,我们会分析一个具体的调度器方法实现,帮助理解拆解调度器的过程。并且关注分析调度器的社区动态。
本文作者:萧元
阅读原文
本文为云栖社区原创内容,未经允许不得转载。