关于prometheus:prometheusoperator源码分析-prometheus配置自动更新之configreloader二

37次阅读

共计 6951 个字符,预计需要花费 18 分钟才能阅读完成。

config-reloader 的源码:https://github.com/prometheus…

config-reloader 的启动参数:

prometheus-config-reloader:
Image:         178.104.162.39:443/kubernetes/amd64/prometheus-config-reloader:v0.40.0
Args:
      --log-format=logfmt
      --reload-url=http://localhost:9090/-/reload
      --config-file=/etc/prometheus/config/prometheus.yaml.gz
      --config-envsubst-file=/etc/prometheus/config_out/prometheus.env.yaml

operator 监听到 prometheus 配置变更,会更新 secret(文件 prometheus-yaml.gz,应用 gz 保障 <1M),config-reloader 监控到 prometheus-yaml.gz 文件有变更,将其解压至 prometheus-env.yaml,而后发送 reload 给 prometehus。

1. config-reloader 内的源码剖析

代码中监听了 config-file 和 rules-dir(未应用) 的变动:

// cmd/promethues-config-reloader/main.go
func main() {cfgFile := app.Flag("config-file", "config file watched by the reloader").String()
    cfgSubstFile := app.Flag("config-envsubst-file", "output file for environment variable substituted config file").String()
    rulesDir := app.Flag("rules-dir", "Rules directory to watch non-recursively").Strings()

    var g run.Group
    {ctx, cancel := context.WithCancel(context.Background())
        rel := reloader.New(logger, *reloadURL, *cfgFile, *cfgSubstFile, *rulesDir)

        g.Add(func() error {return rel.Watch(ctx)
        }, func(error) {cancel()
        })
    }

    if err := g.Run(); err != nil {fmt.Fprintln(os.Stderr, err)
        os.Exit(1)
    }
}

监听的动作由 reloader 实现:

// New creates a new reloader that watches the given config file and rule directory
// and triggers a Prometheus reload upon changes.
// If cfgOutputFile is not empty the config file will be decompressed if needed, environment variables
// will be substituted and the output written into the given path. Prometheus should then use
// cfgOutputFile as its config file path.
func New(logger log.Logger, reloadURL *url.URL, cfgFile string, cfgOutputFile string, ruleDirs []string) *Reloader {
    if logger == nil {logger = log.NewNopLogger()
    }
    return &Reloader{
        logger:        logger,
        reloadURL:     reloadURL,
        cfgFile:       cfgFile,
        cfgOutputFile: cfgOutputFile,
        ruleDirs:      ruleDirs,
        watchInterval: 3 * time.Minute,
        retryInterval: 5 * time.Second,
    }
}

能够看到 reloader 每个 3min 监听一次变动,应用 fsnotify 监听文件的变动:

// Watch starts to watch periodically the config file and rules and process them until the context
// gets canceled. Config file gets env expanded if cfgOutputFile is specified and reload is trigger if
// config or rules changed.
// Watch watchers periodically based on r.watchInterval.
// For config file it watches it directly as well via fsnotify.
// It watches rule dirs as well, but lot's of edge cases are missing, so rely on interval mostly.
func (r *Reloader) Watch(ctx context.Context) error {
    // 应用 fsnotity 监听
    watcher, err := fsnotify.NewWatcher()
    
    // 监听 cfgFile
    watchables := map[string]struct{}{}
    if r.cfgFile != "" {watchables[filepath.Dir(r.cfgFile)] = struct{}{}
        if err := watcher.Add(r.cfgFile); err != nil {return errors.Wrapf(err, "add config file %s to watcher", r.cfgFile)
        }
        if err := r.apply(ctx); err != nil {return err}
    }

    tick := time.NewTicker(r.watchInterval)
    defer tick.Stop()

    for {
        select {case <-ctx.Done():
            return nil
        case <-tick.C:
        case event := <-watcher.Events:
            // TODO(bwplotka): Add metric if we are not cycling CPU here too much.
            if _, ok := watchables[filepath.Dir(event.Name)]; !ok {continue}
        case err := <-watcher.Errors:
            level.Error(r.logger).Log("msg", "watch error", "err", err)
            continue
        }
        // 监听到变动,apply
        if err := r.apply(ctx); err != nil {
            // Critical error.
            return err
        }
    }

}

监听到变动会 apply,apply 中波及的文件:

  • r.cfgFile=/etc/prometheus/config/prometheus.yaml.gz,它是 1 个 gzip 文件;
  • r.cfgOutputFile=/etc/prometheus/config_out/prometheus.env.yaml,它是真正被 prometheus 挂载应用的配置文件;

apply 中的文件操作:

  • gunzip 解压缩 cfgFile 文件;
  • 将解压缩的文件内容,写入到 cfgOutputFile;
  • 也就说,gz 文件是最新的配置,它被解压缩而后替换 env,最终生成新的配置文件 promethues-env.yaml;
