关于数据库:TiDB-Operator-源码阅读-四-组件的控制循环

14次阅读

共计 16885 个字符,预计需要花费 43 分钟才能阅读完成。

上篇文章中,咱们介绍了 TiDB Operator 的组件生命周期治理的编排,以 TiDBCluster Controller 为例介绍 Controller Manager 的实现。TiDBCluster Controller 负责了 TiDB 次要组件的生命周期治理,TiDB 各个组件的 Member Manager 封装了对应具体的生命周期治理逻辑。在上篇文章中,咱们形容了一个形象的组件生命周期治理的实现,本文中,咱们将以 PD 为例具体介绍组件生命周期治理的实现过程和相干代码,并且以 PD 的介绍为根底,介绍其余组件的局部差别。

PD 的生命周期治理

PD 生命周期治理的次要逻辑在 PD Member Manager 下保护,次要代码在 pkg/manager/member/pd_member_manager.go 文件中,扩缩容、降级、故障转移的逻辑别离封装在 PD Scaler、PD Upgrader、PD Failover 中,别离位于 pd_scaler.gopd_upgrader.gopd_failover.go 文件中。

依照前文形容,组件的生命周期治理次要须要实现以下过程:

  1. 同步 Service;
  2. 进入 StatefulSet 同步过程;
  3. 同步 Status;
  4. 同步 ConfigMap;
  5. 解决滚动更新;
  6. 解决扩容与缩容;
  7. 解决故障转移;
  8. 最终实现 StatefulSet 同步过程。

其中,同步 StatefulSet 过程为 PD 组件生命周期治理的次要逻辑,其余同步过程,诸如同步 Status、同步 ConfigMap、滚动更新、扩容与缩容、故障转移等工作,被别离定义为了一个个子函数,在同步 StatefulSet 的过程中被调用,这些子工作的实现会在介绍完 StatefulSet 的同步过程之后具体介绍。

同步 StatefulSet

  1. 应用 Stetefulset Lister 获取 PD 现有的 StatefulSet:
oldPDSetTmp, err := m.deps.StatefulSetLister.StatefulSets(ns).Get(controller.PDMemberName(tcName))
if err != nil && !errors.IsNotFound(err) {return fmt.Errorf("syncPDStatefulSetForTidbCluster: fail to get sts %s for cluster %s/%s, error: %s", controller.PDMemberName(tcName), ns, tcName, err)
}
setNotExist := errors.IsNotFound(err)
 
oldPDSet := oldPDSetTmp.DeepCopy()
  1. 应用 m.syncTidbClusterStatus(tc, oldPDSet) 获取最新状态。
if err := m.syncTidbClusterStatus(tc, oldPDSet); err != nil {klog.Errorf("failed to sync TidbCluster: [%s/%s]'s status, error: %v", ns, tcName, err)
}
  1. 查看 TidbCluster 是否处于 Paused 状态,如果是,则进行接下来的 Reconcile 过程。
if tc.Spec.Paused {klog.V(4).Infof("tidb cluster %s/%s is paused, skip syncing for pd statefulset", tc.GetNamespace(), tc.GetName())
    return nil
}
  1. 依据最新的 tc.Spec,对 ConfigMap 进行同步。
cm, err := m.syncPDConfigMap(tc, oldPDSet)
  1. 依据最新的 tc.Spectc.Status 以及上一步获取到的 ConfigMap,生成最新的 StatefulSet 的模板。
newPDSet, err := getNewPDSetForTidbCluster(tc, cm)
  1. 如果 PD 的 StatefulSet 还未创立,那么在这一轮同步中会先创立 PD 的 StatefulSet。
