简介:pv controller是 kcm 的组件之一,它负责解决集群中的pvc/pv对象,对pvc/pv 对象进行状态转换。本文将基于 kubernetes 1.23进行解析。

作者 | 牧琦
起源 | 阿里技术公众号

基于 kubernetes 1.23

一 简介

pv controller是 kcm 的组件之一,它负责解决集群中的pvc/pv对象,对pvc/pv 对象进行状态转换。

二 pvController 初始化

初始化代码在 pkg/controller/volume/persistentvolume/pv_controller_base.go 文件中,NewController 次要做了如下几件事件

  • 初始化 eventRecorder
  • 初始化 PersistentVolumeController 对象,
  • 调用 VolumePluginMgr.InitPlugins() 办法 初始化存储插件,代码存在于 pkg/volume/plugins.go 文件中
  • 开始创立 informer 监听集群内的资源,初始化了如下 informer
  • PersistentVolumeInformer
  • PersistentVolumeClaimInformer
  • StorageClassInformer
  • PodInformer
  • NodeInformer
  • 将 PV & PVC 的 event 别离放入 volumeQueue & claimQueue
  • 为了不每次都迭代 pods ,自定义一个通过 pvc 键索引 pod 的索引器
  • 初始化 intree 存储 -> csi 迁徙相干性能的 manager

NewController代码在cmd/kube-controller-manager代码外面被调用,初始化胜利之后紧接着调用go Run()办法运行 pvController

三 开始运行

// 开始运行 pvController func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) {  defer utilruntime.HandleCrash()  defer ctrl.claimQueue.ShutDown()  defer ctrl.volumeQueue.ShutDown()  klog.Infof("Starting persistent volume controller")  defer klog.Infof("Shutting down persistent volume controller")  if !cache.WaitForNamedCacheSync("persistent volume", stopCh, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced, ctrl.podListerSynced, ctrl.NodeListerSynced) {    return  }  ctrl.initializeCaches(ctrl.volumeLister, ctrl.claimLister)  go wait.Until(ctrl.resync, ctrl.resyncPeriod, stopCh)  go wait.Until(ctrl.volumeWorker, time.Second, stopCh)  go wait.Until(ctrl.claimWorker, time.Second, stopCh)  metrics.Register(ctrl.volumes.store, ctrl.claims, &ctrl.volumePluginMgr)  <-stopCh}

同步缓存之后开始周期性执行 ctrl.resync,ctrl.volumeWorker , ctrl.claimWorker , 咱们看下 initalizeCaches 办法

