乐趣区

关于数据库:TiDB-Operator-源码阅读-二-Operator-模式

在上一篇文章中咱们探讨了 TiDB Operator 的利用场景,理解了 TiDB Operator 能够在 Kubernetes 集群中治理 TiDB 的生命周期。可是,TiDB Operator 的代码是怎么运行起来的?TiDB 组件的生命周期治理的逻辑又是如何编排的呢?咱们将从 Operator 模式的视角,介绍 TiDB Operator 的代码实现,在这篇文章中咱们次要探讨 controller-manager 的实现,介绍从代码入口到组件的生命周期事件被触发两头的过程。

Operator 模式的演变: 从 Controller 模式到 Operator 模式

TiDB Operator 参考了 kube-controller-manager 的设计,理解 Kubernetes 的设计有助于理解 TiDB Operator 的代码逻辑。Kubernetes 内的 Resources 都是通过 Controller 实现生命周期治理的,例如 Namespace、Node、Deployment、Statefulset 等等,这些 Controller 的代码在 kube-controller-manager 中实现并由 kube-controller-manager 启动后调用。

为了反对用户自定义资源的开发需要,Kubernetes 社区基于下面的开发教训,提出了 Operator 模式。Kubernetes 反对通过 CRD(CustomResourceDefinition)来形容自定义资源,通过 CRD 创立 CR(CustomResource)对象,开发者实现相应 Controller 解决 CR 及关联资源的变更的需要,通过比对资源最新状态和冀望状态,逐渐实现运维操作,实现最终资源状态与冀望状态统一。通过定义 CRD 和实现对应 Controller,无需将代码合并到 Kubernetes 中编译应用,即可实现一个资源的生命周期治理。

TiDB Operator 的 Controller Manager

TiDB Operator 应用 tidb-controller-manager 治理各个 CRD 的 Controller。从 cmd/controller-manager/main.go 开始,tidb-controller-manager 首先加载了 kubeconfig,用于连贯 kube-apiserver,而后应用一系列 NewController 函数,加载了各个 Controller 的初始化函数。

controllers := []Controller{tidbcluster.NewController(deps),
    dmcluster.NewController(deps),
    backup.NewController(deps),
    restore.NewController(deps),
    backupschedule.NewController(deps),
    tidbinitializer.NewController(deps),
    tidbmonitor.NewController(deps),
}

在 Controller 的初始化函数过程中,会初始化一系列 Informer,这些 Informer 次要用来和 kube-apiserver 交互获取 CRD 和相干资源的变更。以 TiDBCluster 为例,在初始化函数 NewController 中,会初始化 Informer 对象:

tidbClusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: c.enqueueTidbCluster,
        UpdateFunc: func(old, cur interface{}) {c.enqueueTidbCluster(cur)
        },
        DeleteFunc: c.enqueueTidbCluster,
    })
statefulsetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: c.addStatefulSet,
        UpdateFunc: func(old, cur interface{}) {c.updateStatefulSet(old, cur)
        },
        DeleteFunc: c.deleteStatefulSet,
    })
 

Informer 中增加了解决增加,更新,删除事件的 EventHandler,把监听到的事件波及到的 CR 的 Key 退出队列。

初始化实现后启动 InformerFactory 并期待 cache 同步实现。

informerFactories := []InformerFactory{
            deps.InformerFactory,
            deps.KubeInformerFactory,
            deps.LabelFilterKubeInformerFactory,
        }
        for _, f := range informerFactories {f.Start(ctx.Done())
            for v, synced := range f.WaitForCacheSync(wait.NeverStop) {
                if !synced {klog.Fatalf("error syncing informer for %v", v)
                }
            }
        }

随后 tidb-controller-manager 会调用各个 Controller 的 Run 函数,开始循环执行 Controller 的外部逻辑。

// Start syncLoop for all controllers
for _,controller := range controllers {
    c := controller
    go wait.Forever(func() {c.Run(cliCfg.Workers,ctx.Done()) },cliCfg.WaitDuration)
}

以 TiDBCluster Controller 为例,Run 函数会启动 worker 解决工作队列。

// Run runs the tidbcluster controller.
func (c *Controller) Run(workers int, stopCh <-chan struct{}) {defer utilruntime.HandleCrash()
    defer c.queue.ShutDown()
 
    klog.Info("Starting tidbcluster controller")
    defer klog.Info("Shutting down tidbcluster controller")
 
    for i := 0; i < workers; i++ {go wait.Until(c.worker, time.Second, stopCh)
    }
 
    <-stopCh
}