if setNotExist {if err := SetStatefulSetLastAppliedConfigAnnotation(newPDSet); err != nil {return err}
    if err := m.deps.StatefulSetControl.CreateStatefulSet(tc, newPDSet); err != nil {return err}
    tc.Status.PD.StatefulSet = &apps.StatefulSetStatus{}
    return controller.RequeueErrorf("TidbCluster: [%s/%s], waiting for PD cluster running", ns, tcName)
}
  1. 如果用户应用 Annotation 配置了强制降级,那么会在这一步间接设置 StatefulSet 进行滚动更新,用于有些场景下同步循环被阻塞始终无奈更新的状况。
if !tc.Status.PD.Synced && NeedForceUpgrade(tc.Annotations) {
    tc.Status.PD.Phase = v1alpha1.UpgradePhase
    setUpgradePartition(newPDSet, 0)
    errSTS := UpdateStatefulSet(m.deps.StatefulSetControl, tc, newPDSet, oldPDSet)
    return controller.RequeueErrorf("tidbcluster: [%s/%s]'s pd needs force upgrade, %v", ns, tcName, errSTS)
}
  1. 解决 Scale,调用 pd_scaler.go 内实现的扩缩容逻辑。
if err := m.scaler.Scale(tc, oldPDSet, newPDSet); err != nil {return err}
  1. 解决 Failover,调用 pd_failover.go 内实现的故障转移逻辑,先查看是否须要 Recover,再查看是否所有 Pod 都失常启动、是否所有 Members 状态都是衰弱的,再决定是否进入 Failover 逻辑。
if m.deps.CLIConfig.AutoFailover {if m.shouldRecover(tc) {m.failover.Recover(tc)
    } else if tc.PDAllPodsStarted() && !tc.PDAllMembersReady() || tc.PDAutoFailovering() {if err := m.failover.Failover(tc); err != nil {return err}
    }
}
  1. 解决 Upgrade,调用 pd_upgrader.go 内实现的更新逻辑,当后面生成的新 PD StatefulSet 和 Kubernetes 内现有的 PD StatefulSet 不统一,或者 StatefulSet 统一但 tc.Status.PD.Phase 里记录的状态是更新状态,都会进入到 Upgrader 外面去解决滚动更新相干逻辑。
if !templateEqual(newPDSet, oldPDSet) || tc.Status.PD.Phase == v1alpha1.UpgradePhase {if err := m.upgrader.Upgrade(tc, oldPDSet, newPDSet); err != nil {return err}
}
  1. 最初实现 PD StatefulSet 的同步,将新的 StatefulSet 更新到 Kubernetes 集群内。

同步 Service

PD 应用了 Service 和 Headless Service 两种 Service,由 syncPDServiceForTidbClustersyncPDHeadlessServiceForTidbCluster 两个函数治理。

Service 地址个别用于 TiKV、TiDB、TiFlash 配置的 PD Endpoint,例如 TiDB 的启动参数如下,其中 TiDB 应用的 --path=${CLUSTER_NAME}-pd:2379 就是 PD 的 Service 地址:

ARGS="--store=tikv \
--advertise-address=${POD_NAME}.${HEADLESS_SERVICE_NAME}.${NAMESPACE}.svc \
--path=${CLUSTER_NAME}-pd:2379 \

Headless Service 能够为每个 Pod 提供惟一网络标识,例如 PD 的启动参数如下所示,当 PD 通过以下参数启动时,该 PD Pod 在 PD Members 中注册的该 Pod 的 Endpoint 为 "${POD_NAME}.${PEER_SERVICE_NAME}.${NAMESPACE}.svc"

domain="${POD_NAME}.${PEER_SERVICE_NAME}.${NAMESPACE}.svc"
ARGS="--data-dir=/var/lib/pd \
--name=${POD_NAME} \
--peer-urls=http://0.0.0.0:2380 \
--advertise-peer-urls=http://${domain}:2380 \
--client-urls=http://0.0.0.0:2379 \
--advertise-client-urls=http://${domain}:2379 \
--config=/etc/pd/pd.toml \
"

同步 ConfigMap

