关于kubernetes:Pod删除流程

4次阅读

共计 6317 个字符,预计需要花费 16 分钟才能阅读完成。

ApiServer 解决

不论是通过 kubectl 命令还是程序通过 api 接口删除 pod,最终都是通过 Api Server 进行解决。Api Server 提供了 restful 接口, 解决 DELETE 的办法地位在 k8s.io/apiserver/pkg/endpoints/install.go 文件中 registerResourceHandlers 函数,实现如下:

case "DELETE": // Delete a resource.
 article := GetArticleForNoun(kind, " ")
 doc := "delete" + article + kind
 if isSubresource {doc = "delete" + subresource + "of" + article + kind}
 deleteReturnType := versionedStatus
 if deleteReturnsDeletedObject {deleteReturnType = producedObject}
 handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, deprecated, removedRelease, restfulDeleteResource(gracefulDeleter, isGracefulDeleter, reqScope, admit))
 if enableWarningHeaders {handler = utilwarning.AddWarningsHandler(handler, warnings)
 }
 route := ws.DELETE(action.Path).To(handler).
  Doc(doc).
  Param(ws.QueryParameter("pretty", "If'true', then the output is pretty printed.")).
  Operation("delete"+namespaced+kind+strings.Title(subresource)+operationSuffix).
  Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
  Writes(deleteReturnType).
  Returns(http.StatusOK, "OK", deleteReturnType).
  Returns(http.StatusAccepted, "Accepted", deleteReturnType)
 if isGracefulDeleter {route.Reads(versionedDeleterObject)
  route.ParameterNamed("body").Required(false)
  if err := AddObjectParams(ws, route, versionedDeleteOptions); err != nil {return nil, nil, err}
 }
 addParams(route, action.Params)
 routes = append(routes, route)

其中调用了 restfulDeleteResource 办法

func restfulDeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope handlers.RequestScope, admit admission.Interface) restful.RouteFunction {return func(req *restful.Request, res *restful.Response) {handlers.DeleteResource(r, allowsOptions, &scope, admit)(res.ResponseWriter, req.Request)
 }
}

DeleteResource 中调用 Delete 办法进行删除

result, err := finishRequest(timeout, func() (runtime.Object, error) {obj, deleted, err := r.Delete(ctx, name, rest.AdmissionToValidateObjectDeleteFunc(admit, staticAdmissionAttrs, scope), options)
 wasDeleted = deleted
 return obj, err
})

Delete 的实现在 k8s.iokubernetesstagingsrck8s.ioapiserverpkgregistrygenericregistrystore.go 文件中,次要的是通过 updateForGracefulDeletionAndFinalizers 函数

// 默认状况下,这个优雅的工夫是 30s
if graceful || pendingFinalizers || shouldUpdateFinalizers {err, ignoreNotFound, deleteImmediately, out, lastExisting = e.updateForGracefulDeletionAndFinalizers(ctx, name, key, options, preconditions, deleteValidation, obj)
 // Update the preconditions.ResourceVersion if set since we updated the object.
 if err == nil && deleteImmediately && preconditions.ResourceVersion != nil {accessor, err = meta.Accessor(out)
  if err != nil {return out, false, apierrors.NewInternalError(err)
  }
  resourceVersion := accessor.GetResourceVersion()
  preconditions.ResourceVersion = &resourceVersion
 }
}
// !deleteImmediately covers all cases where err != nil. We keep both to be future-proof.
// 如果是优雅删除此处 deleteImmediately 为 false,会返回
if !deleteImmediately || err != nil {return out, false, err}
// Going further in this function is not useful when we are
// performing a dry-run request. Worse, it will actually
// override "out" with the version of the object in database
// that doesn't have the finalizer and deletiontimestamp set
// (because the update above was dry-run too). If we already
// have that version available, let's just return it now,
// otherwise, we can call dry-run delete that will get us the
// latest version of the object.
if dryrun.IsDryRun(options.DryRun) && out != nil {return out, true, nil}
// delete immediately, or no graceful deletion supported
klog.V(6).Infof("going to delete %s from registry:", name)
out = e.NewFunc()
// 彻底删除清理存储
if err := e.Storage.Delete(ctx, key, out, &preconditions, storage.ValidateObjectFunc(deleteValidation), dryrun.IsDryRun(options.DryRun)); err != nil {
 // Please refer to the place where we set ignoreNotFound for the reason
 // why we ignore the NotFound error . if storage.IsNotFound(err) && ignoreNotFound && lastExisting != nil {
  // The lastExisting object may not be the last state of the object
 // before its deletion, but it's the best approximation. out, err := e.finalizeDelete(ctx, lastExisting, true)
  return out, true, err
 }
 return nil, false, storeerr.InterpretDeleteError(err, qualifiedResource, name)
}
out, err = e.finalizeDelete(ctx, out, true)

