关于prometheus:prometheusoperator源码分析-prometheus配置自动更新之rulesreloader三

61次阅读

共计 5872 个字符,预计需要花费 15 分钟才能阅读完成。

rules-reloader 的源码:https://github.com/jimmidyson…

rules-reloader 的启动参数:

Args:
      --webhook-url=http://localhost:9090/-/reload
      --volume-dir=/etc/prometheus/rules/prometheus-k8s-rulefiles-0

operator 监听到 prometheusrule 配置变更,会更新 configmap(目录 prometheus-k8s-rulefiles-0),rules-reloader 监控到 prometheus-k8s-rulefiles- 0 目录有变更,发送 reload 给 prometheus。

1. rules-reloader 的源码剖析

rules-reloader 的源码很简略,应用 fsnotify 监听 –volume-dir,发现变动就发送 –webhook-url:

// configmap-reload.go
func main() {flag.Var(&volumeDirs, "volume-dir", "the config map volume directory to watch for updates; may be used multiple times")
    flag.Var(&webhook, "webhook-url", "the url to send a request to when the specified config map volume directory has been updated")
    flag.Parse()
    
    watcher, err := fsnotify.NewWatcher()
    
    go func() {
        for {
            select {
                // 监听到变动
                case event := <-watcher.Events:
                    log.Println("config map updated")
                    for _, h := range webhook {begun := time.Now()
                        // HTTP 发送 webhook
                        req, err := http.NewRequest(*webhookMethod, h.String(), nil)
                        for retries := *webhookRetries; retries != 0; retries-- {resp, err := http.DefaultClient.Do(req)
                            .....
                        }
                    }
                case err := <-watcher.Errors:
            }
        }
    }()
    // 配置监听 volumeDirs
    for _, d := range volumeDirs {log.Printf("Watching directory: %q", d)
        err = watcher.Add(d)
        if err != nil {log.Fatal(err)
        }
    }
    ......
}

rulers-reloader 监听的 volume 是挂载的 ConfigMap:prometheus-k8s-rulefiles-0

--volume-dir=/etc/prometheus/rules/prometheus-k8s-rulefiles-0

该目录下保护的 rules 文件:/etc/prometheus/rules/prometheus-k8s-rulefiles-0 $ ls
monitoring-prometheus-k8s-rules.yaml

2. operator 保护 configmap 的源码

configMap 是由 operator 保护的,它对应 prometheusrule CRD 对象。
1) 监听 prometheurule 资源对象变更
与 promInfo 相似,有一个 ruleInf 专门负责解决 prometheurule:

// pkg/prometheus/operator.go
// New creates a new controller.
func New(conf Config, logger log.Logger, r prometheus.Registerer) (*Operator, error) {
    ......
    c.ruleInf = cache.NewSharedIndexInformer(
        c.metrics.NewInstrumentedListerWatcher(listwatch.MultiNamespaceListerWatcher(c.logger, c.config.Namespaces.AllowList, c.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
                return &cache.ListWatch{ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {return mclient.MonitoringV1().PrometheusRules(namespace).List(context.TODO(), options)
                    },
                    WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {return mclient.MonitoringV1().PrometheusRules(namespace).Watch(context.TODO(), options)
                    },
                }
            }),
        ),
        &monitoringv1.PrometheusRule{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
    )
    ....
}

运行该 ruleInf 并增加 handler:

// pkg/prometheus/operator.go
// Run the controller.
func (c *Operator) Run(stopc <-chan struct{}) error {
    .......
    go c.ruleInf.Run(stopc)
    ......
}
// addHandlers adds the eventhandlers to the informers.
func (c *Operator) addHandlers() {
    ......
    c.ruleInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    c.handleRuleAdd,
        DeleteFunc: c.handleRuleDelete,
        UpdateFunc: c.handleRuleUpdate,
    })
    ......
}

一旦有资源变更,调用 AddFunc/DeleteFunc/UpdateFunc:

// AddFunc
func (c *Operator) handleRuleAdd(obj interface{}) {o, ok := c.getObject(obj)
    if ok {level.Debug(c.logger).Log("msg", "PrometheusRule added")
        c.metrics.TriggerByCounter(monitoringv1.PrometheusRuleKind, "add").Inc()

        c.enqueueForMonitorNamespace(o.GetNamespace())
    }
}