func (ctrl *PersistentVolumeController) initializeCaches(volumeLister corelisters.PersistentVolumeLister, claimLister corelisters.PersistentVolumeClaimLister) {  // 这里不拜访 apiserver,是从本地缓存拿出的对象,这些对象不能够被内部函数批改  volumeList, err := volumeLister.List(labels.Everything())  if err != nil {    klog.Errorf("PersistentVolumeController can't initialize caches: %v", err)    return  }  for _, volume := range volumeList {    // 咱们不能扭转 volume 对象,所以这里咱们copy一份新对象,对新对象进行操作    volumeClone := volume.DeepCopy()    if _, err = ctrl.storeVolumeUpdate(volumeClone); err != nil {      klog.Errorf("error updating volume cache: %v", err)    }  }  claimList, err := claimLister.List(labels.Everything())  if err != nil {    klog.Errorf("PersistentVolumeController can't initialize caches: %v", err)    return  }  for _, claim := range claimList {    if _, err = ctrl.storeClaimUpdate(claim.DeepCopy()); err != nil {      klog.Errorf("error updating claim cache: %v", err)    }  }  klog.V(4).Infof("controller initialized")}type persistentVolumeOrderedIndex struct {  store cache.Indexer}

该办法将 cache.listener 外面的缓存转存在 persistentVolumeOrderedIndex 中,它是按 AccessModes 索引并按存储容量排序的 persistentVolume 的缓存。

1 resync

func (ctrl *PersistentVolumeController) resync() {  klog.V(4).Infof("resyncing PV controller")  pvcs, err := ctrl.claimLister.List(labels.NewSelector())  if err != nil {    klog.Warningf("cannot list claims: %s", err)    return  }  for _, pvc := range pvcs {    ctrl.enqueueWork(ctrl.claimQueue, pvc)  }  pvs, err := ctrl.volumeLister.List(labels.NewSelector())  if err != nil {    klog.Warningf("cannot list persistent volumes: %s", err)    return  }  for _, pv := range pvs {    ctrl.enqueueWork(ctrl.volumeQueue, pv)  }}

这里将集群内所有的 pvc/pv 对立都放到对应的 claimQueue & volumeQueue 外面重新处理。 这个resyncPeriod 等于一个random time.Duration * config.time(在 kcm 启动时设置)。

2 volumeWorker

一个有限循环, 一直的解决从 volumeQueue 外面获取到的 PersistentVolume

workFunc := func() bool {    keyObj, quit := ctrl.volumeQueue.Get()    if quit {      return true    }    defer ctrl.volumeQueue.Done(keyObj)    key := keyObj.(string)    klog.V(5).Infof("volumeWorker[%s]", key)    _, name, err := cache.SplitMetaNamespaceKey(key)    if err != nil {      klog.V(4).Infof("error getting name of volume %q to get volume from informer: %v", key, err)      return false    }    volume, err := ctrl.volumeLister.Get(name)    if err == nil {      // The volume still exists in informer cache, the event must have      // been add/update/sync      ctrl.updateVolume(volume)      return false    }    if !errors.IsNotFound(err) {      klog.V(2).Infof("error getting volume %q from informer: %v", key, err)      return false    }    // The volume is not in informer cache, the event must have been    // "delete"    volumeObj, found, err := ctrl.volumes.store.GetByKey(key)    if err != nil {      klog.V(2).Infof("error getting volume %q from cache: %v", key, err)      return false    }    if !found {      // The controller has already processed the delete event and      // deleted the volume from its cache      klog.V(2).Infof("deletion of volume %q was already processed", key)      return false    }    volume, ok := volumeObj.(*v1.PersistentVolume)    if !ok {      klog.Errorf("expected volume, got %+v", volumeObj)      return false    }    ctrl.deleteVolume(volume)    return false  }

咱们次要关注 ctrl.updateVolume(volume) 办法

updateVolume

updateVolume 办法是对于集群内的 events 理论 handler 办法,它外面次要调用了 ctrl.syncVolume 办法来解决

func (ctrl *PersistentVolumeController) syncVolume(ctx context.Context, volume *v1.PersistentVolume) error {  klog.V(4).Infof("synchronizing PersistentVolume[%s]: %s", volume.Name, getVolumeStatusForLogging(volume))    ...  // [Unit test set 4]  if volume.Spec.ClaimRef == nil {    // Volume is unused    klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is unused", volume.Name)    if _, err := ctrl.updateVolumePhase(volume, v1.VolumeAvailable, ""); err != nil {      // Nothing was saved; we will fall back into the same      // condition in the next call to this method      return err    }    return nil  } else /* pv.Spec.ClaimRef != nil */ {    // Volume is bound to a claim.    if volume.Spec.ClaimRef.UID == "" {      // The PV is reserved for a PVC; that PVC has not yet been      // bound to this PV; the PVC sync will handle it.      klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is pre-bound to claim %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))      if _, err := ctrl.updateVolumePhase(volume, v1.VolumeAvailable, ""); err != nil {        // Nothing was saved; we will fall back into the same        // condition in the next call to this method        return err      }      return nil    }    klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound to claim %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))    // Get the PVC by _name_    var claim *v1.PersistentVolumeClaim    claimName := claimrefToClaimKey(volume.Spec.ClaimRef)    obj, found, err := ctrl.claims.GetByKey(claimName)    if err != nil {      return err    }    if !found {      if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed {        obj, err = ctrl.claimLister.PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(volume.Spec.ClaimRef.Name)        if err != nil && !apierrors.IsNotFound(err) {          return err        }        found = !apierrors.IsNotFound(err)        if !found {          obj, err = ctrl.kubeClient.CoreV1().PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(context.TODO(), volume.Spec.ClaimRef.Name, metav1.GetOptions{})          if err != nil && !apierrors.IsNotFound(err) {            return err          }          found = !apierrors.IsNotFound(err)        }      }    }    if !found {      klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s not found", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))      // Fall through with claim = nil    } else {      var ok bool      claim, ok = obj.(*v1.PersistentVolumeClaim)      if !ok {        return fmt.Errorf("cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, obj)      }      klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s found: %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef), getClaimStatusForLogging(claim))    }    if claim != nil && claim.UID != volume.Spec.ClaimRef.UID {      klog.V(4).Infof("Maybe cached claim: %s is not the newest one, we should fetch it from apiserver", claimrefToClaimKey(volume.Spec.ClaimRef))      claim, err = ctrl.kubeClient.CoreV1().PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(context.TODO(), volume.Spec.ClaimRef.Name, metav1.GetOptions{})      if err != nil && !apierrors.IsNotFound(err) {        return err      } else if claim != nil {        // Treat the volume as bound to a missing claim.        if claim.UID != volume.Spec.ClaimRef.UID {          klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s has a newer UID than pv.ClaimRef, the old one must have been deleted", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))          claim = nil        } else {          klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s has a same UID with pv.ClaimRef", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))        }      }    }    if claim == nil {      if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed {        // Also, log this only once:        klog.V(2).Infof("volume %q is released and reclaim policy %q will be executed", volume.Name, volume.Spec.PersistentVolumeReclaimPolicy)        if volume, err = ctrl.updateVolumePhase(volume, v1.VolumeReleased, ""); err != nil {          // Nothing was saved; we will fall back into the same condition          // in the next call to this method          return err        }      }      if err = ctrl.reclaimVolume(volume); err != nil {        // Release failed, we will fall back into the same condition        // in the next call to this method        return err      }      if volume.Spec.PersistentVolumeReclaimPolicy == v1.PersistentVolumeReclaimRetain {        // volume is being retained, it references a claim that does not exist now.        klog.V(4).Infof("PersistentVolume[%s] references a claim %q (%s) that is not found", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef), volume.Spec.ClaimRef.UID)      }      return nil    } else if claim.Spec.VolumeName == "" {      if pvutil.CheckVolumeModeMismatches(&claim.Spec, &volume.Spec) {        volumeMsg := fmt.Sprintf("Cannot bind PersistentVolume to requested PersistentVolumeClaim %q due to incompatible volumeMode.", claim.Name)        ctrl.eventRecorder.Event(volume, v1.EventTypeWarning, events.VolumeMismatch, volumeMsg)        claimMsg := fmt.Sprintf("Cannot bind PersistentVolume %q to requested PersistentVolumeClaim due to incompatible volumeMode.", volume.Name)        ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, claimMsg)        // Skipping syncClaim        return nil      }      if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnBoundByController) {        // The binding is not completed; let PVC sync handle it        klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume not bound yet, waiting for syncClaim to fix it", volume.Name)      } else {        // Dangling PV; try to re-establish the link in the PVC sync        klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume was bound and got unbound (by user?), waiting for syncClaim to fix it", volume.Name)      }      ctrl.claimQueue.Add(claimToClaimKey(claim))      return nil    } else if claim.Spec.VolumeName == volume.Name {      // Volume is bound to a claim properly, update status if necessary      klog.V(4).Infof("synchronizing PersistentVolume[%s]: all is bound", volume.Name)      if _, err = ctrl.updateVolumePhase(volume, v1.VolumeBound, ""); err != nil {        // Nothing was saved; we will fall back into the same        // condition in the next call to this method        return err      }      return nil    } else {      // Volume is bound to a claim, but the claim is bound elsewhere      if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnDynamicallyProvisioned) && volume.Spec.PersistentVolumeReclaimPolicy == v1.PersistentVolumeReclaimDelete {        if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed {          // Also, log this only once:          klog.V(2).Infof("dynamically volume %q is released and it will be deleted", volume.Name)          if volume, err = ctrl.updateVolumePhase(volume, v1.VolumeReleased, ""); err != nil {            // Nothing was saved; we will fall back into the same condition            // in the next call to this method            return err          }        }        if err = ctrl.reclaimVolume(volume); err != nil {          return err        }        return nil      } else {        if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnBoundByController) {          klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound by controller to a claim that is bound to another volume, unbinding", volume.Name)          if err = ctrl.unbindVolume(volume); err != nil {            return err          }          return nil        } else {          // The PV must have been created with this ptr; leave it alone.          klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound by user to a claim that is bound to another volume, waiting for the claim to get unbound", volume.Name)          if err = ctrl.unbindVolume(volume); err != nil {            return err          }          return nil        }      }    }  }}

1、当 pv 的 Spec.ClaimRef 的值为空的时候,阐明以后 pv 未被应用,调用 ctrl.updateVolumePhase 使得 pv 进入 Available 状态

2、当 pv 的 Spec.ClaimRef 的值不为空的时候, 阐明以后 pv 已绑定一个pvc

  • 当Spec.ClaimRef.UID 为空的时候,阐明 pvc 还未绑定 pv, 调用ctrl.updateVolumePhase 使得 pv 进入 Available 状态, 办法返回,期待 pvc syncClaim 办法解决
  • 应用 Spec.ClaimRef 相干的 pvc 信息获取 pv_controller缓存的pvc

如果 pvc 没有找到

  • 有可能是集群压力过大缓存没有更新,则进一步从 informercache 中找,如果 informercache外面还是没有的话则进一步从apiserver中去找
  • 这里如果发现 非 Released & 非 Failed 的pv 通过上述步骤依然找不到 pvc 的话,阐明 pvc 被删除。在最新的kubernetes 版本中会查看reclaimPoilcy,对 pv的状态进行解决

找到 pvc 之后

1)如果 pvc 的 uid 和 Spec.ClaimRef.UID 不统一,这样个别是 pv 指向的 pvc 被删了,而后立刻创立了一个同名的pvc, 而缓存还没有更新,这时咱们须要doublecheck一下,若 double check 之后仍旧不存在,则判断是pv绑定了一个不存在的pvc, 将pvc置为空,执行上述pvc 没有找到的逻辑

2)如果pvc 的 volumeName 为空

  • 查看 pvc的 volumeMode 和 pv 的 volumeMode是否统一,不统一报 event 进去
  • 如果发现有这个 pv 有 AnnBoundByController = "pv.kubernetes.io/bound-by-controller" 这个annotation 阐明 pvc/pv 流程正在绑定中
  • 将 pvc 放到 claimQueue 外面, 让 claimWorker 进行解决

3)如果 pvc.Spec.volumeName == pv.volumeName 的时候,间接将 pv 设置为 bound 状态

4)如果 pvc.Spec.volumeName != pv.volumeName 的时候

  • 如果是 pv 是动态创建的状况下,并且 pv 的 ReclaimPolicy 是 delete 的状况下, 阐明 pvc 曾经绑定了其余pv, 将 pv 置为 released 的状态, 期待deleters 删除
  • 如果 pv 不是动态创建的状况下,将 pv 的 ClaimRef 字段置为空,将其 unbound 掉

