scrape模块代码位于prometheus/scrape目录下,负责监控对象的指标拉取。
1.整体框架
整体代码框架:
- 由scrape.Manager治理所有的抓取对象;
- 所有的抓取对象按group分组,每个group是一个job_name;
- 每个group下含多个scrapeTarget,即具体的抓取指标endpoint;
- 对每个指标endpoint,启动一个抓取goroutine,依照interval距离循环的抓取对象的指标;
如果prometheus.yaml中的抓取配置为:
scrape_configs: - job_name: "monitor" static_configs: - targets: ['192.168.101.9:11504'] - job_name: 'node-exporter' static_configs: - targets: ['10.21.1.74:9100', '192.168.101.9:9100']
那么,抓取对象将按如下构造分组:
{ "monitor": [ { "Targets": [ { "__address__": "192.168.101.9:11504" } ], "Labels": null, "Source": "0" } ], "node-exporter": [ { "Targets": [ { "__address__": "10.21.1.74:9100" }, { "__address__": "192.168.101.9:9100" } ], "Labels": null, "Source": "0" } ]}
2.scrape.Manager的代码逻辑
代码入口,其中函数参数中的map[string][]*targetgroup.Group由discover组件传入:
// scrape.manager.gofunc (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error { go m.reloader() for { select { case ts := <-tsets: m.updateTsets(ts) select { case m.triggerReload <- struct{}{}: //发送数据到channel: m.triggerReload default: } case <-m.graceShut: return nil } }}// 期待channel:m.triggerReload上的数据,而后进行reload// 热加载func (m *Manager) reloader() { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for { select { case <-m.graceShut: return case <-ticker.C: select { case <-m.triggerReload: m.reload() case <-m.graceShut: return } } }}
具体的初始化动作在reload()中,对每个targetGroup:
- 创立scrapePool;
- 对scrapePool进行Sync:同步信息进行抓取;
// scrape.manager.go//遍历每个targetGroup:创立scrapePool,而后对scrapePool进行Syncfunc (m *Manager) reload() { m.mtxScrape.Lock() var wg sync.WaitGroup for setName, groups := range m.targetSets { if _, ok := m.scrapePools[setName]; !ok { scrapeConfig, ok := m.scrapeConfigs[setName] ... //创立scrapePool sp, err := newScrapePool(scrapeConfig, m.append, m.jitterSeed, log.With(m.logger, "scrape_pool", setName)) m.scrapePools[setName] = sp } wg.Add(1) // Run the sync in parallel as these take a while and at high load can't catch up. go func(sp *scrapePool, groups []*targetgroup.Group) { sp.Sync(groups) //scrapePool内的Sync,这里是groups是数组,然而个别只有1个元素 wg.Done() }(m.scrapePools[setName], groups) } m.mtxScrape.Unlock() wg.Wait()}
对于一个targetGroup下的所有target对象,它们共享httpclient和bufferPool:
// scrape/scrape.go// 创立scrapePoolfunc newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed uint64, logger log.Logger) (*scrapePool, error) { client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName, false) // bufferPool buffers := pool.New(1e3, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) }) ctx, cancel := context.WithCancel(context.Background()) sp := &scrapePool{ cancel: cancel, appendable: app, config: cfg, client: client, activeTargets: map[uint64]*Target{}, loops: map[uint64]loop{}, logger: logger, } sp.newLoop = func(opts scrapeLoopOptions) loop { ...... return newScrapeLoop( ctx, opts.scraper, log.With(logger, "target", opts.target), buffers, func(l labels.Labels) labels.Labels { return mutateSampleLabels(l, opts.target, opts.honorLabels, opts.mrc) }, func(l labels.Labels) labels.Labels { return mutateReportSampleLabels(l, opts.target) }, func() storage.Appender { return appender(app.Appender(), opts.limit) }, cache, jitterSeed, opts.honorTimestamps, ) } return sp, nil}
3.scrape.scrapePool的代码逻辑
scrapePool为targetGroup下的每个targets,创立1个scrapeLoop,而后让scrapeLoop干活。
// scrape/scrape.gofunc (sp *scrapePool) Sync(tgs []*targetgroup.Group) { //所有的targets var all []*Target sp.mtx.Lock() sp.droppedTargets = []*Target{} for _, tg := range tgs { targets, err := targetsFromGroup(tg, sp.config) ...... for _, t := range targets { if t.Labels().Len() > 0 { all = append(all, t) } ...... } } sp.mtx.Unlock() //指挥target干活 sp.sync(all) ......}
对每个target,应用newLoop()创立targetLoop,而后启动1个goroutine,让targetLoop.run()循环拉取:
// scrape/scrape.go//遍历Group下的每个target,对每个target: 创立targetScraper,创立scrapeLoop,而后scrapeLoop进行干活func (sp *scrapePool) sync(targets []*Target) { for _, t := range targets { t := t hash := t.hash() uniqueTargets[hash] = struct{}{} if _, ok := sp.activeTargets[hash]; !ok { s := &targetScraper{Target: t, client: sp.client, timeout: timeout} //创立targetScraper l := sp.newLoop(scrapeLoopOptions{ //创立scrapeLoop target: t, scraper: s, limit: limit, honorLabels: honorLabels, honorTimestamps: honorTimestamps, mrc: mrc, }) sp.activeTargets[hash] = t sp.loops[hash] = l go l.run(interval, timeout, nil) //scrapeLoop循环拉取 } ...... } ... wg.Wait()}
4.scrapeLoop的代码逻辑
每个scrapeLoop按抓取周期循环执行
- scrape抓取指标数据;
- append写入底层存储;
- 最初更新scrapeLoop的状态(次要是指标值);
// scrape/scrape.gofunc (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) { ...... ticker := time.NewTicker(interval) //定时器,定时执行 defer ticker.Stop() for { ...... var ( start = time.Now() scrapeCtx, cancel = context.WithTimeout(sl.ctx, timeout) ) ...... contentType, scrapeErr := sl.scraper.scrape(scrapeCtx, buf) //scrape进行抓取 ...... //写scrape的数据写入底层存储 total, added, seriesAdded, appErr := sl.append(b, contentType, start) ...... sl.buffers.Put(b) //写入buffer //更新scrapeLoop的状态 if err := sl.report(start, time.Since(start), total, added, seriesAdded, scrapeErr); err != nil { level.Warn(sl.l).Log("msg", "Appending scrape report failed", "err", err) } select { ...... case <-ticker.C: //循环执行 } } ......}
5.targetScrape的抓取逻辑
最终达到HTTP抓取的逻辑:
- 首先,向target的url发送HTTP Get;
- 而后,将写入io.writer(即上文中的buffers)中,待前面解析出指标:
//抓取逻辑func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error) { if s.req == nil { req, err := http.NewRequest("GET", s.URL().String(), nil) //Get /metrics接口 if err != nil { return "", err } req.Header.Add("Accept", acceptHeader) req.Header.Add("Accept-Encoding", "gzip") req.Header.Set("User-Agent", userAgentHeader) req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", fmt.Sprintf("%f", s.timeout.Seconds())) s.req = req } resp, err := s.client.Do(s.req.WithContext(ctx)) //发送http GET申请 if err != nil { return "", err } ....... if resp.Header.Get("Content-Encoding") != "gzip" { _, err = io.Copy(w, resp.Body) //将response body写入参数w if err != nil { return "", err } return resp.Header.Get("Content-Type"), nil } if s.gzipr == nil { s.buf = bufio.NewReader(resp.Body) s.gzipr, err = gzip.NewReader(s.buf) if err != nil { return "", err } } // 写入io.writer _, err = io.Copy(w, s.gzipr) s.gzipr.Close() return resp.Header.Get("Content-Type"), nil}