k8s原生并不反对就地降级。诸如deployment等工作负载在降级的过程中,间接对Pod进行recreate

实现容器的就地降级的另外一个前提是kubelet通过容器hash 来治理容器版本。

当创立一个Pod的时候,kubelet会计算每个容器的hash,并且把该hash值写到ContainerStatus中,该status的定义如下:

// Status represents the status of a container.type Status struct {    // ID of the container.    ID ContainerID    // Name of the container.    Name string    // Status of the container.    State State    // Creation time of the container.    CreatedAt time.Time    // Start time of the container.    StartedAt time.Time    // Finish time of the container.    FinishedAt time.Time    // Exit code of the container.    ExitCode int    // Name of the image, this also includes the tag of the image,    // the expected form is "NAME:TAG".    Image string    // ID of the image.    ImageID string    // Hash of the container, used for comparison.    Hash uint64    // Number of times that the container has been restarted.    RestartCount int    // A string explains why container is in such a status.    Reason string    // Message written by the container before exiting (stored in    // TerminationMessagePath).    Message string}

其中的Hashfiled 正是咱们明天讲到的hash。

如果spec.containers[i].image 发生变化之后,kubelet 通过计算hash值,判断出容器版本曾经变动,须要执行降级操作,拉取新的image,重启该容器,重启胜利之后,会把新的hash写到下面讲的ContainerStatus中。

kubelet 源码中提供了一个计算容器hash的辅助办法:

// HashContainer returns the hash of the container. It is used to compare// the running container with its desired spec.// Note: remember to update hashValues in container_hash_test.go as well.func HashContainer(container *v1.Container) uint64 {    hash := fnv.New32a()    // Omit nil or empty field when calculating hash value    // Please see https://github.com/kubernetes/kubernetes/issues/53644    containerJSON, _ := json.Marshal(container)    hashutil.DeepHashObject(hash, containerJSON)    return uint64(hash.Sum32())}

该办法,实现了容器hash的计算。

kubelet 通过hash,比拟理论运行的容器和spec中冀望的容器,判断是否容器曾经扭转,具体代码如下:

func containerChanged(container *v1.Container, containerStatus *kubecontainer.Status) (uint64, uint64, bool) {    expectedHash := kubecontainer.HashContainer(container)    return expectedHash, containerStatus.Hash, containerStatus.Hash != expectedHash}

