乐趣区

关于prometheus:prometheus-agent模式的使用及原理

一.Prometheus agent 模式

Prometheus agent 是 v2.32.0 开始提供的一项性能:

应用起来很简略,间接在启动参数中减少 –enable-feature=agent 即可:

usage: prometheus [<flags>]

The Prometheus monitoring server

Flags:
  -h, --help                     Show context-sensitive help (also try --help-long and --help-man).
      (... other flags)
      --storage.tsdb.path="data/"
                                 Base path for metrics storage. Use with server mode only.
      --storage.agent.path="data-agent/"
                                 Base path for metrics storage. Use with agent mode only.
      (... other flags)
      --enable-feature= ...      Comma separated feature names to enable. Valid options: agent, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, remote-write-receiver,
                                 extra-scrape-metrics, new-service-discovery-manager. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.

若 Prometheus 过程启用 agent 模式,原 Prometheus 的残缺性能,被精简为只有:

  • discovery
  • scrape
  • remoteWrite

也就是说,在 agent 模式下,它没有 Prometheus 的 Query/Alert/Local Storage 等性能。

Prometheus agent 典型的 利用场景:作为无状态的采集器(带主动发现),而后将采集的指标写入 remote storage。

二.agent 模式的原理

对于一般的 Prometheus 来说,它的采集并写入的流程:

  • scrape–>wal–>block–>block compact

对于 agent 模式的 Prometheus,它的采集并写入的流程:

  • scrape–>wal
  • 同时定期的 truncate wal

1.agent 模式的初始化

配置 –enable-feature=agent,则启用 agent 模式:

func main() {
    ...
    if agentMode {
        // WAL storage.
        opts := cfg.agent.ToAgentOptions()
        cancel := make(chan struct{})
        g.Add(func() error {
                ...
                // 创立 db 并运行
                db, err := agent.Open(
                    logger,
                    prometheus.DefaultRegisterer,
                    remoteStorage,
                    localStoragePath,
                    &opts,
                )
                ...
                localStorage.Set(db, 0)
                close(dbOpen)
                <-cancel
                return nil
            },
            func(e error) {if err := fanoutStorage.Close(); err != nil {level.Error(logger).Log("msg", "Error stopping storage", "err", err)
                }
                close(cancel)
            },
        )
    }
    ...
}

DB 对象的创立,以及 DB 对象的 run():

// tsdb/agent/db.go
func Open(l log.Logger, reg prometheus.Registerer, rs *remote.Storage, dir string, opts *Options) (*DB, error) {
    ...
    dir = filepath.Join(dir, "wal")
    w, err := wal.NewSize(l, reg, dir, opts.WALSegmentSize, opts.WALCompression)
    ...
    db := &DB{
        logger: l,
        opts:   opts,
        rs:     rs,

        wal:    w,
        locker: locker,

        nextRef: atomic.NewUint64(0),
        series:  newStripeSeries(opts.StripeSize),
        deleted: make(map[chunks.HeadSeriesRef]int),

        donec: make(chan struct{}),
        stopc: make(chan struct{}),

        metrics: newDBMetrics(reg),
    }
    ...
    go db.run()        // 启动
    return db, nil
}

2.agent wal 的定期 truncate

在 agent 的 db 对象中,启动后盾线程,对 wal 进行定期的 truncate:

  • 默认每隔 2hour 执行 1 次清理;
  • 计算 ts,即 ts 之前的 wal 都能够被清理,计算方法:

    • ts = 已发送至 remoteStorage 的最小 timestamp – 5min;(5min 是平安 buf);
    • 若某个 remoteStorage 写入很慢,累积了很多的 wal,ts 就会很小,那么规定:最长保留 now – 4hour 的 wal;

      • 即 ts = max(ts, now-4hour);
// Default values for options.
var (
    DefaultTruncateFrequency = 2 * time.Hour
    DefaultMinWALTime        = int64(5 * time.Minute / time.Millisecond)
    DefaultMaxWALTime        = int64(4 * time.Hour / time.Millisecond)
)
// tsdb/agent/db.go
func (db *DB) run() {defer close(db.donec)

Loop:
    for {
        select {
        case <-db.stopc:
            break Loop
        case <-time.After(db.opts.TruncateFrequency):    // 默认 2hour,即 2hour 执行 1 次
            // 已发送至 remote 的最小工夫 - 5min
            ts := db.rs.LowestSentTimestamp() - db.opts.MinWALTime
            if ts < 0 {ts = 0}

            // 不容许 wal 有限增长,最长保留 now - 4hour
            if maxTS := timestamp.FromTime(time.Now()) - db.opts.MaxWALTime; ts < maxTS {ts = maxTS}
            // 清理 ts 之前的 wal
            level.Debug(db.logger).Log("msg", "truncating the WAL", "ts", ts)
            if err := db.truncate(ts); err != nil {level.Warn(db.logger).Log("msg", "failed to truncate WAL", "err", err)
            }
        }
    }
}

参考:

1. 官网 doc: https://prometheus.io/blog/#p…
2.categraf:https://github.com/flashcatcl…

退出移动版