乐趣区

关于容器:CSI-工作原理与JuiceFS-CSI-Driver-的架构设计详解

容器存储接口(Container Storage Interface)简称 CSI,CSI 建设了行业标准接口的标准,借助 CSI 容器编排零碎(CO)能够将任意存储系统裸露给本人的容器工作负载。JuiceFS CSI Driver 通过实现 CSI 接口使得 Kubernetes 上的利用能够通过 PVC(PersistentVolumeClaim)应用 JuiceFS。本文将具体介绍 CSI 的工作原理以及 JuiceFS CSI Driver 的架构设计。

CSI 的根本组件

CSI 的 cloud providers 有两种类型,一种为 in-tree 类型,一种为 out-of-tree 类型。前者是指运行在 K8s 外围组件外部的存储插件;后者是指独立在 K8s 组件之外运行的存储插件。本文次要介绍 out-of-tree 类型的插件。

out-of-tree 类型的插件次要是通过 gRPC 接口跟 K8s 组件交互,并且 K8s 提供了大量的 SideCar 组件来配合 CSI 插件实现丰盛的性能。对于 out-of-tree 类型的插件来说,所用到的组件分为 SideCar 组件和第三方须要实现的插件。

SideCar 组件

external-attacher

监听 VolumeAttachment 对象,并调用 CSI driver Controller 服务的 ControllerPublishVolumeControllerUnpublishVolume 接口,用来将 volume 附着到 node 上,或从 node 上删除。

如果存储系统须要 attach/detach 这一步,就须要应用到这个组件,因为 K8s 外部的 Attach/Detach Controller 不会间接调用 CSI driver 的接口。

external-provisioner

监听 PVC 对象,并调用 CSI driver Controller 服务的 CreateVolumeDeleteVolume 接口,用来提供一个新的 volume。前提是 PVC 中指定的 StorageClass 的 provisioner 字段和 CSI driver Identity 服务的 GetPluginInfo 接口的返回值一样。一旦新的 volume 提供进去,K8s 就会创立对应的 PV。

而如果 PVC 绑定的 PV 的回收策略是 delete,那么 external-provisioner 组件监听到 PVC 的删除后,会调用 CSI driver Controller 服务的 DeleteVolume 接口。一旦 volume 删除胜利,该组件也会删除相应的 PV。

该组件还反对从快照创立数据源。如果在 PVC 中指定了 Snapshot CRD 的数据源,那么该组件会通过 SnapshotContent 对象获取无关快照的信息,并将此内容在调用 CreateVolume 接口的时候传给 CSI driver,CSI driver 须要依据数据源快照来创立 volume。

external-resizer

监听 PVC 对象,如果用户申请在 PVC 对象上申请更多存储,该组件会调用 CSI driver Controller 服务的 NodeExpandVolume 接口,用来对 volume 进行扩容。

external-snapshotter

该组件须要与 Snapshot Controller 配合应用。Snapshot Controller 会依据集群中创立的 Snapshot 对象创立对应的 VolumeSnapshotContent,而 external-snapshotter 负责监听 VolumeSnapshotContent 对象。当监听到 VolumeSnapshotContent 时,将其对应参数通过 CreateSnapshotRequest 传给 CSI driver Controller 服务,调用其 CreateSnapshot 接口。该组件还负责调用 DeleteSnapshotListSnapshots 接口。

livenessprobe

负责监测 CSI driver 的衰弱状况,并通过 Liveness Probe 机制汇报给 K8s,当监测到 CSI driver 有异样时负责重启 pod。

node-driver-registrar

通过间接调用 CSI driver Node 服务的 NodeGetInfo 接口,将 CSI driver 的信息通过 kubelet 的插件注册机制在对应节点的 kubelet 上进行注册。

external-health-monitor-controller

通过调用 CSI driver Controller 服务的 ListVolumes 或者 ControllerGetVolume 接口,来查看 CSI volume 的衰弱状况,并上报在 PVC 的 event 中。

external-health-monitor-agent

通过调用 CSI driver Node 服务的 NodeGetVolumeStats 接口,来查看 CSI volume 的衰弱状况,并上报在 pod 的 event 中。

