乐趣区

关于golang:Kubelet从入门到放弃识透CPU管理下

三、源码剖析

 在介绍代码之前,zouyee 先带各位看一看 CPU manager 的启动图(CPU manager 属于 Container Manager 模块的子系统)

对于上图的内容,zouyee 总结流程如下:

1、在命令行启动局部,Kubelet 中调用 NewContainerManager 构建 ContainerManager

2、NewContainerManager 函数调用 topologymanager.NewManager 构建拓扑管理器

3、NewContainerManager 函数调用 cpumanager.NewManager 构建 CPU 管理器

4、拓扑管理器应用 AddHintPriovider 办法将 CPU 管理器退出治理

5、回到命令行启动局部,调用 NewMainKubelet(),构建 Kubelet 构造体

6、构建 Kubelet 构造体时,将 CPU 管理器跟拓扑管理器封装为 InternalContainerLifecycle 接口,其实现 Pod 相干的生命周期资源管理操作,波及 CPU 相干的是 PreStart 办法

7、构建 Kubelet 构造体时,调用 AddPodmitHandler 将 GetAllocateResourcesPodAdmitHandler 办法退出到 Pod 准入插件中,在 Pod 创立时,资源预调配查看

8、构建 Kubelet 构造体后,调用 ContainerManager 的 Start 办法,ContainerManager 在 Start 办法中调用 CPU 管理器的 Start 办法,其做一些解决工作并孵化一个 goroutine,执行 reconcileState()

上面顺次进行解说。STEP 1

Kubelet 中调用 NewContainerManager 构建 ContainerManager, 波及代码为 cmd/kubelet/app/server.go

在 run 函数中实现 ContainerManager 初始化工作

func run(ctx context.Context, 参数太长, 不写全了){
    ....
    if kubeDeps.ContainerManager == nil {
    ...
    kubeDeps.ContainerManager, err = cm.NewContainerManager(...)
    ...
    }
}

STEP 2-4    

NewContainerManager 函数调用 topologymanager.NewManager 构建拓扑管理器,波及代码

pkg/kubelet/cm/container_manager_linux.go

func NewContainerManager(参数太长, 不写全了) {if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager){
        // 判断个性是否开启,构建拓扑治理
        cm.topologyManager, err = topologymanager.NewManager(
            machineInfo.Topology,
            nodeConfig.ExperimentalTopologyManagerPolicy,
            nodeConfig.ExperimentalTopologyManagerScope,
        )
    }
    // 判断个性是否开启,构建 CPU 治理
    if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
        cm.cpuManager, err = cpumanager.NewManager(
            nodeConfig.ExperimentalCPUManagerPolicy,
            nodeConfig.ExperimentalCPUManagerReconcilePeriod,
            machineInfo,
            nodeConfig.NodeAllocatableConfig.ReservedSystemCPUs,
            cm.GetNodeAllocatableReservation(),
            nodeConfig.KubeletRootDir,
            cm.topologyManager,
        )
        if err != nil {klog.Errorf("failed to initialize cpu manager: %v", err)
            return nil, err
        }
        // 拓扑管理器应用 AddHintPriovider 办法将 CPU 管理器退出治理 
        cm.topologyManager.AddHintProvider(cm.cpuManager)
    }
}

其中对于 CPU 管理器的初始化:type CPUTopology struct {
    NumCPUs    int
    NumCores   int
    NumSockets int
    CPUDetails CPUDetails
}

type CPUDetails map[int]CPUInfo

type CPUInfo struct {
    NUMANodeID int
    SocketID   int
    CoreID     int
}

func NewManager(参数太多,省略了) (Manager, error) {
    // 依据 cpuPolicyName,决定初始化 policy,以后反对 none 和 static
    switch policyName(cpuPolicyName) {
        ...
        case PolicyStatic:
            // 1. 依据 cadvisor 的数据,生产 topology 构造体
            topo, err = topology.Discover(machineInfo)
            // 2. 查看 reserved 的 CPU 是否为 0,须要 kube+system reserved 的 CPU > 0
            // 3. 初始化 policy
            policy, err = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, affinity)
        ...
    }
}
   

STEP 5-7

在 run 函数中实现 ContainerManager 初始化工作后,调用 RunKubelet 函数构建 Kubelet 构造体,其最终调用 NewMainKubelet(),实现 Kubelet 构造体构建。波及代码 pkg/kubelet/kubelet.go

func NewMainKubelet(参数太长, 不写全了)(*Kubelet, error) {
    ...
    klet := &Kubelet{
    ...
    containerManager: kubeDeps.ContainerManager,
    ...
    }
    ...
    runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
    ...
    // 构建 Kubelet 构造体时,将 CPU 管理器跟拓扑管理器封装为 InternalContainerLifecycle 接口
    kubeDeps.ContainerManager.InternalContainerLifecycle(),
    ...
    )
    ...
    // 调用 AddPodmitHandler 将 GetAllocateResourcesPodAdmitHandler 办法退出到 Pod 准入插件中,在 Pod 创立时,资源预调配查看
