关于kubernetes:kubescheduler深度剖析与开发二

4次阅读

共计 14536 个字符,预计需要花费 37 分钟才能阅读完成。

为了深刻学习 kube-scheduler,本系从源码和实战角度深度学 习 kube-scheduler,该系列一共分 6 篇文章,如下:

  • kube-scheduler 整体架构
  • 初始化一个 scheduler
  • 一个 Pod 是如何调度的
  • 如何开发一个属于本人的 scheduler 插件
  • 开发一个 prefilter 扩大点的插件
  • 开发一个 socre 扩大点的插件

上一篇,咱们说了 kube-scheduler 的整体架构,是从整体的架构方面来思考的,本文咱们说说 kube-scheduler 是如何初始化进去的,kube-scheduler 外面都有些什么货色。

因为 kube-scheduler 源码内容比拟多,对于那些不是要害的货色,就疏忽不做探讨。

Scheduler 之 Profiles

上面咱们先看下 Scheduler 的构造

type Scheduler struct {
   Cache internalcache.Cache
   
   Extenders []framework.Extender
   
   NextPod func() *framework.QueuedPodInfo
   
   FailureHandler FailureHandlerFn
   
   SchedulePod func(ctx context.Context, fwk framework.Framework, state  *framework.CycleState, pod *v1.Pod) (ScheduleResult, error)
   
   StopEverything <-chan struct{}
   
   SchedulingQueue internalqueue.SchedulingQueue
   
   Profiles profile.Map
   
   client clientset.Interface
   
   nodeInfoSnapshot *internalcache.Snapshot
   
   percentageOfNodesToScore int32
   
   nextStartNodeIndex int
}

上一篇咱们说过,为一个 Pod 抉择一个 Node 是依照固定程序运行扩大点的;在扩大点内,是依照插件注册的程序运行插件,如下图

下面的这些扩大点在 kube-scheduler 中是固定的,而且也不反对减少扩大点(实际上有这些扩大点曾经足够了),而且扩大点程序也是固定执行的。

下图是插件(以 preFilter 为例)运行的程序,扩大点内的插件,你既能够调整插件的执行程序(理论很少会批改默认的插件执行程序),能够敞开某个内置插件,还能够减少本人开发的插件。

那么这些插件是怎么注册的,注册在哪里呢,本人开发的插件又是怎么加进去的呢?

咱们来看下 Scheduler 外面最重要的一个成员:Profiles profile.Map

// 门路:pkg/scheduler/profile/profile.go

// Map holds frameworks indexed by scheduler name.
type Map map[string]framework.Framework

Profiles 是一个 key 为 scheduler name,value 是 framework.Framework 的 map,示意依据 scheduler name 来获取 framework.Framework 类型的值,所以能够有多个 scheduler。或者你在应用 k8s 的时候没有关注过 pod 或 deploment 外面的 scheduler,因为你没有指定的话,k8s 就会主动设置为默认的调度器,下图是 deployment 中未指定 schedulerName 被设置了默认调度器的一个 deployment

](/img/bVc7ywM)

假如当初我想要应用本人开发的一个名叫 my-scheduler-1 的调度器,这个调度器在 preFilter 扩大点中减少了 zoneLabel 插件,怎么做?

应用 kubeadm 部署的 k8s 集群,会在 /etc/kubernetes/manifests 目录下创立 kube-scheduler.yaml 文件,kubelet 会依据这个文件主动拉起来一个动态 Pod,一个 kube-scheduler pod 就被创立了,而且这个 kube-scheduler 运行的参数是间接在命令行上指定的。

apiVersion: v1
kind: Pod
metadata:
  creationTimestamp: null
  labels:
    component: kube-scheduler
    tier: control-plane
  name: kube-scheduler
  namespace: kube-system