PD 应用 ConfigMap 治理配置和启动脚本,syncPDConfigMap 函数调用 getPDConfigMap 取得最新的 ConfigMap,而后将最新的 ConfigMap 更新到 Kubernetes 集群中。ConfigMap 须要解决以下工作:

  1. 获取 PD Config,用于前面同步。为了兼容 TiDB Operator 1.0 版本应用 Helm 保护 ConfigMap 的状况,当 config 对象为空时,不同步 ConfigMap。
config := tc.Spec.PD.Config
if config == nil {return nil, nil}
  1. 批改配置中 TLS 无关配置,其中 4.0 以下不反对 Dashboard,因而 4.0 以下 PD 版本不必设置 Dashboard 证书。
// override CA if tls enabled
if tc.IsTLSClusterEnabled() {config.Set("security.cacert-path", path.Join(pdClusterCertPath, tlsSecretRootCAKey))
    config.Set("security.cert-path", path.Join(pdClusterCertPath, corev1.TLSCertKey))
    config.Set("security.key-path", path.Join(pdClusterCertPath, corev1.TLSPrivateKeyKey))
}
// Versions below v4.0 do not support Dashboard
if tc.Spec.TiDB != nil && tc.Spec.TiDB.IsTLSClientEnabled() && !tc.SkipTLSWhenConnectTiDB() && clusterVersionGE4 {config.Set("dashboard.tidb-cacert-path", path.Join(tidbClientCertPath, tlsSecretRootCAKey))
    config.Set("dashboard.tidb-cert-path", path.Join(tidbClientCertPath, corev1.TLSCertKey))
    config.Set("dashboard.tidb-key-path", path.Join(tidbClientCertPath, corev1.TLSPrivateKeyKey))
}
  1. 将 Config 转换成 TOML 格局用于 PD 应用。
confText, err := config.MarshalTOML()
  1. 应用 RenderPDStartScript 生成 PD 启动脚本,其中 PD 的启动脚本模板在 pkg/manager/member/template.go 中的 pdStartScriptTpl 变量中。PD 的启动脚本是一段 Bash 脚本,依据模板渲染的目标是将一些 TidbCluster 对象设置的变量和 Annotation 插入到启动脚本中,用于 PD 失常启动以及 debug 模式。
  2. 将上文生成的 PD 配置以及 PD 启动脚本,组装成 Kubernetes 的 ConfigMap 对象,返回给 syncPDConfigMap 函数。
cm := &corev1.ConfigMap{
    ObjectMeta: metav1.ObjectMeta{Name:            controller.PDMemberName(tc.Name),
        Namespace:       tc.Namespace,
        Labels:          pdLabel,
        OwnerReferences: []metav1.OwnerReference{controller.GetOwnerRef(tc)},
    },
    Data: map[string]string{"config-file":    string(confText),
        "startup-script": startScript,
    },
}

扩容与缩容

扩缩容实现在 pkg/manager/member/pd_scaler.go 文件中,用于解决 PD 扩缩容的需要。在 StatefulSet 中,会调用 Scale 函数进入扩缩容的逻辑。扩容缩容都是通过设置 StatefulSet 正本数量施行,理论扩缩容之前须要实现一些前置操作,例如缩容时须要被动迁徙 Leader、下线节点、为 PVC 增加延时删除的 Annotation、扩容时主动删除之前保留的 PVC。在实现前置操作后再调整 StatefulSet 正本数量,缩小扩缩容操作对集群的影响。这些前置操作也能够依据业务须要进行拓展。

func (s *pdScaler) Scale(meta metav1.Object, oldSet *apps.StatefulSet, newSet *apps.StatefulSet) error {scaling, _, _, _ := scaleOne(oldSet, newSet)
    if scaling > 0 {return s.ScaleOut(meta, oldSet, newSet)
    } else if scaling < 0 {return s.ScaleIn(meta, oldSet, newSet)
    }
    return s.SyncAutoScalerAnn(meta, oldSet)
}

