关于后端:Kubernetes-存储详解

1次阅读

共计 26137 个字符,预计需要花费 66 分钟才能阅读完成。

概述

因为容器默认状况下 rootfs 零碎树没有一个与之对应的存储设备,因而能够认为容器中任何文件操作都是临时性的,这样的设计带来了两个问题,其一是如果容器因为某些起因被 kubelet 重启后,会失落这些文件;其二是不同的容器之间无奈共享文件。为了解决上述问题,Kubernetes 设计了卷(Volume)这一形象来治理容器中所有须要应用到的内部文件。

并且因为容器自身存在申明周期以及容器的存储起源有多样性的特点,卷自身带有独立的状态标识实现其生命周期循环,杜绝业务场景的不同,卷又细分为长久卷、长期卷、投射卷三大类。

为了让集群管理员能够治理更多不同个性的长久卷,Kubernetes 又设计了存储类(StorageClass) 来治理每一类具备雷同个性的长久卷,在后续的 Kubernetes 版本迭代过程中陆续退出了一些其余的个性,例如:为了能够将任意第三方存储裸露给容器,减少了容器存储接口 (CSI);为了减少存储的可靠性,减少相似传统文件系统中快照概念的卷快照 (Snapshot) 资源;为了能够对接任意第三方对象存储,减少了容器对象存储接口(COSI)。

长久卷(Persistent Volume)

PersistentVolume 子系统为用户和管理员提供了一组 API,通过引入了两个新的 API 资源:PersistentVolume 存储提供者进行治理;PersistentVolumeClaim 被存储使用者援用。

长久卷(PersistentVolume,PV)是集群中的一块存储,能够由管理员当时制备 (Provision),或者应用存储类(Storage Class)来动静制备 (Provision)。长久卷是集群资源,就像节点也是集群资源一样。PV 长久卷和一般的 Volume 一样,也是应用卷插件来实现的,只是它们领有独立于任何应用 PV 的 Pod 的生命周期。

apiVersion: v1
kind: PersistentVolume
metadata:
  name: foo-pv
spec:
  storageClassName: ""
  claimRef:
    name: foo-pvc
    namespace: foo

长久卷申领(PersistentVolumeClaim,PVC)表白的是用户对存储的申请。概念上与 Pod 相似。Pod 会耗用节点资源,而 PVC 申领会耗用 PV 资源。Pod 能够申请特定数量的资源(CPU 和内存);同样 PVC 申领也能够申请特定的大小和拜访模式(例如,能够要求 PV 卷可能以 ReadWriteOnce、ReadOnlyMany 或 ReadWriteMany 模式之一来挂载,参见拜访模式)。

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: foo-pvc
  namespace: foo
spec:
  storageClassName: ""
  volumeName: foo-pv

只管 PersistentVolumeClaim 容许用户耗费形象的存储资源,常见的状况是针对不同的问题用户须要的是具备不同属性(如,性能)的 PersistentVolume 卷。集群管理员须要可能提供不同性质的 PersistentVolume,并且这些 PV 卷之间的差异不仅限于卷大小和拜访模式,同时又不能将卷是如何实现的这些细节裸露给用户。为了满足这类需要,就有了存储类(StorageClass)资源。

存储卷生命周期治理

1. 制备 (Provision)

制备 (Provision) 个别是只在 Kubernetes API Server 中的 PV 资源以及在存储系统中申请存储空间。PV 卷的制备有两种形式:动态制备或动静制备。

动态制备时,集群管理员创立若干 PV 卷,这些卷对象带有实在存储的细节信息。

动静制备时,集群管理员在 Kubernetes 上创立一个存储类,当集群发现没有 PV 能够与 PVC 匹配时,由存储类来治理 PV 资源的创立和存储空间的创立。

为了基于存储类实现动静的存储制备,集群管理员须要在 API 服务器上启用 DefaultStorageClass 准入控制器。

2. 绑定 (Bind)

后面咱们讲过 Pod 中每申明一个 PVC 资源,则生产一个与之对应的 PV 资源,这一过程也被咱们成为绑定 (Bind)。集群管制立体监测到新的 PVC 对象,并寻找一个与之匹配的 PV 卷,并将二者绑定到一起(将 PV 信息写入 PVC 对象的 ClaimRef 信息中,并将 PV 状态批改 Bound)。

如果找不到匹配的 PV 卷,PVC 会无限期地处于未绑定状态。例如,即便某集群上制备了很多 50 Gi 大小的 PV 卷,也无奈与申请 100 Gi 大小的存储的 PVC 匹配。当新的 100 Gi PV 卷被退出到集群时,该 PVC 才有可能被绑定。

3. 应用 (Using)

Pod 将 PVC 申领当做存储卷来应用。集群会检视 PVC 申领,找到所绑定的卷,并为 Pod 挂载该卷。对于反对多种拜访模式的卷,用户要在 Pod 中以卷的模式应用申领时指定冀望的拜访模式。

一旦用户有了申领对象并且该申领曾经被绑定,则所绑定的 PV 卷在用户依然须要它期间始终属于该用户。用户通过在 Pod 的 volumes 块中蕴含 persistentVolumeClaim 节区来调度 Pod,拜访所申领的 PV 卷。相干细节可参阅应用申领作为卷。

apiVersion: v1
kind: Pod
metadata:
  name: volume-test
spec:
  containers:
  - name: container-test
    image: busybox:1.28
    volumeMounts:
    - name: foo
      mountPath: "/mnt/foo"
  volumes:
  - name: foo
    persistentVolumeClaim:
        claimName: foo-pvc

