config-reloader的源码:https://github.com/prometheus...

config-reloader的启动参数:

prometheus-config-reloader:Image:         178.104.162.39:443/kubernetes/amd64/prometheus-config-reloader:v0.40.0Args:      --log-format=logfmt      --reload-url=http://localhost:9090/-/reload      --config-file=/etc/prometheus/config/prometheus.yaml.gz      --config-envsubst-file=/etc/prometheus/config_out/prometheus.env.yaml

operator监听到prometheus配置变更,会更新secret(文件prometheus-yaml.gz,应用gz保障<1M),config-reloader监控到prometheus-yaml.gz文件有变更,将其解压至prometheus-env.yaml,而后发送reload给prometehus。

1. config-reloader内的源码剖析

代码中监听了config-file和rules-dir(未应用)的变动:

// cmd/promethues-config-reloader/main.gofunc main() {    cfgFile := app.Flag("config-file", "config file watched by the reloader").String()    cfgSubstFile := app.Flag("config-envsubst-file", "output file for environment variable substituted config file").String()    rulesDir := app.Flag("rules-dir", "Rules directory to watch non-recursively").Strings()    var g run.Group    {        ctx, cancel := context.WithCancel(context.Background())        rel := reloader.New(logger, *reloadURL, *cfgFile, *cfgSubstFile, *rulesDir)        g.Add(func() error {            return rel.Watch(ctx)        }, func(error) {            cancel()        })    }    if err := g.Run(); err != nil {        fmt.Fprintln(os.Stderr, err)        os.Exit(1)    }}

监听的动作由reloader实现:

// New creates a new reloader that watches the given config file and rule directory// and triggers a Prometheus reload upon changes.// If cfgOutputFile is not empty the config file will be decompressed if needed, environment variables// will be substituted and the output written into the given path. Prometheus should then use// cfgOutputFile as its config file path.func New(logger log.Logger, reloadURL *url.URL, cfgFile string, cfgOutputFile string, ruleDirs []string) *Reloader {    if logger == nil {        logger = log.NewNopLogger()    }    return &Reloader{        logger:        logger,        reloadURL:     reloadURL,        cfgFile:       cfgFile,        cfgOutputFile: cfgOutputFile,        ruleDirs:      ruleDirs,        watchInterval: 3 * time.Minute,        retryInterval: 5 * time.Second,    }}

能够看到reloader每个3min监听一次变动,应用fsnotify监听文件的变动:

// Watch starts to watch periodically the config file and rules and process them until the context// gets canceled. Config file gets env expanded if cfgOutputFile is specified and reload is trigger if// config or rules changed.// Watch watchers periodically based on r.watchInterval.// For config file it watches it directly as well via fsnotify.// It watches rule dirs as well, but lot's of edge cases are missing, so rely on interval mostly.func (r *Reloader) Watch(ctx context.Context) error {    // 应用fsnotity监听    watcher, err := fsnotify.NewWatcher()        // 监听cfgFile    watchables := map[string]struct{}{}    if r.cfgFile != "" {        watchables[filepath.Dir(r.cfgFile)] = struct{}{}        if err := watcher.Add(r.cfgFile); err != nil {            return errors.Wrapf(err, "add config file %s to watcher", r.cfgFile)        }        if err := r.apply(ctx); err != nil {            return err        }    }    tick := time.NewTicker(r.watchInterval)    defer tick.Stop()    for {        select {        case <-ctx.Done():            return nil        case <-tick.C:        case event := <-watcher.Events:            // TODO(bwplotka): Add metric if we are not cycling CPU here too much.            if _, ok := watchables[filepath.Dir(event.Name)]; !ok {                continue            }        case err := <-watcher.Errors:            level.Error(r.logger).Log("msg", "watch error", "err", err)            continue        }        // 监听到变动,apply        if err := r.apply(ctx); err != nil {            // Critical error.            return err        }    }}

监听到变动会apply,apply中波及的文件:

  • r.cfgFile=/etc/prometheus/config/prometheus.yaml.gz,它是1个gzip文件;
  • r.cfgOutputFile=/etc/prometheus/config_out/prometheus.env.yaml,它是真正被prometheus挂载应用的配置文件;

apply中的文件操作:

  • gunzip解压缩cfgFile文件;
  • 将解压缩的文件内容,写入到cfgOutputFile;
  • 也就说,gz文件是最新的配置,它被解压缩而后替换env,最终生成新的配置文件promethues-env.yaml;
