rules-reloader的源码: https://github.com/jimmidyson...

rules-reloader的启动参数:

Args:      --webhook-url=http://localhost:9090/-/reload      --volume-dir=/etc/prometheus/rules/prometheus-k8s-rulefiles-0

operator监听到prometheusrule配置变更,会更新configmap(目录prometheus-k8s-rulefiles-0),rules-reloader监控到prometheus-k8s-rulefiles-0目录有变更,发送reload给prometheus。

1. rules-reloader的源码剖析

rules-reloader的源码很简略,应用fsnotify监听--volume-dir,发现变动就发送--webhook-url:

// configmap-reload.gofunc main() {    flag.Var(&volumeDirs, "volume-dir", "the config map volume directory to watch for updates; may be used multiple times")    flag.Var(&webhook, "webhook-url", "the url to send a request to when the specified config map volume directory has been updated")    flag.Parse()        watcher, err := fsnotify.NewWatcher()        go func() {        for {            select {                // 监听到变动                case event := <-watcher.Events:                    log.Println("config map updated")                    for _, h := range webhook {                        begun := time.Now()                        // HTTP发送webhook                        req, err := http.NewRequest(*webhookMethod, h.String(), nil)                        for retries := *webhookRetries; retries != 0; retries-- {                            resp, err := http.DefaultClient.Do(req)                            .....                        }                    }                case err := <-watcher.Errors:            }        }    }()    // 配置监听volumeDirs    for _, d := range volumeDirs {        log.Printf("Watching directory: %q", d)        err = watcher.Add(d)        if err != nil {            log.Fatal(err)        }    }    ......}

rulers-reloader监听的volume是挂载的ConfigMap:prometheus-k8s-rulefiles-0

--volume-dir=/etc/prometheus/rules/prometheus-k8s-rulefiles-0该目录下保护的rules文件:/etc/prometheus/rules/prometheus-k8s-rulefiles-0 $ lsmonitoring-prometheus-k8s-rules.yaml

2. operator保护configmap的源码

configMap是由operator保护的,它对应prometheusrule CRD对象。
1) 监听prometheurule资源对象变更
与promInfo相似,有一个ruleInf专门负责解决prometheurule:

// pkg/prometheus/operator.go// New creates a new controller.func New(conf Config, logger log.Logger, r prometheus.Registerer) (*Operator, error) {    ......    c.ruleInf = cache.NewSharedIndexInformer(        c.metrics.NewInstrumentedListerWatcher(            listwatch.MultiNamespaceListerWatcher(c.logger, c.config.Namespaces.AllowList, c.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {                return &cache.ListWatch{                    ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {                        return mclient.MonitoringV1().PrometheusRules(namespace).List(context.TODO(), options)                    },                    WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {                        return mclient.MonitoringV1().PrometheusRules(namespace).Watch(context.TODO(), options)                    },                }            }),        ),        &monitoringv1.PrometheusRule{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},    )    ....}

运行该ruleInf并增加handler:

// pkg/prometheus/operator.go// Run the controller.func (c *Operator) Run(stopc <-chan struct{}) error {    .......    go c.ruleInf.Run(stopc)    ......}// addHandlers adds the eventhandlers to the informers.func (c *Operator) addHandlers() {    ......    c.ruleInf.AddEventHandler(cache.ResourceEventHandlerFuncs{        AddFunc:    c.handleRuleAdd,        DeleteFunc: c.handleRuleDelete,        UpdateFunc: c.handleRuleUpdate,    })    ......}

一旦有资源变更,调用AddFunc/DeleteFunc/UpdateFunc:

// AddFuncfunc (c *Operator) handleRuleAdd(obj interface{}) {    o, ok := c.getObject(obj)    if ok {        level.Debug(c.logger).Log("msg", "PrometheusRule added")        c.metrics.TriggerByCounter(monitoringv1.PrometheusRuleKind, "add").Inc()        c.enqueueForMonitorNamespace(o.GetNamespace())    }}// UpdateFuncfunc (c *Operator) handleRuleUpdate(old, cur interface{}) {    if old.(*monitoringv1.PrometheusRule).ResourceVersion == cur.(*monitoringv1.PrometheusRule).ResourceVersion {        return    }    o, ok := c.getObject(cur)    if ok {        level.Debug(c.logger).Log("msg", "PrometheusRule updated")        c.metrics.TriggerByCounter(monitoringv1.PrometheusRuleKind, "update").Inc()        c.enqueueForMonitorNamespace(o.GetNamespace())    }}

看下入队到workQueue的到底是啥:

// pkg/prometheus/operator.gofunc (c *Operator) enqueueForMonitorNamespace(nsName string) {    c.enqueueForNamespace(c.nsMonInf.GetStore(), nsName)}// enqueueForNamespace enqueues all Prometheus object keys that belong to the// given namespace or select objects in the given namespace.func (c *Operator) enqueueForNamespace(store cache.Store, nsName string) {    nsObject, exists, err := store.GetByKey(nsName)    ns := nsObject.(*v1.Namespace)        err = cache.ListAll(c.promInf.GetStore(), labels.Everything(), func(obj interface{}) {        // Check for Prometheus instances in the namespace.        p := obj.(*monitoringv1.Prometheus)        if p.Namespace == nsName {            c.enqueue(p)            return        }        ......    }}

能够看到,入队的是个prometheus CRD对象的key。

2) 资源对象的变更解决
间接来到prometheus workQueue的处理函数:

// pkg/prometheus/operator.gofunc (c *Operator) sync(key string) error {    obj, exists, err := c.promInf.GetIndexer().GetByKey(key)    ......    p := obj.(*monitoringv1.Prometheus)    ruleConfigMapNames, err := c.createOrUpdateRuleConfigMaps(p)    if err != nil {        return err    }    .....}

具体看下prometheurule的处理函数,对于prometheusrule,这里没有更新,间接delete老的,而后create新的:

// pkg/prometheus/operator.gofunc (c *Operator) createOrUpdateRuleConfigMaps(p *monitoringv1.Prometheus) ([]string, error) {    cClient := c.kclient.CoreV1().ConfigMaps(p.Namespace)    ......    newRules, err := c.selectRules(p, namespaces)    ......    newConfigMaps, err := makeRulesConfigMaps(p, newRules)    ......    // 间接删除老的规定,而后创立新的    // Simply deleting old ConfigMaps and creating new ones for now. Could be    // replaced by logic that only deletes obsolete ConfigMaps in the future.    for _, cm := range currentConfigMaps {        err := cClient.Delete(context.TODO(), cm.Name, metav1.DeleteOptions{})        if err != nil {            return nil, errors.Wrapf(err, "failed to delete current ConfigMap '%v'", cm.Name)        }    }    for _, cm := range newConfigMaps {        _, err = cClient.Create(context.TODO(), &cm, metav1.CreateOptions{})        if err != nil {            return nil, errors.Wrapf(err, "failed to create new ConfigMap '%v'", cm.Name)        }    }    ......}

结构rule的细节:rule文件名称={namespace}-{name}.yaml

// 结构rulesfunc (c *Operator) selectRules(p *monitoringv1.Prometheus, namespaces []string) (map[string]string, error) {    rules := map[string]string{}    .....    for _, ns := range namespaces {        var marshalErr error        err := cache.ListAllByNamespace(c.ruleInf.GetIndexer(), ns, ruleSelector, func(obj interface{}) {            promRule := obj.(*monitoringv1.PrometheusRule).DeepCopy()            if err := nsLabeler.EnforceNamespaceLabel(promRule); err != nil {                marshalErr = err                return            }            content, err := generateContent(promRule.Spec)            if err != nil {                marshalErr = err                return            }            rules[fmt.Sprintf("%v-%v.yaml", promRule.Namespace, promRule.Name)] = content    //rule的名称:{namespace}-{name}.yaml        })        if err != nil {            return nil, err        }        if marshalErr != nil {            return nil, marshalErr        }    }    ...    return rules, nil}