第三方插件

第三方存储提供方(即 SP,Storage Provider)须要实现 Controller 和 Node 两个插件,其中 Controller 负责 Volume 的治理,以 StatefulSet 模式部署;Node 负责将 Volume mount 到 pod 中,以 DaemonSet 模式部署在每个 node 中。

CSI 插件与 kubelet 以及 K8s 内部组件是通过 Unix Domani Socket gRPC 来进行交互调用的。CSI 定义了三套 RPC 接口,SP 须要实现这三组接口,以便与 K8s 内部组件进行通信。三组接口别离是:CSI Identity、CSI Controller 和 CSI Node,上面具体看看这些接口定义。

CSI Identity

用于提供 CSI driver 的身份信息,Controller 和 Node 都须要实现。接口如下:

service Identity {rpc GetPluginInfo(GetPluginInfoRequest)
    returns (GetPluginInfoResponse) {}

  rpc GetPluginCapabilities(GetPluginCapabilitiesRequest)
    returns (GetPluginCapabilitiesResponse) {}

  rpc Probe (ProbeRequest)
    returns (ProbeResponse) {}}

GetPluginInfo 是必须要实现的,node-driver-registrar 组件会调用这个接口将 CSI driver 注册到 kubelet;GetPluginCapabilities 是用来表明该 CSI driver 次要提供了哪些性能。

CSI Controller

用于实现创立 / 删除 volume、attach/detach volume、volume 快照、volume 扩缩容等性能,Controller 插件须要实现这组接口。接口如下:

service Controller {rpc CreateVolume (CreateVolumeRequest)
    returns (CreateVolumeResponse) {}

  rpc DeleteVolume (DeleteVolumeRequest)
    returns (DeleteVolumeResponse) {}

  rpc ControllerPublishVolume (ControllerPublishVolumeRequest)
    returns (ControllerPublishVolumeResponse) {}

  rpc ControllerUnpublishVolume (ControllerUnpublishVolumeRequest)
    returns (ControllerUnpublishVolumeResponse) {}

  rpc ValidateVolumeCapabilities (ValidateVolumeCapabilitiesRequest)
    returns (ValidateVolumeCapabilitiesResponse) {}

  rpc ListVolumes (ListVolumesRequest)
    returns (ListVolumesResponse) {}

  rpc GetCapacity (GetCapacityRequest)
    returns (GetCapacityResponse) {}

  rpc ControllerGetCapabilities (ControllerGetCapabilitiesRequest)
    returns (ControllerGetCapabilitiesResponse) {}

  rpc CreateSnapshot (CreateSnapshotRequest)
    returns (CreateSnapshotResponse) {}

  rpc DeleteSnapshot (DeleteSnapshotRequest)
    returns (DeleteSnapshotResponse) {}

  rpc ListSnapshots (ListSnapshotsRequest)
    returns (ListSnapshotsResponse) {}

  rpc ControllerExpandVolume (ControllerExpandVolumeRequest)
    returns (ControllerExpandVolumeResponse) {}

  rpc ControllerGetVolume (ControllerGetVolumeRequest)
    returns (ControllerGetVolumeResponse) {option (alpha_method) = true;
    }
}

在下面介绍 K8s 内部组件的时候曾经提到,不同的接口别离提供给不同的组件调用,用于配合实现不同的性能。比方 CreateVolume/DeleteVolume 配合 external-provisioner 实现创立 / 删除 volume 的性能;ControllerPublishVolume/ControllerUnpublishVolume 配合 external-attacher 实现 volume 的 attach/detach 性能等。

CSI Node

用于实现 mount/umount volume、查看 volume 状态等性能,Node 插件须要实现这组接口。接口如下:

service Node {rpc NodeStageVolume (NodeStageVolumeRequest)
    returns (NodeStageVolumeResponse) {}

  rpc NodeUnstageVolume (NodeUnstageVolumeRequest)
    returns (NodeUnstageVolumeResponse) {}

  rpc NodePublishVolume (NodePublishVolumeRequest)
    returns (NodePublishVolumeResponse) {}

  rpc NodeUnpublishVolume (NodeUnpublishVolumeRequest)
    returns (NodeUnpublishVolumeResponse) {}

  rpc NodeGetVolumeStats (NodeGetVolumeStatsRequest)
    returns (NodeGetVolumeStatsResponse) {}

  rpc NodeExpandVolume(NodeExpandVolumeRequest)
    returns (NodeExpandVolumeResponse) {}

  rpc NodeGetCapabilities (NodeGetCapabilitiesRequest)
    returns (NodeGetCapabilitiesResponse) {}

  rpc NodeGetInfo (NodeGetInfoRequest)
    returns (NodeGetInfoResponse) {}}

NodeStageVolume 用来实现多个 pod 共享一个 volume 的性能,反对先将 volume 挂载到一个长期目录,而后通过 NodePublishVolume 将其挂载到 pod 中;NodeUnstageVolume 为其反操作。

工作流程

上面来看看 pod 挂载 volume 的整个工作流程。整个流程流程别离三个阶段:Provision/Delete、Attach/Detach、Mount/Unmount,不过不是每个存储计划都会经验这三个阶段,比方 NFS 就没有 Attach/Detach 阶段。

整个过程不仅仅波及到下面介绍的组件的工作,还波及 ControllerManager 的 AttachDetachController 组件和 PVController 组件以及 kubelet。上面别离详细分析一下 Provision、Attach、Mount 三个阶段。

Provision

先来看 Provision 阶段,整个过程如上图所示。其中 extenal-provisioner 和 PVController 均 watch PVC 资源。

  1. 当 PVController watch 到集群中有 PVC 创立时,会判断以后是否有 in-tree plugin 与之相符,如果没有则判断其存储类型为 out-of-tree 类型,于是给 PVC 打上注解 volume.beta.kubernetes.io/storage-provisioner={csi driver name}
  2. 当 extenal-provisioner watch 到 PVC 的注解 csi driver 与本人的 csi driver 统一时,调用 CSI Controller 的 CreateVolume 接口;
  3. 当 CSI Controller 的 CreateVolume 接口返回胜利时,extenal-provisioner 会在集群中创立对应的 PV;
  4. PVController watch 到集群中有 PV 创立时,将 PV 与 PVC 进行绑定。

Attach

Attach 阶段是指将 volume 附着到节点上,整个过程如上图所示。

  1. ADController 监听到 pod 被调度到某节点,并且应用的是 CSI 类型的 PV,会调用外部的 in-tree CSI 插件的接口,该接口会在集群中创立一个 VolumeAttachment 资源;
  2. external-attacher 组件 watch 到有 VolumeAttachment 资源创立进去时,会调用 CSI Controller 的 ControllerPublishVolume 接口;
  3. 当 CSI Controller 的 ControllerPublishVolume 接口调用胜利后,external-attacher 将对应的 VolumeAttachment 对象的 Attached 状态设为 true;
  4. ADController watch 到 VolumeAttachment 对象的 Attached 状态为 true 时,更新 ADController 外部的状态 ActualStateOfWorld。

Mount

最初一步将 volume 挂载到 pod 里的过程波及到 kubelet。整个流程简略地说是,对应节点上的 kubelet 在创立 pod 的过程中,会调用 CSI Node 插件,执行 mount 操作。上面再针对 kubelet 外部的组件细分进行剖析。

首先 kubelet 创立 pod 的主函数 syncPod 中,kubelet 会调用其子组件 volumeManager 的 WaitForAttachAndMount 办法,期待 volume mount 实现:

func (kl *Kubelet) syncPod(o syncPodOptions) error {
...
    // Volume manager will not mount volumes for terminated pods
    if !kl.podIsTerminated(pod) {
        // Wait for volumes to attach/mount
        if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err)
            klog.Errorf("Unable to attach or mount volumes for pod %q: %v; skipping pod", format.Pod(pod), err)
            return err
        }
    }
...
}

volumeManager 中蕴含两个组件:desiredStateOfWorldPopulator 和 reconciler。这两个组件相互配合就实现了 volume 在 pod 中的 mount 和 umount 过程。整个过程如下:

desiredStateOfWorldPopulator 和 reconciler 的协同模式是生产者和消费者的模式。volumeManager 中保护了两个队列(严格来讲是 interface,但这里充当了队列的作用),即 DesiredStateOfWorld 和 ActualStateOfWorld,前者保护的是以后节点中 volume 的冀望状态;后者保护的是以后节点中 volume 的理论状态。

而 desiredStateOfWorldPopulator 在本人的循环中只做了两个事件,一个是从 kubelet 的 podManager 中获取以后节点新建的 Pod,将其须要挂载的 volume 信息记录到 DesiredStateOfWorld 中;另一件事是从 podManager 中获取以后节点中被删除的 pod,查看其 volume 是否在 ActualStateOfWorld 的记录中,如果没有,将其在 DesiredStateOfWorld 中也删除,从而保障 DesiredStateOfWorld 记录的是节点中所有 volume 的冀望状态。相干代码如下(为了精简逻辑,删除了局部代码):

// Iterate through all pods and add to desired state of world if they don't
// exist but should
func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() {
    // Map unique pod name to outer volume name to MountedVolume.
    mountedVolumesForPod := make(map[volumetypes.UniquePodName]map[string]cache.MountedVolume)
    ...
    processedVolumesForFSResize := sets.NewString()
    for _, pod := range dswp.podManager.GetPods() {dswp.processPodVolumes(pod, mountedVolumesForPod, processedVolumesForFSResize)
    }
}

// processPodVolumes processes the volumes in the given pod and adds them to the
// desired state of the world.
func (dswp *desiredStateOfWorldPopulator) processPodVolumes(
    pod *v1.Pod,
    mountedVolumesForPod map[volumetypes.UniquePodName]map[string]cache.MountedVolume,
    processedVolumesForFSResize sets.String) {uniquePodName := util.GetUniquePodName(pod)
    ...
    for _, podVolume := range pod.Spec.Volumes {   
        pvc, volumeSpec, volumeGidValue, err :=
            dswp.createVolumeSpec(podVolume, pod, mounts, devices)

        // Add volume to desired state of world
        _, err = dswp.desiredStateOfWorld.AddPodToVolume(uniquePodName, pod, volumeSpec, podVolume.Name, volumeGidValue)
        dswp.actualStateOfWorld.MarkRemountRequired(uniquePodName)
    }
}

而 reconciler 就是消费者,它次要做了三件事:

  1. unmountVolumes():在 ActualStateOfWorld 中遍历 volume,判断其是否在 DesiredStateOfWorld 中,如果不在,则调用 CSI Node 的接口执行 unmount,并在 ActualStateOfWorld 中记录;
  2. mountAttachVolumes():从 DesiredStateOfWorld 中获取须要被 mount 的 volume,调用 CSI Node 的接口执行 mount 或扩容,并在 ActualStateOfWorld 中做记录;
  3. unmountDetachDevices():在 ActualStateOfWorld 中遍历 volume,若其曾经 attach,但没有应用的 pod,并在 DesiredStateOfWorld 也没有记录,则将其 unmount/detach 掉。

咱们以 mountAttachVolumes() 为例,看看其如何调用 CSI Node 的接口。

func (rc *reconciler) mountAttachVolumes() {
    // Ensure volumes that should be attached/mounted are attached/mounted.
    for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
        volumeToMount.DevicePath = devicePath
        if cache.IsVolumeNotAttachedError(err) {...} else if !volMounted || cache.IsRemountRequiredError(err) {
            // Volume is not mounted, or is already mounted, but requires remounting
            err := rc.operationExecutor.MountVolume(
                rc.waitForAttachTimeout,
                volumeToMount.VolumeToMount,
                rc.actualStateOfWorld,
                isRemount)
            ...
        } else if cache.IsFSResizeRequiredError(err) {
            err := rc.operationExecutor.ExpandInUseVolume(
                volumeToMount.VolumeToMount,
                rc.actualStateOfWorld)
            ...
        }
    }
}

执行 mount 的操作全在 rc.operationExecutor 中实现,再看 operationExecutor 的代码:

