Operator的整体架构:

次要包含3大组件:

  • Informer: 监听资源对象的变动,将变动转成事件放入WorkQueue;
  • WorkQueue: 保留变动的事件;
  • Control Loop: 生产WorkQueue中的事件,对事件做响应;

其中,Informer较为简单:

  • Reflector: 调用apiservier接口,应用List&Watch对指定类型的资源对象进行监控;
  • DeltaFIFO: 增量队列,保留Reflector监控到的change对象;
  • LocalStorage: informer的本地cache,用以查问特定类型的资源对象,以加重apiserver的查问压力;

1. Informer源码

对于要监控的资源类型,每种类型创立一个Informer,比方prometheus CRD:

// 代码入口// cmd/operator/main.gofunc Main() int {    .......    r := prometheus.NewRegistry()    po, err := prometheuscontroller.New(cfg, log.With(logger, "component", "prometheusoperator"), r)    if err != nil {        fmt.Fprint(os.Stderr, "instantiating prometheus controller failed: ", err)        return 1    }    .......}

这里能够看到,用prometheuscontroller.New()创立对应的operator,这也是常说的operator=CRD+Controller:

// pkg/prometheus/operator.go// New creates a new controller.func New(conf Config, logger log.Logger, r prometheus.Registerer) (*Operator, error) {    ....    c.promInf = cache.NewSharedIndexInformer(        c.metrics.NewInstrumentedListerWatcher(            listwatch.MultiNamespaceListerWatcher(c.logger, c.config.Namespaces.PrometheusAllowList, c.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {                return &cache.ListWatch{                    ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {                        options.LabelSelector = c.config.PromSelector                        return mclient.MonitoringV1().Prometheuses(namespace).List(context.TODO(), options)                    },                    WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {                        options.LabelSelector = c.config.PromSelector                        return mclient.MonitoringV1().Prometheuses(namespace).Watch(context.TODO(), options)                    },                }            }),        ),        &monitoringv1.Prometheus{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},    )    .....}

创立了prometheus的Informor: c.promInf,能够看到,它监听对象变更应用的是List&Watch;
创立ok后,将该Informer Run起来:

// pkg/prometheus/operator.go// Run the controller.func (c *Operator) Run(stopc <-chan struct{}) error {    ......    go c.promInf.Run(stopc)    ......}

同时为该Informer增加handler,包含Add/Delete/Update:

// pkg/prometheus/operator.go// Run the controller.func (c *Operator) Run(stopc <-chan struct{}) error {    ......    if err := c.waitForCacheSync(stopc); err != nil {        return err    }    c.addHandlers()    ......}// addHandlers adds the eventhandlers to the informers.func (c *Operator) addHandlers() {    c.promInf.AddEventHandler(cache.ResourceEventHandlerFuncs{        AddFunc:    c.handlePrometheusAdd,        DeleteFunc: c.handlePrometheusDelete,        UpdateFunc: c.handlePrometheusUpdate,    })    .......}

2. WorkQueue源码

Informer发现监听的对象变更,调用handler,handler会将变更对象放入WorkQueue:
上面是Add Prometheus的事件:

// pkg/prometheus/operator.gofunc (c *Operator) handlePrometheusAdd(obj interface{}) {    key, ok := c.keyFunc(obj)    if !ok {        return    }    level.Debug(c.logger).Log("msg", "Prometheus added", "key", key)    c.metrics.TriggerByCounter(monitoringv1.PrometheusesKind, "add").Inc()    checkPrometheusSpecDeprecation(key, obj.(*monitoringv1.Prometheus), c.logger)    c.enqueue(key)}

3. Control Loop源码

operator会启动1个worker,来生产workQueue中的事件:

// pkg/prometheus/operator.go// Run the controller.func (c *Operator) Run(stopc <-chan struct{}) error {    .......    go c.worker()}
// pkg/prometheus/operator.go// worker runs a worker thread that just dequeues items, processes them, and// marks them done. It enforces that the syncHandler is never invoked// concurrently with the same key.func (c *Operator) worker() {    for c.processNextWorkItem() {    }}

