关于prometheus:prometheusoperator源码分析-以prometheus-statefulset为例

42次阅读

共计 5769 个字符,预计需要花费 15 分钟才能阅读完成。

Operator 的整体架构:

次要包含 3 大组件:

  • Informer: 监听资源对象的变动,将变动转成事件放入 WorkQueue;
  • WorkQueue: 保留变动的事件;
  • Control Loop: 生产 WorkQueue 中的事件,对事件做响应;

其中,Informer 较为简单:

  • Reflector: 调用 apiservier 接口,应用 List&Watch 对指定类型的资源对象进行监控;
  • DeltaFIFO: 增量队列,保留 Reflector 监控到的 change 对象;
  • LocalStorage: informer 的本地 cache,用以查问特定类型的资源对象,以加重 apiserver 的查问压力;

1. Informer 源码

对于要监控的资源类型,每种类型创立一个 Informer,比方 prometheus CRD:

// 代码入口
// cmd/operator/main.go
func Main() int {
    .......
    r := prometheus.NewRegistry()
    po, err := prometheuscontroller.New(cfg, log.With(logger, "component", "prometheusoperator"), r)
    if err != nil {fmt.Fprint(os.Stderr, "instantiating prometheus controller failed:", err)
        return 1
    }
    .......
}

这里能够看到,用 prometheuscontroller.New() 创立对应的 operator,这也是常说的 operator=CRD+Controller:

// pkg/prometheus/operator.go
// New creates a new controller.
func New(conf Config, logger log.Logger, r prometheus.Registerer) (*Operator, error) {
    ....
    c.promInf = cache.NewSharedIndexInformer(
        c.metrics.NewInstrumentedListerWatcher(listwatch.MultiNamespaceListerWatcher(c.logger, c.config.Namespaces.PrometheusAllowList, c.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
                return &cache.ListWatch{ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
                        options.LabelSelector = c.config.PromSelector
                        return mclient.MonitoringV1().Prometheuses(namespace).List(context.TODO(), options)
                    },
                    WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
                        options.LabelSelector = c.config.PromSelector
                        return mclient.MonitoringV1().Prometheuses(namespace).Watch(context.TODO(), options)
                    },
                }
            }),
        ),
        &monitoringv1.Prometheus{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
    )
    .....
}

创立了 prometheus 的 Informor: c.promInf,能够看到,它监听对象变更应用的是 List&Watch;
创立 ok 后,将该 Informer Run 起来:

// pkg/prometheus/operator.go
// Run the controller.
func (c *Operator) Run(stopc <-chan struct{}) error {
    ......
    go c.promInf.Run(stopc)
    ......
}

同时为该 Informer 增加 handler,包含 Add/Delete/Update:

// pkg/prometheus/operator.go
// Run the controller.
func (c *Operator) Run(stopc <-chan struct{}) error {
    ......
    if err := c.waitForCacheSync(stopc); err != nil {return err}
    c.addHandlers()
    ......
}

// addHandlers adds the eventhandlers to the informers.
func (c *Operator) addHandlers() {
    c.promInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    c.handlePrometheusAdd,
        DeleteFunc: c.handlePrometheusDelete,
        UpdateFunc: c.handlePrometheusUpdate,
    })
    .......
}

2. WorkQueue 源码

Informer 发现监听的对象变更,调用 handler,handler 会将变更对象放入 WorkQueue:
上面是 Add Prometheus 的事件:

// pkg/prometheus/operator.go
func (c *Operator) handlePrometheusAdd(obj interface{}) {key, ok := c.keyFunc(obj)
    if !ok {return}
    level.Debug(c.logger).Log("msg", "Prometheus added", "key", key)
    c.metrics.TriggerByCounter(monitoringv1.PrometheusesKind, "add").Inc()
    checkPrometheusSpecDeprecation(key, obj.(*monitoringv1.Prometheus), c.logger)
    c.enqueue(key)
}

3. Control Loop 源码

operator 会启动 1 个 worker,来生产 workQueue 中的事件:

// pkg/prometheus/operator.go
// Run the controller.
func (c *Operator) Run(stopc <-chan struct{}) error {
    .......
    go c.worker()}