spec:
  containers:
  - command:
    - kube-scheduler
    - --address=0.0.0.0
    - --authentication-kubeconfig=/etc/kubernetes/scheduler.conf
    - --authorization-kubeconfig=/etc/kubernetes/scheduler.conf
    - --bind-address=127.0.0.1
    - --kubeconfig=/etc/kubernetes/scheduler.conf
    - --leader-elect=true
    image: k8s.gcr.io/kube-scheduler:v1.16.8
    
    ....

其实 kube-scheduler 运行的时候能够指定配置文件,而不间接把参数写在启动命令上, 如下模式。

./kube-scheduler --config /etc/kube-scheduler.conf

于是乎,咱们就能够在配置文件中配置咱们调度器的插件了

apiVersion: kubescheduler.config.k8s.io/v1beta2
kind: KubeSchedulerConfiguration
leaderElection:
  leaderElect: true
clientConnection:
  kubeconfig: "/etc/kubernetes/scheduler.conf"

profiles:
- schedulerName: my-scheduler
  plugins:
    preFilter:
      enabled:
        - name: zoneLabel
      disabled:
        - name: NodePorts

咱们能够应用 enabled,disabled 开关来敞开或关上某个插件。
通过配置文件,还能够管制扩大点的调用程序,规定如下:

  • 如果某个扩大点没有配置对应的扩大,调度框架将应用默认插件中的扩大
  • 如果为某个扩大点配置且激活了扩大,则调度框架将先调用默认插件的扩大,再调用配置中的扩大
  • 默认插件的扩大始终被最先调用,而后依照 KubeSchedulerConfiguration 中扩大的激活 enabled 程序一一调用扩大点的扩大
  • 能够先禁用默认插件的扩大,而后在 enabled 列表中的某个地位激活默认插件的扩大,这种做法能够扭转默认插件的扩大被调用时的程序

还能够增加多个调度器,在 deployment 等控制器中指定本人想要应用的调度器即可:

apiVersion: kubescheduler.config.k8s.io/v1beta2
kind: KubeSchedulerConfiguration
leaderElection:
  leaderElect: true
clientConnection:
  kubeconfig: "/etc/kubernetes/scheduler.conf"

profiles:
- schedulerName: my-scheduler-1
  plugins:
    preFilter:
      enabled:
        - name: zoneLabel
        
- schedulerName: my-scheduler-2
  plugins:
    queueSort:
      enabled:
        - name: mySort

当然了,当初咱们在配置文件中定义的 mySort,zoneLabel 这样的插件还不能应用,咱们须要开发具体的插件注册进去,能力失常运行,前面的文章会具体讲。

好了,当初 Profiles 成员(一个 map)曾经蕴含了两个元素,{“my-scheduler-1″: framework.Framework ,”my-scheduler-2”: framework.Framework}。当一个 Pod 须要被调度的时候,kube-scheduler 会先取出 Pod 的 schedulerName 字段的值,而后通过 Profiles[schedulerName],拿到 framework.Framework 对象,进而应用这个对象开始调度,咱们能够用上面这种张图总结下下面形容的各个对象的关系。

那么重点就来到了 framework.Framework,上面是 framework.Framework 的定义:

// pkg/scheduler/framework/interface.go

type Framework interface {
   Handle
   
   QueueSortFunc() LessFunc

   RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) (*PreFilterResult, *Status)

   RunPostFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status)

   RunPreBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status

   RunPostBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string)

   RunReservePluginsReserve(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status

   RunReservePluginsUnreserve(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string)

   RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status

   WaitOnPermit(ctx context.Context, pod *v1.Pod) *Status

   RunBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status

   HasFilterPlugins() bool

   HasPostFilterPlugins() bool

   HasScorePlugins() bool

   ListPlugins() *config.Plugins

   ProfileName() string}

Framework 是一个接口,须要实现的办法大部分模式为:Run*Plugins,也就是运行某个扩大点的插件,那么只有实现这个 Framework 接口就能够对 Pod 进行调度了。那么须要用户本人实现么?答案是不必,kube-scheduler 曾经有一个该接口的实现:frameworkImpl

// pkg/scheduler/framework/runtime/framework.go

