关于prometheus:prometheus源码分析scrape模块

51次阅读

共计 5661 个字符,预计需要花费 15 分钟才能阅读完成。

scrape 模块代码位于 prometheus/scrape 目录下,负责监控对象的指标拉取。

1. 整体框架

整体代码框架:

  • 由 scrape.Manager 治理所有的抓取对象;
  • 所有的抓取对象按 group 分组,每个 group 是一个 job_name;
  • 每个 group 下含多个 scrapeTarget,即具体的抓取指标 endpoint;
  • 对每个指标 endpoint,启动一个抓取 goroutine,依照 interval 距离循环的抓取对象的指标;

如果 prometheus.yaml 中的抓取配置为:

scrape_configs:
  - job_name: "monitor"
    static_configs:
    - targets: ['192.168.101.9:11504']

  - job_name: 'node-exporter'
    static_configs:
    - targets: ['10.21.1.74:9100', '192.168.101.9:9100']

那么,抓取对象将按如下构造分组:

{
    "monitor": [
        {
            "Targets": [
                {"__address__": "192.168.101.9:11504"}
            ],
            "Labels": null,
            "Source": "0"
        }
    ],
    "node-exporter": [
        {
            "Targets": [
                {"__address__": "10.21.1.74:9100"},
                {"__address__": "192.168.101.9:9100"}
            ],
            "Labels": null,
            "Source": "0"
        }
    ]
}

2.scrape.Manager 的代码逻辑

代码入口,其中函数参数中的 map[string][]*targetgroup.Group 由 discover 组件传入:

// scrape.manager.go
func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {go m.reloader()
    for {
        select {
        case ts := <-tsets:
            m.updateTsets(ts)

            select {case m.triggerReload <- struct{}{}:    // 发送数据到 channel: m.triggerReload
            default:
            }
        case <-m.graceShut:
            return nil
        }
    }
}

// 期待 channel:m.triggerReload 上的数据,而后进行 reload
// 热加载
func (m *Manager) reloader() {ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()
    for {
        select {
        case <-m.graceShut:
            return
        case <-ticker.C:
            select {
            case <-m.triggerReload:
                m.reload()
            case <-m.graceShut:
                return
            }
        }
    }
}

具体的初始化动作在 reload() 中,对每个 targetGroup:

  • 创立 scrapePool;
  • 对 scrapePool 进行 Sync:同步信息进行抓取;
// scrape.manager.go
// 遍历每个 targetGroup:创立 scrapePool,而后对 scrapePool 进行 Sync
func (m *Manager) reload() {m.mtxScrape.Lock()
    var wg sync.WaitGroup
    for setName, groups := range m.targetSets {if _, ok := m.scrapePools[setName]; !ok {scrapeConfig, ok := m.scrapeConfigs[setName]
            ...
            // 创立 scrapePool
            sp, err := newScrapePool(scrapeConfig, m.append, m.jitterSeed, log.With(m.logger, "scrape_pool", setName))
            m.scrapePools[setName] = sp
        }

        wg.Add(1)
        // Run the sync in parallel as these take a while and at high load can't catch up.
        go func(sp *scrapePool, groups []*targetgroup.Group) {sp.Sync(groups)        //scrapePool 内的 Sync,这里是 groups 是数组,然而个别只有 1 个元素
            wg.Done()}(m.scrapePools[setName], groups)
    }
    m.mtxScrape.Unlock()
    wg.Wait()}

对于一个 targetGroup 下的所有 target 对象,它们共享 httpclient 和 bufferPool:

// scrape/scrape.go
// 创立 scrapePool
func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed uint64, logger log.Logger) (*scrapePool, error) {client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName, false)
    // bufferPool
    buffers := pool.New(1e3, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) })

    ctx, cancel := context.WithCancel(context.Background())
    sp := &scrapePool{
        cancel:        cancel,
        appendable:    app,
        config:        cfg,
        client:        client,
        activeTargets: map[uint64]*Target{},
        loops:         map[uint64]loop{},
        logger:        logger,
    }
    sp.newLoop = func(opts scrapeLoopOptions) loop {
        ......
        return newScrapeLoop(
            ctx,
            opts.scraper,
            log.With(logger, "target", opts.target),
            buffers,
            func(l labels.Labels) labels.Labels {return mutateSampleLabels(l, opts.target, opts.honorLabels, opts.mrc)
            },
            func(l labels.Labels) labels.Labels {return mutateReportSampleLabels(l, opts.target) },
            func() storage.Appender { return appender(app.Appender(), opts.limit) },
            cache,
            jitterSeed,
            opts.honorTimestamps,
        )
    }
    return sp, nil
}

