乐趣区

关于golang:Kubernetes学习笔记之CSI-External-Provisioner源码解析

Overview

最近在部署 K8s 长久化存储插件时,须要依照 CSI 官网阐明部署一个 Deployment pod,因为咱们的自研存储类型是文件存储不是块存储,所以部署 pod 不须要蕴含容器 external-attacher
只须要蕴含 external-provisioner sidecar container 和咱们自研的 csi-plugin 容器就行,部署 yaml 相似如下:


apiVersion: apps/v1
kind: Deployment
metadata:
  annotations:
    deployment.kubernetes.io/revision: "2"
  name: sunnyfs-csi-controller-share
  namespace: sunnyfs
spec:
  progressDeadlineSeconds: 600
  replicas: 1
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      app: sunnyfs-csi-controller-share
  strategy:
    rollingUpdate:
      maxSurge: 25%
      maxUnavailable: 25%
    type: RollingUpdate
  template:
    metadata:
      labels:
        app: sunnyfs-csi-controller-share
    spec:
      containers:
        - args:
            - --csi-address=/csi/sunnyfs-provisioner-share.sock
            - --timeout=150s
          image: quay.io/k8scsi/csi-provisioner:v2.0.2
          imagePullPolicy: IfNotPresent
          name: csi-provisioner
          resources:
            limits:
              cpu: "4"
              memory: 8000Mi
            requests:
              cpu: "2"
              memory: 8000Mi
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
          volumeMounts:
            - mountPath: /csi
              name: socket-dir
        - args:
            - --v=5
            - --endpoint=unix:///csi/sunnyfs-provisioner-share.sock
            - --nodeid=$(NODE_ID)
            - --drivername=csi.sunnyfs.share.com
            - --version=v1.0.0
          env:
            - name: NODE_ID
              valueFrom:
                fieldRef:
                  apiVersion: v1
                  fieldPath: spec.nodeName
          image: sunnyfs-csi-driver:v1.0.3
          imagePullPolicy: IfNotPresent
          lifecycle:
            preStop:
              exec:
                command:
                  - /bin/sh
                  - -c
                  - rm -rf /csi/sunnyfs-provisioner-share.sock
          name: sunnyfs-csi-plugin
          resources:
            limits:
              cpu: "2"
              memory: 4000Mi
            requests:
              cpu: "1"
              memory: 4000Mi
          securityContext:
            capabilities:
              add:
                - SYS_ADMIN
            privileged: true
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
          volumeMounts:
            - mountPath: /csi
              name: socket-dir
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      schedulerName: default-scheduler
      securityContext: {}
      serviceAccount: sunnyfs-csi-controller-account
      serviceAccountName: sunnyfs-csi-controller-account
      terminationGracePeriodSeconds: 30
      volumes:
        - hostPath:
            path: /var/lib/kubelet/plugins/csi.sunnyfs.share.com
            type: DirectoryOrCreate
          name: socket-dir

当咱们新建一个带有 storage class 的 pvc 时,会动态创建 pv 对象,并在咱们自研的存储引擎服务创立对应的 volume。这也是利用了 storage class 来动态创建 pv 和存储服务对应的 volume。

重要问题是,这是如何做到的呢?

答案很简略:external-provisioner sidecar container 是一个 controller 去 watch pvc/pv 对象,当新建一个由 storageclass 创立 pv 的 pvc(或删除 pv 对象),该 sidecar container 会 grpc 调用
咱们自研的 csi-plugin CreateVolume(DeleteVolume)办法来理论创立一个内部存储 volume,并新建一个 pv 对象写入 k8s api server。

external-provisioner 源码解析

external-provisioner sidecar container 次要逻辑很简略:
先实例化 csiProvisioner 对象 ,而后应用
csiProvisioner 实例化 provisionController 对象,最初启动
provisionController.Run 去 watch pvc/pv 对象实现次要业务逻辑,
即依据新建的 pvc 去调用 csi-plugin CreateVolume 创立 volume,和新建一个 pv 对象写入 k8s api server。