// thanos-io/thanos/pkg/reloader/reloader.go
// apply triggers Prometheus reload if rules or config changed. If cfgOutputFile is set, we also
// expand env vars into config file before reloading.
// Reload is retried in retryInterval until watchInterval.
func (r *Reloader) apply(ctx context.Context) error {
    if r.cfgFile != "" {
        if r.cfgOutputFile != "" {b, err := ioutil.ReadFile(r.cfgFile)
            if err != nil {return errors.Wrap(err, "read file")
            }
            // cfgFile 是个 gz 文件,读文件内容
            // Detect and extract gzipped file.
            if bytes.Equal(b[0:3], firstGzipBytes) {zr, err := gzip.NewReader(bytes.NewReader(b))
                if err != nil {return errors.Wrap(err, "create gzip reader")
                }
                defer runutil.CloseWithLogOnErr(r.logger, zr, "gzip reader close")

                b, err = ioutil.ReadAll(zr)
                if err != nil {return errors.Wrap(err, "read compressed config file")
                }
            }
            // 替换其中的 env 变量为实在值
            b, err = expandEnv(b)
            if err != nil {return errors.Wrap(err, "expand environment variables")
            }
            tmpFile := r.cfgOutputFile + ".tmp"
            defer func() {_ = os.Remove(tmpFile)
            }()
            // 将 cfgFile 的内容,写入 cfgOutputFile
            if err := ioutil.WriteFile(tmpFile, b, 0666); err != nil {return errors.Wrap(err, "write file")
            }
            if err := os.Rename(tmpFile, r.cfgOutputFile); err != nil {return errors.Wrap(err, "rename file")
            }
        }
    }
    // 向 prometheus 发送 reload
    if err := runutil.RetryWithLog(r.logger, r.retryInterval, retryCtx.Done(), func() error {if err := r.triggerReload(ctx); err != nil {return errors.Wrap(err, "trigger reload")
        }
        ....
    }
}

apply 中的 reload 操作:POST reloadURL 告诉 prometheus 配置变更;

// thanos-io/thanos/pkg/reloader/reloader.go
func (r *Reloader) triggerReload(ctx context.Context) error {req, err := http.NewRequest("POST", r.reloadURL.String(), nil)
    if err != nil {return errors.Wrap(err, "create request")
    }
    req = req.WithContext(ctx)

    resp, err := http.DefaultClient.Do(req)
    if err != nil {return errors.Wrap(err, "reload request failed")
    }
    defer runutil.ExhaustCloseWithLogOnErr(r.logger, resp.Body, "trigger reload resp body")
    if resp.StatusCode != 200 {return errors.Errorf("received non-200 response: %s; have you set `--web.enable-lifecycle` Prometheus flag?", resp.Status)
    }
    return nil
}

2. config-reloader 中 prometheus.yaml.gz 的由来

prometheus.yaml.gz 是 gzip 文件,依据下面的剖析,config-reloader 每隔 3min 监听一次 prometheus.yaml.gz 文件的变动,而后将其 gunzip 再写入到 prometheus-env.yaml,给 prometheus 应用。

那么,prometheus.yaml.gz 文件是哪里来的,为什么要应用 gz 文件?

能够在 prometheus-operator 中找到答案:

  • prometheus.yaml.gz 是 secret(prometheus-k8s) 挂载而来,该 secret 由 operator 保护;
  • 应用 gz 文件是因为 secret 有 1M 大小的限度,避免文件过大;

operator 保护 secret(prometheus-k8s):

  • 当 prometheus CRD 对象产生变更时,产生变更事件,而后由 goroutine 在 sync 中生产;
  • operator 依据 promethues CRD 对象的配置,生成 secret spec;
  • 如果 secret 不存在,则 sClient.Create(); 否则 sClient.Update();
// pkg/prometheus/operator.go
func (c *Operator) sync(key string) error {
    ...
    if err := c.createOrUpdateConfigurationSecret(p, ruleConfigMapNames); err != nil {return errors.Wrap(err, "creating config failed")
    }
    ...
}
// pkg/prometheus/operator.go
func (c *Operator) createOrUpdateConfigurationSecret(p *monitoringv1.Prometheus, ruleConfigMapNames []string) error {
    ......
    // Update secret based on the most recent configuration.
    conf, err := c.configGenerator.generateConfig(
        p,
        smons,
        pmons,
        basicAuthSecrets,
        bearerTokens,
        additionalScrapeConfigs,
        additionalAlertRelabelConfigs,
        additionalAlertManagerConfigs,
        ruleConfigMapNames,
    )
    // 生成 secrete 的 spec
    s := makeConfigSecret(p, c.config)
    s.ObjectMeta.Annotations = map[string]string{"generated": "true",}
    ......
    // gz 的起因在这里
    // Compress config to avoid 1mb secret limit for a while
    var buf bytes.Buffer
    if err = gzipConfig(&buf, conf); err != nil {return errors.Wrap(err, "couldnt gzip config")
    }
    // 这里 configFilename="prometheus.yaml.gz"
    s.Data[configFilename] = buf.Bytes()
    // secret 不存在的话,Create,而后返回
    curSecret, err := sClient.Get(context.TODO(), s.Name, metav1.GetOptions{})
    if apierrors.IsNotFound(err) {level.Debug(c.logger).Log("msg", "creating configuration")
        _, err = sClient.Create(context.TODO(), s, metav1.CreateOptions{})
        return err
    }
    ......
    // secret 已存在,Update
    level.Debug(c.logger).Log("msg", "updating Prometheus configuration secret")
    _, err = sClient.Update(context.TODO(), s, metav1.UpdateOptions{})
    return err
}

正文完
 0