// pkg/prometheus/operator.go
// worker runs a worker thread that just dequeues items, processes them, and
// marks them done. It enforces that the syncHandler is never invoked
// concurrently with the same key.
func (c *Operator) worker() {for c.processNextWorkItem() {}}

看下具体的生产动作:

// pkg/prometheus/operator.go
func (c *Operator) processNextWorkItem() bool {key, quit := c.queue.Get()    // 取队列元素
    if quit {return false}
    defer c.queue.Done(key)

    err := c.sync(key.(string))    // 进行事件操作
    if err == nil {c.queue.Forget(key)        // 处理完毕,Forget
        return true
    }
    c.metrics.ReconcileErrorsCounter().Inc()
    utilruntime.HandleError(errors.Wrap(err, fmt.Sprintf("Sync %q failed", key)))
    c.queue.AddRateLimited(key)
    return true
}

事件处理在 c.sync() 中,以 prometehus statefulset 为例:

  • 首先查看资源对象是否存在,若不存在则间接创立,返回;
  • 否则,更新 statefulset 对象;
  • 若都不满足,则删除 statefulset 对象;
// pkg/prometheus/operator.go
func (c *Operator) sync(key string) error {
    ......
    ssetClient := c.kclient.AppsV1().StatefulSets(p.Namespace)
    // Ensure we have a StatefulSet running Prometheus deployed.
    obj, exists, err = c.ssetInf.GetIndexer().GetByKey(prometheusKeyToStatefulSetKey(key))

    sset, err := makeStatefulSet(*p, &c.config, ruleConfigMapNames, newSSetInputHash)

    // 不存在,就创立
    if !exists {level.Debug(c.logger).Log("msg", "no current Prometheus statefulset found")
        level.Debug(c.logger).Log("msg", "creating Prometheus statefulset")
        if _, err := ssetClient.Create(context.TODO(), sset, metav1.CreateOptions{}); err != nil {return errors.Wrap(err, "creating statefulset failed")
        }
        return nil
    }
    ......
    // 否则,就更新
    _, err = ssetClient.Update(context.TODO(), sset, metav1.UpdateOptions{})

    // 都不满足,则删除
    if ok && sErr.ErrStatus.Code == 422 && sErr.ErrStatus.Reason == metav1.StatusReasonInvalid {level.Info(c.logger).Log("msg", "resolving illegal update of Prometheus StatefulSet", "details", sErr.ErrStatus.Details)
        if err := ssetClient.Delete(context.TODO(), sset.GetName(), metav1.DeleteOptions{PropagationPolicy: &propagationPolicy}); err != nil {return errors.Wrap(err, "failed to delete StatefulSet to avoid forbidden action")
        }
        return nil
    }
    ......
}

简略看下创立 statefulset 做的事件,无外乎依据 CRD 的配置,结构 spec 对象,而后应用 ssetClient.Create 创立 statefulset:

// pkg/prometheus/statefulset.go
func makeStatefulSet(
    p monitoringv1.Prometheus,
    config *Config,
    ruleConfigMapNames []string,
    inputHash string,
) (*appsv1.StatefulSet, error) {
    ......
    spec, err := makeStatefulSetSpec(p, config, ruleConfigMapNames)
    ......
}
func makeStatefulSetSpec(p monitoringv1.Prometheus, c *Config, ruleConfigMapNames []string) (*appsv1.StatefulSetSpec, error) {
    ......
    promArgs := []string{
        "-web.console.templates=/etc/prometheus/consoles",
        "-web.console.libraries=/etc/prometheus/console_libraries",
    }
    switch version.Major {
    case 1:
        ......
    case 2:
        retentionTimeFlag := "-storage.tsdb.retention="
        if version.Minor >= 7 {
            retentionTimeFlag = "-storage.tsdb.retention.time="
            if p.Spec.RetentionSize != "" {
                promArgs = append(promArgs,
                    fmt.Sprintf("-storage.tsdb.retention.size=%s", p.Spec.RetentionSize),
                )
            }
        }
        promArgs = append(promArgs,
            fmt.Sprintf("-config.file=%s", path.Join(confOutDir, configEnvsubstFilename)),
            fmt.Sprintf("-storage.tsdb.path=%s", storageDir),
            retentionTimeFlag+p.Spec.Retention,
            "-web.enable-lifecycle",
            "-storage.tsdb.no-lockfile",
        )
        ......
    
}

正文完
 0