provisionController 在实例化时,会 watch pvc/pv 对象,代码在 L695-L739


// 实例化 provisionController
func NewProvisionController(
    client kubernetes.Interface,
    provisionerName string,
    provisioner Provisioner,
    kubeVersion string,
    options ...func(*ProvisionController) error,
) *ProvisionController {
    // ...
    controller := &ProvisionController{
    client:                    client,
    provisionerName:           provisionerName,
    provisioner:               provisioner, // 在 sync pvc 时会调用 provisioner 来创立 volume
    // ...
    }
    
    controller.claimQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "claims")
    controller.volumeQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "volumes")
    informer := informers.NewSharedInformerFactory(client, controller.resyncPeriod)
    // ----------------------
    // PersistentVolumeClaims
    claimHandler := cache.ResourceEventHandlerFuncs{AddFunc:    func(obj interface{}) {controller.enqueueClaim(obj) },
        UpdateFunc: func(oldObj, newObj interface{}) {controller.enqueueClaim(newObj) },
        DeleteFunc: func(obj interface{}) {
            // NOOP. The claim is either in claimsInProgress and in the queue, so it will be processed as usual
            // or it's not in claimsInProgress and then we don't care
        },
    }
    // ...
    // -----------------
    // PersistentVolumes
    volumeHandler := cache.ResourceEventHandlerFuncs{AddFunc:    func(obj interface{}) {controller.enqueueVolume(obj) },
        UpdateFunc: func(oldObj, newObj interface{}) {controller.enqueueVolume(newObj) },
        DeleteFunc: func(obj interface{}) {controller.forgetVolume(obj) },
    }

    // --------------
    // StorageClasses
    // no resource event handler needed for StorageClasses
    if controller.classInformer == nil {if controller.kubeVersion.AtLeast(utilversion.MustParseSemantic("v1.6.0")) {controller.classInformer = informer.Storage().V1().StorageClasses().Informer()} else {controller.classInformer = informer.Storage().V1beta1().StorageClasses().Informer()}
    }
    controller.classes = controller.classInformer.GetStore()
    
    if controller.createProvisionerPVLimiter != nil {
        // 会调用 volumeStore 来新建 pv 对象写入 api server 中
        controller.volumeStore = NewVolumeStoreQueue(client, controller.createProvisionerPVLimiter, controller.claimsIndexer, controller.eventRecorder)
    } else {// ...}

    return controller
}

这里次要看下新建一个 pvc 时,是如何调谐的,看代码 L933-L986


