为了深刻学习 kube-scheduler,本系从源码和实战角度深度学 习kube-scheduler,该系列一共分6篇文章,如下:
- kube-scheduler 整体架构
- 初始化一个 scheduler
- 一个 Pod 是如何调度的
- 如何开发一个属于本人的scheduler插件
- 开发一个 prefilter 扩大点的插件
- 开发一个 socre 扩大点的插件
上一篇文章咱们讲了一个 kube-scheduler 是怎么初始化进去的,有了 调度器之后就得开始让他干活了 这一篇文章咱们来讲讲一个 Pod 是怎么被调度到某个 Node 的。
我把调度一个 Pod 分为3个阶段
- 获取须要被调度的 Pod
- 运行每个扩大点的所有插件,给 Pod 抉择一个最合适的 Node
- 将 Pod 绑定到选出来的 Node
感知 Pod
要可能获取到 Pod 的前提是:kube-scheduler 能感知到有 Pod 须要被调度,得悉有 Pod 须要被调度后还须要有中央寄存被调度的 Pod 的信息。为了感知有 Pod 须要被调度,kube-scheduler 启动时通过 Informer watch Pod 的变动,它把待调度的 Pod 分了两种状况,代码如下
// pkg/scheduler/eventhandlers.gofunc addAllEventHandlers(...) { //曾经调度过的 Pod 则加到本地缓存,并判断是退出到调度队列还是退出到backoff队列 informerFactory.Core().V1().Pods().Informer().AddEventHandler( cache.FilteringResourceEventHandler{ FilterFunc: func(obj interface{}) bool { switch t := obj.(type) { case *v1.Pod: return assignedPod(t) case cache.DeletedFinalStateUnknown: if _, ok := t.Obj.(*v1.Pod); ok { // The carried object may be stale, so we don't use it to check if // it's assigned or not. Attempting to cleanup anyways. return true } utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched)) return false default: utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj)) return false } }, Handler: cache.ResourceEventHandlerFuncs{ AddFunc: sched.addPodToCache, UpdateFunc: sched.updatePodInCache, DeleteFunc: sched.deletePodFromCache, }, }, ) // 没有调度过的Pod,放到调度队列 informerFactory.Core().V1().Pods().Informer().AddEventHandler( cache.FilteringResourceEventHandler{ FilterFunc: func(obj interface{}) bool { switch t := obj.(type) { case *v1.Pod: return !assignedPod(t) && responsibleForPod(t, sched.Profiles) case cache.DeletedFinalStateUnknown: if pod, ok := t.Obj.(*v1.Pod); ok { // The carried object may be stale, so we don't use it to check if // it's assigned or not. return responsibleForPod(pod, sched.Profiles) } utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched)) return false default: utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj)) return false } }, Handler: cache.ResourceEventHandlerFuncs{ AddFunc: sched.addPodToSchedulingQueue, UpdateFunc: sched.updatePodInSchedulingQueue, DeleteFunc: sched.deletePodFromSchedulingQueue, }, }, )......}
- 曾经调度过的 Pod
辨别是不是调度过的 Pod 是通过:len(pod.Spec.NodeName) != 0 来判断的,因为调度过的 Pod 这个字段总是会被赋予被选中的 Node 名字。然而,既然是调度过的 Pod 上面的代码中为什么还要辨别:sched.addPodToCache 和 sched.updatePodInCache 呢?起因在于咱们能够在创立 Pod 的时候人为给它调配一个 Node(即给 pod.Spec.NodeName 赋值),这样 kube-scheduler 在监听到该 Pod 后,判断这个 Pod 的该字段不为空就会认为这个 Pod 曾经调度过了,然而这个字段不为空并不是 kube-scheduler 调度的后果,而是人为赋值的,那么 kube-scheduler 的 cache(能够参考上一篇 cache 相干的内容)中没有这个 Pod 的信息,所以就须要将 Pod 信息退出到 cache 中。至于在监听到 Pod 后 sched.addPodToCache 和 sched.updatePodInCache 哪个会被调用,这是 Informer 决定的,它会依据监听到变动的 Pod 和 Informer 的本地缓存做比照,要是缓存中没有这个 Pod,那么就调用 add 函数,否则就调用 update 函数。
退出或更新缓存后,还须要做一件事:去 unschedulablePods(调度失败的Pod) 中获取 Pod,这些 Pod 的亲和性和刚刚退出的这个 Pod 匹配,而后依据上面的规定判断是把 Pod 放入 backoffQ 还是放入 activeQ
- 依据这个 Pod 尝试被调度的次数计算这个 Pod 下次调度应该期待的工夫,计算规定为指数级增长,即依照1s,2s,4s,8s这样的工夫进行期待,然而这个等待时间也不会有限减少,会受到 podMaxBackoffDuration(默认10s) 的限度,这个参数示意是一个 Pod 处于 backoff 的最大工夫,如果期待的工夫如果超过了 podMaxBackoffDuration,那么就只期待 podMaxBackoffDuration 就会再次被调度;
- 以后工夫 - 上次调度的工夫 > 依据(1)获取到的应该期待的工夫,那么就把Pod放到activeQ外面,将会被调度,否则Pod被放入 backoff 队列里期待。
从下面咱们能够看到,一个 Pod 的变动会触发此前调度失败的 Pod 从新判断是否能够被调度
- 没有调度过的 Pod
len(pod.Spec.NodeName) = 0,那么这个 Pod 没有被调度过或者是此前调度过然而调度失败的(用户批改了 Pod 的配置导致 Pod 发生变化,又被 kube-scheduler 感知到了),如果是没有调度过的 Pod 那么间接退出到 activeQ,如果是调度失败的 Pod 则根据上述规定判断是退出 backoffQ 还是 activeQ。退出到 activeQ 会马上被取走,而后开始调度。
那么那些因为调度失败而被放入 unscheduleable 的 Pod 还有其余机会(下面说的有新 Pod 创立是一个机会)从新被调度么?答案是有的,否则他们就“被饿死了”,有两种路径:1. 定期强制将 unscheduleable 的 Pod 放入 backoffQ 或 activeQ,定期将 backoffQ 期待超时的 Pod 放入 ac activeQ;2. 集群内其余相干资源变动时,判断 unscheduleable 中的 Pod 是不是要放入 backoffQ 或 activeQ,其实这跟有 Pod 发生变化的状况是一样的。
第一种状况
在 kube-scheduler启动的时候中会起两个协程,他们会定期把 backoffQ 和 unscheduleable 外面的 Pod拿到activeQ外面去
func (p *PriorityQueue) Run() { go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop) go wait.Until(p.flushUnschedulablePodsLeftover, 30*time.Second, p.stop)}
flushUnschedulablePodsLeftover
func (p *PriorityQueue) flushUnschedulablePodsLeftover() { p.lock.Lock() defer p.lock.Unlock() var podsToMove []*framework.QueuedPodInfo currentTime := p.clock.Now() for _, pInfo := range p.unschedulablePods.podInfoMap { lastScheduleTime := pInfo.Timestamp if currentTime.Sub(lastScheduleTime) > p.podMaxInUnschedulablePodsDuration { podsToMove = append(podsToMove, pInfo) } } if len(podsToMove) > 0 { p.movePodsToActiveOrBackoffQueue(podsToMove, UnschedulableTimeout) }}
func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.QueuedPodInfo, event framework.ClusterEvent) { activated := false for _, pInfo := range podInfoList { // If the event doesn't help making the Pod schedulable, continue. // Note: we don't run the check if pInfo.UnschedulablePlugins is nil, which denotes // either there is some abnormal error, or scheduling the pod failed by plugins other than PreFilter, Filter and Permit. // In that case, it's desired to move it anyways. if len(pInfo.UnschedulablePlugins) != 0 && !p.podMatchesEvent(pInfo, event) { continue } pod := pInfo.Pod if p.isPodBackingoff(pInfo) { if err := p.podBackoffQ.Add(pInfo); err != nil { klog.ErrorS(err, "Error adding pod to the backoff queue", "pod", klog.KObj(pod)) } else { metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event.Label).Inc() p.unschedulablePods.delete(pod) } } else { if err := p.activeQ.Add(pInfo); err != nil { klog.ErrorS(err, "Error adding pod to the scheduling queue", "pod", klog.KObj(pod)) } else { metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event.Label).Inc() p.unschedulablePods.delete(pod) } } } p.moveRequestCycle = p.schedulingCycle if activated { p.cond.Broadcast() } }x
将在 unscheduleable 外面停留时长超过 podMaxInUnschedulablePodsDuration(默认是5min)的pod放入到 ActiveQ 或 BackoffQueue,具体是放到哪个队列外面,还是依据咱们上文说的那个理论计算规定来。这么做的起因就是给那些“问题少年”一次重新做人的机会,也不能一犯错误(调度失败)就彻底打入死牢了。
flushBackoffQCompleted
去 backoffQ 获取期待完结的 Pod,放入 activeQ
func (p *PriorityQueue) flushBackoffQCompleted() { p.lock.Lock() defer p.lock.Unlock() activated := false for { rawPodInfo := p.podBackoffQ.Peek() if rawPodInfo == nil { break } pod := rawPodInfo.(*framework.QueuedPodInfo).Pod boTime := p.getBackoffTime(rawPodInfo.(*framework.QueuedPodInfo)) if boTime.After(p.clock.Now()) { break } _, err := p.podBackoffQ.Pop() if err != nil { klog.ErrorS(err, "Unable to pop pod from backoff queue despite backoff completion", "pod", klog.KObj(pod)) break } p.activeQ.Add(rawPodInfo) metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc() activated = true } if activated { p.cond.Broadcast() } }
第二种状况
集群内资源发生变化
- 有新节点退出集群
- 节点配置或状态发生变化
- 曾经存在的 Pod 发生变化
- 集群内有Pod被删除
informerFactory.Core().V1().Nodes().Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: sched.addNodeToCache, UpdateFunc: sched.updateNodeInCache, DeleteFunc: sched.deleteNodeFromCache, },)
新退出节点
func (sched *Scheduler) addNodeToCache(obj interface{}) { node, ok := obj.(*v1.Node) if !ok { klog.ErrorS(nil, "Cannot convert to *v1.Node", "obj", obj) return } nodeInfo := sched.Cache.AddNode(node) klog.V(3).InfoS("Add event for node", "node", klog.KObj(node)) sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.NodeAdd, preCheckForNode(nodeInfo))}
func preCheckForNode(nodeInfo *framework.NodeInfo) queue.PreEnqueueCheck { // Note: the following checks doesn't take preemption into considerations, in very rare // cases (e.g., node resizing), "pod" may still fail a check but preemption helps. We deliberately // chose to ignore those cases as unschedulable pods will be re-queued eventually. return func(pod *v1.Pod) bool { admissionResults := AdmissionCheck(pod, nodeInfo, false) if len(admissionResults) != 0 { return false } _, isUntolerated := corev1helpers.FindMatchingUntoleratedTaint(nodeInfo.Node().Spec.Taints, pod.Spec.Tolerations, func(t *v1.Taint) bool { return t.Effect == v1.TaintEffectNoSchedule }) return !isUntolerated }}
能够看到,当有节点退出集群的时候,会把 unscheduleable 外面的Pod 顺次拿进去做上面的判断:
- Pod 对 节点的亲和性
- Pod 中 Nodename不为空 那么判断新退出节点的Name判断pod Nodename是否相等
- 判断 Pod 中容器对端口的要求是否和新退出节点曾经被应用的端口抵触
- Pod 是否容忍了Node的Pod
只有上述4个条件都满足,那么新退出节点这个事件才会触发这个未被调度的Pod退出到 backoffQ 或者 activeQ,至于是退出哪个queue,下面曾经剖析过了
节点更新
func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) { oldNode, ok := oldObj.(*v1.Node) if !ok { klog.ErrorS(nil, "Cannot convert oldObj to *v1.Node", "oldObj", oldObj) return } newNode, ok := newObj.(*v1.Node) if !ok { klog.ErrorS(nil, "Cannot convert newObj to *v1.Node", "newObj", newObj) return } nodeInfo := sched.Cache.UpdateNode(oldNode, newNode) // Only requeue unschedulable pods if the node became more schedulable. if event := nodeSchedulingPropertiesChange(newNode, oldNode); event != nil { sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(*event, preCheckForNode(nodeInfo)) }}
func nodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) *framework.ClusterEvent { if nodeSpecUnschedulableChanged(newNode, oldNode) { return &queue.NodeSpecUnschedulableChange } if nodeAllocatableChanged(newNode, oldNode) { return &queue.NodeAllocatableChange } if nodeLabelsChanged(newNode, oldNode) { return &queue.NodeLabelChange } if nodeTaintsChanged(newNode, oldNode) { return &queue.NodeTaintChange } if nodeConditionsChanged(newNode, oldNode) { return &queue.NodeConditionChange } return nil}
首先是判断节点是何种配置产生了变动,有如下状况
- 节点可调度状况发生变化
- 节点可分配资源发生变化
- 节点标签发生变化
- 节点污点发生变化
- 节点状态发生变化
如果某个 Pod 调度失败的起因能够匹配到下面其中一个起因,那么节点更新这个事件才会触发这个未被调度的Pod退出到 backoffQ 或者 activeQ
informerFactory.Core().V1().Pods().Informer().AddEventHandler( cache.FilteringResourceEventHandler{ FilterFunc: func(obj interface{}) bool { switch t := obj.(type) { case *v1.Pod: return assignedPod(t) case cache.DeletedFinalStateUnknown: if _, ok := t.Obj.(*v1.Pod); ok { // The carried object may be stale, so we don't use it to check if // it's assigned or not. Attempting to cleanup anyways. return true } utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched)) return false default: utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj)) return false } }, Handler: cache.ResourceEventHandlerFuncs{ AddFunc: sched.addPodToCache, UpdateFunc: sched.updatePodInCache, DeleteFunc: sched.deletePodFromCache, }, },)
曾经存在的 Pod 发生变化
func (sched *Scheduler) addPodToCache(obj interface{}) { pod, ok := obj.(*v1.Pod) if !ok { klog.ErrorS(nil, "Cannot convert to *v1.Pod", "obj", obj) return } klog.V(3).InfoS("Add event for scheduled pod", "pod", klog.KObj(pod)) if err := sched.Cache.AddPod(pod); err != nil { klog.ErrorS(err, "Scheduler cache AddPod failed", "pod", klog.KObj(pod)) } sched.SchedulingQueue.AssignedPodAdded(pod)}
func (p *PriorityQueue) AssignedPodAdded(pod *v1.Pod) { p.lock.Lock() p.movePodsToActiveOrBackoffQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod), AssignedPodAdd) p.lock.Unlock()}
func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*framework.QueuedPodInfo { var nsLabels labels.Set nsLabels = interpodaffinity.GetNamespaceLabelsSnapshot(pod.Namespace, p.nsLister) var podsToMove []*framework.QueuedPodInfo for _, pInfo := range p.unschedulablePods.podInfoMap { for _, term := range pInfo.RequiredAffinityTerms { if term.Matches(pod, nsLabels) { podsToMove = append(podsToMove, pInfo) break } } } return podsToMove}
能够看到,曾经存在的Pod发生变化后,会把这个Pod亲和性配置顺次和 unscheduleable 外面的Pod匹配,如果可能匹配上,那么节点更新这个事件才会触发这个未被调度的Pod退出到 backoffQ 或者 activeQ。
集群内有Pod删除
func (sched *Scheduler) deletePodFromCache(obj interface{}) { var pod *v1.Pod switch t := obj.(type) { case *v1.Pod: pod = t case cache.DeletedFinalStateUnknown: var ok bool pod, ok = t.Obj.(*v1.Pod) if !ok { klog.ErrorS(nil, "Cannot convert to *v1.Pod", "obj", t.Obj) return } default: klog.ErrorS(nil, "Cannot convert to *v1.Pod", "obj", t) return } klog.V(3).InfoS("Delete event for scheduled pod", "pod", klog.KObj(pod)) if err := sched.Cache.RemovePod(pod); err != nil { klog.ErrorS(err, "Scheduler cache RemovePod failed", "pod", klog.KObj(pod)) } sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.AssignedPodDelete, nil)}
能够看到,Pod删除工夫不像其余工夫须要做额定的判断,这个preCheck函数是空的,所以所有 unscheduleable 外面的Pod都会被放到 activeQ 或 backoffQ 中。
从下面的状况,咱们能够看到,集群内有事件发生变化,是能够减速调度失败的Pod被从新调度的过程的。惯例的是,调度失败的 Pod 须要等5min 而后才会被重新加入 backoffQ 或 activeQ。backoffQ外面的Pod也须要等一段时间才会从新调度。这也就是为什么,当你批改节点配置的时候,能看到Pod马上从新被调度的起因
下面就是一个Pod调度失败后,从新触发调度的状况了。
取出 Pod
Scheduler 中有个成员 NextPod 会从 activeQ 队列中尝试获取一个待调度的 Pod,该函数在 SchedulePod 中被调用,如下:
// 启动 Schedulerfunc (sched *Scheduler) Run(ctx context.Context) { sched.SchedulingQueue.Run() go wait.UntilWithContext(ctx, sched.scheduleOne, 0) <-ctx.Done() sched.SchedulingQueue.Close()}// 尝试调度一个 Pod,所以 Pod 的调度入口func (sched *Scheduler) scheduleOne(ctx context.Context) { // 会始终阻塞,直到获取到一个Pod ...... podInfo := sched.NextPod() ......}
NextPod 它被赋予如下函数:
// pkg/scheduler/internal/queue/scheduling_queue.gofunc MakeNextPodFunc(queue SchedulingQueue) func() *framework.QueuedPodInfo { return func() *framework.QueuedPodInfo { podInfo, err := queue.Pop() if err == nil { klog.V(4).InfoS("About to try and schedule pod", "pod", klog.KObj(podInfo.Pod)) for plugin := range podInfo.UnschedulablePlugins { metrics.UnschedulableReason(plugin, podInfo.Pod.Spec.SchedulerName).Dec() } return podInfo } klog.ErrorS(err, "Error while retrieving next pod from scheduling queue") return nil }}
Pop 会始终阻塞,直到 activeQ 长度大于0,而后取出一个 Pod 返回
// pkg/scheduler/internal/queue/scheduling_queue.gofunc (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) { p.lock.Lock() defer p.lock.Unlock() for p.activeQ.Len() == 0 { // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When Close() is called, the p.closed is set and the condition is broadcast, // which causes this loop to continue and return from the Pop(). if p.closed { return nil, fmt.Errorf(queueClosed) } p.cond.Wait() } obj, err := p.activeQ.Pop() if err != nil { return nil, err } pInfo := obj.(*framework.QueuedPodInfo) pInfo.Attempts++ p.schedulingCycle++ return pInfo, nil}
调度 Pod
func (sched *Scheduler) scheduleOne(ctx context.Context) { // 取出 Pod podInfo := sched.NextPod() ... // 依据 Pod 的调度名字,获取之前初始化好的调度框架(framework) fwk, err := sched.frameworkForPod(pod) ... // 开始执行插件,包含 filter, socre 两个扩大点内的所有插件,获取一个最合适 Pod 的节点 scheduleResult, err := sched.SchedulePod(schedulingCycleCtx, fwk, state, pod) // 如果获取节点失败,则开始运行 postFilter 开始抢占一个 Pod if err != nil { result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatusMap) } .... // 将 Pod 放入 assumedPod 存储,即假如 Pod 曾经调度胜利 err = sched.assume(assumedPod, scheduleResult.SuggestedHost) // 运行 Reserve 插件 fwk.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) ... // 运行 Permit 插件 fwk.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) ... // 启动一个协程,开始绑定,主流程到了这里就完结了,而后开始新的一轮调度; go func() { // 执行 preBind 插件 fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) ... // 执行绑定插件,会调用 kube-apiserver 写入etcd 调度后果,就是给 Pod 赋予 Nodename err := sched.bind(bindingCycleCtx, fwk, assumedPod, scheduleResult.SuggestedHost, state) ... // 执行 postBind fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) }
- 执行 filter 类型扩大点(包含preFilter,filter,postFilter)插件,选出所有合乎 Pod 的 Node,如果无奈找到合乎的 Node, 则把 Pod 退出 unscheduleable 中,此次调度完结;
- 执行 score 扩大点插件,给所有 Node 打分;
- 拿出得分最高的 Node;
- assume Pod。这一步就是乐观假如 Pod 曾经调度胜利,更新缓存中 Node 和 PodStats 信息,到了这里scheduling cycle就曾经完结了,而后会开启新的一轮调度。至于真正的绑定,则会新起一个协程。
- 执行 reserve 插件;
- 启动协程绑定 Pod 到 Node上。实际上就是批改 Pod.spec.nodeName: 选定的node名字,而后调用 kube-apiserver 接口写入 etcd。如果绑定失败了,那么移除缓存中此前退出的信息,而后把 Pod 放入activeQ 中,后续从新调度。执行 postBinding,该步没有实现的插件没所以没有做任何事。
好了,到了这里一个 Pod 如果可能失常的被调度的话,那么流程就完结了。如果调度失败的话,Pod会被放入 unscheduleable 中,后续还会对 unscheduleable 中的 Pod 从新调度。
本文由博客一文多发平台 OpenWrite 公布!