// UpdateFunc
func (c *Operator) handleRuleUpdate(old, cur interface{}) {if old.(*monitoringv1.PrometheusRule).ResourceVersion == cur.(*monitoringv1.PrometheusRule).ResourceVersion {return}
    o, ok := c.getObject(cur)
    if ok {level.Debug(c.logger).Log("msg", "PrometheusRule updated")
        c.metrics.TriggerByCounter(monitoringv1.PrometheusRuleKind, "update").Inc()
        c.enqueueForMonitorNamespace(o.GetNamespace())
    }
}

看下入队到 workQueue 的到底是啥:

// pkg/prometheus/operator.go
func (c *Operator) enqueueForMonitorNamespace(nsName string) {c.enqueueForNamespace(c.nsMonInf.GetStore(), nsName)
}

// enqueueForNamespace enqueues all Prometheus object keys that belong to the
// given namespace or select objects in the given namespace.
func (c *Operator) enqueueForNamespace(store cache.Store, nsName string) {nsObject, exists, err := store.GetByKey(nsName)
    ns := nsObject.(*v1.Namespace)
    
    err = cache.ListAll(c.promInf.GetStore(), labels.Everything(), func(obj interface{}) {
        // Check for Prometheus instances in the namespace.
        p := obj.(*monitoringv1.Prometheus)
        if p.Namespace == nsName {c.enqueue(p)
            return
        }
        ......
    }
}

能够看到,入队的是个 prometheus CRD 对象的 key。

2) 资源对象的变更解决
间接来到 prometheus workQueue 的处理函数:

// pkg/prometheus/operator.go
func (c *Operator) sync(key string) error {obj, exists, err := c.promInf.GetIndexer().GetByKey(key)
    ......
    p := obj.(*monitoringv1.Prometheus)
    ruleConfigMapNames, err := c.createOrUpdateRuleConfigMaps(p)
    if err != nil {return err}
    .....
}

具体看下 prometheurule 的处理函数,对于 prometheusrule,这里没有更新,间接 delete 老的,而后 create 新的:

// pkg/prometheus/operator.go
func (c *Operator) createOrUpdateRuleConfigMaps(p *monitoringv1.Prometheus) ([]string, error) {cClient := c.kclient.CoreV1().ConfigMaps(p.Namespace)
    ......
    newRules, err := c.selectRules(p, namespaces)
    ......
    newConfigMaps, err := makeRulesConfigMaps(p, newRules)
    ......
    // 间接删除老的规定,而后创立新的
    // Simply deleting old ConfigMaps and creating new ones for now. Could be
    // replaced by logic that only deletes obsolete ConfigMaps in the future.
    for _, cm := range currentConfigMaps {err := cClient.Delete(context.TODO(), cm.Name, metav1.DeleteOptions{})
        if err != nil {return nil, errors.Wrapf(err, "failed to delete current ConfigMap'%v'", cm.Name)
        }
    }
    for _, cm := range newConfigMaps {_, err = cClient.Create(context.TODO(), &cm, metav1.CreateOptions{})
        if err != nil {return nil, errors.Wrapf(err, "failed to create new ConfigMap'%v'", cm.Name)
        }
    }
    ......
}

结构 rule 的细节:rule 文件名称 ={namespace}-{name}.yaml

// 结构 rules
func (c *Operator) selectRules(p *monitoringv1.Prometheus, namespaces []string) (map[string]string, error) {rules := map[string]string{}
    .....
    for _, ns := range namespaces {
        var marshalErr error
        err := cache.ListAllByNamespace(c.ruleInf.GetIndexer(), ns, ruleSelector, func(obj interface{}) {promRule := obj.(*monitoringv1.PrometheusRule).DeepCopy()
            if err := nsLabeler.EnforceNamespaceLabel(promRule); err != nil {
                marshalErr = err
                return
            }
            content, err := generateContent(promRule.Spec)
            if err != nil {
                marshalErr = err
                return
            }
            rules[fmt.Sprintf("%v-%v.yaml", promRule.Namespace, promRule.Name)] = content    //rule 的名称:{namespace}-{name}.yaml
        })
        if err != nil {return nil, err}
        if marshalErr != nil {return nil, marshalErr}
    }
    ...
    return rules, nil
}

正文完
 0