回收策略 (Reclaim Policy)

当用户不再应用其存储卷时,他们能够从 API 中将 PVC 对象删除,从而容许该资源被回收再利用。PersistentVolume 对象的回收策略通知集群,当其被从申领中开释时如何解决该数据卷。目前,数据卷能够被 Retained(保留)、Recycled(回收)或 Deleted(删除)。

1. 保留(Retain)

回收策略 Retain 使得用户能够手动回收资源。当 PersistentVolumeClaim 对象被删除时,PersistentVolume 卷依然存在,对应的数据卷被视为 ” 已开释(released)”。因为卷上依然存在这前一申领人的数据,该卷还不能用于其余申领。管理员能够通过上面的步骤来手动回收该卷:

删除 PersistentVolume 对象。与之相干的、位于内部基础设施中的存储资产(例如 AWS EBS、GCE PD、Azure Disk 或 Cinder 卷)在 PV 删除之后依然存在。
依据状况,手动革除所关联的存储资产上的数据。
手动删除所关联的存储资产。
如果你心愿重用该存储资产,能够基于存储资产的定义创立新的 PersistentVolume 卷对象。

2. 删除(Delete)

对于反对 Delete 回收策略的卷插件,删除动作会将 PersistentVolume 对象从 Kubernetes 中移除,同时也会从内部基础设施(如 AWS EBS、GCE PD、Azure Disk 或 Cinder 卷)中移除所关联的存储资产。动静制备的卷会继承其 StorageClass 中设置的回收策略,该策略默认为 Delete。管理员须要依据用户的冀望来配置 StorageClass;否则 PV 卷被创立之后必须要被编辑或者修补。参阅更改 PV 卷的回收策略。

3. 回收(Recycle)