klet.admitHandlers.AddPodAdmitHandler(klet.containerManager.GetAllocateResourcesPodAdmitHandler())
    ...
}

   其中在 InternalContainerLifecycle 接口,波及 CPU 局部在 PreStartContainer 办法,波及代码 pkg/kubelet/cm/internal_container_lifecycle.go

func (i *internalContainerLifecycleImpl) PreStartContainer(参数太长, 不写全了) error {
   if i.cpuManager != nil {i.cpuManager.AddContainer(pod, container, containerID)
   }

   if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) {err := i.topologyManager.AddContainer(pod, containerID)
      if err != nil {return err}
   }
   return nil
}

那么何时调用呢?下面咱们提到了 kuberuntime.NewKubeGenericRuntimeManager, 该函数实例化 KubeGenericRuntimeManager 构造体 (后续具体介绍),而该构造体在 startContainer 办法中,进行调用,波及代码 pkg/kubelet/kuberuntime/kuberuntime_container.go

// 用于启动容器,该构造体实现了 Runtime 接口
func (m *kubeGenericRuntimeManager) startContainer(参数太多,不写了) (string, error) {
    ...
    err = m.internalLifecycle.PreStartContainer(pod, container, containerID)
    ...
}

另外 GetAllocateResourcesPodAdmitHandler 须要实现返回的构造体须要实现 Admit 接口

波及代码 pkg/kubelet/cm/container_manager_linux.go

func (m *resourceAllocator) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
   pod := attrs.Pod

   for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
      ...

      if m.cpuManager != nil {err = m.cpuManager.Allocate(pod, &container)
         ...
      }
   }

   return lifecycle.PodAdmitResult{Admit: true}
}

 理论调用逻辑为 m.cpuManager.Allocate->m.policy.Allocate->func (p *staticPolicy) Allocate (none 策略无需操作),波及代码 pkg/kubelet/cm/cpumanager/policy_static.go

func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error {
     // 1. 如介绍所说,查看是否满足调配,即 QOS 为 Guaranteed,且调配 CPU 为整型
   if numCPUs := p.guaranteedCPUs(pod, container); numCPUs != 0 {
           // 2. 获取是否调配过,调配过则更新即可
      if cpuset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {...}
      // 3. 获取亲和性拓扑
      hint := p.affinity.GetAffinity(string(pod.UID), container.Name)
      // 4. 依据 numa 亲和性进行调配
            cpuset, err := p.allocateCPUs(..)
            // 5. 设置调配后果
            s.SetCPUSet(string(pod.UID), container.Name, cpuset)
            // 6. 设置 reuse 字段
            p.updateCPUsToReuse(pod, container, cpuset)

    }
    // container belongs in the shared pool (nothing to do; use default cpuset)
    return nil
}

STEP 8

构建实现 Kubelet 构造体后,在 Kubelet 办法 initializeRuntimeDependentModules 中调用 ContainerManager 的 Start 办法,波及代码 pkg/kubelet/kubelet.go

func (kl *Kubelet) initializeRuntimeDependentModules() {
    ...
    // 这里依据咱们后面阐明的,须要 cadvisor 的数据,因而须要提前启动
    if err := kl.containerManager.Start(省略); err != nil {...}
    ...
}

ContainerManager 在 Start 办法中调用 CPU 管理器的 Start 办法,具体步骤如下:

a. 构建 Checkpoint, 其中蕴含文件及内存的操作

b. 依据初始化的 policy,运行 Start, 理论只有 static 起到作用,次要是校验工作

c. 孵化一个 goroutine,执行 reconcileState()

func (cm *containerManagerImpl) Start(参数太多,省略) error {
    ...
    // 初始化 CPU 管理器
    if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
        ...
        err = cm.cpuManager.Start(参数太多,省略)
        ...
    }
    ...
}
// 波及代码 pkg/kubelet/cm/cpumanager/cpu_manager.go
func (m *manager) Start(参数太多,省略) error {
    ...
    // 该处为 Checkpoint 解决,理论为文件管理工作,即调配等状况的数据保留
    
    stateImpl, err := state.NewCheckpointState(m.stateFileDirectory, cpuManagerStateFileName, m.policy.Name(), m.containerMap)
    ...
    // 孵化一个 goroutine,执行 reconcileState()
    // 解决以后理论 CPU 调配的工作,相似 actual 与 desired
    go wait.Until(func() {m.reconcileState() }, m.reconcilePeriod, wait.NeverStop)
}

    其中 reconcileState  次要实现以下工作

    a. 解决以后沉闷 Pod,更新 containerMap 构造体

    b. 通过 CRI 接口更新容器底层的 CPU 配置 (即 m.containerRuntime.UpdateContainerResources)

后续 zouyee 将带各位看看 ContainerManager 各大组件:拓扑治理、设施治理、容器治理等。

点击查看全文

后续相干内容,请查看公众号:DCOS

四、参考资料

1、cpuset

2、cpu topology

3、cpu manager policy

退出移动版