3.scrape.scrapePool 的代码逻辑

scrapePool 为 targetGroup 下的每个 targets,创立 1 个 scrapeLoop,而后让 scrapeLoop 干活。

// scrape/scrape.go
func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
        // 所有的 targets
    var all []*Target
    sp.mtx.Lock()
    sp.droppedTargets = []*Target{}
    for _, tg := range tgs {targets, err := targetsFromGroup(tg, sp.config)
        ......
        for _, t := range targets {if t.Labels().Len() > 0 {all = append(all, t)
            }
            ......
        }
    }
    sp.mtx.Unlock()
    // 指挥 target 干活
    sp.sync(all)
    ......
}

对每个 target,应用 newLoop() 创立 targetLoop,而后启动 1 个 goroutine,让 targetLoop.run() 循环拉取:

// scrape/scrape.go
// 遍历 Group 下的每个 target,对每个 target: 创立 targetScraper,创立 scrapeLoop,而后 scrapeLoop 进行干活
func (sp *scrapePool) sync(targets []*Target) {
    for _, t := range targets {
        t := t
        hash := t.hash()
        uniqueTargets[hash] = struct{}{}

        if _, ok := sp.activeTargets[hash]; !ok {s := &targetScraper{Target: t, client: sp.client, timeout: timeout}    // 创立 targetScraper
            l := sp.newLoop(scrapeLoopOptions{    // 创立 scrapeLoop
                target:          t,
                scraper:         s,
                limit:           limit,
                honorLabels:     honorLabels,
                honorTimestamps: honorTimestamps,
                mrc:             mrc,
            })

            sp.activeTargets[hash] = t
            sp.loops[hash] = l

            go l.run(interval, timeout, nil)  //scrapeLoop 循环拉取
        }
        ......
    }
    ...
    wg.Wait()}

4.scrapeLoop 的代码逻辑

每个 scrapeLoop 按抓取周期循环执行

  • scrape 抓取指标数据;
  • append 写入底层存储;
  • 最初更新 scrapeLoop 的状态 (次要是指标值);
// scrape/scrape.go
func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
    ......
    ticker := time.NewTicker(interval)    // 定时器,定时执行
    defer ticker.Stop()

    for {
        ......
        var (start             = time.Now()
            scrapeCtx, cancel = context.WithTimeout(sl.ctx, timeout)
        )
        ......
        contentType, scrapeErr := sl.scraper.scrape(scrapeCtx, buf)    //scrape 进行抓取
        ......

        // 写 scrape 的数据写入底层存储
        total, added, seriesAdded, appErr := sl.append(b, contentType, start)
        ......

        sl.buffers.Put(b)    // 写入 buffer
        // 更新 scrapeLoop 的状态
        if err := sl.report(start, time.Since(start), total, added, seriesAdded, scrapeErr); err != nil {level.Warn(sl.l).Log("msg", "Appending scrape report failed", "err", err)
        }

        select {
        ......
        case <-ticker.C:       // 循环执行
        }
    }
    ......
}

5.targetScrape 的抓取逻辑

最终达到 HTTP 抓取的逻辑:

  • 首先,向 target 的 url 发送 HTTP Get;
  • 而后,将写入 io.writer(即上文中的 buffers) 中,待前面解析出指标:
// 抓取逻辑
func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error) {
    if s.req == nil {req, err := http.NewRequest("GET", s.URL().String(), nil)    //Get /metrics 接口
        if err != nil {return "", err}
        req.Header.Add("Accept", acceptHeader)
        req.Header.Add("Accept-Encoding", "gzip")
        req.Header.Set("User-Agent", userAgentHeader)
        req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", fmt.Sprintf("%f", s.timeout.Seconds()))

        s.req = req
    }

    resp, err := s.client.Do(s.req.WithContext(ctx))    // 发送 http GET 申请
    if err != nil {return "", err}
    .......

    if resp.Header.Get("Content-Encoding") != "gzip" {_, err = io.Copy(w, resp.Body)      // 将 response body 写入参数 w
        if err != nil {return "", err}
        return resp.Header.Get("Content-Type"), nil
    }
    if s.gzipr == nil {s.buf = bufio.NewReader(resp.Body)
        s.gzipr, err = gzip.NewReader(s.buf)
        if err != nil {return "", err}
    }
    // 写入 io.writer
    _, err = io.Copy(w, s.gzipr)
    s.gzipr.Close()
    return resp.Header.Get("Content-Type"), nil
}

正文完
 0