调用 BeforeDelete 办法扭转 pod 的外部信息,次要是 DeletionTimestampDeletionGracePeriodSeconds两个字段

now := metav1.NewTime(metav1.Now().Add(time.Second * time.Duration(*options.GracePeriodSeconds)))
objectMeta.SetDeletionTimestamp(&now)
objectMeta.SetDeletionGracePeriodSeconds(options.GracePeriodSeconds)

Kubelet 解决

解决流程

申请删除 Pod-->apiserver 更新 Pod 信息 -->kubelet 优雅开释 Pod 资源(批改 DeletionTimestamp 和 DeletionGracePeriodSeconds)-->kubelet 清理 pod 资源(canBeDeleted-->PodResourcesAreReclaimed)-->kubelet 调用 api server 接口删除 Pod(此时将 graceful 设置为 0)-->apiserver 删除 etcd 中 Pod 信息(deleteImmediately 此时为 true)-->kubelet 实现最终 Pod 的资源清理(执行 remove 操作)

代码剖析

kubelet 通过 statusManager 来同步 pod 的状态

// Syncs pods statuses with apiserver; also used as a cache of statuses.
statusManager status.Manager

manager start 函数如下:

func (m *manager) Start() {
 // Don't start the status manager if we don't have a client. This will happen
 // on the master, where the kubelet is responsible for bootstrapping the pods // of the master components. if m.kubeClient == nil {klog.Infof("Kubernetes client is nil, not starting status manager.")
  return
 }
 klog.Info("Starting to sync pod status with apiserver")
 //lint:ignore SA1015 Ticker can link since this is only called once and doesn't handle termination.
 syncTicker := time.Tick(syncPeriod)
 // syncPod and syncBatch share the same go routine to avoid sync races.
 go wait.Forever(func() {
  for {
   select {
   case syncRequest := <-m.podStatusChannel:
    klog.V(5).Infof("Status Manager: syncing pod: %q, with status: (%d, %v) from podStatusChannel",
 syncRequest.podUID, syncRequest.status.version, syncRequest.status.status)
    m.syncPod(syncRequest.podUID, syncRequest.status)
   case <-syncTicker:
    klog.V(5).Infof("Status Manager: syncing batch")
    // remove any entries in the status channel since the batch will handle them
 for i := len(m.podStatusChannel); i > 0; i-- {<-m.podStatusChannel}
    m.syncBatch()}
  }
 }, 0)
}

在 syncPod 办法有上面的逻辑

// We don't handle graceful deletion of mirror pods.
// canBeDeleted 函数中会调用 PodResourcesAreReclaimed, 来查看 pod 资源是否曾经开释结束; 真正的回收工作在 cgc.evictContainers 中实现
if m.canBeDeleted(pod, status.status) {
 deleteOptions := metav1.DeleteOptions{GracePeriodSeconds: new(int64),
 // Use the pod UID as the precondition for deletion to prevent deleting a
 // newly created pod with the same name and namespace. Preconditions: metav1.NewUIDPreconditions(string(pod.UID)),
 }
 
 // 再次调用删除接口,此时 GracePeriodSeconds 曾经是 0,api server 会执行立刻删除操作
 err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, deleteOptions)
 if err != nil {klog.Warningf("Failed to delete status for pod %q: %v", format.Pod(pod), err)
  return
 }
 klog.V(3).Infof("Pod %q fully terminated and removed from etcd", format.Pod(pod))
 m.deletePodStatus(uid)
}
正文完
 0