一.整体框架

  • fanoutStorage被传入scrapeLoop;
  • scrapeLoop在抓取结束后,调用fanoutAppender去add/addFast/commit时序数据;

    • add/addFast是写入缓存;
    • commit是写wal + 写入head chunk;
    • 批量数据commit进步了写入效率;

二.fanoutStorage被传入scrapeLoop

1)fanoutStorage被传入scrape.Manager

// cmd/prometheus/main.go// fanoutStorage被传入scrape.ManagerscrapeManager = scrape.NewManager(log.With(logger, "component", "scrape manager"), fanoutStorage)
// scrape/manager.go// NewManager is the Manager constructorfunc NewManager(logger log.Logger, app storage.Appendable) *Manager {    m := &Manager{        append:        app,        ......    }    return m}

2)fanoutStorage被传入scrape.Pool

// scrape/manager.go    func (m *Manager) reload() {        var wg sync.WaitGroup    for setName, groups := range m.targetSets {        if _, ok := m.scrapePools[setName]; !ok {            scrapeConfig, ok := m.scrapeConfigs[setName]            // 创立scrape.Pool, m.append=fanoutStorage            sp, err := newScrapePool(scrapeConfig, m.append, m.jitterSeed, log.With(m.logger, "scrape_pool", setName))                        m.scrapePools[setName] = sp        }        wg.Add(1)        // Run the sync in parallel as these take a while and at high load can't catch up.        go func(sp *scrapePool, groups []*targetgroup.Group) {            sp.Sync(groups)            wg.Done()        }(m.scrapePools[setName], groups)    }    m.mtxScrape.Unlock()    wg.Wait()}    

3)fanoutAppender被传入scrapeLoop

//scrape/scrape.gofunc newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed uint64, logger log.Logger) (*scrapePool, error) {    sp := &scrapePool{        appendable:    app,        ......    }    sp.newLoop = func(opts scrapeLoopOptions) loop {        ......        return newScrapeLoop(            ......            // fanoutAppender被传入scrapeLoop            func() storage.Appender { return appender(app.Appender(), opts.limit) },            .....        )    }    return sp, nil}

三.scrapeLoop应用fanoutAppender写入时序数据

  • 首先,执行scrape;
  • 而后,scrapeLoop应用fanoutAppender,将数据写入headAppender的缓存;
  • 最初,scrapeLoop应用fanoutAppender.Commit()将缓存的数据提交,包含;

    • 写wal;
    • 写headChunk;
// scrape/scrape.go// scrapeAndReport performs a scrape and then appends the result to the storagefunc (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time.Time, errc chan<- error) time.Time {    //执行scrape    contentType, scrapeErr := sl.scraper.scrape(scrapeCtx, buf)    // 应用fanoutAppender    app := sl.appender()    defer func() {        if err != nil {            app.Rollback()        //rollback回滚            return        }        err = app.Commit()        //commit提交        if err != nil {            level.Error(sl.l).Log("msg", "Scrape commit failed", "err", err)        }    }()    // 利用app.add/addFast写入    total, added, seriesAdded, appErr := sl.append(app, b, contentType, start)    ......}

四.headAppender的commit()

headAppender中的add/addFast,仅将时序数据缓存在headAppender.samples/series中,直到调用commmit才真正的提交(写wal + 写headChunk)。

// tsdb/head.gotype headAppender struct {    head         *Head    .......    series       []record.RefSeries    samples      []record.RefSample    sampleSeries []*memSeries    ......}
func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {    ......    s := a.head.series.getByID(ref)    ......    a.samples = append(a.samples, record.RefSample{        Ref: ref,        T:   t,        V:   v,    })    a.sampleSeries = append(a.sampleSeries, s)    return nil}

commit()的源码:

  • 先写入wal,若写入失败,则回滚返回;
  • 再写入headChunk;
// tsdb/head.gofunc (a *headAppender) Commit() error {    // 写入WAL    if err := a.log(); err != nil {        //nolint: errcheck        a.Rollback() // Most likely the same error will happen again.        return errors.Wrap(err, "write to WAL")    }    total := len(a.samples)    var series *memSeries    for i, s := range a.samples {        series = a.sampleSeries[i]        series.Lock()        ok, chunkCreated := series.append(s.T, s.V, a.appendID, a.head.chunkDiskMapper)   // 写入memSeries的headChunk        ......    }    return nil}