这个 Scale 函数相当于一个路由,依据扩缩容的方向、步长、间隔决定调用计划。目前 PD 扩缩容每次只会扩缩容一个节点,步长为 1,方向依据 scaling 变量正负性决定,计划由 ScaleIn 和 ScaleOut 两个函数实现。

对于 PD 缩容,为了不影响集群性能,缩容过程中须要被动实现 Leader 迁徙工作。否则缩容节点为 Leader 节点时,在 Leader 节点下线后残余节点才会在没有 Leader 的状况下被动开始选举 Leader,影响集群性能。被动迁徙 Leader 时,只须要将 Leader 迁徙到序号最小的节点,保障 PD Leader 迁徙的次数只有一次。

首先获取 PD Client,获取 PD Leader:

pdClient := controller.GetPDClient(s.deps.PDControl, tc)
leader, err := pdClient.GetPDLeader()

当 Leader Name 等于该节点 Name 的时候,执行 transferLeader 操作。如果节点数量为 1,此时没有足够 PD 节点实现 transferLeader 的操作,跳过该操作。

Leader 迁徙实现后,ScaleIn 函数会调用 PD 的 DeleteMember API 从 PD 成员中删除该节点,实现该节点的下线过程,最初调用 setReplicasAndDeleteSlots 调整 StatefulSet 的正本数量实现缩容。

对于 PD 扩容,此时缩容为了数据可靠性留下的 PVC,在扩容时须要删掉,避免应用旧数据,因而在扩容前会调用 deleteDeferDeletingPVC 删掉提早删除的 PVC。删除之后,调整 StatefulSet 的正本数量扩容即可。

对于 PD 的扩缩容,次要是通过设置 StatefulSet 正本数量实现扩缩容,因而在反对 Advanced StatefulSet 时,在计算正本数量须要思考留空 slots 的存在。

滚动更新

PD 降级在 pkg/manager/member/pd_upgrader.go 中实现,次要伎俩是应用 StatefulSet 的 UpdateStrategy 实现滚动更新。PD Upgrader 中会在调整 StatefulSet UpgradeStrategy 过程中插入 PD 的一些前置操作,缩小降级操作对 PD 集群的影响。对于具体如何管制 StatefulSet UpgradeStrategy,能够参考上篇文章。

在开始降级前,须要实现以下状态查看:

  1. 查看有无其余操作正在进行,次要是查看 TiCDC、TiFlash 是否处于降级状态,PD 是否处于扩缩容状态:
if tc.Status.TiCDC.Phase == v1alpha1.UpgradePhase ||
        tc.Status.TiFlash.Phase == v1alpha1.UpgradePhase ||
        tc.PDScaling()
  1. 在同步 StatefulSet 局部提到,进入 Upgrader 有两种条件,一是 newSet 和 oldSet Template Spec 不统一,这一状况产生在更新开始时,此时返回 nil,间接更新 StatefulSet 即可,不须要执行上面一一 Pod 查看的操作。如果是依据 tc.Status.PD.Phase == v1alpha1.UpgradePhase 条件进入的 Upgrader,则 newSetoldSet Template Spec 统一,此时须要继续执行上面的查看。
if !templateEqual(newSet, oldSet) {return nil}
  1. 比拟 tc.Status.PD.StatefulSet.UpdateRevisiontc.Status.PD.StatefulSet.CurrentRevision 来获取滚动更新状态,若两者相等阐明滚动更新操作实现,能够退出滚动更新过程。
if tc.Status.PD.StatefulSet.UpdateRevision == tc.Status.PD.StatefulSet.CurrentRevision
  1. 查看 StatefulSetUpdateStrategy 是否被手动批改,如果手动批改则沿用相应策略。
if oldSet.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType || oldSet.Spec.UpdateStrategy.RollingUpdate == nil {
        newSet.Spec.UpdateStrategy = oldSet.Spec.UpdateStrategy
        klog.Warningf("tidbcluster: [%s/%s] pd statefulset %s UpdateStrategy has been modified manually", ns, tcName, oldSet.GetName())
        return nil
    }

