乐趣区

关于云原生:Volcano-源码解读一控制器

在云原生畛域工作了 1.5 年,总是被各种业务压着,都没工夫好好回头看看总结下。心愿可能通过平台来记些笔记,也算是对本人的认可吧。

一、Volcano 背景

先来看看官网形容:

Volcano 是 CNCF 下首个也是惟一的基于 Kubernetes 的容器批量计算平台,次要用于高性能计算场景。它提供了 Kubernetes 目前短少的一套机制,这些机制通常是机器学习大数据利用、科学计算、特效渲染等多种高性能工作负载所需的。作为一个通用批处理平台,Volcano 与简直所有的支流计算框 架无缝对接,如 Spark、TensorFlow、PyTorch、Flink、Argo、MindSpore、PaddlePaddle 等。它还提供了包含基于各种支流架构的 CPU、GPU 在内的异构设施混合调度能力。Volcano 的设计理念建设在 15 年来多种零碎和平台大规模运行各种高性能工作负载的应用教训之上,并联合来自开源社区的最佳思维和实际。

简略点说,Volcano 是一个批调度器,且定义了一些资源,用以反对批调度。本文旨在从源码上剖析 Volcano 调度机制(所有剖析以开源 tag v1.9.0-alpha.0 为准)。若读者对 Volcano 基本概念不理解,请自行浏览官网文档 Volcano 官网文档。

二、源码剖析

2.1 核心理念

Queue

Queue 用于治理和优先级排序工作。它容许用户依据业务需要或优先级,将作业分组到不同的队列中,各个队列所能应用的资源由用户自定义。这有助于更好地管制资源分配和调度优先级,确保高优先级的工作能够优先获取资源。

PodGroup

一组相干的 Pod 汇合。次要解决 Kubernetes 原生调度器中单个 Pod 调度的限度。通过将相干的 Pod 组织成 PodGroup,Volcano 可能更无效地解决那些须要多个 Pod 协同工作的简单工作。PodGroup 是调度器辨认的根本单位。
理论调度过程中由 Volcano 调度的 Pod,都通过 annotation 与 PodGroup 建设关联,且 spec.schedulerName 均为 Volcano 调度器。

VolcanoJob

VolcanoJob (前期简称 Job) 不仅包含了 Kubernetes Job 的所有个性,还退出了对批处理作业的额定反对,使得 Volcano 可能更好地适应高性能和大规模计算工作的需要。
Job Controller 会依据 Job 中定义的 Task 创立出 PodGroup 和 Pod。

2.2 控制器逻辑

Job Controller 监听 Job 资源,并根 Job 上的信息创立对应的 PodGroup 和 Pod。因而,本文从 Job Controller 开始介绍。其次

Job Controller

以 cmd/controller-manager/main.go 为入口,查看 Job 相干逻辑。main.go 中最次要的是 app.Run 办法,该办法中有个 startControllers(config, opt) 办法,startControllers(config, opt) 中有个 c.Run 办法,而 JobController 是 c 的实现之一,此时 Job Controller 启动运行。

// cmd/controller-manager/main.go
func main() {
    // ...
    if err := app.Run(s); err != nil {fmt.Fprintf(os.Stderr, "%v\n", err)
        os.Exit(1)
    }
}

// cmd/controller-manager/app/server.go
// Run the controller.
func Run(opt *options.ServerOption) error {
    // ...
    run := startControllers(config, opt)
    // ...
}

// cmd/controller-manager/app/server.go
func startControllers(config *rest.Config, opt *options.ServerOption) func(ctx context.Context) {
    // ...
    go c.Run(ctx.Done())
    // ...
}

在 JobController 的 Run 中启动 worker(),持续往后跟 worker() 办法能找到一个 processNextReq() 办法,此时走到业务逻辑。其中 workers 是个 int 类型的整数,示意 worker 个数(目前不分明为什么没有选用 controller-runtime 框架,而且是应用这种比拟裸的形式)。