func (oe *operationExecutor) MountVolume(
    waitForAttachTimeout time.Duration,
    volumeToMount VolumeToMount,
    actualStateOfWorld ActualStateOfWorldMounterUpdater,
    isRemount bool) error {
    ...
    var generatedOperations volumetypes.GeneratedOperations
        generatedOperations = oe.operationGenerator.GenerateMountVolumeFunc(waitForAttachTimeout, volumeToMount, actualStateOfWorld, isRemount)

    // Avoid executing mount/map from multiple pods referencing the
    // same volume in parallel
    podName := nestedpendingoperations.EmptyUniquePodName

    return oe.pendingOperations.Run(volumeToMount.VolumeName, podName, "" /* nodeName */, generatedOperations)
}

该函数先结构执行函数,再执行,那么再看构造函数:

func (og *operationGenerator) GenerateMountVolumeFunc(
    waitForAttachTimeout time.Duration,
    volumeToMount VolumeToMount,
    actualStateOfWorld ActualStateOfWorldMounterUpdater,
    isRemount bool) volumetypes.GeneratedOperations {

    volumePlugin, err :=
        og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)

    mountVolumeFunc := func() volumetypes.OperationContext {
        // Get mounter plugin
        volumePlugin, err := og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)
        volumeMounter, newMounterErr := volumePlugin.NewMounter(
            volumeToMount.VolumeSpec,
            volumeToMount.Pod,
            volume.VolumeOptions{})
        ...
        // Execute mount
        mountErr := volumeMounter.SetUp(volume.MounterArgs{FsUser:              util.FsUserFrom(volumeToMount.Pod),
            FsGroup:             fsGroup,
            DesiredSize:         volumeToMount.DesiredSizeLimit,
            FSGroupChangePolicy: fsGroupChangePolicy,
        })
        // Update actual state of world
        markOpts := MarkVolumeOpts{
            PodName:             volumeToMount.PodName,
            PodUID:              volumeToMount.Pod.UID,
            VolumeName:          volumeToMount.VolumeName,
            Mounter:             volumeMounter,
            OuterVolumeSpecName: volumeToMount.OuterVolumeSpecName,
            VolumeGidVolume:     volumeToMount.VolumeGidValue,
            VolumeSpec:          volumeToMount.VolumeSpec,
            VolumeMountState:    VolumeMounted,
        }

        markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(markOpts)
        ...
        return volumetypes.NewOperationContext(nil, nil, migrated)
    }

    return volumetypes.GeneratedOperations{
        OperationName:     "volume_mount",
        OperationFunc:     mountVolumeFunc,
        EventRecorderFunc: eventRecorderFunc,
        CompleteFunc:      util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePluginName, volumeToMount.VolumeSpec), "volume_mount"),
    }
}

这里先去注册到 kubelet 的 CSI 的 plugin 列表中找到对应的插件,而后再执行 volumeMounter.SetUp,最初更新 ActualStateOfWorld 的记录。这里负责执行 external CSI 插件的是 csiMountMgr,代码如下:

func (c *csiMountMgr) SetUp(mounterArgs volume.MounterArgs) error {return c.SetUpAt(c.GetPath(), mounterArgs)
}

func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {csi, err := c.csiClientGetter.Get()
    ...

    err = csi.NodePublishVolume(
        ctx,
        volumeHandle,
        readOnly,
        deviceMountPath,
        dir,
        accessMode,
        publishContext,
        volAttribs,
        nodePublishSecrets,
        fsType,
        mountOptions,
    )
    ...
    return nil
}

能够看到,在 kubelet 中调用 CSI Node NodePublishVolume/NodeUnPublishVolume 接口的是 volumeManager 的 csiMountMgr。至此,整个 Pod 的 volume 流程就曾经梳理分明了。

JuiceFS CSI Driver 工作原理

接下来再来看看 JuiceFS CSI Driver 的工作原理。架构图如下:


JuiceFS 在 CSI Node 接口 NodePublishVolume 中创立 pod,用来执行 juicefs mount xxx,从而保障 juicefs 客户端运行在 pod 里。如果有多个的业务 pod 共用一份存储,mount pod 会在 annotation 进行援用计数,确保不会反复创立。具体的代码如下(为了不便浏览,省去了日志等无关代码):