实现整体查看后,开始对每个 Pod 进行解决,进行滚动更新操作:

  1. 查看 PD Pod 是否曾经更新结束,通过查看 Pod Label 中的 controller-revision-hash 的值和 StatefulSetUpdateRevision 比拟,判断该 Pod 是曾经降级实现的 Pod 还是未解决的 Pod。对于已进行降级实现的 Pod,则查看该 Pod 对应的 PD Member 是否变为衰弱状态,若不是则返回谬误,期待下一次同步持续查看状态,若达到衰弱状态则开始解决下一个 Pod。
revision, exist := pod.Labels[apps.ControllerRevisionHashLabelKey]
        if !exist {return controller.RequeueErrorf("tidbcluster: [%s/%s]'s pd pod: [%s] has no label: %s", ns, tcName, podName, apps.ControllerRevisionHashLabelKey)
        }
 
        if revision == tc.Status.PD.StatefulSet.UpdateRevision {if member, exist := tc.Status.PD.Members[PdName(tc.Name, i, tc.Namespace, tc.Spec.ClusterDomain)]; !exist || !member.Health {return controller.RequeueErrorf("tidbcluster: [%s/%s]'s pd upgraded pod: [%s] is not ready", ns, tcName, podName)
            }
            continue
        }
  1. 对于 Pod revision != tc.Status.PD.StatefulSet.UpdateRevision 的 Pod,阐明该 Pod 还未执行滚动更新,调用 upgradePDPod 函数解决该 Pod。与缩容逻辑一样,当解决到 PD Leader Pod 时,会执行一段被动迁徙 Leader 的操作,而后才会执行更新该 Pod 的操作。
if tc.Status.PD.Leader.Name == upgradePdName || tc.Status.PD.Leader.Name == upgradePodName {
    var targetName string
    targetOrdinal := helper.GetMaxPodOrdinal(*newSet.Spec.Replicas, newSet)
    if ordinal == targetOrdinal {targetOrdinal = helper.GetMinPodOrdinal(*newSet.Spec.Replicas, newSet)
    }
    targetName = PdName(tcName, targetOrdinal, tc.Namespace, tc.Spec.ClusterDomain)
    if _, exist := tc.Status.PD.Members[targetName]; !exist {targetName = PdPodName(tcName, targetOrdinal)
    }
    
    if len(targetName) > 0 {err := u.transferPDLeaderTo(tc, targetName)
        if err != nil {klog.Errorf("pd upgrader: failed to transfer pd leader to: %s, %v", targetName, err)
            return err
        }
        klog.Infof("pd upgrader: transfer pd leader to: %s successfully", targetName)
        return controller.RequeueErrorf("tidbcluster: [%s/%s]'s pd member: [%s] is transferring leader to pd member: [%s]", ns, tcName, upgradePdName, targetName)
    }
}
setUpgradePartition(newSet, ordinal)

故障转移

PD 故障转移在 pd_failover.go 中实现,与其余组件 Failover 逻辑不同的是,PD 会通过被动删除故障 Pod 来修复问题。PD Failover 之前会做相应查看,当 PD 集群不可用,即过半 PD 节点不衰弱,此时重建 PD 节点并不会使集群复原,因而不会进行 Failover 工作。

  1. 遍历从 PD Client 获取到的 PD Members 衰弱状态,当 PD 成员衰弱状态为 Unhealthy 且 LastTransitionTime 距当初工夫超过 failoverDeadline 工夫,则标记为不衰弱成员,接下来的操作会将不衰弱的 PD 成员相干的 Pod 信息、PVC 信息记录到 tc.Status.PD.FailureMembers 中。