Worker 会调用 processNextWorkItem 函数,弹出队列的元素,而后调用 sync 函数进行同步:

// worker runs a worker goroutine that invokes processNextWorkItem until the the controller's queue is closed
func (c *Controller) worker() {for c.processNextWorkItem() {}}
 
// processNextWorkItem dequeues items, processes them, and marks them done. It enforces that the syncHandler is never
// invoked concurrently with the same key.
func (c *Controller) processNextWorkItem() bool {key, quit := c.queue.Get()
    if quit {return false}
    defer c.queue.Done(key)
    if err := c.sync(key.(string)); err != nil {if perrors.Find(err, controller.IsRequeueError) != nil {klog.Infof("TidbCluster: %v, still need sync: %v, requeuing", key.(string), err)
        } else {utilruntime.HandleError(fmt.Errorf("TidbCluster: %v, sync failed %v, requeuing", key.(string), err))
        }
        c.queue.AddRateLimited(key)
    } else {c.queue.Forget(key)
    }
    return true
}

Sync 函数会依据 Key 获取对应的 CR 对象,例如这里的 TiDBCluster 对象,而后对这个 TiDBCluster 对象进行同步。

// sync syncs the given tidbcluster.
func (c *Controller) sync(key string) error {startTime := time.Now()
    defer func() {klog.V(4).Infof("Finished syncing TidbCluster %q (%v)", key, time.Since(startTime))
    }()
 
    ns, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {return err}
    tc, err := c.deps.TiDBClusterLister.TidbClusters(ns).Get(name)
    if errors.IsNotFound(err) {klog.Infof("TidbCluster has been deleted %v", key)
        return nil
    }
    if err != nil {return err}
 
    return c.syncTidbCluster(tc.DeepCopy())
}
 
func (c *Controller) syncTidbCluster(tc *v1alpha1.TidbCluster) error {return c.control.UpdateTidbCluster(tc)
}

syncTidbCluster 函数调用 updateTidbCluster 函数,进而调用一系列组件的 Sync 函数实现 TiDB 集群治理的相干工作。在 pkg/controller/tidbcluster/tidb_cluster_control.go 的 updateTidbCluster 函数实现中,咱们能够看到各个组件的 Sync 函数在这里调用,在相干调用代码正文里形容着每个 Sync 函数执行的生命周期操作事件,能够帮忙了解每个组件的 Reconcile 须要实现哪些工作,例如 PD 组件:

// works that should do to making the pd cluster current state match the desired state:
//   - create or update the pd service
//   - create or update the pd headless service
//   - create the pd statefulset
//   - sync pd cluster status from pd to TidbCluster object
//   - upgrade the pd cluster
//   - scale out/in the pd cluster
//   - failover the pd cluster
if err := c.pdMemberManager.Sync(tc); err != nil {return err}

咱们将在下篇文章中介绍组件的 Sync 函数实现了哪些工作,TiDBCluster Controller 是怎么实现各个组件的生命周期治理。

小结

通过这篇文章,咱们理解到 TiDB Operator 如何从 cmd/controller-manager/main.go 初始化运行和如何实现对应的 Controller 对象,并以 TidbCluster Controller 为例介绍了 Controller 从初始化到理论工作的过程以及 Controller 外部的工作逻辑。通过下面的代码运行逻辑的介绍,咱们分明了组件的生命周期管制循环是如何被触发的,问题曾经被放大到如何细化这个管制循环,增加 TiDB 非凡的运维逻辑,使得 TiDB 能在 Kubernetes 上部署和失常运行,实现其余的生命周期操作。咱们将在下一篇文章中探讨如何细化这个管制循环,探讨组件的管制循环的实现。

咱们介绍了社区对于 Operator 模式的摸索和演变。对于一些心愿应用 Operator 模式开发资源管理零碎的小伙伴,Kubernetes 社区中提供了 Kubebuilder 和 Operator Framework 两个 Controller 脚手架我的项目。相比于参考 kubernetes/sample-controller 进行开发,Operator 脚手架基于 kubernetes-sigs/controller-runtime 生成 Controller 代码,缩小了许多反复引入的模板化的代码。开发者只须要专一于实现 CRD 对象的管制循环局部即可,而不须要关怀管制循环启动之前的筹备工作。

如果有什么好的想法,欢送通过 #sig-k8s 或 pingcap/tidb-operator 参加 TiDB Operator 社区交换。

退出移动版