// thanos-io/thanos/pkg/reloader/reloader.go// apply triggers Prometheus reload if rules or config changed. If cfgOutputFile is set, we also// expand env vars into config file before reloading.// Reload is retried in retryInterval until watchInterval.func (r *Reloader) apply(ctx context.Context) error {    if r.cfgFile != "" {        if r.cfgOutputFile != "" {            b, err := ioutil.ReadFile(r.cfgFile)            if err != nil {                return errors.Wrap(err, "read file")            }            // cfgFile是个gz文件,读文件内容            // Detect and extract gzipped file.            if bytes.Equal(b[0:3], firstGzipBytes) {                zr, err := gzip.NewReader(bytes.NewReader(b))                if err != nil {                    return errors.Wrap(err, "create gzip reader")                }                defer runutil.CloseWithLogOnErr(r.logger, zr, "gzip reader close")                b, err = ioutil.ReadAll(zr)                if err != nil {                    return errors.Wrap(err, "read compressed config file")                }            }            // 替换其中的env变量为实在值            b, err = expandEnv(b)            if err != nil {                return errors.Wrap(err, "expand environment variables")            }            tmpFile := r.cfgOutputFile + ".tmp"            defer func() {                _ = os.Remove(tmpFile)            }()            // 将cfgFile的内容,写入cfgOutputFile            if err := ioutil.WriteFile(tmpFile, b, 0666); err != nil {                return errors.Wrap(err, "write file")            }            if err := os.Rename(tmpFile, r.cfgOutputFile); err != nil {                return errors.Wrap(err, "rename file")            }        }    }    // 向prometheus发送reload    if err := runutil.RetryWithLog(r.logger, r.retryInterval, retryCtx.Done(), func() error {        if err := r.triggerReload(ctx); err != nil {            return errors.Wrap(err, "trigger reload")        }        ....    }}

apply中的reload操作:POST reloadURL告诉prometheus配置变更;

// thanos-io/thanos/pkg/reloader/reloader.gofunc (r *Reloader) triggerReload(ctx context.Context) error {    req, err := http.NewRequest("POST", r.reloadURL.String(), nil)    if err != nil {        return errors.Wrap(err, "create request")    }    req = req.WithContext(ctx)    resp, err := http.DefaultClient.Do(req)    if err != nil {        return errors.Wrap(err, "reload request failed")    }    defer runutil.ExhaustCloseWithLogOnErr(r.logger, resp.Body, "trigger reload resp body")    if resp.StatusCode != 200 {        return errors.Errorf("received non-200 response: %s; have you set `--web.enable-lifecycle` Prometheus flag?", resp.Status)    }    return nil}

2. config-reloader中prometheus.yaml.gz的由来

prometheus.yaml.gz是gzip文件,依据下面的剖析,config-reloader每隔3min监听一次prometheus.yaml.gz文件的变动,而后将其gunzip再写入到prometheus-env.yaml,给prometheus应用。

那么,prometheus.yaml.gz文件是哪里来的,为什么要应用gz文件?

能够在prometheus-operator中找到答案:

  • prometheus.yaml.gz是secret(prometheus-k8s)挂载而来,该secret由operator保护;
  • 应用gz文件是因为secret有1M大小的限度,避免文件过大;

operator保护secret(prometheus-k8s):

  • 当prometheus CRD对象产生变更时,产生变更事件,而后由goroutine在sync中生产;
  • operator依据promethues CRD对象的配置,生成secret spec;
  • 如果secret不存在,则sClient.Create(); 否则sClient.Update();
// pkg/prometheus/operator.gofunc (c *Operator) sync(key string) error {    ...    if err := c.createOrUpdateConfigurationSecret(p, ruleConfigMapNames); err != nil {        return errors.Wrap(err, "creating config failed")    }    ...}
// pkg/prometheus/operator.gofunc (c *Operator) createOrUpdateConfigurationSecret(p *monitoringv1.Prometheus, ruleConfigMapNames []string) error {    ......    // Update secret based on the most recent configuration.    conf, err := c.configGenerator.generateConfig(        p,        smons,        pmons,        basicAuthSecrets,        bearerTokens,        additionalScrapeConfigs,        additionalAlertRelabelConfigs,        additionalAlertManagerConfigs,        ruleConfigMapNames,    )    // 生成secrete的spec    s := makeConfigSecret(p, c.config)    s.ObjectMeta.Annotations = map[string]string{        "generated": "true",    }    ......    // gz的起因在这里    // Compress config to avoid 1mb secret limit for a while    var buf bytes.Buffer    if err = gzipConfig(&buf, conf); err != nil {        return errors.Wrap(err, "couldnt gzip config")    }    // 这里configFilename="prometheus.yaml.gz"    s.Data[configFilename] = buf.Bytes()    // secret不存在的话,Create,而后返回    curSecret, err := sClient.Get(context.TODO(), s.Name, metav1.GetOptions{})    if apierrors.IsNotFound(err) {        level.Debug(c.logger).Log("msg", "creating configuration")        _, err = sClient.Create(context.TODO(), s, metav1.CreateOptions{})        return err    }    ......    // secret已存在,Update    level.Debug(c.logger).Log("msg", "updating Prometheus configuration secret")    _, err = sClient.Update(context.TODO(), s, metav1.UpdateOptions{})    return err}