type frameworkImpl struct {
    registry             Registry
    snapshotSharedLister framework.SharedLister
    waitingPods          *waitingPodsMap
    scorePluginWeight    map[string]int
    queueSortPlugins     []framework.QueueSortPlugin
    preFilterPlugins     []framework.PreFilterPlugin
    filterPlugins        []framework.FilterPlugin
    postFilterPlugins    []framework.PostFilterPlugin
    preScorePlugins      []framework.PreScorePlugin
    scorePlugins         []framework.ScorePlugin
    reservePlugins       []framework.ReservePlugin
    preBindPlugins       []framework.PreBindPlugin
    bindPlugins          []framework.BindPlugin
    postBindPlugins      []framework.PostBindPlugin
    permitPlugins        []framework.PermitPlugin

    clientSet       clientset.Interface
    kubeConfig      *restclient.Config
    eventRecorder   events.EventRecorder
    informerFactory informers.SharedInformerFactory

    metricsRecorder *metricsRecorder
    profileName     string

    extenders []framework.Extender
    framework.PodNominator

    parallelizer parallelize.Parallelizer
}

frameworkImpl 这个构造体外面蕴含了每个扩大点插件数组,所以某个扩大点要被执行的时候,只有遍历这个数组外面的所有插件,而后执行这些插件就能够了。咱们看看 framework.FilterPlugin 是怎么定义的(其余的也相似):


type Plugin interface {Name() string
}

type FilterPlugin interface {
    Plugin
    Filter(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *NodeInfo) *Status
}

插件数组的类型是一个接口,那么某个插件只有实现了这个接口就能够被运行。实际上,咱们后面说的那些默认插件,都实现了这个接口,在目录 pkg/scheduler/framework/plugins 目录上面蕴含了所有内置插件的实现,次要就是对下面说的这个插件接口的实现。咱们能够简略用图形容下 Pod 被调度的时候执行插件的流程

那么这些默认插件是怎么加到 framework 外面的,自定义插件又是怎么加进来的呢?

分三步:

  1. 依据配置文件(–config 指定的)、零碎默认的插件,依照扩大点生成须要被加载的插件数组(包含插件名字,权重信息),也就是初始化 KubeSchedulerConfiguration 中的 Profiles 成员。
type KubeSchedulerConfiguration struct {
  
  metav1.TypeMeta

  Parallelism int32

  LeaderElection componentbaseconfig.LeaderElectionConfiguration

  ClientConnection componentbaseconfig.ClientConnectionConfiguration
  
  HealthzBindAddress string

  MetricsBindAddress string

  componentbaseconfig.DebuggingConfiguration

  PercentageOfNodesToScore int32

  PodInitialBackoffSeconds int64

  PodMaxBackoffSeconds int64

  Profiles []KubeSchedulerProfile

  Extenders []Extender}
  1. 创立 registry 汇合,这个汇合内是每个插件实例化函数,也就是 插件名字 -> 插件实例化函数的映射,艰深一点说就是通知零碎:1. 我叫王二;2. 你应该怎么把我创立进去。那么张三、李四、王五别离通知零碎怎么创立本人,就组成了这个汇合。

type PluginFactory = func(configuration runtime.Object, f framework.Handle) (framework.Plugin, error)

type Registry map[string]PluginFactory

这个汇合是内置(叫 inTree)默认的插件映射和用户自定义(outOfTree)的插件映射的并集,内置的映射通过上面函数创立:

// pkg/scheduler/framework/plugins/registry.go