如果上层的卷插件反对,回收策略 Recycle 会在卷上执行一些根本的擦除(rm -rf /thevolume/*)操作,之后容许该卷用于新的 PVC。

长期卷

有些应用程序须要额定的存储,但并不关怀数据在重启后是否依然可用。例如,缓存服务常常受限于内存大小,而且能够将不罕用的数据转移到比内存慢的存储中,对总体性能的影响并不大。

另有些应用程序须要以文件模式注入的只读数据,比方配置数据或密钥。

长期卷 就是为此类用例设计的。因为卷会听从 Pod 的生命周期,与 Pod 一起创立和删除,所以进行和重新启动 Pod 时,不会受长久卷在何处可用的限度。

长期卷在 Pod 规约中以 内联 形式定义,这简化了应用程序的部署和治理。

Kubernetes 为了不同的用处,反对几种不同类型的长期卷:

emptyDir:
Pod 启动时为空,存储空间来自本地的 kubelet 根目录(通常是根磁盘)或内存。

configMap、downwardAPI、secret:
将不同类型的 Kubernetes 数据注入到 Pod 中

CSI 长期卷:
相似于后面的卷类型,但由专门反对此个性 的指定 CSI 驱动程序提供。

通用长期卷:
它能够由所有反对长久卷的存储驱动程序提供。

emptyDir、configMap、downwardAPI、secret 是作为 本地长期存储 提供的。它们由各个节点上的 kubelet 治理。

CSI 长期卷 必须 由第三方 CSI 存储驱动程序提供。

通用长期卷 能够 由第三方 CSI 存储驱动程序提供,也能够由反对动静制备的任何其余存储驱动程序提供。一些专门为 CSI 长期卷编写的 CSI 驱动程序,不反对动静制备:因而这些驱动程序不能用于通用长期卷。

应用第三方驱动程序的劣势在于,它们能够提供 Kubernetes 自身不反对的性能,例如,与 kubelet 治理的磁盘具备不同性能特色的存储,或者用来注入不同的数据。

投射卷

一个 projected 卷能够将若干现有的卷源映射到同一个目录之上。

目前,源能够被投射的卷包含:secret, downwardAPI, configMap, serviceAccountToken。

所有的卷源都要求处于 Pod 所在的同一个名字空间内。进一步的详细信息,可参考 一体化卷设计文档。

apiVersion: v1
kind: Pod
metadata:
  name: volume-test
spec:
  containers:
  - name: container-test
    image: busybox:1.28
    volumeMounts:
    - name: all-in-one
      mountPath: "/projected-volume"
      readOnly: true
  volumes:
  - name: all-in-one
    projected:
      sources:
      - secret:
          name: mysecret
          items:
            - key: username
              path: my-group/my-username
      - downwardAPI:
          items:
            - path: "labels"
              fieldRef:
                fieldPath: metadata.labels
            - path: "cpu_limit"
              resourceFieldRef:
                containerName: container-test
                resource: limits.cpu
      - configMap:
          name: myconfigmap
          items:
            - key: config
              path: my-group/my-config

存储架构

为了存储卷治理的性能,Kubernetes 设计了 4 大组件,从顶层向一下顺次为:

Volume Plugins — 存储提供的扩大接口, 蕴含了各类存储提供者的 plugin 实现。其中 Volume Plugins 是一个根底部件,性能上来说 Volume Manager、PV Controller、AD Controller 三局部。

Volume Manager — 运行在 kubelet 里让存储 Ready 的部件,负责管理数据卷的 Mount/Umount 操作(也负责数据卷的 Attach/Detach 操作,需配置 kubelet 相干参数开启该个性)、卷设施的格式化等等。

PV Controller — 运行在管制立体的 kube-controller-manager 上的组件,次要负责 PV/PVC 绑定及周期治理,依据需要进行对 PVC 的 Provision/Delete 操作;所谓将一个 PV 与 PVC 进行“绑定”,其实就是将这个 PV 对象的名字,填在了 PVC 对象的 spec.volumeName 字段上。

AD(Attach/Detach) Controller — 运行在管制立体的 kube-controller-manager 上的组件,负责数据卷的 Attach/Detach 操作,将设施挂接到指标节点。

存储卷挂载

PV Controller 和 K8S 其它组件一样监听 API Server 中的资源更新,对于卷治理次要是监听 PV,PVC,SC 三类资源,当监听到这些资源的创立、删除、批改时,PV Controller 通过判断是须要做创立、删除、绑定、回收等动作(后续会开展介绍外部逻辑),而后依据须要调用 Volume Plugins 进行业务解决。

存储卷治理外围做三个动作:

Provision/Delete(创立 / 删除存储卷,解决 pv 和 pvc 之间的关系)

Attach/Detach(挂接和摘除存储卷,解决的是 volumes 和 node 上目录之间的关系)

Mount/Unmount(挂载和摘除目录,解决的是 volumes 和 pod 之间的关系)

CSI 存储插件流程剖析

以 nfs plugin 插件为例,剖析 Kubernetes 集群如何为用户提供存储资源。

CSI 简介

csi 插件的实现,官网曾经封装好的 lib,咱们只有实现对应的接口就能够了,目前咱们能够实现的插件有两种类型,如下:

Controller Plugin,负责存储对象(Volume)的生命周期治理,在集群中仅须要有一个即可;

Node Plugin,在必要时与应用 Volume 的容器所在的节点交互,提供诸如节点上的 Volume 挂载 / 卸载等动作反对,如有须要则在每个服务节点上均部署。

官网提供了 rpc 接口实现下面两个插件,如下:

CSI Identity Service(身份服务)

Node Plugin 和 Controller Plugin 都必须实现这些 RPC 集,接口如下:

service Identity {
    // GetPluginInfo,获取 Plugin 根本信息
  rpc GetPluginInfo(GetPluginInfoRequest)
    returns (GetPluginInfoResponse) {}

    // GetPluginCapabilities,获取 Plugin 反对的能力
  rpc GetPluginCapabilities(GetPluginCapabilitiesRequest)
    returns (GetPluginCapabilitiesResponse) {}

    //Probe,探测 Plugin 的衰弱状态
  rpc Probe (ProbeRequest)
    returns (ProbeResponse) {}}
CSI Controller Service(控制器服务)

控制器服务,Controller Plugin 必须实现这些 RPC 集。CSI Controller 服务里定义的这些操作有个独特特点,那就是它们都无需在宿主机上进行,而是属于 Kubernetes 里 Volume Controller 的逻辑,也就是属于 Master 节点的一部分。须要留神的是,正如我在后面提到的那样,CSI Controller 服务的理论调用者,并不是 Kubernetes(即:通过 pkg/volume/csi 发动 CSI 申请),而是 External Provisioner 和 External Attacher。这两个 External Components,别离通过监听 PVC 和 VolumeAttachement 对象,来跟 Kubernetes 进行合作。

service Controller {
    // Volume CRUD,包含了扩容和容量探测等 Volume 状态查看与操作接口
  rpc CreateVolume (CreateVolumeRequest)
    returns (CreateVolumeResponse) {}

  rpc DeleteVolume (DeleteVolumeRequest)
    returns (DeleteVolumeResponse) {}

    // Publish/Unpublish,也就是对 CSI Volume 进行 Attach/Dettach,还包含 Node 对 Volume 的拜访权限治理
  rpc ControllerPublishVolume (ControllerPublishVolumeRequest)
    returns (ControllerPublishVolumeResponse) {}

  rpc ControllerUnpublishVolume (ControllerUnpublishVolumeRequest)
    returns (ControllerUnpublishVolumeResponse) {}

  rpc ValidateVolumeCapabilities (ValidateVolumeCapabilitiesRequest)
    returns (ValidateVolumeCapabilitiesResponse) {}

  rpc ListVolumes (ListVolumesRequest)
    returns (ListVolumesResponse) {}

  rpc GetCapacity (GetCapacityRequest)
    returns (GetCapacityResponse) {}

  rpc ControllerGetCapabilities (ControllerGetCapabilitiesRequest)
    returns (ControllerGetCapabilitiesResponse) {}

    // Snapshot CRD,快照的创立和删除操作,目前 CSI 定义的 Snapshot 仅用于创立 Volume,未提供回滚的语义
  rpc CreateSnapshot (CreateSnapshotRequest)
    returns (CreateSnapshotResponse) {}

  rpc DeleteSnapshot (DeleteSnapshotRequest)
    returns (DeleteSnapshotResponse) {}

  rpc ListSnapshots (ListSnapshotsRequest)
    returns (ListSnapshotsResponse) {}

  rpc ControllerExpandVolume (ControllerExpandVolumeRequest)
    returns (ControllerExpandVolumeResponse) {}

  rpc ControllerGetVolume (ControllerGetVolumeRequest)
    returns (ControllerGetVolumeResponse) {option (alpha_method) = true;
    }
}
CSI Controller Service(控制器服务)

节点服务:Node Plugin 必须实现这些 RPC 集。CSI Volume 须要在宿主机上执行的操作,都定义在了 CSI Node 服务外面

service Node {
    // Node Stage/Unstage/Publish/Unpublish/GetStats Volume,节点上 Volume 的连贯状态治理,也就是 mount 是由 NodeStageVolume 和 NodePublishVolume 两个接口独特实现的。rpc NodeStageVolume (NodeStageVolumeRequest)
    returns (NodeStageVolumeResponse) {}

  rpc NodeUnstageVolume (NodeUnstageVolumeRequest)
    returns (NodeUnstageVolumeResponse) {}

  rpc NodePublishVolume (NodePublishVolumeRequest)
    returns (NodePublishVolumeResponse) {}

  rpc NodeUnpublishVolume (NodeUnpublishVolumeRequest)
    returns (NodeUnpublishVolumeResponse) {}

  rpc NodeGetVolumeStats (NodeGetVolumeStatsRequest)
    returns (NodeGetVolumeStatsResponse) {}

    // Node Expand Volume, 节点上的 Volume 扩容操作,在 volume 逻辑大小扩容之后,可能还须要同步的扩容 Volume 之上的文件系统并让应用 Volume 的 Container 感知到,所以在 Node Plugin 上须要有对应的接口
  rpc NodeExpandVolume(NodeExpandVolumeRequest)
    returns (NodeExpandVolumeResponse) {}

    // Node Get Capabilities/Info,Plugin 的根底属性与 Node 的属性查问

  rpc NodeGetCapabilities (NodeGetCapabilitiesRequest)
    returns (NodeGetCapabilitiesResponse) {}

  rpc NodeGetInfo (NodeGetInfoRequest)
    returns (NodeGetInfoResponse) {}}

Driver Registrar 组件,负责将插件注册到 kubelet 外面(这能够类比为,将可执行文件放在插件目录下)。而在具体实现上,Driver Registrar 须要申请 CSI 插件的 Identity 服务来获取插件信息。

External Provisioner 组件,负责的正是 Provision 阶段。在具体实现上,External Provisioner 监听(Watch)了 APIServer 里的 PVC 对象。当一个 PVC 被创立时,它就会调用 CSI Controller 的 CreateVolume 办法,为你创立对应 PV。

External Attacher 组件,负责的正是“Attach 阶段”。在具体实现上,它监听了 APIServer 里 VolumeAttachment 对象的变动。VolumeAttachment 对象是 Kubernetes 确认一个 Volume 能够进入“Attach 阶段”的重要标记。一旦呈现了 VolumeAttachment 对象,External Attacher 就会调用 CSI Controller 服务的 ControllerPublish 办法,实现它所对应的 Volume 的 Attach 阶段。

Volume 的“Mount 阶段”,并不属于 External Components 的职责。当 kubelet 的
VolumeManagerReconciler 管制循环查看到它须要执行 Mount 操作的时候,会通过 pkg/volume/csi 包,间接调用 CSI Node 服务实现 Volume 的“Mount 阶段”。

如果你要实现一个本人的 CSI Driver 你须要至多提供两个 gRPC 服务:CSI Identity service,负责 CSI 插件的辨认工作;CSI Node Driver 负责

nfs plugin 我的项目门路:https://github.com/kubernetes…
nfs controller 我的项目门路:https://github.com/kubernetes…

部署

部署形式

# 装置存储插件
curl -skSL https://raw.githubusercontent.com/kubernetes-csi/csi-driver-nfs/master/deploy/install-driver.sh | bash -s master --

# 查看 Pod 工作状态
kubectl -n kube-system get pod -o wide -l app=csi-nfs-controller
kubectl -n kube-system get pod -o wide -l app=csi-nfs-node

# 后果
NAME                                       READY   STATUS    RESTARTS   AGE     IP             NODE
csi-nfs-controller-56bfddd689-dh5tk       4/4     Running   0          35s     10.240.0.19    k8s-agentpool-22533604-0
csi-nfs-node-cvgbs                        3/3     Running   0          35s     10.240.0.35    k8s-agentpool-22533604-1
csi-nfs-node-dr4s4                        3/3     Running   0          35s     10.240.0.4     k8s-agentpool-22533604-0

csi nfs driver 蕴含 controller 和 nodeplugin 两局部,controller 负责存储资源管理,nodeplugin 负责将存储卷加载到容器中。

controller pod 中有两个工作组件,一个是 provisioner 容器,负责的正是 Provision 阶段;一个 nfsplugin 容器,负责存储卷申明周期治理。

node pod 中有两个工作组件,一个 node-driver-registrar 容器,负责向 Kubelet 注册一个 CSI 插件;一个 nfs 容器,负责 PV 挂载。

nfscontroller 源码解析

1. 服务入口

controller 服务的主流程间接写在了 main 函数中,间接看代码:

func main() {
    // ...

    // 客户端初始化
    grpcClient, err := ctrl.Connect(*csiEndpoint, metricsManager)
    if err != nil {klog.Error(err.Error())
        os.Exit(1)
    }

    err = ctrl.Probe(grpcClient, *operationTimeout)
    if err != nil {klog.Error(err.Error())
        os.Exit(1)
    }

    // 查问集群中 driver 对应的 provisionerName
    provisionerName, err := ctrl.GetDriverName(grpcClient, *operationTimeout)
    if err != nil {klog.Fatalf("Error getting CSI driver name: %s", err)
    }
    klog.V(2).Infof("Detected CSI driver %s", provisionerName)
    metricsManager.SetDriverName(provisionerName)

    translator := csitrans.New()
    supportsMigrationFromInTreePluginName := ""
    if translator.IsMigratedCSIDriverByName(provisionerName) {supportsMigrationFromInTreePluginName, err = translator.GetInTreeNameFromCSIName(provisionerName)
        if err != nil {klog.Fatalf("Failed to get InTree plugin name for migrated CSI plugin %s: %v", provisionerName, err)
        }
        klog.V(2).Infof("Supports migration from in-tree plugin: %s", supportsMigrationFromInTreePluginName)

        // Create a new connection with the metrics manager with migrated label
        metricsManager = metrics.NewCSIMetricsManagerWithOptions(provisionerName,
            // Will be provided via default gatherer.
            metrics.WithProcessStartTime(false),
            metrics.WithMigration())
        migratedGrpcClient, err := ctrl.Connect(*csiEndpoint, metricsManager)
        if err != nil {klog.Error(err.Error())
            os.Exit(1)
        }
        grpcClient.Close()
        grpcClient = migratedGrpcClient

        err = ctrl.Probe(grpcClient, *operationTimeout)
        if err != nil {klog.Error(err.Error())
            os.Exit(1)
        }
    }

    // 筹备资源采集、选主、监控查看
    mux := http.NewServeMux()
    gatherers := prometheus.Gatherers{
        legacyregistry.DefaultGatherer,
        metricsManager.GetRegistry(),}

    pluginCapabilities, controllerCapabilities, err := ctrl.GetDriverCapabilities(grpcClient, *operationTimeout)
    if err != nil {klog.Fatalf("Error getting CSI driver capabilities: %s", err)
    }

    // 为 provisioner 生成惟一的 ID
    timeStamp := time.Now().UnixNano() / int64(time.Millisecond)
    identity := strconv.FormatInt(timeStamp, 10) + "-" + strconv.Itoa(rand.Intn(10000)) + "-" + provisionerName
    if *enableNodeDeployment {identity = identity + "-" + node}

    // 创立一个共享的 informer
    factory := informers.NewSharedInformerFactory(clientset, ctrl.ResyncPeriodOfCsiNodeInformer)
    var factoryForNamespace informers.SharedInformerFactory 

    // 监听 StorageClass 和 PVC
    scLister := factory.Storage().V1().StorageClasses().Lister()
    claimLister := factory.Core().V1().PersistentVolumeClaims().Lister()

    var vaLister storagelistersv1.VolumeAttachmentLister
    if controllerCapabilities[csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME] {klog.Info("CSI driver supports PUBLISH_UNPUBLISH_VOLUME, watching VolumeAttachments")
        vaLister = factory.Storage().V1().VolumeAttachments().Lister()
    } else {klog.Info("CSI driver does not support PUBLISH_UNPUBLISH_VOLUME, not watching VolumeAttachments")
    }

    // 默认不开启 enableNodeDeployment
    var nodeDeployment *ctrl.NodeDeployment
    if *enableNodeDeployment {
        nodeDeployment = &ctrl.NodeDeployment{
            NodeName:         node,
            ClaimInformer:    factory.Core().V1().PersistentVolumeClaims(),
            ImmediateBinding: *nodeDeploymentImmediateBinding,
            BaseDelay:        *nodeDeploymentBaseDelay,
            MaxDelay:         *nodeDeploymentMaxDelay,
        }
        nodeInfo, err := ctrl.GetNodeInfo(grpcClient, *operationTimeout)
        if err != nil {klog.Fatalf("Failed to get node info from CSI driver: %v", err)
        }
        nodeDeployment.NodeInfo = *nodeInfo
    }

    // 节点监听
    var nodeLister listersv1.NodeLister
    var csiNodeLister storagelistersv1.CSINodeLister
    if ctrl.SupportsTopology(pluginCapabilities) {
        if nodeDeployment != nil {// ...} else {csiNodeLister = factory.Storage().V1().CSINodes().Lister()
            nodeLister = factory.Core().V1().Nodes().Lister()
        }
    }

    // PVC Informer
    rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax)
    claimQueue := workqueue.NewNamedRateLimitingQueue(rateLimiter, "claims")
    claimInformer := factory.Core().V1().PersistentVolumeClaims().Informer()

    // 为 external-provisioner 增加 options
    provisionerOptions := []func(*controller.ProvisionController) error{controller.LeaderElection(false), // Always disable leader election in provisioner lib. Leader election should be done here in the CSI provisioner level instead.
        controller.FailedProvisionThreshold(0),
        controller.FailedDeleteThreshold(0),
        controller.RateLimiter(rateLimiter),
        controller.Threadiness(int(*workerThreads)),
        controller.CreateProvisionedPVLimiter(workqueue.DefaultControllerRateLimiter()),
        controller.ClaimsInformer(claimInformer),
        controller.NodesLister(nodeLister),
    }

    if utilfeature.DefaultFeatureGate.Enabled(features.HonorPVReclaimPolicy) {provisionerOptions = append(provisionerOptions, controller.AddFinalizer(true))
    }

    if supportsMigrationFromInTreePluginName != "" {provisionerOptions = append(provisionerOptions, controller.AdditionalProvisionerNames([]string{supportsMigrationFromInTreePluginName}))
    }

    // 创立一个 provisioner 和两个 controller 
    csiProvisioner := ctrl.NewCSIProvisioner(
        clientset,
        *operationTimeout,
        identity,
        *volumeNamePrefix,
        *volumeNameUUIDLength,
        grpcClient,
        snapClient,
        provisionerName,
        pluginCapabilities,
        controllerCapabilities,
        supportsMigrationFromInTreePluginName,
        *strictTopology,
        *immediateTopology,
        translator,
        scLister,
        csiNodeLister,
        nodeLister,
        claimLister,
        vaLister,
        *extraCreateMetadata,
        *defaultFSType,
        nodeDeployment,
        *controllerPublishReadOnly,
        *preventVolumeModeConversion,
    )

    // ...

    provisionController = controller.NewProvisionController(
        clientset,
        provisionerName,
        csiProvisioner,
        provisionerOptions...,
    )

    csiClaimController := ctrl.NewCloningProtectionController(
        clientset,
        claimLister,
        claimInformer,
        claimQueue,
        controllerCapabilities,
    )

    // 启动 http 服务
    if addr != "" {
        // ...

        go func() {klog.Infof("ServeMux listening at %q", addr)
            err := http.ListenAndServe(addr, mux)
            if err != nil {klog.Fatalf("Failed to start HTTP server at specified address (%q) and metrics path (%q): %s", addr, *metricsPath, err)
            }
        }()}

    run := func(ctx context.Context) {factory.Start(ctx.Done())
        if factoryForNamespace != nil {
            // Starting is enough, the capacity controller will
            // wait for sync.
            factoryForNamespace.Start(ctx.Done())
        }

        // informer 缓存同步
        cacheSyncResult := factory.WaitForCacheSync(ctx.Done())
        for _, v := range cacheSyncResult {
            if !v {klog.Fatalf("Failed to sync Informers!")
            }
        }

        // 启动控制器
        if capacityController != nil {go capacityController.Run(ctx, int(*capacityThreads))
        }
        if csiClaimController != nil {go csiClaimController.Run(ctx, int(*finalizerThreads))
        }
        provisionController.Run(ctx)
    }

    // ...
}

provisionController 是定义在 external-provisioner 官网 lib 库中所实现的控制器,上面看一下源码:

func (ctrl *ProvisionController) Run(ctx context.Context) {run := func(ctx context.Context) {klog.Infof("Starting provisioner controller %s!", ctrl.component)
        defer utilruntime.HandleCrash()
        defer ctrl.claimQueue.ShutDown()
        defer ctrl.volumeQueue.ShutDown()

        ctrl.hasRunLock.Lock()
        ctrl.hasRun = true
        ctrl.hasRunLock.Unlock()
        if ctrl.metricsPort > 0 {// ...}

        // 将 controller 初始化传入的 Informer 启动
        if !ctrl.customClaimInformer {go ctrl.claimInformer.Run(ctx.Done())
        }
        if !ctrl.customVolumeInformer {go ctrl.volumeInformer.Run(ctx.Done())
        }
        if !ctrl.customClassInformer {go ctrl.classInformer.Run(ctx.Done())
        }

        // informer 缓存同步
        if !cache.WaitForCacheSync(ctx.Done(), ctrl.claimInformer.HasSynced, ctrl.volumeInformer.HasSynced, ctrl.classInformer.HasSynced) {return}

        // 启动 goroutine 解决因为被监听到产生了增删改查事件的 PV 和 PVC 队列
        for i := 0; i < ctrl.threadiness; i++ {go wait.Until(func() {ctrl.runClaimWorker(ctx) }, time.Second, ctx.Done())
            go wait.Until(func() {ctrl.runVolumeWorker(ctx) }, time.Second, ctx.Done())
        }

        klog.Infof("Started provisioner controller %s!", ctrl.component)

        select {}}

    go ctrl.volumeStore.Run(ctx, DefaultThreadiness)

    if ctrl.leaderElection {// ...} else {run(ctx)
    }
}

事件处理的 runClaimWorker 和 runVolumeWorker 会去调用咱们申明在 external-provsioner 中的 Controller Service 接口,具体的调用过程这里不开展,如果有趣味能够到官网仓库查看源码

2. 控制器服务构造

func (p *csiProvisioner) Provision(ctx context.Context, options controller.ProvisionOptions) (*v1.PersistentVolume, controller.ProvisioningState, error) {
    claim := options.PVC
    provisioner, ok := claim.Annotations[annStorageProvisioner]
    if !ok {provisioner = claim.Annotations[annBetaStorageProvisioner]
    }

    // ...

    // 检测工作节点
    owned, err := p.checkNode(ctx, claim, options.StorageClass, "provision")
    if err != nil {
        return nil, controller.ProvisioningNoChange,
            fmt.Errorf("node check failed: %v", err)
    }
    if !owned {
        return nil, controller.ProvisioningNoChange, &controller.IgnoredError{Reason: fmt.Sprintf("not responsible for provisioning of PVC %s/%s because it is not assigned to node %q", claim.Namespace, claim.Name, p.nodeDeployment.NodeName),
        }
    }

    // provision 预处理
    result, state, err := p.prepareProvision(ctx, claim, options.StorageClass, options.SelectedNode)
    if result == nil {return nil, state, err}

    // ...

    // 创立 PV
    rep, err := p.csiClient.CreateVolume(createCtx, req)
    if err != nil {mayReschedule := p.supportsTopology() &&
            options.SelectedNode != nil
        state := checkError(err, mayReschedule)
        klog.V(5).Infof("CreateVolume failed, supports topology = %v, node selected %v => may reschedule = %v => state = %v: %v",
            p.supportsTopology(),
            options.SelectedNode != nil,
            mayReschedule,
            state,
            err)
        return nil, state, err
    }

    // ...


    // 将 PV 创立失去的返回值写回到预处理后果中
    if len(volCaps) == 1 && volCaps[0].GetAccessMode().GetMode() == csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY && p.controllerPublishReadOnly {pvReadOnly = true}

    result.csiPVSource.VolumeHandle = p.volumeIdToHandle(rep.Volume.VolumeId)
    result.csiPVSource.VolumeAttributes = volumeAttributes
    result.csiPVSource.ReadOnly = pvReadOnly
    pv := &v1.PersistentVolume{
        ObjectMeta: metav1.ObjectMeta{Name: pvName,},
        Spec: v1.PersistentVolumeSpec{
            AccessModes:  options.PVC.Spec.AccessModes,
            MountOptions: options.StorageClass.MountOptions,
            Capacity: v1.ResourceList{v1.ResourceName(v1.ResourceStorage): bytesToQuantity(respCap),
            },
            // TODO wait for CSI VolumeSource API
            PersistentVolumeSource: v1.PersistentVolumeSource{CSI: result.csiPVSource,},
        },
    }

    // 批改 PV 资源的 annDeletionSecretRefName 和 namespace
    if result.provDeletionSecrets != nil {klog.V(5).Infof("createVolumeOperation: set annotation [%s/%s] on pv [%s].", annDeletionProvisionerSecretRefNamespace, annDeletionProvisionerSecretRefName, pv.Name)
        metav1.SetMetaDataAnnotation(&pv.ObjectMeta, annDeletionProvisionerSecretRefName, result.provDeletionSecrets.name)
        metav1.SetMetaDataAnnotation(&pv.ObjectMeta, annDeletionProvisionerSecretRefNamespace, result.provDeletionSecrets.namespace)
    } else {metav1.SetMetaDataAnnotation(&pv.ObjectMeta, annDeletionProvisionerSecretRefName, "")
        metav1.SetMetaDataAnnotation(&pv.ObjectMeta, annDeletionProvisionerSecretRefNamespace, "")
    }

    if options.StorageClass.ReclaimPolicy != nil {pv.Spec.PersistentVolumeReclaimPolicy = *options.StorageClass.ReclaimPolicy}

    // ...

    return pv, controller.ProvisioningFinished, nil
}

解决好了 PV 和 PVC 的绑定当前,provisioner 的工作到这里就完结了,后须要解决将由 kube-controller-manager 中的 AD controller 来接管。

nfsplugin 源码解析

1. 程序入口

从程序入口登程,理论的服务启动逻辑在 driver.Run 函数中。

咱们能够看到服务启动时,三个 CSI Service(CSI Identity , CSI Controller , CSI Node)全都被初始化了,个别咱们开发插件时会把三者放在一个二进制程序中。

func (n *Driver) Run(testMode bool) {
    // ...

    n.ns = NewNodeServer(n, mount.New(""))
    s := NewNonBlockingGRPCServer()
    s.Start(n.endpoint,
        NewDefaultIdentityServer(n),
        // NFS plugin has not implemented ControllerServer
        // using default controllerserver.
        NewControllerServer(n),
        n.ns,
        testMode)
    s.Wait()}

2. 服务启动

func (s *nonBlockingGRPCServer) serve(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer, testMode bool) {proto, addr, err := ParseEndpoint(endpoint)
    if err != nil {klog.Fatal(err.Error())
    }

    if proto == "unix" {
        addr = "/" + addr
        if err := os.Remove(addr); err != nil && !os.IsNotExist(err) {klog.Fatalf("Failed to remove %s, error: %s", addr, err.Error())
        }
    }

    listener, err := net.Listen(proto, addr)
    if err != nil {klog.Fatalf("Failed to listen: %v", err)
    }

    opts := []grpc.ServerOption{grpc.UnaryInterceptor(logGRPC),
    }
    server := grpc.NewServer(opts...)
    s.server = server

    // 注册 gRPC 服务
    if ids != nil {csi.RegisterIdentityServer(server, ids)
    }
    if cs != nil {csi.RegisterControllerServer(server, cs)
    }
    if ns != nil {csi.RegisterNodeServer(server, ns)
    }

    // ...

    err = server.Serve(listener)
    if err != nil {klog.Fatalf("Failed to serve grpc server: %v", err)
    }
}

3. 存储卷生命周期治理

通过 AD Controller 的调度后,Kubelet 晓得了以后节点有须要调用 nfscsiplugin 所注册的接口进行生命周期治理的存储卷存储,这个存储卷对应的生命周期是怎么的呢,依据官网上的定义,默认状况下,一个存储卷的生命周期如下:

   CreateVolume +------------+ DeleteVolume
 +------------->|  CREATED   +--------------+
 |              +---+----^---+              |
 |       Controller |    | Controller       v
+++         Publish |    | Unpublish       +++
|X|          Volume |    | Volume          | |
+-+             +---v----+---+             +-+
                | NODE_READY |
                +---+----^---+
               Node |    | Node
            Publish |    | Unpublish
             Volume |    | Volume
                +---v----+---+
                | PUBLISHED  |
                +------------+

接口下来看一下图表中的业务锁对应的代码

CreateVolume/DeleteVolume, ControllerPublishVolume/ControllerUnpublishVolume 这两组接口申明在
ControllerServer 服务中,代码如下:


func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {

    // ...

    // 创立存储卷
    nfsVol, err := newNFSVolume(name, reqCapacity, parameters)
    if err != nil {return nil, status.Error(codes.InvalidArgument, err.Error())
    }

    var volCap *csi.VolumeCapability
    if len(req.GetVolumeCapabilities()) > 0 {volCap = req.GetVolumeCapabilities()[0]
    }
    // Mount nfs base share so we can create a subdirectory
    if err = cs.internalMount(ctx, nfsVol, parameters, volCap); err != nil {return nil, status.Errorf(codes.Internal, "failed to mount nfs server: %v", err.Error())
    }
    defer func() {if err = cs.internalUnmount(ctx, nfsVol); err != nil {klog.Warningf("failed to unmount nfs server: %v", err.Error())
        }
    }()

    
    fileMode := os.FileMode(mountPermissions)
    internalVolumePath := getInternalVolumePath(cs.Driver.workingMountDir, nfsVol)
    if err = os.Mkdir(internalVolumePath, fileMode); err != nil && !os.IsExist(err) {return nil, status.Errorf(codes.Internal, "failed to make subdirectory: %v", err.Error())
    }
    if err = os.Chmod(internalVolumePath, fileMode); err != nil {klog.Warningf("failed to chmod subdirectory: %v", err.Error())
    }

    setKeyValueInMap(parameters, paramSubDir, nfsVol.subDir)
    return &csi.CreateVolumeResponse{
        Volume: &csi.Volume{
            VolumeId:      nfsVol.id,
            CapacityBytes: 0,
            VolumeContext: parameters,
        },
    }, nil
}

// 没实现
func (cs *ControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {return nil, status.Error(codes.Unimplemented, "")
}

NodePublishVolume/NodeUnpublishVolume 这组接口定义在 nodeserver 服务中,代码如下:

func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
    // 在节点上挂存储卷
    mountOptions := volCap.GetMount().GetMountFlags()
    if req.GetReadonly() {mountOptions = append(mountOptions, "ro")
    }

    var server, baseDir, subDir string
    subDirReplaceMap := map[string]string{}

    mountPermissions := ns.Driver.mountPermissions
    performChmodOp := (mountPermissions > 0)
    for k, v := range req.GetVolumeContext() {switch strings.ToLower(k) {
        case paramServer:
            server = v
        case paramShare:
            baseDir = v
        case paramSubDir:
            subDir = v
        case pvcNamespaceKey:
            subDirReplaceMap[pvcNamespaceMetadata] = v
        case pvcNameKey:
            subDirReplaceMap[pvcNameMetadata] = v
        case pvNameKey:
            subDirReplaceMap[pvNameMetadata] = v
        case mountOptionsField:
            if v != "" {mountOptions = append(mountOptions, v)
            }
        case mountPermissionsField:
            if v != "" {
                var err error
                var perm uint64
                if perm, err = strconv.ParseUint(v, 8, 32); err != nil {return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid mountPermissions %s", v))
                }
                if perm == 0 {performChmodOp = false} else {mountPermissions = perm}
            }
        }
    }

    if server == "" {return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("%v is a required parameter", paramServer))
    }
    if baseDir == "" {return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("%v is a required parameter", paramShare))
    }
    server = getServerFromSource(server)
    source := fmt.Sprintf("%s:%s", server, baseDir)
    if subDir != "" {
        // replace pv/pvc name namespace metadata in subDir
        subDir = replaceWithMap(subDir, subDirReplaceMap)

        source = strings.TrimRight(source, "/")
        source = fmt.Sprintf("%s/%s", source, subDir)
    }

    notMnt, err := ns.mounter.IsLikelyNotMountPoint(targetPath)
    if err != nil {if os.IsNotExist(err) {if err := os.MkdirAll(targetPath, os.FileMode(mountPermissions)); err != nil {return nil, status.Error(codes.Internal, err.Error())
            }
            notMnt = true
        } else {return nil, status.Error(codes.Internal, err.Error())
        }
    }
    if !notMnt {return &csi.NodePublishVolumeResponse{}, nil
    }

    klog.V(2).Infof("NodePublishVolume: volumeID(%v) source(%s) targetPath(%s) mountflags(%v)", volumeID, source, targetPath, mountOptions)
    err = ns.mounter.Mount(source, targetPath, "nfs", mountOptions)
    if err != nil {if os.IsPermission(err) {return nil, status.Error(codes.PermissionDenied, err.Error())
        }
        if strings.Contains(err.Error(), "invalid argument") {return nil, status.Error(codes.InvalidArgument, err.Error())
        }
        return nil, status.Error(codes.Internal, err.Error())
    }

    if performChmodOp {if err := chmodIfPermissionMismatch(targetPath, os.FileMode(mountPermissions)); err != nil {return nil, status.Error(codes.Internal, err.Error())
        }
    } else {klog.V(2).Infof("skip chmod on targetPath(%s) since mountPermissions is set as 0", targetPath)
    }
    klog.V(2).Infof("volume(%s) mount %s on %s succeeded", volumeID, source, targetPath)
    return &csi.NodePublishVolumeResponse{}, nil}

更多技术分享浏览我的博客:

https://thierryzhou.github.io

参考

  • [1] kubenretes 官网文档——存储
  • [2] csi 官网文档
  • [3] 云计算 K8s 组件系列—- 存储 CSI

本文由 mdnice 多平台公布

正文完
 0