// pkg/controllers/job/job_controller.go
func (cc *jobcontroller) Run(stopCh <-chan struct{}) {
    // ...
    for i = 0; i < cc.workers; i++ {go func(num uint32) {
            wait.Until(func() {cc.worker(num)
                },
                time.Second,
                stopCh)
        }(i)
    }
}

func (cc *jobcontroller) worker(i uint32) {klog.Infof("worker %d start ......", i)

    for cc.processNextReq(i) {}}

processNextReq 蕴含了很多框架类的代码,比方获取 key,在 cache 中查看对象等,此类不再赘述。其中最次要的是 Execute 办法。

// pkg/controllers/job/job_controller.go
func (cc *jobcontroller) processNextReq(count uint32) bool {queue := cc.queueList[count]
    obj, shutdown := queue.Get()
    // ......

    jobInfo, err := cc.cache.Get(key)
    // ......

    st := state.NewState(jobInfo)
    // ......

    if err := st.Execute(action); err != nil {// ......}

    queue.Forget(req)

    return true
}

此时先来摸索下 st 为何物。st 获取 Job 的 Status.State.Phase 字段,并封装成 State 类型返回。Job 有多个 State 类型的对象,不同的对象后续有本人的 Execute 实现。

// pkg/controllers/job/state/factory.go
// NewState gets the state from the volcano job Phase.
func NewState(jobInfo *apis.JobInfo) State {
    job := jobInfo.Job
    switch job.Status.State.Phase {
    case vcbatch.Pending:
        return &pendingState{job: jobInfo}
    case vcbatch.Running:
        return &runningState{job: jobInfo}
    case ...
    }
}

Execute 函数有个 action 入参,action 参数和 state 类型独特决定了执行逻辑,而执行逻辑次要分为 SyncJob 和 KillJob。比方以 PendingState 为例(初始状态),syncJob 呈现了,也就是 Job 的外围逻辑。

// pkg/controllers/job/state/pending.go
func (ps *pendingState) Execute(action v1alpha1.Action) error {
    switch action {
    // ......
    default:
        return SyncJob(ps.job, func(status *vcbatch.JobStatus) bool {
            if ps.job.Job.Spec.MinAvailable <= status.Running+status.Succeeded+status.Failed {
                status.State.Phase = vcbatch.Running
                return true
            }
            return false
        })
    }
}

读者们如果关怀其余 State 类型的实现,可自行浏览源码。其中 action 参数和 state 类型独特决定了执行逻辑,对应关系如下表所示(空白示意是 KillJob):

Action\State Pending Aborting Aborted Running Restarting Completing Terminating Terminated Completed Failed
AbortJob
RestartJob
RestartTask SyncJob SyncJob
TerminateJob
CompleteJob
ResumeJob SyncJob SyncJob
SyncJob SyncJob SyncJob
EnqueueJob SyncJob SyncJob
SyncQueue SyncJob SyncJob
OpenQueue SyncJob SyncJob
CloseQueue SyncJob SyncJob

接着就来看下 reconcile 的外围逻辑:SyncJob。这里次要蕴含两个外围逻辑:

  1. 调用 initiateJob 办法初始化 Job。首先初始化 Job 状态为 Pending;其次调用 pluginOnJobAdd,其中调用 Job.Spec.Plugins 中插件的 OnJobAdd 办法设置 job.Status.ControlledResources 变量;再次调用 createJobIOIfNotExist 办法创立 pvc;最初用 createOrUpdatePodGroup 创立 PodGroup;
  2. 遍历 Job.Spec.Tasks,为其创立对应的 Pod 资源。

咱们先来看下创立 PodGroup 的逻辑,从代码中能够看 Job 和 PodGroup 是一一对应的关系。

// pkg/controllers/job/job_controller_actions.go
func (cc *jobcontroller) initiateJob(job *batch.Job) (*batch.Job, error) {
    // ......

    if err := cc.createOrUpdatePodGroup(newJob); err != nil {cc.recorder.Event(job, v1.EventTypeWarning, string(batch.PodGroupError),
            fmt.Sprintf("Failed to create PodGroup, err: %v", err))
        return nil, err
    }

    return newJob, nil
}

func (cc *jobcontroller) createOrUpdatePodGroup(job *batch.Job) error {
    pg := &scheduling.PodGroup{
        ObjectMeta: metav1.ObjectMeta{
            Namespace: job.Namespace,
            // 这个 pgName 内容是 job.Name + "-" + string(job.UID)
            Name:        pgName,
            Annotations: job.Annotations,
            Labels:      job.Labels,
            OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(job, helpers.JobKind),
            },
        },
        Spec: scheduling.PodGroupSpec{
            MinMember:         job.Spec.MinAvailable,
            MinTaskMember:     minTaskMember,
            Queue:             job.Spec.Queue,
            MinResources:      cc.calcPGMinResources(job),
            PriorityClassName: job.Spec.PriorityClassName,
        },
    }
}