func NewInTreeRegistry() runtime.Registry {

    fts := plfeature.Features{EnableReadWriteOncePod:                       feature.DefaultFeatureGate.Enabled(features.ReadWriteOncePod),
        EnableVolumeCapacityPriority:                 feature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority),
        EnableMinDomainsInPodTopologySpread:          feature.DefaultFeatureGate.Enabled(features.MinDomainsInPodTopologySpread),
        EnableNodeInclusionPolicyInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.NodeInclusionPolicyInPodTopologySpread),
    }

    return runtime.Registry{
        selectorspread.Name:                  selectorspread.New,
        imagelocality.Name:                   imagelocality.New,
        tainttoleration.Name:                 tainttoleration.New,
        nodename.Name:                        nodename.New,
        nodeports.Name:                       nodeports.New,
        nodeaffinity.Name:                    nodeaffinity.New,
        podtopologyspread.Name:               runtime.FactoryAdapter(fts, podtopologyspread.New),
        nodeunschedulable.Name:               nodeunschedulable.New,
        noderesources.Name:                   runtime.FactoryAdapter(fts, noderesources.NewFit),
        noderesources.BalancedAllocationName: runtime.FactoryAdapter(fts, noderesources.NewBalancedAllocation),
        volumebinding.Name:                   runtime.FactoryAdapter(fts, volumebinding.New),
        volumerestrictions.Name:              runtime.FactoryAdapter(fts, volumerestrictions.New),
        volumezone.Name:                      volumezone.New,
        nodevolumelimits.CSIName:             runtime.FactoryAdapter(fts, nodevolumelimits.NewCSI),
        nodevolumelimits.EBSName:             runtime.FactoryAdapter(fts, nodevolumelimits.NewEBS),
        nodevolumelimits.GCEPDName:           runtime.FactoryAdapter(fts, nodevolumelimits.NewGCEPD),
        nodevolumelimits.AzureDiskName:       runtime.FactoryAdapter(fts, nodevolumelimits.NewAzureDisk),
        nodevolumelimits.CinderName:          runtime.FactoryAdapter(fts, nodevolumelimits.NewCinder),
        interpodaffinity.Name:                interpodaffinity.New,
        queuesort.Name:                       queuesort.New,
        defaultbinder.Name:                   defaultbinder.New,
        defaultpreemption.Name:               runtime.FactoryAdapter(fts, defaultpreemption.New),
    }
}

那么用户自定义的插件怎么来的呢?这里咱们先不开展,在前面插件开发的时候再具体讲,不影响咱们了解。咱们假如用户自定义的也曾经生成了 registry,上面的代码就是把他们合并在一起

// pkg/scheduler/scheduler.go

registry := frameworkplugins.NewInTreeRegistry()

if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {return nil, err}

当初内置插件和零碎默认插件的实例化函数映射曾经创立好了

  1. 将(1)中每个扩大点的每个插件(就是插件名字)拿进去,去(2)的映射(map)中获取实例化函数,而后运行这个实例化函数,最初把这个实例化进去的插件(能够被运行的)追加到下面提到过的 frameworkImpl 中对应扩大点数组中,这样前面要运行某个扩大点插件的时候就能够遍历运行就能够了。咱们能够把上述过程用下图示意

Scheduler 之 SchedulingQueue

下面咱们介绍了 Scheduler 第一个要害成员 Profiles 的初始化和作用,上面咱们来谈谈第二个要害成员:SchedulingQueue

// pkg/scheduler/scheduler.go

podQueue := internalqueue.NewSchedulingQueue(profiles[options.profiles[0].SchedulerName].QueueSortFunc(),
    informerFactory,
    // 1s
    internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second),
    // 10s
    internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second),
    internalqueue.WithPodNominator(nominator),
    internalqueue.WithClusterEventMap(clusterEventMap),
    // 5min
    internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration),
)
func NewSchedulingQueue(
    lessFn framework.LessFunc,
    informerFactory informers.SharedInformerFactory,
    opts ...Option) SchedulingQueue {return NewPriorityQueue(lessFn, informerFactory, opts...)
}
type PriorityQueue struct {
  
  framework.PodNominator

  stop  chan struct{}

  clock clock.Clock

  podInitialBackoffDuration time.Duration

  podMaxBackoffDuration time.Duration

  podMaxInUnschedulablePodsDuration time.Duration

  lock sync.RWMutex

  cond sync.Cond

  activeQ *heap.Heap

  podBackoffQ *heap.Heap

  unschedulablePods *UnschedulablePods

  schedulingCycle int64

  moveRequestCycle int64

  clusterEventMap map[framework.ClusterEvent]sets.String

  closed bool

  nsLister listersv1.NamespaceLister
}