看下具体的生产动作:

// pkg/prometheus/operator.gofunc (c *Operator) processNextWorkItem() bool {    key, quit := c.queue.Get()    //取队列元素    if quit {        return false    }    defer c.queue.Done(key)    err := c.sync(key.(string))    //进行事件操作    if err == nil {        c.queue.Forget(key)        //处理完毕,Forget        return true    }    c.metrics.ReconcileErrorsCounter().Inc()    utilruntime.HandleError(errors.Wrap(err, fmt.Sprintf("Sync %q failed", key)))    c.queue.AddRateLimited(key)    return true}

事件处理在c.sync()中,以prometehus statefulset为例:

  • 首先查看资源对象是否存在,若不存在则间接创立,返回;
  • 否则,更新statefulset对象;
  • 若都不满足,则删除statefulset对象;
// pkg/prometheus/operator.gofunc (c *Operator) sync(key string) error {    ......    ssetClient := c.kclient.AppsV1().StatefulSets(p.Namespace)    // Ensure we have a StatefulSet running Prometheus deployed.    obj, exists, err = c.ssetInf.GetIndexer().GetByKey(prometheusKeyToStatefulSetKey(key))    sset, err := makeStatefulSet(*p, &c.config, ruleConfigMapNames, newSSetInputHash)    //不存在,就创立    if !exists {        level.Debug(c.logger).Log("msg", "no current Prometheus statefulset found")        level.Debug(c.logger).Log("msg", "creating Prometheus statefulset")        if _, err := ssetClient.Create(context.TODO(), sset, metav1.CreateOptions{}); err != nil {            return errors.Wrap(err, "creating statefulset failed")        }        return nil    }    ......    //否则,就更新    _, err = ssetClient.Update(context.TODO(), sset, metav1.UpdateOptions{})    // 都不满足,则删除    if ok && sErr.ErrStatus.Code == 422 && sErr.ErrStatus.Reason == metav1.StatusReasonInvalid {        level.Info(c.logger).Log("msg", "resolving illegal update of Prometheus StatefulSet", "details", sErr.ErrStatus.Details)        if err := ssetClient.Delete(context.TODO(), sset.GetName(), metav1.DeleteOptions{PropagationPolicy: &propagationPolicy}); err != nil {            return errors.Wrap(err, "failed to delete StatefulSet to avoid forbidden action")        }        return nil    }    ......}

简略看下创立statefulset做的事件,无外乎依据CRD的配置,结构spec对象,而后应用ssetClient.Create创立statefulset:

// pkg/prometheus/statefulset.gofunc makeStatefulSet(    p monitoringv1.Prometheus,    config *Config,    ruleConfigMapNames []string,    inputHash string,) (*appsv1.StatefulSet, error) {    ......    spec, err := makeStatefulSetSpec(p, config, ruleConfigMapNames)    ......}
func makeStatefulSetSpec(p monitoringv1.Prometheus, c *Config, ruleConfigMapNames []string) (*appsv1.StatefulSetSpec, error) {    ......    promArgs := []string{        "-web.console.templates=/etc/prometheus/consoles",        "-web.console.libraries=/etc/prometheus/console_libraries",    }    switch version.Major {    case 1:        ......    case 2:        retentionTimeFlag := "-storage.tsdb.retention="        if version.Minor >= 7 {            retentionTimeFlag = "-storage.tsdb.retention.time="            if p.Spec.RetentionSize != "" {                promArgs = append(promArgs,                    fmt.Sprintf("-storage.tsdb.retention.size=%s", p.Spec.RetentionSize),                )            }        }        promArgs = append(promArgs,            fmt.Sprintf("-config.file=%s", path.Join(confOutDir, configEnvsubstFilename)),            fmt.Sprintf("-storage.tsdb.path=%s", storageDir),            retentionTimeFlag+p.Spec.Retention,            "-web.enable-lifecycle",            "-storage.tsdb.no-lockfile",        )        ......    }