咱们再来看看调谐 Pod 的过程。syncJob 遍历所有 tasks,而后生成 task 所需的 Pod 模版,并放到 podToCreate(类型是 map[string][]*v1.Pod)映射中。留神在 createJobPod 中会为 Pod 打上几行 annotations,其中有个 KubeGroupNameAnnotationKey = pgName,也就是 scheduling.k8s.io/group-name=pg-name。此时能够发现 Pod 与 PodGroup 是通过 Annotation 关联起来的。删除 Pod 的逻辑读者能够自行查看哈,也是在同一个办法外面。

// pkg/controllers/job/job_controller_actions.g
for _, ts := range job.Spec.Tasks {
    ts.Template.Name = ts.Name
    tc := ts.Template.DeepCopy()
    name := ts.Template.Name

    pods, found := jobInfo.Pods[name]
    if !found {pods = map[string]*v1.Pod{}}

    var podToCreateEachTask []*v1.Pod
    // 每个 Task 对应一组 pods,所以这里有一个循环
    for i := 0; i < int(ts.Replicas); i++ {podName := fmt.Sprintf(jobhelpers.PodNameFmt, job.Name, name, i)
        if pod, found := pods[podName]; !found {
            // 这个 createJobPod 只是组装 Pod 资源对象,类型是 *v1.Pod
            newPod := createJobPod(job, tc, ts.TopologyPolicy, i, jobForwarding)
            if err := cc.pluginOnPodCreate(job, newPod); err != nil {return err}
            // 加到队列中
            podToCreateEachTask = append(podToCreateEachTask, newPod)
            waitCreationGroup.Add(1)
        } else {// ......}
    }
    podToCreate[ts.Name] = podToCreateEachTask
    // ......
}

PodGroup Controller

PodGroup Controller 的逻辑比较简单,负责为未制订的 PodGroup 且调度器为 Volcano 的 Pod 调配 PodGroup。其中 pkg/controllers/podgroup/pg_controller.go 中入口逻辑逻辑与 Job Controller 相似,再次不再赘述,咱们间接看外围逻辑 processNextReq():

// pkg/controllers/podgroup/pg_controller.go
func (pg *pgcontroller) processNextReq() bool {
    // ...
    req := obj.(podRequest)
    defer pg.queue.Done(req)

    // 获取 Pod 对象
    pod, err := pg.podLister.Pods(req.podNamespace).Get(req.podName)
    if err != nil {klog.Errorf("Failed to get pod by <%v> from cache: %v", req, err)
        return true
    }
    // 依据调度器名称过滤,只关注调度器为 Volcano 的 Pod
    if !commonutil.Contains(pg.schedulerNames, pod.Spec.SchedulerName) {klog.V(5).Infof("pod %v/%v field SchedulerName is not matched", pod.Namespace, pod.Name)
        return true
    }
    // 若 Annotations 有值,则示意已关联,不做解决
    if pod.Annotations != nil && pod.Annotations[scheduling.KubeGroupNameAnnotationKey] != "" {klog.V(5).Infof("pod %v/%v has created podgroup", pod.Namespace, pod.Name)
        return true
    }

    // 为 Pod 调配 PodGroup
    if err := pg.createNormalPodPGIfNotExist(pod); err != nil {klog.Errorf("Failed to handle Pod <%s/%s>: %v", pod.Namespace, pod.Name, err)
        pg.queue.AddRateLimited(req)
        return true
    }
    // ...
}

createNormalPodPGIfNotExist 中逻辑大抵如下:

  1. 若名称为 “podgroup-$pod.Uid”(不存在 Pod Ownerrefrence.Controller 的状况,若存在,命名形式读者可自行理解)的 PodGroup 不存在,则创立 PodGroup,否则走到下一步;
  2. 更新 Pod.Annotation.[scheduling.k8s.io/group-name]=pg-name。

Queue Controller

不同于前两者,Queue Controller 同时监听 Queue 和 PodGroup。
Queue Controller 中的入口函数为 processNextWorkItem 下的 handleQueue 办法,本文间接进到此处来看。能够发现 Queue Controller 同样也会依据以后 State,调用不同的 Execute 办法。而 Execute 办法中也会依据不同的 Action,调用不同的原子办法。总结来说就是 State 和 Action 独特决定前面走哪段逻辑。

// pkg/controllers/queue/queue_controller.go
func (c *queuecontroller) handleQueue(req *apis.Request) error {
    // ...
    queueState := queuestate.NewState(queue)

    if err := queueState.Execute(req.Action); err != nil {
        return fmt.Errorf("sync queue %s failed for %v, event is %v, action is %s",
            req.QueueName, err, req.Event, req.Action)
    }
    // ...
}

“State 和 Action”与“调用办法”的对应关系如下表所示,波及到 SyncQueue、OpenQueue 和 CloseQueue:

Action\State Open Closed Closing Unknown
OpenQueue SyncQueue OpenQueue OpenQueue OpenQueue
CloseQueue CloseQueue SyncQueue SyncQueue CloseQueue
SyncQueue SyncQueue SyncQueue SyncQueue SyncQueue

SyncQueue、OpenQueue 和 CloseQueue 办法的概述如下:

  1. SyncQueue 用以统计以后 PodGroup 的个数与状态,并更新到 Queue 的 Status 中;
  2. OpenQueue 将 Queue 状态置为 Open;
  3. CloseQueue 将 Queue 状态置为 Closed。

上面以 SyncQueue 为例看下具体逻辑,另外两种办法读者可自行浏览。

// pkg/controllers/queue/queue_controller_action.go
func (c *queuecontroller) syncQueue(queue *schedulingv1beta1.Queue, updateStateFn state.UpdateQueueStatusFn) error {
     // ...
    // 获取以后 PodGroups 
    podGroups := c.getPodGroups(queue.Name)

    // 统计 pg 资源状态
    for _, pgKey := range podGroups {
        switch pg.Status.Phase {
        case schedulingv1beta1.PodGroupPending:
            queueStatus.Pending++
        case schedulingv1beta1.PodGroupRunning:
            queueStatus.Running++
        case schedulingv1beta1.PodGroupUnknown:
            queueStatus.Unknown++
        case schedulingv1beta1.PodGroupInqueue:
            queueStatus.Inqueue++
        }
    }

    // updateStateFn 是在执行器中定义的函数,用于更新 queue 的状态
    if updateStateFn != nil {updateStateFn(&queueStatus, podGroups)
    } else {queueStatus.State = queue.Status.State}
    
    // ...
    // 更新 Queue 资源
    if _, err := c.vcClient.SchedulingV1beta1().Queues().UpdateStatus(context.TODO(), newQueue, metav1.UpdateOptions{}); err != nil { }
    // ...
}

未完待续

好啦,明天就先到这儿啦。看完后还是有些不解的中央,后续再补上啦;

  1. job.Status.ControlledResources 到底有何用处,全局搜寻了代码,临时没找到什么端倪;
  2. controller 中状态转换比拟零散,本文也没好好整顿,后续补上捏;
  3. Admission 组件次要都做了啥呢;
  4. 调度器下篇文章补上哈。
退出移动版