func (ctrl *ProvisionController) processNextVolumeWorkItem(ctx context.Context) bool {
    // ...
    err := func() error {
        // ...
        if err := ctrl.syncVolumeHandler(ctx, key); err != nil {// ...}
        ctrl.volumeQueue.Forget(obj)
        return nil
    }()
    // ...
    return true
}
func (ctrl *ProvisionController) syncClaimHandler(ctx context.Context, key string) error {
    // ...
    return ctrl.syncClaim(ctx, claimObj)
}
func (ctrl *ProvisionController) syncClaim(ctx context.Context, obj interface{}) error {
    // ...
    // 起始时,在 pv controller 调谐 pvc 去更新 pvc annotation 后,该 shouldProvision 才会返回 true
    should, err := ctrl.shouldProvision(ctx, claim)
    if err != nil {
        // ...
        return err
    } else if should {
        // 调用 provisioner 来创立后端存储服务的 volume,调用 volumeStore 对象创立 pv 对象并写入 k8s api server
        status, err := ctrl.provisionClaimOperation(ctx, claim)
        // ...
        return err
    }
    return nil
}

const (annStorageProvisioner = "volume.beta.kubernetes.io/storage-provisioner")
func (ctrl *ProvisionController) shouldProvision(ctx context.Context, claim *v1.PersistentVolumeClaim) (bool, error) {
    // ...
    // 这里次要查看 pvc 是否存在 "volume.beta.kubernetes.io/storage-provisioner" annotation,起初创立 pvc 时是没有该 annotation 的
    // 该 annotation 会由 kube-controller-manager 组件中 pv controller 去增加,该 pv controller 也会去 watch pvc 对象,当发现该 pvc 定义的 storage class
    // 的 provisioner 定义的 plugin 不是 k8s in-tree plugin,会给该 pvc 打上 "volume.beta.kubernetes.io/storage-provisioner" annotation
    // 能够参考办法 https://github.com/kubernetes/kubernetes/blob/release-1.19/pkg/controller/volume/persistentvolume/pv_controller_base.go#L544-L566
    // 所以起始时,在 pv controller 调谐 pvc 去更新 pvc annotation 后,该 shouldProvision 才会返回 true
    if provisioner, found := claim.Annotations[annStorageProvisioner]; found {if ctrl.knownProvisioner(provisioner) {claimClass := GetPersistentVolumeClaimClass(claim)
            class, err := ctrl.getStorageClass(claimClass)
            // ...
            if class.VolumeBindingMode != nil && *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer {if selectedNode, ok := claim.Annotations[annSelectedNode]; ok && selectedNode != "" {return true, nil}
                return false, nil
            }
            return true, nil
        }
    }
    
    return false, nil
}

所以,以上代码要害逻辑是 provisionClaimOperation 函数,该函数次要实现两个业务逻辑:调用 provisioner 来创立后端存储服务的 volume;调用 volumeStore 对象创立 pv 对象并写入 k8s api server。
查看下 provisionClaimOperation 代码


func (ctrl *ProvisionController) provisionClaimOperation(ctx context.Context, claim *v1.PersistentVolumeClaim) (ProvisioningState, error) {
    // ...
    // 筹备相干参数
    claimClass := util.GetPersistentVolumeClaimClass(claim)
    pvName := ctrl.getProvisionedVolumeNameForClaim(claim)
    claimRef, err := ref.GetReference(scheme.Scheme, claim)
    class, err := ctrl.getStorageClass(claimClass)
    options := ProvisionOptions{
        StorageClass: class,
        PVName:       pvName,
        PVC:          claim,
        SelectedNode: selectedNode,
    }

    // (1) 调用 provisioner 来创立后端存储服务的 volume
    volume, result, err := ctrl.provisioner.Provision(ctx, options)

    volume.Spec.ClaimRef = claimRef
    // 增加 "pv.kubernetes.io/provisioned-by" annotation
    metav1.SetMetaDataAnnotation(&volume.ObjectMeta, annDynamicallyProvisioned, ctrl.provisionerName)
    // (2) 调用 volumeStore 对象创立 pv 对象并写入 k8s api server
    if err := ctrl.volumeStore.StoreVolume(claim, volume); err != nil {return ProvisioningFinished, err}
    // 更新本地缓存
    if err = ctrl.volumes.Add(volume); err != nil {utilruntime.HandleError(err)
    }
    return ProvisioningFinished, nil
}

以上代码次要逻辑比较简单,要害逻辑是调用了 provisioner.Provision() 办法创立后端存储服务的 volume,看下要害逻辑代码 Provision()


func (p *csiProvisioner) Provision(ctx context.Context, options controller.ProvisionOptions) (*v1.PersistentVolume, controller.ProvisioningState, error) {pvName, err := makeVolumeName(p.volumeNamePrefix, fmt.Sprintf("%s", options.PVC.ObjectMeta.UID), p.volumeNameUUIDLength)
    req := csi.CreateVolumeRequest{
        Name:               pvName,
        Parameters:         options.StorageClass.Parameters,
        VolumeCapabilities: volumeCaps,
        CapacityRange: &csi.CapacityRange{RequiredBytes: int64(volSizeBytes),
        },
    }
    // 获取 provision secret credentials
    provisionerSecretRef, err := getSecretReference(provisionerSecretParams, options.StorageClass.Parameters, pvName, &v1.PersistentVolumeClaim{
        ObjectMeta: metav1.ObjectMeta{
            Name:      options.PVC.Name,
            Namespace: options.PVC.Namespace,
        },
    })
    provisionerCredentials, err := getCredentials(ctx, p.client, provisionerSecretRef)
    req.Secrets = provisionerCredentials
    // ...

    // 要害逻辑:通过 grpc 调用咱们自研 csi-plugin 中的 controller-service CreateVolume 办法,在后端存储服务中创立一个实在的 volume
    // 该 csiClient 为 controller-service client,controller-service rpc 规范能够参考官网文档 https://github.com/container-storage-interface/spec/blob/master/spec.md#controller-service-rpc
    rep, err = p.csiClient.CreateVolume(createCtx, &req)
    // ...
    pv := &v1.PersistentVolume{
        ObjectMeta: metav1.ObjectMeta{Name: pvName,},
        Spec: v1.PersistentVolumeSpec{
            AccessModes:  options.PVC.Spec.AccessModes,
            MountOptions: options.StorageClass.MountOptions,
            Capacity: v1.ResourceList{v1.ResourceName(v1.ResourceStorage): bytesToGiQuantity(respCap),
            },
            // TODO wait for CSI VolumeSource API
            PersistentVolumeSource: v1.PersistentVolumeSource{
                CSI: &v1.CSIPersistentVolumeSource{
                    Driver:                     p.driverName,
                    VolumeHandle:               p.volumeIdToHandle(rep.Volume.VolumeId),
                    VolumeAttributes:           volumeAttributes,
                    ControllerPublishSecretRef: controllerPublishSecretRef,
                    NodeStageSecretRef:         nodeStageSecretRef,
                    NodePublishSecretRef:       nodePublishSecretRef,
                    ControllerExpandSecretRef:  controllerExpandSecretRef,
                },
            },
        },
    }

    return pv, controller.ProvisioningFinished, nil
}

以上代码也比拟清晰简略,要害逻辑是通过 grpc 调用咱们自研 csi-plugin 的 controller-service CreateVolume 办法来创立内部存储服务中的一个实在 volume。

同理,external-provisioner sidecar container 也会去 watch pv,如果删除 pv 时,会首先判断是否同时须要删除后端存储服务的实在 volume,如果须要
删除则调用 provisioner.Delete(),即自研 csi-plugin 的 controller-service DeleteVolume 办法去删除 volume。删除 volume 能够参考代码 deleteVolumeOperation

至此,就能够解释当咱们创立一个带有 storage class 的 pvc 时,external-provisioner sidecar container 会 watch pvc,并调用 provisioner.Provision 去
创立 volume,而 provisioner.CreateVolume 又会去调用自研 csi-plugin controller-service 的 CreateVolume()去实在创立一个 volume,最初再依据该 volume
获取相干 pv 对象参数,并新建一个 pv 对象写入 k8s api server 中。以上过程都是动态创建,自动化的,无需人工操作,这也是 storage class 的性能。

总结

本文次要学习了 external-provisioner sidecar container 相干原理逻辑,解释了创立一个带有 storage class 的 pvc 时,如何新建一个 k8s pv 对象,以及
如何创立一个后端存储服务的实在 volume。

至此,曾经有了一个 pvc 对象,且该 pvc 对象曾经 bound 了一个带有后端存储服务实在 volume 的 pv,当初就能够在 pod 内应用这个 pvc 了,pod containers 内的 mount path 能够像应用本地
目录一样应用这个 volume path。然而,该 volume path 是如何被 mount 到 pod containers 中的呢?后续有空再更新。

参考文献

一文读懂 K8s 长久化存储流程

从零开始入门 K8s | Kubernetes 存储架构及插件应用

Kubernetes Container Storage Interface (CSI) Documentation

node-driver-registrar

退出移动版