for pdName, pdMember := range tc.Status.PD.Members {podName := strings.Split(pdName, ".")[0]
 
    failoverDeadline := pdMember.LastTransitionTime.Add(f.deps.CLIConfig.PDFailoverPeriod)
    _, exist := tc.Status.PD.FailureMembers[pdName]
 
    if pdMember.Health || time.Now().Before(failoverDeadline) || exist {continue}
 
    pod, _ := f.deps.PodLister.Pods(ns).Get(podName)
 
    pvcs, _ := util.ResolvePVCFromPod(pod, f.deps.PVCLister)
 
    f.deps.Recorder.Eventf(tc, apiv1.EventTypeWarning, "PDMemberUnhealthy", "%s/%s(%s) is unhealthy", ns, podName, pdMember.ID)
 
    pvcUIDSet := make(map[types.UID]struct{})
    for _, pvc := range pvcs {pvcUIDSet[pvc.UID] = struct{}{}
    }
    tc.Status.PD.FailureMembers[pdName] = v1alpha1.PDFailureMember{
        PodName:       podName,
        MemberID:      pdMember.ID,
        PVCUIDSet:     pvcUIDSet,
        MemberDeleted: false,
        CreatedAt:     metav1.Now(),}
    return controller.RequeueErrorf("marking Pod: %s/%s pd member: %s as failure", ns, podName, pdMember.Name)
}
  1. 调用 tryToDeleteAFailureMember 函数解决 FailureMemebrs,遍历 FailureMembers,当遇到 MemberDeleted 为 False 的成员,调用 PD Client 删除该 PD 成员,尝试复原 Pod。
