一.Prometheus agent模式

Prometheus agent是v2.32.0开始提供的一项性能:

应用起来很简略,间接在启动参数中减少--enable-feature=agent即可:

usage: prometheus [<flags>]The Prometheus monitoring serverFlags:  -h, --help                     Show context-sensitive help (also try --help-long and --help-man).      (... other flags)      --storage.tsdb.path="data/"                                 Base path for metrics storage. Use with server mode only.      --storage.agent.path="data-agent/"                                 Base path for metrics storage. Use with agent mode only.      (... other flags)      --enable-feature= ...      Comma separated feature names to enable. Valid options: agent, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, remote-write-receiver,                                 extra-scrape-metrics, new-service-discovery-manager. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.

若Prometheus过程启用agent模式,原Prometheus的残缺性能,被精简为只有:

  • discovery
  • scrape
  • remoteWrite

也就是说,在agent模式下,它没有Prometheus的Query/Alert/Local Storage等性能。

Prometheus agent典型的利用场景:作为无状态的采集器(带主动发现),而后将采集的指标写入remote storage。

二.agent模式的原理

对于一般的Prometheus来说,它的采集并写入的流程:

  • scrape-->wal-->block-->block compact

对于agent模式的Prometheus,它的采集并写入的流程:

  • scrape-->wal
  • 同时定期的truncate wal

1.agent模式的初始化

配置 --enable-feature=agent,则启用agent模式:

func main() {    ...    if agentMode {        // WAL storage.        opts := cfg.agent.ToAgentOptions()        cancel := make(chan struct{})        g.Add(            func() error {                ...                // 创立db并运行                db, err := agent.Open(                    logger,                    prometheus.DefaultRegisterer,                    remoteStorage,                    localStoragePath,                    &opts,                )                ...                localStorage.Set(db, 0)                close(dbOpen)                <-cancel                return nil            },            func(e error) {                if err := fanoutStorage.Close(); err != nil {                    level.Error(logger).Log("msg", "Error stopping storage", "err", err)                }                close(cancel)            },        )    }    ...}

DB对象的创立,以及DB对象的run():

// tsdb/agent/db.gofunc Open(l log.Logger, reg prometheus.Registerer, rs *remote.Storage, dir string, opts *Options) (*DB, error) {    ...    dir = filepath.Join(dir, "wal")    w, err := wal.NewSize(l, reg, dir, opts.WALSegmentSize, opts.WALCompression)    ...    db := &DB{        logger: l,        opts:   opts,        rs:     rs,        wal:    w,        locker: locker,        nextRef: atomic.NewUint64(0),        series:  newStripeSeries(opts.StripeSize),        deleted: make(map[chunks.HeadSeriesRef]int),        donec: make(chan struct{}),        stopc: make(chan struct{}),        metrics: newDBMetrics(reg),    }    ...    go db.run()        // 启动    return db, nil}

2.agent wal的定期truncate

在agent的db对象中,启动后盾线程,对wal进行定期的truncate:

  • 默认每隔2hour执行1次清理;
  • 计算ts,即ts之前的wal都能够被清理,计算方法:

    • ts = 已发送至remoteStorage的最小timestamp - 5min;(5min是平安buf);
    • 若某个remoteStorage写入很慢,累积了很多的wal,ts就会很小,那么规定:最长保留 now - 4hour 的wal;

      • 即 ts = max(ts, now-4hour);
// Default values for options.var (    DefaultTruncateFrequency = 2 * time.Hour    DefaultMinWALTime        = int64(5 * time.Minute / time.Millisecond)    DefaultMaxWALTime        = int64(4 * time.Hour / time.Millisecond))
// tsdb/agent/db.gofunc (db *DB) run() {    defer close(db.donec)Loop:    for {        select {        case <-db.stopc:            break Loop        case <-time.After(db.opts.TruncateFrequency):    // 默认2hour,即2hour执行1次            // 已发送至remote的最小工夫 - 5min            ts := db.rs.LowestSentTimestamp() - db.opts.MinWALTime            if ts < 0 {                ts = 0            }            // 不容许wal有限增长,最长保留now - 4hour            if maxTS := timestamp.FromTime(time.Now()) - db.opts.MaxWALTime; ts < maxTS {                ts = maxTS            }            // 清理ts之前的wal            level.Debug(db.logger).Log("msg", "truncating the WAL", "ts", ts)            if err := db.truncate(ts); err != nil {                level.Warn(db.logger).Log("msg", "failed to truncate WAL", "err", err)            }        }    }}

参考:

1.官网doc: https://prometheus.io/blog/#p...
2.categraf:https://github.com/flashcatcl...