一.整体框架
- fanoutStorage被传入scrapeLoop;
scrapeLoop在抓取结束后,调用fanoutAppender去add/addFast/commit时序数据;
- add/addFast是写入缓存;
- commit是写wal + 写入head chunk;
- 批量数据commit进步了写入效率;
二.fanoutStorage被传入scrapeLoop
1)fanoutStorage被传入scrape.Manager
// cmd/prometheus/main.go// fanoutStorage被传入scrape.ManagerscrapeManager = scrape.NewManager(log.With(logger, "component", "scrape manager"), fanoutStorage)
// scrape/manager.go// NewManager is the Manager constructorfunc NewManager(logger log.Logger, app storage.Appendable) *Manager { m := &Manager{ append: app, ...... } return m}
2)fanoutStorage被传入scrape.Pool
// scrape/manager.go func (m *Manager) reload() { var wg sync.WaitGroup for setName, groups := range m.targetSets { if _, ok := m.scrapePools[setName]; !ok { scrapeConfig, ok := m.scrapeConfigs[setName] // 创立scrape.Pool, m.append=fanoutStorage sp, err := newScrapePool(scrapeConfig, m.append, m.jitterSeed, log.With(m.logger, "scrape_pool", setName)) m.scrapePools[setName] = sp } wg.Add(1) // Run the sync in parallel as these take a while and at high load can't catch up. go func(sp *scrapePool, groups []*targetgroup.Group) { sp.Sync(groups) wg.Done() }(m.scrapePools[setName], groups) } m.mtxScrape.Unlock() wg.Wait()}
3)fanoutAppender被传入scrapeLoop
//scrape/scrape.gofunc newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed uint64, logger log.Logger) (*scrapePool, error) { sp := &scrapePool{ appendable: app, ...... } sp.newLoop = func(opts scrapeLoopOptions) loop { ...... return newScrapeLoop( ...... // fanoutAppender被传入scrapeLoop func() storage.Appender { return appender(app.Appender(), opts.limit) }, ..... ) } return sp, nil}
三.scrapeLoop应用fanoutAppender写入时序数据
- 首先,执行scrape;
- 而后,scrapeLoop应用fanoutAppender,将数据写入headAppender的缓存;
最初,scrapeLoop应用fanoutAppender.Commit()将缓存的数据提交,包含;
- 写wal;
- 写headChunk;
// scrape/scrape.go// scrapeAndReport performs a scrape and then appends the result to the storagefunc (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time.Time, errc chan<- error) time.Time { //执行scrape contentType, scrapeErr := sl.scraper.scrape(scrapeCtx, buf) // 应用fanoutAppender app := sl.appender() defer func() { if err != nil { app.Rollback() //rollback回滚 return } err = app.Commit() //commit提交 if err != nil { level.Error(sl.l).Log("msg", "Scrape commit failed", "err", err) } }() // 利用app.add/addFast写入 total, added, seriesAdded, appErr := sl.append(app, b, contentType, start) ......}
四.headAppender的commit()
headAppender中的add/addFast,仅将时序数据缓存在headAppender.samples/series中,直到调用commmit才真正的提交(写wal + 写headChunk)。
// tsdb/head.gotype headAppender struct { head *Head ....... series []record.RefSeries samples []record.RefSample sampleSeries []*memSeries ......}
func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { ...... s := a.head.series.getByID(ref) ...... a.samples = append(a.samples, record.RefSample{ Ref: ref, T: t, V: v, }) a.sampleSeries = append(a.sampleSeries, s) return nil}
commit()的源码:
- 先写入wal,若写入失败,则回滚返回;
- 再写入headChunk;
// tsdb/head.gofunc (a *headAppender) Commit() error { // 写入WAL if err := a.log(); err != nil { //nolint: errcheck a.Rollback() // Most likely the same error will happen again. return errors.Wrap(err, "write to WAL") } total := len(a.samples) var series *memSeries for i, s := range a.samples { series = a.sampleSeries[i] series.Lock() ok, chunkCreated := series.append(s.T, s.V, a.appendID, a.head.chunkDiskMapper) // 写入memSeries的headChunk ...... } return nil}