func (f *pdFailover) tryToDeleteAFailureMember(tc *v1alpha1.TidbCluster) error {ns := tc.GetNamespace()
    tcName := tc.GetName()
    var failureMember *v1alpha1.PDFailureMember
    var failurePodName string
    var failurePDName string
 
    for pdName, pdMember := range tc.Status.PD.FailureMembers {
        if !pdMember.MemberDeleted {
            failureMember = &pdMember
            failurePodName = strings.Split(pdName, ".")[0]
            failurePDName = pdName
            break
        }
    }
    if failureMember == nil {klog.Infof("No PD FailureMembers to delete for tc %s/%s", ns, tcName)
        return nil
    }
 
    memberID, err := strconv.ParseUint(failureMember.MemberID, 10, 64)
    if err != nil {return err}
 
    if err := controller.GetPDClient(f.deps.PDControl, tc).DeleteMemberByID(memberID); err != nil {klog.Errorf("pd failover[tryToDeleteAFailureMember]: failed to delete member %s/%s(%d), error: %v", ns, failurePodName, memberID, err)
        return err
    }
    klog.Infof("pd failover[tryToDeleteAFailureMember]: delete member %s/%s(%d) successfully", ns, failurePodName, memberID)
  ...

删除该故障 Pod,

pod, err := f.deps.PodLister.Pods(ns).Get(failurePodName)
    if err != nil && !errors.IsNotFound(err) {return fmt.Errorf("pd failover[tryToDeleteAFailureMember]: failed to get pod %s/%s for tc %s/%s, error: %s", ns, failurePodName, ns, tcName, err)
    }
    if pod != nil {
        if pod.DeletionTimestamp == nil {if err := f.deps.PodControl.DeletePod(tc, pod); err != nil {return err}
        }
    } else {klog.Infof("pd failover[tryToDeleteAFailureMember]: failure pod %s/%s not found, skip", ns, failurePodName)
    }

删除 PVC,

for _, pvc := range pvcs {_, pvcUIDExist := failureMember.PVCUIDSet[pvc.GetUID()]
        // for backward compatibility, if there exists failureMembers and user upgrades operator to newer version
        // there will be failure member structures with PVCUID set from api server, we should handle this as pvcUIDExist == true
        if pvc.GetUID() == failureMember.PVCUID {pvcUIDExist = true}
        if pvc.DeletionTimestamp == nil && pvcUIDExist {if err := f.deps.PVCControl.DeletePVC(tc, pvc); err != nil {klog.Errorf("pd failover[tryToDeleteAFailureMember]: failed to delete PVC: %s/%s, error: %s", ns, pvc.Name, err)
                return err
            }
            klog.Infof("pd failover[tryToDeleteAFailureMember]: delete PVC %s/%s successfully", ns, pvc.Name)
        }
    }

标记 tc.Status.PD.FailureMembers 中的状态为 Deleted。

setMemberDeleted(tc, failurePDName)
  1. PD StatefulSet 的 replicas 正本数量由 tc.PDStsDesiredReplicas() 取得,StatefulSet 正本数量会加上曾经被删除的 FailureMembers 数量,此时同步 StatefulSet 过程中会调用扩容 StatefulSet 的逻辑减少一个用于故障转移的 PD Pod。
func (tc *TidbCluster) GetPDDeletedFailureReplicas() int32 {
    var deletedReplicas int32 = 0
    for _, failureMember := range tc.Status.PD.FailureMembers {
        if failureMember.MemberDeleted {deletedReplicas++}
    }
    return deletedReplicas
}
 
func (tc *TidbCluster) PDStsDesiredReplicas() int32 {return tc.Spec.PD.Replicas + tc.GetPDDeletedFailureReplicas()
}

其余组件的生命周期治理

在后面咱们通过 PD 具体介绍了组件的生命周期治理的代码实现,其余组件,包含 TiKV、TiFlash、TiDB、Pump、TiCDC,与 PD 的生命周期治理较为相似,在此不再赘述,在上面局部咱们着重强调和 PD 代码实现的差别。

TiKV/TiFlash 的生命周期治理

TiKV 与 TiFlash 生命周期治理比拟相似,了解 TiKV 的代码实现即可。TiKV 的生命周期治理与 PD 生命周期治理差别如下:

  1. 同步 StatefulSet 过程中,TiKV Member Manager 须要通过 setStoreLabelsForTiKV 设置 TiKV Store Label。setStoreLabelsForTiKV 函数中实现了将 Node 上的 Label 通过 PD Client 的 SetStoreLabels 接口为 TiKV store 设置 Label。
for _, store := range storesInfo.Stores {
    nodeName := pod.Spec.NodeName
    ls, _ := getNodeLabels(m.deps.NodeLister, nodeName, storeLabels)
 
    if !m.storeLabelsEqualNodeLabels(store.Store.Labels, ls) {set, err := pdCli.SetStoreLabels(store.Store.Id, ls)
        if err != nil {continue}
        if set {
            setCount++
            klog.Infof("pod: [%s/%s] set labels: %v successfully", ns, podName, ls)
        }
    }
}
  1. 同步 Status 方面,TiKV Member Manager 会调用 PD Client 的 GetStores 函数从 PD 取得 TiKV Stores 信息,并对失去的 Stores 信息进行分类,用于后续同步。这一过程相似 PD 的 Status 同步过程中对 GetMembers 接口的调用并对 PD Members 信息进行剖析和记录的过程。
  2. 同步 Service 方面,TiKV Member Manager 只创立 Headless Service,用于 TiKV Pod DNS 解析。
  3. 同步 ConfigMap 方面,TiKV Member Manager 与 PD Member Manager 相似,相干脚本模版实现在 templates.go 文件中,通过调用 RenderTiKVStartScript 生成 TiKV 的启动脚本,调用 transformTiKVConfigMap 取得 TiKV 的配置文件。
  4. 扩容与缩容方面,与 PD 生命周期治理缩容前中被动迁徙 Leader 节点相似,TiKV 生命周期治理中须要平安下线 TiKV Stores,TiKV Member Manager 调用 PD Client 的 DeleteStore 删除该 Pod 上对应的 Store。
  5. 滚动更新方面,TiKV 须要在滚动更新 Pod 之前,须要确保 Pod 上没有 Region Leader。在开始滚动更新前,TiKV Upgrader 中会通过 Pod Annotation 上有没有 EvictLeaderBeginTime 判断是否对该 Pod 进行过 EvictLeader 操作。如果没有,则调用 调用 PD Client 中的 BeginEvictLeader 函数,为 TiKV store 增加 evict leader scheduler 驱赶该 TiKV Store 上的 Region Leader。
_, evicting := upgradePod.Annotations[EvictLeaderBeginTime]
if !evicting {return u.beginEvictLeader(tc, storeID, upgradePod)
}

在 readyToUpgrade 函数中,当 Region Leader 为零,或者转移 Region Leader 工夫超过了 tc.Spec.TiKV.EvictLeaderTimeout 设置的工夫,则更新 StatefulSet UpgradeStrategy 中的 partition 配置,触发 Pod 降级。当 Pod 实现降级之后,调用 endEvictLeaderbyStoreID 完结 evictLeader 操作。

  1. 故障转移方面,TiKV Member Manager 在状态同步时,会记录最初一次 Store 状态变更的工夫。
status.LastTransitionTime = metav1.Now()
if exist && status.State == oldStore.State {status.LastTransitionTime = oldStore.LastTransitionTime}

当 Store 状态为 v1alpha1.TiKVStateDown,并且依据 LastTransitionTime 放弃 Down 的状态的工夫超过了配置中设置的 Failover 工夫限度,则将该 TiKV Pod 退出到 FailureStores 中,

if store.State == v1alpha1.TiKVStateDown && time.Now().After(deadline) && !exist {
    if tc.Status.TiKV.FailureStores == nil {tc.Status.TiKV.FailureStores = map[string]v1alpha1.TiKVFailureStore{}}
    if tc.Spec.TiKV.MaxFailoverCount != nil && *tc.Spec.TiKV.MaxFailoverCount > 0 {
        maxFailoverCount := *tc.Spec.TiKV.MaxFailoverCount
        if len(tc.Status.TiKV.FailureStores) >= int(maxFailoverCount) {klog.Warningf("%s/%s failure stores count reached the limit: %d", ns, tcName, tc.Spec.TiKV.MaxFailoverCount)
            return nil
        }
        tc.Status.TiKV.FailureStores[storeID] = v1alpha1.TiKVFailureStore{
            PodName:   podName,
            StoreID:   store.ID,
            CreatedAt: metav1.Now(),}
        msg := fmt.Sprintf("store[%s] is Down", store.ID)
        f.deps.Recorder.Event(tc, corev1.EventTypeWarning, unHealthEventReason, fmt.Sprintf(unHealthEventMsgPattern, "tikv", podName, msg))
    }
}

同步 TiKV StatefulSet 过程中,replicas 数量会加上 FailureStores 的数量,触发扩容逻辑,实现 failover 过程。

func (tc *TidbCluster) TiKVStsDesiredReplicas() int32 {return tc.Spec.TiKV.Replicas + int32(len(tc.Status.TiKV.FailureStores))
}

TiDB/TiCDC/Pump 的生命周期治理

TiDB、TiCDC、Pump 的生命周期治理比拟相似,与其余组件相比,次要须要管制滚动更新时成员状态为衰弱状态时才容许持续滚动更新过程。在扩容与缩容过程中,额定须要思考的是 PVC 的应用,与 PD 的 PVC 应用办法相似,须要在缩容时增加 deferDeleting 的设计保障数据安全、在扩容时移除该 PVC。故障转移方面,目前 TiDB 曾经实现故障转移的逻辑,TiCDC 和 Pump 临时没有故障转移相干的逻辑。

小结

这篇文章介绍了 TiDBCluster 组件的管制循环的具体实现,次要联合 PD 组件的上下文信息解释了上篇文章中介绍的通用逻辑设计,而后介绍了其余组件的局部差别。通过这篇文章与上一篇文章,咱们理解了 TiDB 次要组件的 Member Manager 局部的设计,理解了 TiDB 生命周期治理过程在 TiDB Operator 的实现。

如果有什么好的想法,欢送通过 #sig-k8s 或 pingcap/tidb-operator 参加 TiDB Operator 社区交换。

正文完
 0