如果曾经变动,则会执行重启操作。

                restart := shouldRestartOnFailure(pod)        if _, _, changed := containerChanged(&container, containerStatus); changed {            message = fmt.Sprintf("Container %s definition changed", container.Name)            // Restart regardless of the restart policy because the container            // spec changed.            restart = true        }

而后就实现了就地降级。

上述的逻辑位于 kuberuntime_manager.go 文件SyncPod办法中。该文件是kubelet的kuberuntimeManager 的实现。

// SyncPod syncs the running pod into the desired pod by executing following steps:////  1. Compute sandbox and container changes.//  2. Kill pod sandbox if necessary.//  3. Kill any containers that should not be running.//  4. Create sandbox if necessary.//  5. Create ephemeral containers.//  6. Create init containers.//  7. Create normal containers.func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {    // Step 1: Compute sandbox and container changes.    podContainerChanges := m.computePodActions(pod, podStatus)    klog.V(3).Infof("computePodActions got %+v for pod %q", podContainerChanges, format.Pod(pod))    if podContainerChanges.CreateSandbox {        ref, err := ref.GetReference(legacyscheme.Scheme, pod)        if err != nil {            klog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), err)        }        if podContainerChanges.SandboxID != "" {            m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.")        } else {            klog.V(4).Infof("SyncPod received new pod %q, will create a sandbox for it", format.Pod(pod))        }    }    // Step 2: Kill the pod if the sandbox has changed.    if podContainerChanges.KillPod {        if podContainerChanges.CreateSandbox {            klog.V(4).Infof("Stopping PodSandbox for %q, will start new one", format.Pod(pod))        } else {            klog.V(4).Infof("Stopping PodSandbox for %q because all other containers are dead.", format.Pod(pod))        }        killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)        result.AddPodSyncResult(killResult)        if killResult.Error() != nil {            klog.Errorf("killPodWithSyncResult failed: %v", killResult.Error())            return        }        if podContainerChanges.CreateSandbox {            m.purgeInitContainers(pod, podStatus)        }    } else {        // Step 3: kill any running containers in this pod which are not to keep.        for containerID, containerInfo := range podContainerChanges.ContainersToKill {            klog.V(3).Infof("Killing unwanted container %q(id=%q) for pod %q", containerInfo.name, containerID, format.Pod(pod))            killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name)            result.AddSyncResult(killContainerResult)            if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil); err != nil {                killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())                klog.Errorf("killContainer %q(id=%q) for pod %q failed: %v", containerInfo.name, containerID, format.Pod(pod), err)                return            }        }    }    // Keep terminated init containers fairly aggressively controlled    // This is an optimization because container removals are typically handled    // by container garbage collector.    m.pruneInitContainersBeforeStart(pod, podStatus)    // We pass the value of the PRIMARY podIP and list of podIPs down to    // generatePodSandboxConfig and generateContainerConfig, which in turn    // passes it to various other functions, in order to facilitate functionality    // that requires this value (hosts file and downward API) and avoid races determining    // the pod IP in cases where a container requires restart but the    // podIP isn't in the status manager yet. The list of podIPs is used to    // generate the hosts file.    //    // We default to the IPs in the passed-in pod status, and overwrite them if the    // sandbox needs to be (re)started.    var podIPs []string    if podStatus != nil {        podIPs = podStatus.IPs    }    // Step 4: Create a sandbox for the pod if necessary.    podSandboxID := podContainerChanges.SandboxID    if podContainerChanges.CreateSandbox {        var msg string        var err error        klog.V(4).Infof("Creating PodSandbox for pod %q", format.Pod(pod))        createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))        result.AddSyncResult(createSandboxResult)        podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)        if err != nil {            createSandboxResult.Fail(kubecontainer.ErrCreatePodSandbox, msg)            klog.Errorf("createPodSandbox for pod %q failed: %v", format.Pod(pod), err)            ref, referr := ref.GetReference(legacyscheme.Scheme, pod)            if referr != nil {                klog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), referr)            }            m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedCreatePodSandBox, "Failed to create pod sandbox: %v", err)            return        }        klog.V(4).Infof("Created PodSandbox %q for pod %q", podSandboxID, format.Pod(pod))        podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID)        if err != nil {            ref, referr := ref.GetReference(legacyscheme.Scheme, pod)            if referr != nil {                klog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), referr)            }            m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedStatusPodSandBox, "Unable to get pod sandbox status: %v", err)            klog.Errorf("Failed to get pod sandbox status: %v; Skipping pod %q", err, format.Pod(pod))            result.Fail(err)            return        }        // If we ever allow updating a pod from non-host-network to        // host-network, we may use a stale IP.        if !kubecontainer.IsHostNetworkPod(pod) {            // Overwrite the podIPs passed in the pod status, since we just started the pod sandbox.            podIPs = m.determinePodSandboxIPs(pod.Namespace, pod.Name, podSandboxStatus)            klog.V(4).Infof("Determined the ip %v for pod %q after sandbox changed", podIPs, format.Pod(pod))        }    }    // the start containers routines depend on pod ip(as in primary pod ip)    // instead of trying to figure out if we have 0 < len(podIPs)    // everytime, we short circuit it here    podIP := ""    if len(podIPs) != 0 {        podIP = podIPs[0]    }    // Get podSandboxConfig for containers to start.    configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID)    result.AddSyncResult(configPodSandboxResult)    podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt)    if err != nil {        message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err)        klog.Error(message)        configPodSandboxResult.Fail(kubecontainer.ErrConfigPodSandbox, message)        return    }    // Helper containing boilerplate common to starting all types of containers.    // typeName is a label used to describe this type of container in log messages,    // currently: "container", "init container" or "ephemeral container"    start := func(typeName string, spec *startSpec) error {        startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, spec.container.Name)        result.AddSyncResult(startContainerResult)        isInBackOff, msg, err := m.doBackOff(pod, spec.container, podStatus, backOff)        if isInBackOff {            startContainerResult.Fail(err, msg)            klog.V(4).Infof("Backing Off restarting %v %+v in pod %v", typeName, spec.container, format.Pod(pod))            return err        }        klog.V(4).Infof("Creating %v %+v in pod %v", typeName, spec.container, format.Pod(pod))        // NOTE (aramase) podIPs are populated for single stack and dual stack clusters. Send only podIPs.        if msg, err := m.startContainer(podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs); err != nil {            startContainerResult.Fail(err, msg)            // known errors that are logged in other places are logged at higher levels here to avoid            // repetitive log spam            switch {            case err == images.ErrImagePullBackOff:                klog.V(3).Infof("%v %+v start failed in pod %v: %v: %s", typeName, spec.container, format.Pod(pod), err, msg)            default:                utilruntime.HandleError(fmt.Errorf("%v %+v start failed in pod %v: %v: %s", typeName, spec.container, format.Pod(pod), err, msg))            }            return err        }        return nil    }    // Step 5: start ephemeral containers    // These are started "prior" to init containers to allow running ephemeral containers even when there    // are errors starting an init container. In practice init containers will start first since ephemeral    // containers cannot be specified on pod creation.    if utilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers) {        for _, idx := range podContainerChanges.EphemeralContainersToStart {            start("ephemeral container", ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))        }    }    // Step 6: start the init container.    if container := podContainerChanges.NextInitContainerToStart; container != nil {        // Start the next init container.        if err := start("init container", containerStartSpec(container)); err != nil {            return        }        // Successfully started the container; clear the entry in the failure        klog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod))    }    // Step 7: start containers in podContainerChanges.ContainersToStart.    for _, idx := range podContainerChanges.ContainersToStart {        start("container", containerStartSpec(&pod.Spec.Containers[idx]))    }    return}

公布于 4 小时前