SchedulingQueue 是一个 internalqueue.SchedulingQueue 的接口类型,PriorityQueue 对这个接口进行了实现,创立 Scheduler 的时候 SchedulingQueue 会被 PriorityQueue 类型对象赋值。

PriorityQueue 中有要害的 3 个成员:activeQ、podBackoffQ、unschedulablePods。

  • activeQ 是一个优先队列,用来寄存待调度的 Pod,Pod 依照优先级寄存在队列中
  • podBackoffQ 用来寄存异样的 Pod,该队列外面的 Pod 会期待肯定工夫后被挪动到 activeQ 外面从新被调度
  • unschedulablePods 中会寄存调度失败的 Pod,它不是队列,而是应用 map 来寄存的,这个 map 外面的 Pod 在肯定条件下会被挪动到 activeQ 或 podBackoffQ 中

PriorityQueue 还有两个办法:flushUnschedulablePodsLeftover 和 flushBackoffQCompleted

  • flushUnschedulablePodsLeftover:调度失败的 Pod 如果满足肯定条件,这个函数会将这种 Pod 挪动到 activeQ 或 podBackoffQ
  • flushBackoffQCompleted:运行异样的 Pod 等待时间实现后,flushBackoffQCompleted 将该 Pod 挪动到 activeQ

Scheduler 在启动的时候,会创立 2 个协程来定期运行这两个函数

func (p *PriorityQueue) Run() {go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
   go wait.Until(p.flushUnschedulablePodsLeftover, 30*time.Second, p.stop)
}

下面是定期对 Pod 在这些队列之间的转换,那么除了定期刷新的形式,还有上面状况也会触发队列转换:

  • 有新节点退出集群
  • 节点配置或状态发生变化
  • 曾经存在的 Pod 发生变化
  • 集群内有 Pod 被删除

至于他们之间是如何转换的,咱们在下一篇文章外面具体介绍

Scheduler 之 cache

要说 cache 最大的作用就是晋升 Scheduler 的效率,升高 kube-apiserver(实质是 etcd) 的压力,在调用各个插件计算的时候所须要的 Node 信息和其余 Pod 信息都缓存在本地,在须要应用的时候间接从缓存获取即可,而不须要调用 api 从 kube-apiserver 获取。cache 类型是 internalcache.Cache 的接口,cacheImpl 实现了这个接口。上面是 cacheImpl 的构造

type Cache interface 
  NodeCount() int
  PodCount() (int, error)
  AssumePod(pod *v1.Pod) error
  FinishBinding(pod *v1.Pod) error
  ForgetPod(pod *v1.Pod) error
  AddPod(pod *v1.Pod) error
  UpdatePod(oldPod, newPod *v1.Pod) error
  RemovePod(pod *v1.Pod) error
  GetPod(pod *v1.Pod) (*v1.Pod, error)
  IsAssumedPod(pod *v1.Pod) (bool, error)
  AddNode(node *v1.Node) *framework.NodeInfo
  UpdateNode(oldNode, newNode *v1.Node) *framework.NodeInfo
  RemoveNode(node *v1.Node) error
  UpdateSnapshot(nodeSnapshot *Snapshot) error
  Dump() *Dump}
type cacheImpl struct {stop   <-chan struct{}
  ttl    time.Duration
  period time.Duration
  mu sync.RWMutex
  assumedPods sets.String
  podStates map[string]*podState
  nodes     map[string]*nodeInfoListItem
  headNode *nodeInfoListItem
  nodeTree *nodeTree
  imageStates map[string]*imageState
}