3 claimWorker

一个有限循环,一直的解决从 claimQueue 外面获取到的 PersistentVolumeClaim

    workFunc := func() bool {    keyObj, quit := ctrl.claimQueue.Get()    if quit {      return true    }    defer ctrl.claimQueue.Done(keyObj)    key := keyObj.(string)    klog.V(5).Infof("claimWorker[%s]", key)    namespace, name, err := cache.SplitMetaNamespaceKey(key)    if err != nil {      klog.V(4).Infof("error getting namespace & name of claim %q to get claim from informer: %v", key, err)      return false    }    claim, err := ctrl.claimLister.PersistentVolumeClaims(namespace).Get(name)    if err == nil {      // The claim still exists in informer cache, the event must have      // been add/update/sync      ctrl.updateClaim(claim)      return false    }    if !errors.IsNotFound(err) {      klog.V(2).Infof("error getting claim %q from informer: %v", key, err)      return false    }    // The claim is not in informer cache, the event must have been "delete"    claimObj, found, err := ctrl.claims.GetByKey(key)    if err != nil {      klog.V(2).Infof("error getting claim %q from   cache: %v", key, err)      return false    }    if !found {      // The controller has already processed the delete event and      // deleted the claim from its cache      klog.V(2).Infof("deletion of claim %q was already processed", key)      return false    }    claim, ok := claimObj.(*v1.PersistentVolumeClaim)    if !ok {      klog.Errorf("expected claim, got %+v", claimObj)      return false    }    ctrl.deleteClaim(claim)    return false    }

咱们次要关注 ctrl.updateClaim(claim) 办法, 与下面同样,它外面次要调用了 ctrl.syncClaim 办法来解决, 在 syncClaim 外面依据 pvc 的状态别离调用了 ctrl.syncUnboundClaim & ctrl.syncBoundClaim 办法来解决

syncUnboundClaim

func (ctrl *PersistentVolumeController) syncUnboundClaim(ctx context.Context, claim *v1.PersistentVolumeClaim) error {  if claim.Spec.VolumeName == "" {    // User did not care which PV they get.    delayBinding, err := pvutil.IsDelayBindingMode(claim, ctrl.classLister)    if err != nil {      return err    }    // [Unit test set 1]    volume, err := ctrl.volumes.findBestMatchForClaim(claim, delayBinding)    if err != nil {      klog.V(2).Infof("synchronizing unbound PersistentVolumeClaim[%s]: Error finding PV for claim: %v", claimToClaimKey(claim), err)      return fmt.Errorf("error finding PV for claim %q: %w", claimToClaimKey(claim), err)    }    if volume == nil {      klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: no volume found", claimToClaimKey(claim))      switch {      case delayBinding && !pvutil.IsDelayBindingProvisioning(claim):        if err = ctrl.emitEventForUnboundDelayBindingClaim(claim); err != nil {          return err        }      case storagehelpers.GetPersistentVolumeClaimClass(claim) != "":        if err = ctrl.provisionClaim(ctx, claim); err != nil {          return err        }        return nil      default:        ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.FailedBinding, "no persistent volumes available for this claim and no storage class is set")      }      // Mark the claim as Pending and try to find a match in the next      // periodic syncClaim      if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {        return err      }      return nil    } else /* pv != nil */ {      // Found a PV for this claim      // OBSERVATION: pvc is "Pending", pv is "Available"      claimKey := claimToClaimKey(claim)      klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q found: %s", claimKey, volume.Name, getVolumeStatusForLogging(volume))      if err = ctrl.bind(volume, claim); err != nil {        metrics.RecordMetric(claimKey, &ctrl.operationTimestamps, err)        return err      }      metrics.RecordMetric(claimKey, &ctrl.operationTimestamps, nil)      return nil    }  } else /* pvc.Spec.VolumeName != nil */ {    klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested", claimToClaimKey(claim), claim.Spec.VolumeName)    obj, found, err := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName)    if err != nil {      return err    }    if !found {      klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and not found, will try again next time", claimToClaimKey(claim), claim.Spec.VolumeName)      if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {        return err      }      return nil    } else {      volume, ok := obj.(*v1.PersistentVolume)      if !ok {        return fmt.Errorf("cannot convert object from volume cache to volume %q!?: %+v", claim.Spec.VolumeName, obj)      }      klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and found: %s", claimToClaimKey(claim), claim.Spec.VolumeName, getVolumeStatusForLogging(volume))      if volume.Spec.ClaimRef == nil {        klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume is unbound, binding", claimToClaimKey(claim))        if err = checkVolumeSatisfyClaim(volume, claim); err != nil {          klog.V(4).Infof("Can't bind the claim to volume %q: %v", volume.Name, err)          // send an event          msg := fmt.Sprintf("Cannot bind to requested volume %q: %s", volume.Name, err)          ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, msg)          // volume does not satisfy the requirements of the claim          if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {            return err          }        } else if err = ctrl.bind(volume, claim); err != nil {          // On any error saving the volume or the claim, subsequent          // syncClaim will finish the binding.          return err        }        // OBSERVATION: pvc is "Bound", pv is "Bound"        return nil      } else if pvutil.IsVolumeBoundToClaim(volume, claim) {        // User asked for a PV that is claimed by this PVC        // OBSERVATION: pvc is "Pending", pv is "Bound"        klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound, finishing the binding", claimToClaimKey(claim))        // Finish the volume binding by adding claim UID.        if err = ctrl.bind(volume, claim); err != nil {          return err        }        // OBSERVATION: pvc is "Bound", pv is "Bound"        return nil      } else {        // User asked for a PV that is claimed by someone else        // OBSERVATION: pvc is "Pending", pv is "Bound"        if !metav1.HasAnnotation(claim.ObjectMeta, pvutil.AnnBoundByController) {          klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound to different claim by user, will retry later", claimToClaimKey(claim))          claimMsg := fmt.Sprintf("volume %q already bound to a different claim.", volume.Name)          ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.FailedBinding, claimMsg)          // User asked for a specific PV, retry later          if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {            return err          }          return nil        } else {          klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound to different claim %q by controller, THIS SHOULD NEVER HAPPEN", claimToClaimKey(claim), claimrefToClaimKey(volume.Spec.ClaimRef))          claimMsg := fmt.Sprintf("volume %q already bound to a different claim.", volume.Name)          ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.FailedBinding, claimMsg)          return fmt.Errorf("invalid binding of claim %q to volume %q: volume already claimed by %q", claimToClaimKey(claim), claim.Spec.VolumeName, claimrefToClaimKey(volume.Spec.ClaimRef))        }      }    }  }}

梳理下整体流程

如果以后 pvc 的 volumeName 为空

  • 判断以后pvc 是否是提早绑定的
  • 调用 volume, err := ctrl.volumes.findBestMatchForClaim(claim, delayBinding) 找出对应的 pv

如果找到 volume 的话

  • 调用 ctrl.bind(volume, claim) 办法进行绑定

如果没有找到 volume 的话

  • 如果是提早绑定, 并且还未触发(pod 未援用)则 emit event 到 pvc 上
  • 如果 pvc 绑定了 sc, 调用 ctrl.provisionClaim(ctx, claim) 办法

剖析 pvc yaml, 找到 provisioner driver

启动一个 goroutine

调用 ctrl.provisionClaimOperation(ctx, claim, plugin, storageClass) 进行创立工作

provisionClaimOperation

func (ctrl *PersistentVolumeController) provisionClaimOperation(  ctx context.Context,  claim *v1.PersistentVolumeClaim,  plugin vol.ProvisionableVolumePlugin,  storageClass *storage.StorageClass) (string, error) {  claimClass := storagehelpers.GetPersistentVolumeClaimClass(claim)  klog.V(4).Infof("provisionClaimOperation [%s] started, class: %q", claimToClaimKey(claim), claimClass)  pluginName := plugin.GetPluginName()  if pluginName != "kubernetes.io/csi" && claim.Spec.DataSource != nil {    strerr := fmt.Sprintf("plugin %q is not a CSI plugin. Only CSI plugin can provision a claim with a datasource", pluginName)    return pluginName, fmt.Errorf(strerr)  }  provisionerName := storageClass.Provisioner  // Add provisioner annotation to be consistent with external provisioner workflow  newClaim, err := ctrl.setClaimProvisioner(ctx, claim, provisionerName)  if err != nil {    // Save failed, the controller will retry in the next sync    klog.V(2).Infof("error saving claim %s: %v", claimToClaimKey(claim), err)    return pluginName, err  }  claim = newClaim  pvName := ctrl.getProvisionedVolumeNameForClaim(claim)  volume, err := ctrl.kubeClient.CoreV1().PersistentVolumes().Get(context.TODO(), pvName, metav1.GetOptions{})  if err != nil && !apierrors.IsNotFound(err) {    klog.V(3).Infof("error reading persistent volume %q: %v", pvName, err)    return pluginName, err  }  if err == nil && volume != nil {    // Volume has been already provisioned, nothing to do.    klog.V(4).Infof("provisionClaimOperation [%s]: volume already exists, skipping", claimToClaimKey(claim))    return pluginName, err  }  // Prepare a claimRef to the claim early (to fail before a volume is  // provisioned)  claimRef, err := ref.GetReference(scheme.Scheme, claim)  if err != nil {    klog.V(3).Infof("unexpected error getting claim reference: %v", err)    return pluginName, err  }  // Gather provisioning options  tags := make(map[string]string)  tags[CloudVolumeCreatedForClaimNamespaceTag] = claim.Namespace  tags[CloudVolumeCreatedForClaimNameTag] = claim.Name  tags[CloudVolumeCreatedForVolumeNameTag] = pvName  options := vol.VolumeOptions{    PersistentVolumeReclaimPolicy: *storageClass.ReclaimPolicy,    MountOptions:                  storageClass.MountOptions,    CloudTags:                     &tags,    ClusterName:                   ctrl.clusterName,    PVName:                        pvName,    PVC:                           claim,    Parameters:                    storageClass.Parameters,  }  // Refuse to provision if the plugin doesn't support mount options, creation  // of PV would be rejected by validation anyway  if !plugin.SupportsMountOption() && len(options.MountOptions) > 0 {    strerr := fmt.Sprintf("Mount options are not supported by the provisioner but StorageClass %q has mount options %v", storageClass.Name, options.MountOptions)    klog.V(2).Infof("Mount options are not supported by the provisioner but claim %q's StorageClass %q has mount options %v", claimToClaimKey(claim), storageClass.Name, options.MountOptions)    ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)    return pluginName, fmt.Errorf("provisioner %q doesn't support mount options", plugin.GetPluginName())  }  // Provision the volume  provisioner, err := plugin.NewProvisioner(options)  if err != nil {    strerr := fmt.Sprintf("Failed to create provisioner: %v", err)    klog.V(2).Infof("failed to create provisioner for claim %q with StorageClass %q: %v", claimToClaimKey(claim), storageClass.Name, err)    ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)    return pluginName, err  }  var selectedNode *v1.Node = nil  if nodeName, ok := claim.Annotations[pvutil.AnnSelectedNode]; ok {    selectedNode, err = ctrl.NodeLister.Get(nodeName)    if err != nil {      strerr := fmt.Sprintf("Failed to get target node: %v", err)      klog.V(3).Infof("unexpected error getting target node %q for claim %q: %v", nodeName, claimToClaimKey(claim), err)      ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)      return pluginName, err    }  }  allowedTopologies := storageClass.AllowedTopologies  opComplete := util.OperationCompleteHook(plugin.GetPluginName(), "volume_provision")  volume, err = provisioner.Provision(selectedNode, allowedTopologies)  opComplete(volumetypes.CompleteFuncParam{Err: &err})  if err != nil {    ctrl.rescheduleProvisioning(claim)    strerr := fmt.Sprintf("Failed to provision volume with StorageClass %q: %v", storageClass.Name, err)    klog.V(2).Infof("failed to provision volume for claim %q with StorageClass %q: %v", claimToClaimKey(claim), storageClass.Name, err)    ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)    return pluginName, err  }  klog.V(3).Infof("volume %q for claim %q created", volume.Name, claimToClaimKey(claim))  // Create Kubernetes PV object for the volume.  if volume.Name == "" {    volume.Name = pvName  }  // Bind it to the claim  volume.Spec.ClaimRef = claimRef  volume.Status.Phase = v1.VolumeBound  volume.Spec.StorageClassName = claimClass  // Add AnnBoundByController (used in deleting the volume)  metav1.SetMetaDataAnnotation(&volume.ObjectMeta, pvutil.AnnBoundByController, "yes")  metav1.SetMetaDataAnnotation(&volume.ObjectMeta, pvutil.AnnDynamicallyProvisioned, plugin.GetPluginName())  // Try to create the PV object several times  for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {    klog.V(4).Infof("provisionClaimOperation [%s]: trying to save volume %s", claimToClaimKey(claim), volume.Name)    var newVol *v1.PersistentVolume    if newVol, err = ctrl.kubeClient.CoreV1().PersistentVolumes().Create(context.TODO(), volume, metav1.CreateOptions{}); err == nil || apierrors.IsAlreadyExists(err) {      // Save succeeded.      if err != nil {        klog.V(3).Infof("volume %q for claim %q already exists, reusing", volume.Name, claimToClaimKey(claim))        err = nil      } else {        klog.V(3).Infof("volume %q for claim %q saved", volume.Name, claimToClaimKey(claim))        _, updateErr := ctrl.storeVolumeUpdate(newVol)        if updateErr != nil {          // We will get an "volume added" event soon, this is not a big error          klog.V(4).Infof("provisionClaimOperation [%s]: cannot update internal cache: %v", volume.Name, updateErr)        }      }      break    }    // Save failed, try again after a while.    klog.V(3).Infof("failed to save volume %q for claim %q: %v", volume.Name, claimToClaimKey(claim), err)    time.Sleep(ctrl.createProvisionedPVInterval)  }  if err != nil {    strerr := fmt.Sprintf("Error creating provisioned PV object for claim %s: %v. Deleting the volume.", claimToClaimKey(claim), err)    klog.V(3).Info(strerr)    ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)    var deleteErr error    var deleted bool    for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {      _, deleted, deleteErr = ctrl.doDeleteVolume(volume)      if deleteErr == nil && deleted {        // Delete succeeded        klog.V(4).Infof("provisionClaimOperation [%s]: cleaning volume %s succeeded", claimToClaimKey(claim), volume.Name)        break      }      if !deleted {        klog.Errorf("Error finding internal deleter for volume plugin %q", plugin.GetPluginName())        break      }      // Delete failed, try again after a while.      klog.V(3).Infof("failed to delete volume %q: %v", volume.Name, deleteErr)      time.Sleep(ctrl.createProvisionedPVInterval)    }    if deleteErr != nil {      strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), deleteErr)      klog.V(2).Info(strerr)      ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningCleanupFailed, strerr)    }  } else {    klog.V(2).Infof("volume %q provisioned for claim %q", volume.Name, claimToClaimKey(claim))    msg := fmt.Sprintf("Successfully provisioned volume %s using %s", volume.Name, plugin.GetPluginName())    ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.ProvisioningSucceeded, msg)  }  return pluginName, nil}

provisionClaimOperation 的根本逻辑如下

  • 查看driver,只有 csi 类型的 driver 才容许应用 dataSource 字段
  • 为 pvc 加 claim.Annotations["volume.kubernetes.io/storage-provisioner"] = class.Provisioner annotation
  • 依据规定拼出 pv Name = "pvc-" + pvc.UID
  • 如果找到了 pv, 则阐明 pv曾经存在,跳过 provision
  • 收集pvc/pv 根本信息封装到 options 中
  • 对 plugin 进行校验, 如果plugin不反对mount操作,则间接回绝provision 申请
  • 调用plugin.NewProvisioner(options) 创立 provisioner, 接口实现了Provision(selectedNode *v1.Node, allowedTopologies []v1.TopologySelectorTerm) 办法,留神,该办法为同步办法
  • Provision 办法返回了 PersistentVolume实例
  • 为创立进去的 pv 关联 pvc 对象(ClaimRef),尝试创立 pv 对象 (反复屡次)
  • 如果创立 pv 失败,则尝试调用 Delete 办法删除创立的volume资源

syncBoundClaim

func (ctrl *PersistentVolumeController) syncBoundClaim(claim *v1.PersistentVolumeClaim) error {  if claim.Spec.VolumeName == "" {    // Claim was bound before but not any more.    if _, err := ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimLost", "Bound claim has lost reference to PersistentVolume. Data on the volume is lost!"); err != nil {      return err    }    return nil  }  obj, found, err := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName)  if err != nil {    return err  }  if !found {    // Claim is bound to a non-existing volume.    if _, err = ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimLost", "Bound claim has lost its PersistentVolume. Data on the volume is lost!"); err != nil {      return err    }    return nil  } else {    volume, ok := obj.(*v1.PersistentVolume)    if !ok {      return fmt.Errorf("cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, obj)    }    klog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: volume %q found: %s", claimToClaimKey(claim), claim.Spec.VolumeName, getVolumeStatusForLogging(volume))    if volume.Spec.ClaimRef == nil {      // Claim is bound but volume has come unbound.      // Or, a claim was bound and the controller has not received updated      // volume yet. We can't distinguish these cases.      // Bind the volume again and set all states to Bound.      klog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: volume is unbound, fixing", claimToClaimKey(claim))      if err = ctrl.bind(volume, claim); err != nil {        // Objects not saved, next syncPV or syncClaim will try again        return err      }      return nil    } else if volume.Spec.ClaimRef.UID == claim.UID {      // All is well      // NOTE: syncPV can handle this so it can be left out.      // NOTE: bind() call here will do nothing in most cases as      // everything should be already set.      klog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: claim is already correctly bound", claimToClaimKey(claim))      if err = ctrl.bind(volume, claim); err != nil {        // Objects not saved, next syncPV or syncClaim will try again        return err      }      return nil    } else {      // Claim is bound but volume has a different claimant.      // Set the claim phase to 'Lost', which is a terminal      // phase.      if _, err = ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimMisbound", "Two claims are bound to the same volume, this one is bound incorrectly"); err != nil {        return err      }      return nil    }  }}

1)如果 pvc.Spec.VolumeName 为空, 阐明这个 pvc 之前被 bound 过,然而曾经不存在指向的pv, 报出event并返回

2)从 cache 外面找 pvc 绑定的 pv

  • 如果没找到, 阐明 pvc 绑定了一个不存在的pv,报 event 并返回
  • 如果找到了pv

查看 pv.Spec.ClaimRef 字段, 如果 为空,阐明 pv 还没有绑定 pvc, 调用 ctrl.bind(volume, claim); 办法进行绑定

pv.ClaimRef.UID == pvc.UID, 调用 bind 办法,然而大多数状况会间接返回(因为所有的操作都曾经做完了)

其余状况阐明 volume 绑定了其余的 pvc, 更新pvc 的状态 为 lost 并报出 event

四 总结

最初用一张 pvc/pv 的状态流转图来总结一下

原文链接

本文为阿里云原创内容,未经容许不得转载。