背景
上篇文章咱们说了 Volcano 控制器原理,这篇文章来看下调度器外围逻辑。
调度器简介
接着终于到了 Volcano 的外围控制器局部。其实上局部 Controller 所有的调谐,最终都是为了能做好批调度。和其余文章一样,先看看官网的形容:
官网形容:
- 客户端提交的 Job 被 scheduler 察看到并缓存起来;
- 周期性的开启会话,一个调度周期开始;
- 将没有被调度的 Job 发送到会话的待调度队列中;
- 遍历所有的待调度 Job,依照定义的秩序顺次执行 enqueue、allocate、preempt、reclaim、backfill 等动作,为每个 Job 找到一个最合适的节点。将该 Job 绑定到这个节点。action 中执行的具体算法逻辑取决于注册的 plugin 中各函数的实现
- 敞开本次会话。
艰深点:Volcano 调度过程中会执行一系列的 Action,Action 中执行什么算法逻辑,就取决于注册进去的 plugins。具体如何配置 Actions 和 Plugins,这须要扒扒源码了。次要其实代码中多了个 shuffle Action,官网文档有些落后了。
调度器源码实现
从控制器 CRD 到调度器,调度器如何辨认 PodGroup
此时必须要留神,调度器代码中的 JobInfo 并非 控制器所辨认的 Volcano Job,而是对 PodGroup 的封装。
咱们先到调度器的 main 入口来看,除了框架类的代码外,只有一个 Run 函数。而 Run 函数中有个 NewScheduler 办法。NewScheduler 办法中有个 schedcache.New() 办法 …。顺着代码,终于来到了配角 addEventHandler()(此处只是为了看 PodGroup 到 TaskInfo 的映射,讲的比拟粗,细节后续会再提到)。
// cmd/scheduler/main.go
func main() {
// ...
if err := app.Run(s); err != nil {// ...}
}
// cmd/scheduler/app/server.go
// Run the volcano scheduler.
func Run(opt *options.ServerOption) error {
// ...
sched, err := scheduler.NewScheduler(config, opt)
// ...
}
// pkg/scheduler/scheduler.go
// NewScheduler returns a Scheduler
func NewScheduler(config *rest.Config, opt *options.ServerOption) (*Scheduler, error) {
// ...
cache := schedcache.New(config, opt.SchedulerNames, opt.DefaultQueue, opt.NodeSelector, opt.NodeWorkerThreads, opt.IgnoredCSIProvisioners)
scheduler := &Scheduler{
schedulerConf: opt.SchedulerConf,
fileWatcher: watcher,
cache: cache,
schedulePeriod: opt.SchedulePeriod,
dumper: schedcache.Dumper{Cache: cache, RootDir: opt.CacheDumpFileDir},
}
return scheduler, nil
}
// pkg/scheduler/cache/cache.go
// New returns a Cache implementation.
func New(config *rest.Config, schedulerNames []string, defaultQueue string, nodeSelectors []string, nodeWorkers uint32, ignoredProvisioners []string) Cache {return newSchedulerCache(config, schedulerNames, defaultQueue, nodeSelectors, nodeWorkers, ignoredProvisioners)
}
// pkg/scheduler/cache/cache.go
func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueue string, nodeSelectors []string, nodeWorkers uint32, ignoredProvisioners []string) *SchedulerCache {
// ...
// add all events handlers
sc.addEventHandler()
// ...
}
从 addEventHandler() 中能够看到调度器监听了好多资源,包含 PodGroup,次要看看其中的 AddFunc,也就是 pkg/scheduler/cache/event_handlers.go 中的 AddPodGroupV1beta1 办法。此时会看到一个 schedulingapi.PodGroup 构造体,留神该构造体并非控制器中的 PodGroup crd,而是一个 Wrapper。
// PodGroup is a collection of Pod; used for batch workload.
type PodGroup struct {
scheduling.PodGroup
// Version represents the version of PodGroup
Version string
}
// pkg/scheduler/cache/event_handlers.go
// AddPodGroupV1beta1 add podgroup to scheduler cache
func (sc *SchedulerCache) AddPodGroupV1beta1(obj interface{}) {ss, ok := obj.(*schedulingv1beta1.PodGroup)
podgroup := scheduling.PodGroup{}
pg := &schedulingapi.PodGroup{PodGroup: podgroup, Version: schedulingapi.PodGroupVersionV1Beta1}
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
if err := sc.setPodGroup(pg); err != nil {}}
在 setGroup 中能够发现把 PodGroup 塞到了 Jobs map 中,而 Jobs map 类型为 map[schedulingapi.JobID]*schedulingapi.JobInfo。此时 PodGroup 变成了 JobInfo。
// pkg/scheduler/cache/event_handlers.go
// Assumes that lock is already acquired.
func (sc *SchedulerCache) setPodGroup(ss *schedulingapi.PodGroup) error {job := getJobID(ss)
if _, found := sc.Jobs[job]; !found {sc.Jobs[job] = schedulingapi.NewJobInfo(job)
}
sc.Jobs[job].SetPodGroup(ss)
// TODO(k82cn): set default queue in admission.
if len(ss.Spec.Queue) == 0 {sc.Jobs[job].Queue = schedulingapi.QueueID(sc.defaultQueue)
}
metrics.UpdateE2eSchedulingStartTimeByJob(sc.Jobs[job].Name, string(sc.Jobs[job].Queue), sc.Jobs[job].Namespace,
sc.Jobs[job].CreationTimestamp.Time)
return nil
}
调度器如何辨认 Pod
此时必须要留神,调度器代码中的 TaskInfo 并非 控制器所辨认的 Volcano Job 中的 Task,而是对 Pod 的封装。
在上一节的 addEventHandler() 中,同样能够看到对 Pod 资源的监听,同样来看下其中的 addFunc,也就是 pkg/scheduler/cache/event_handlers.go 中的 AddPod。
// AddPod add pod to scheduler cache
func (sc *SchedulerCache) AddPod(obj interface{}) {pod, ok := obj.(*v1.Pod)
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
err := sc.addPod(pod)
}
AddPod 的动作很显著了,就是将 Pod 转换为 Task,并执行 addTasks 函数(将 TaskInfo 保留到 Tasks map 中。同时更新 Nodes map)。
// Assumes that lock is already acquired.
func (sc *SchedulerCache) addPod(pod *v1.Pod) error {pi, err := sc.NewTaskInfo(pod)
return sc.addTask(pi)
}
启动调度
上节说到,调度器的 Run() 函数中有个 NewScheduler 函数,而在 Run 中,除了初始化 Scheduler 对象外,另一个重要的动作就是 Scheduler 对象的 Run 办法。
// cmd/scheduler/app/server.go
// Run the volcano scheduler.
func Run(opt *options.ServerOption) error {
// ...
sched, err := scheduler.NewScheduler(config, opt)
// ...
run := func(ctx context.Context) {sched.Run(ctx.Done())
<-ctx.Done()}
}
先来瞅一眼 Scheduler 对象的构造,其中最值的关注的天然是 actions 和 plugins。
// Scheduler watches for new unscheduled pods for volcano. It attempts to find
// nodes that they fit on and writes bindings back to the api server.
type Scheduler struct {
cache schedcache.Cache
schedulerConf string
fileWatcher filewatcher.FileWatcher
schedulePeriod time.Duration
once sync.Once
mutex sync.Mutex
actions []framework.Action
plugins []conf.Tier
configurations []conf.Configuration
metricsConf map[string]string
dumper schedcache.Dumper
}
接着来到 Run 函数,其中有个定时执行的逻辑 runOnce。
// pkg/scheduler/scheduler.go
// Run initializes and starts the Scheduler. It loads the configuration,
// initializes the cache, and begins the scheduling process.
func (pc *Scheduler) Run(stopCh <-chan struct{}) {go wait.Until(pc.runOnce, pc.schedulePeriod, stopCh)
}
而在 runOnce 函数中首先初始化了 Session 对象,其次顺次执行其对应的 actions。此时对官网图中 Session 开始对应起来。
// pkg/scheduler/scheduler.go
func (pc *Scheduler) runOnce() {
// ...
ssn := framework.OpenSession(pc.cache, plugins, configurations)
for _, action := range actions {actionStartTime := time.Now()
action.Execute(ssn)
metrics.UpdateActionDuration(action.Name(), metrics.Duration(actionStartTime))
}
// ...
}
那么这里调用了那些 plugins 和 actions 呢,各个 action 中又要执行那些 plugins 呢。别急,本文一一道来。
首先看看有哪些 actions 和 plguins。在 Scheduler.Run() 中有个 loadSchedulerConf 办法。该办法中默认在文件中读取 actions 和 plugins 配置。
// pkg/scheduler/scheduler.go
func (pc *Scheduler) loadSchedulerConf() {
// ...
var config string
if len(pc.schedulerConf) != 0 {confData, err := os.ReadFile(pc.schedulerConf)
if err != nil {
klog.Errorf("Failed to read the Scheduler config in'%s', using previous configuration: %v",
pc.schedulerConf, err)
return
}
config = strings.TrimSpace(string(confData))
}
actions, plugins, configurations, metricsConf, err := unmarshalSchedulerConf(config)
// ...
}
默认配置如下变量 defaultSchedulerConf 所示,蕴含 enqueue、allocate 和 backfill 三个 action 以及多个 plugins。
var defaultSchedulerConf = `
actions: "enqueue, allocate, backfill"
tiers:
- plugins:
- name: priority
- name: gang
- name: conformance
- plugins:
- name: overcommit
- name: drf
- name: predicates
- name: proportion
- name: nodeorder
`
看到这儿后,有些读者费解,这么多 plugins 如何与 actions 一一对应上呢,某个 action 外面须要执行哪些 plugins 呢。其实在 OpenSession 办法中,尝试调用了每个 plugin 的 OnSessionOpen 办法,该办法由各个 plugin 方本人实现,次要将本身逻辑注册到各个 action 的 funcmap 中。
// OpenSession start the session
func OpenSession(cache cache.Cache, tiers []conf.Tier, configurations []conf.Configuration) *Session {
// ...
for _, tier := range tiers {
for _, plugin := range tier.Plugins {if pb, found := GetPluginBuilder(plugin.Name); !found {klog.Errorf("Failed to get plugin %s.", plugin.Name)
} else {plugin := pb(plugin.Arguments)
ssn.plugins[plugin.Name()] = plugin
plugin.OnSessionOpen(ssn)
}
}
}
return ssn
}
以经典的 gang plugin 为例,看看 OnSessionOpen 办法,其余插件读者可自行查看。
// pkg/scheduler/plugins/gang/gang.go
func (gp *gangPlugin) OnSessionOpen(ssn *framework.Session) {
// ....
// 将 preemptableFn 注册到 Reclaimable 和 Preemptable 两个 action 的 map 中
preemptableFn := func(preemptor *api.TaskInfo, preemptees []*api.TaskInfo) ([]*api.TaskInfo, int) {var victims []*api.TaskInfo
jobOccupiedMap := map[api.JobID]int32{}
for _, preemptee := range preemptees {job := ssn.Jobs[preemptee.Job]
if _, found := jobOccupiedMap[job.UID]; !found {jobOccupiedMap[job.UID] = job.ReadyTaskNum()}
if jobOccupiedMap[job.UID] > job.MinAvailable {jobOccupiedMap[job.UID]--
victims = append(victims, preemptee)
} else {klog.V(4).Infof("Can not preempt task <%v/%v> because job %s ready num(%d) <= MinAvailable(%d) for gang-scheduling",
preemptee.Namespace, preemptee.Name, job.Name, jobOccupiedMap[job.UID], job.MinAvailable)
}
}
return victims, util.Permit
}
ssn.AddReclaimableFn(gp.Name(), preemptableFn)
ssn.AddPreemptableFn(gp.Name(), preemptableFn)
}
经典动作 Enqueue
Enqueue action 负责通过一系列的过滤算法筛选出符合要求的待调度工作并将它们送入待调度队列。通过这个 action,工作的状态将由 pending 变为 inqueue。
前文提到,runOnce 办法中会调用各个 action 的 Execute 办法。总的来说,Enqueue action 的 Execute 办法定义了三个局部变量 queues、queueSet 和 jobsMap,而后执行了两个 for 循环。其中 queues 是一个以 queue 为 item 的优先队列,queueSet 是一个队列 id list 造成的字符串,jobsMap 是队列 id 到 队列的映射。
// pkg/scheduler/actions/enqueue/enqueue.go
func (enqueue *Action) Execute(ssn *framework.Session) {
// ......
queues := util.NewPriorityQueue(ssn.QueueOrderFn)
queueSet := sets.NewString()
jobsMap := map[api.QueueID]*util.PriorityQueue{}
for _, job := range ssn.Jobs {// ......}
klog.V(3).Infof("Try to enqueue PodGroup to %d Queues", len(jobsMap))
for {// ......}
}
第一个 for 循环代码如下。首先遍历 jobs(对应 PodGroup Wrapper)的过程中判断用到了哪些 Volcano Queue,将这些 Queue 保留到 queueSet 和 queues 中;其次将处于 Pending 状态的 jobs 退出到 jobsMap 中。
// pkg/scheduler/actions/enqueue/enqueue.go
// 这个 Job 是 Volcano 自定义资源 Job,不是 K8s 里的 Job;这里开始遍历所有 jobs
for _, job := range ssn.Jobs {if job.ScheduleStartTimestamp.IsZero() {ssn.Jobs[job.UID].ScheduleStartTimestamp = metav1.Time{Time: time.Now(),
}
}
// 如果 job 中定义的 Queue 在 Session 中存在,那就执行
// queueSet.Insert(string(queue.UID)) 和
// queues.Push(queue);留神这里 Push 进去的是 queue
if queue, found := ssn.Queues[job.Queue]; !found {
klog.Errorf("Failed to find Queue <%s> for Job <%s/%s>",
job.Queue, job.Namespace, job.Name)
continue
} else if !queueSet.Has(string(queue.UID)) {klog.V(5).Infof("Added Queue <%s> for Job <%s/%s>",
queue.Name, job.Namespace, job.Name)
// 这里构建了一个 queue UID 的 set 和一个 queue 队列(优先级队列,heap 实现)queueSet.Insert(string(queue.UID))
queues.Push(queue)
}
if job.IsPending() {
// 如果 job 指定的 queue 还没存到 jobsMap 里,则创立一个对应的 PriorityQueue
if _, found := jobsMap[job.Queue]; !found {jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
}
klog.V(5).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue)
// 将 job 加到指定 queue 中
jobsMap[job.Queue].Push(job)
}
}
第二个 for 循环的代码如下。总的来说就是一直的从优先 queue 中获取最高优先级的 queue,而后依据 queue 的 UID 找到本地 jobsMap 里对应的 jobs 队列(PodGroup 的 Wrapper),最初从 jobs 队列中找到优先级最高的 Job,并将其状态置为 Inqueue。
for {
// 没有队列,退出循环
if queues.Empty() {break}
// 从优先级队列 queues 中 Pop 一个高优的队列进去
queue := queues.Pop().(*api.QueueInfo)
// 如果这个高优队列在 jobsMap 里没有保留相应的 jobs,也就是为空,那就持续下一轮循环
jobs, found := jobsMap[queue.UID]
if !found || jobs.Empty() {continue}
// jobs 也是一个优先级队列,Pop 一个高优 job 进去
job := jobs.Pop().(*api.JobInfo)
if job.PodGroup.Spec.MinResources == nil || ssn.JobEnqueueable(job) {ssn.JobEnqueued(job)
// Phase 更新为 "Inqueue"
job.PodGroup.Status.Phase = scheduling.PodGroupInqueue
// 将以后 job 退出到 ssn.Jobs map
ssn.Jobs[job.UID] = job
}
// 将后面 Pop 进去的 queue 加回到 queues 中,直到 queue 中没有 job,这样逐渐 queues 为空空,下面的 Empty() 办法就会返回 true,而后循环退出。queues.Push(queue)
}
经典动作 Allocate
Allocate action 负责通过一系列的预选和优选算法筛选出最适宜的节点。
如下代码所示,Allocate action 的 Execute 办法中定义了 queues 和 jobsMap 变量,而后执行了 pickUpQueuesAndJobs 和 allocateResources 办法。其中 queues 是一个元素为优先级队列的优先级队列;jobsMap 是一个 map,key 为 queue id,value 为优先级队列,也就是一个特定的 queue,queue 中存着 jobs。
// pkg/scheduler/actions/allocate/allocate.go
func (alloc *Action) Execute(ssn *framework.Session) {
// queues sort queues by QueueOrderFn.
queues := util.NewPriorityQueue(ssn.QueueOrderFn)
// jobsMap is used to find job with the highest priority in given queue.
jobsMap := map[api.QueueID]*util.PriorityQueue{}
alloc.session = ssn
alloc.pickUpQueuesAndJobs(queues, jobsMap)
klog.V(3).Infof("Try to allocate resource to %d Queues", len(jobsMap))
alloc.allocateResources(queues, jobsMap)
}
而 pickUpQueuesAndJobs 办法中外围逻辑只有一个。遍历 jobs,将其依照 queue 不同存到 jobsMap 中。allocateResources 办法的逻辑次要是依照优先级顺次给 tasks 寻找最合适的 node,找到后“预占”资源,于是按程序逐渐给所有的 tasks 都找到了最佳节点(这块代码很简单,前期补上哈)。
未完待续
明天就先到这儿啦。看完后仍旧有些费解的中央,后续有空补上哈。
- 为啥 plugins 里面还要带上 tier 一层呢;
- Volcano 中几个常见的 plugin 是怎么运作的呢;
- allocateResources 办法前期补上哈。