cacheImpl 中的 nodes 寄存集群内所有 Node 信息;podStates 寄存所有 Pod 信息;,assumedPods 寄存曾经调度胜利然而还没调用 kube-apiserver 的进行绑定的(也就是还没有执行 bind 插件)的 Pod,须要这个缓存的起因也是为了晋升调度效率,将绑定和调度离开,因为绑定须要调用 kube-apiserver,这是一个重操作会耗费比拟多的工夫,所以 Scheduler 乐观的假如调度曾经胜利,而后返回去调度其余 Pod,而这个 Pod 就会放入 assumedPods 中,并且也会放入到 podStates 中,后续其余 Pod 在进行调度的时候,这个 Pod 也会在插件的计算范畴内(如亲和性),而后会新起协程进行最初的绑定,要是最初绑定失败了,那么这个 Pod 的信息会从 assumedPods 和 podStates 移除,并且把这个 Pod 从新放入 activeQ 中,从新被调度。

Scheduler 在启动时首先会 list 一份全量的 Pod 和 Node 数据到上述的缓存中,后续通过 watch 的形式发现变动的 Node 和 Pod,而后将变动的 Node 或 Pod 更新到上述缓存中。

Scheduler 之 NextPod 和 SchedulePod

到了这里,调度框架 framework 和调度队列 SchedulingQueue 都曾经创立进去了,当初是时候开始调度 Pod 了。

Scheduler 中有个成员 NextPod 会从 activeQ 队列中尝试获取一个待调度的 Pod,该函数在 SchedulePod 中被调用,如下:

// 启动 Scheduler
func (sched *Scheduler) Run(ctx context.Context) {sched.SchedulingQueue.Run()
    go wait.UntilWithContext(ctx, sched.scheduleOne, 0)
    <-ctx.Done()
    sched.SchedulingQueue.Close()}


// 尝试调度一个 Pod,所以 Pod 的调度入口
func (sched *Scheduler) scheduleOne(ctx context.Context) {
    // 会始终阻塞,直到获取到一个 Pod
    ......
    podInfo := sched.NextPod()
    ......
}

NextPod 它被赋予如下函数:

// pkg/scheduler/internal/queue/scheduling_queue.go

func MakeNextPodFunc(queue SchedulingQueue) func() *framework.QueuedPodInfo {return func() *framework.QueuedPodInfo {podInfo, err := queue.Pop()
        if err == nil {klog.V(4).InfoS("About to try and schedule pod", "pod", klog.KObj(podInfo.Pod))
            for plugin := range podInfo.UnschedulablePlugins {metrics.UnschedulableReason(plugin, podInfo.Pod.Spec.SchedulerName).Dec()}
            return podInfo
        }
        klog.ErrorS(err, "Error while retrieving next pod from scheduling queue")
        return nil
    }
}

Pop 会始终阻塞,直到 activeQ 长度大于 0,而后去取出一个 Pod 返回

// pkg/scheduler/internal/queue/scheduling_queue.go

func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) {p.lock.Lock()
    defer p.lock.Unlock()
    for p.activeQ.Len() == 0 {// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
        // When Close() is called, the p.closed is set and the condition is broadcast,
        // which causes this loop to continue and return from the Pop().
        if p.closed {return nil, fmt.Errorf(queueClosed)
        }
        p.cond.Wait()}
    obj, err := p.activeQ.Pop()
    if err != nil {return nil, err}
    pInfo := obj.(*framework.QueuedPodInfo)
    pInfo.Attempts++
    p.schedulingCycle++
    return pInfo, nil
}

到了这里咱们就介绍完了 Scheduler 中最重要的几个成员,简略总结下:

  • Profiles: 寄存插件对象,在运行时能够遍历扩大点内的所有插件运行
  • SchedulerQueue:用来寄存待调度 Pod,异样 Pod,调度失败 Pod,他们互相能够转换
  • cache:寄存 Pod 和 Node 的信息,晋升调度效率
  • NextPod 和 ScheduleOne:尝试从 activeQ 获取一个 Pod,开始调度。

本文就到这,下一篇,咱们会讲一讲一个 Pod 提交后的调度流程。


我是清风徐来,一起学习 k8s,支付 k8s、docker 等精髓学习资源

本文由博客一文多发平台 OpenWrite 公布!

正文完
 0