关于prometheus:prometheus源码分析从scrape到tsdb写入

8次阅读

共计 3092 个字符,预计需要花费 8 分钟才能阅读完成。

一. 整体框架

  • fanoutStorage 被传入 scrapeLoop;
  • scrapeLoop 在抓取结束后,调用 fanoutAppender 去 add/addFast/commit 时序数据;

    • add/addFast 是写入缓存;
    • commit 是写 wal + 写入 head chunk;
    • 批量数据 commit 进步了写入效率;

二.fanoutStorage 被传入 scrapeLoop

1)fanoutStorage 被传入 scrape.Manager

// cmd/prometheus/main.go
// fanoutStorage 被传入 scrape.Manager
scrapeManager = scrape.NewManager(log.With(logger, "component", "scrape manager"), fanoutStorage)
// scrape/manager.go
// NewManager is the Manager constructor
func NewManager(logger log.Logger, app storage.Appendable) *Manager {
    m := &Manager{
        append:        app,
        ......
    }
    return m
}

2)fanoutStorage 被传入 scrape.Pool

// scrape/manager.go    
func (m *Manager) reload() {    
    var wg sync.WaitGroup
    for setName, groups := range m.targetSets {if _, ok := m.scrapePools[setName]; !ok {scrapeConfig, ok := m.scrapeConfigs[setName]
            // 创立 scrape.Pool, m.append=fanoutStorage
            sp, err := newScrapePool(scrapeConfig, m.append, m.jitterSeed, log.With(m.logger, "scrape_pool", setName))            
            m.scrapePools[setName] = sp
        }
        wg.Add(1)
        // Run the sync in parallel as these take a while and at high load can't catch up.
        go func(sp *scrapePool, groups []*targetgroup.Group) {sp.Sync(groups)
            wg.Done()}(m.scrapePools[setName], groups)
    }
    m.mtxScrape.Unlock()
    wg.Wait()}    

3)fanoutAppender 被传入 scrapeLoop

//scrape/scrape.go
func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed uint64, logger log.Logger) (*scrapePool, error) {
    sp := &scrapePool{
        appendable:    app,
        ......
    }
    sp.newLoop = func(opts scrapeLoopOptions) loop {
        ......
        return newScrapeLoop(
            ......
            // fanoutAppender 被传入 scrapeLoop
            func() storage.Appender { return appender(app.Appender(), opts.limit) },
            .....
        )
    }
    return sp, nil
}

三.scrapeLoop 应用 fanoutAppender 写入时序数据

  • 首先,执行 scrape;
  • 而后,scrapeLoop 应用 fanoutAppender,将数据写入 headAppender 的缓存;
  • 最初,scrapeLoop 应用 fanoutAppender.Commit() 将缓存的数据提交,包含;

    • 写 wal;
    • 写 headChunk;
// scrape/scrape.go
// scrapeAndReport performs a scrape and then appends the result to the storage
func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time.Time, errc chan<- error) time.Time {
    // 执行 scrape
    contentType, scrapeErr := sl.scraper.scrape(scrapeCtx, buf)
    // 应用 fanoutAppender
    app := sl.appender()
    defer func() {
        if err != nil {app.Rollback()        //rollback 回滚
            return
        }
        err = app.Commit()        //commit 提交
        if err != nil {level.Error(sl.l).Log("msg", "Scrape commit failed", "err", err)
        }
    }()
    // 利用 app.add/addFast 写入
    total, added, seriesAdded, appErr := sl.append(app, b, contentType, start)
    ......
}

四.headAppender 的 commit()

headAppender 中的 add/addFast,仅将时序数据缓存在 headAppender.samples/series 中,直到调用 commmit 才真正的提交 (写 wal + 写 headChunk)。

// tsdb/head.go
type headAppender struct {
    head         *Head
    .......
    series       []record.RefSeries
    samples      []record.RefSample
    sampleSeries []*memSeries
    ......
}
func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
    ......
    s := a.head.series.getByID(ref)
    ......
    a.samples = append(a.samples, record.RefSample{
        Ref: ref,
        T:   t,
        V:   v,
    })
    a.sampleSeries = append(a.sampleSeries, s)
    return nil
}

commit() 的源码:

  • 先写入 wal,若写入失败,则回滚返回;
  • 再写入 headChunk;
// tsdb/head.go
func (a *headAppender) Commit() error {
    // 写入 WAL
    if err := a.log(); err != nil {
        //nolint: errcheck
        a.Rollback() // Most likely the same error will happen again.
        return errors.Wrap(err, "write to WAL")
    }
    total := len(a.samples)
    var series *memSeries
    for i, s := range a.samples {series = a.sampleSeries[i]
        series.Lock()
        ok, chunkCreated := series.append(s.T, s.V, a.appendID, a.head.chunkDiskMapper)   // 写入 memSeries 的 headChunk
        ......
    }
    return nil
}

正文完
 0