func (p *PodMount) JMount(jfsSetting *jfsConfig.JfsSetting) error {if err := p.createOrAddRef(jfsSetting); err != nil {return err}
    return p.waitUtilPodReady(GenerateNameByVolumeId(jfsSetting.VolumeId))
}

func (p *PodMount) createOrAddRef(jfsSetting *jfsConfig.JfsSetting) error {
    ...
    
    for i := 0; i < 120; i++ {
        // wait for old pod deleted
        oldPod, err := p.K8sClient.GetPod(podName, jfsConfig.Namespace)
        if err == nil && oldPod.DeletionTimestamp != nil {time.Sleep(time.Millisecond * 500)
            continue
        } else if err != nil {if K8serrors.IsNotFound(err) {newPod := r.NewMountPod(podName)
                if newPod.Annotations == nil {newPod.Annotations = make(map[string]string)
                }
                newPod.Annotations[key] = jfsSetting.TargetPath
                po, err := p.K8sClient.CreatePod(newPod)
                ...
                return err
            }
            return err
        }
      ...
        return p.AddRefOfMount(jfsSetting.TargetPath, podName)
    }
    return status.Errorf(codes.Internal, "Mount %v failed: mount pod %s has been deleting for 1 min", jfsSetting.VolumeId, podName)
}

func (p *PodMount) waitUtilPodReady(podName string) error {
    // Wait until the mount pod is ready
    for i := 0; i < 60; i++ {pod, err := p.K8sClient.GetPod(podName, jfsConfig.Namespace)
        ...
        if util.IsPodReady(pod) {return nil}
        time.Sleep(time.Millisecond * 500)
    }
    ...
    return status.Errorf(codes.Internal, "waitUtilPodReady: mount pod %s isn't ready in 30 seconds: %v", podName, log)
}

每当有业务 pod 退出时,CSI Node 会在接口 NodeUnpublishVolume 删除 mount pod annotation 中对应的计数,当最初一个记录被删除时,mount pod 才会被删除。具体代码如下(为了不便浏览,省去了日志等无关代码):

func (p *PodMount) JUmount(volumeId, target string) error {
   ...
    err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {po, err := p.K8sClient.GetPod(pod.Name, pod.Namespace)
        if err != nil {return err}
        annotation := po.Annotations
        ...
        delete(annotation, key)
        po.Annotations = annotation
        return p.K8sClient.UpdatePod(po)
    })
    ...

    deleteMountPod := func(podName, namespace string) error {return retry.RetryOnConflict(retry.DefaultBackoff, func() error {po, err := p.K8sClient.GetPod(podName, namespace)
            ...
            shouldDelay, err = util.ShouldDelay(po, p.K8sClient)
            if err != nil {return err}
            if !shouldDelay {
                // do not set delay delete, delete it now
                if err := p.K8sClient.DeletePod(po); err != nil {return err}
            }
            return nil
        })
    }

    newPod, err := p.K8sClient.GetPod(pod.Name, pod.Namespace)
    ...
    if HasRef(newPod) {return nil}
    return deleteMountPod(pod.Name, pod.Namespace)
}

CSI Driver 与 juicefs 客户端解耦,做降级不会影响到业务容器;将客户端独立在 pod 中运行也就使其在 K8s 的管控内,可观测性更强;同时 pod 的益处咱们也能享受到,比方隔离性更强,能够独自设置客户端的资源配额等。

总结

本文从 CSI 的组件、CSI 接口、volume 如何挂载到 pod 上,三个方面动手,剖析了 CSI 整个体系工作的过程,并介绍了 JuiceFS CSI Driver 的工作原理。CSI 是整个容器生态的规范存储接口,CO 通过 gRPC 形式和 CSI 插件通信,而为了做到普适,K8s 设计了很多内部组件来配合 CSI 插件来实现不同的性能,从而保障了 K8s 外部逻辑的纯正以及 CSI 插件的简略易用。

如有帮忙的话欢送关注咱们我的项目 Juicedata/JuiceFS 哟!(0ᴗ0✿)

退出移动版