共计 5661 个字符,预计需要花费 15 分钟才能阅读完成。
scrape 模块代码位于 prometheus/scrape 目录下,负责监控对象的指标拉取。
1. 整体框架
整体代码框架:
- 由 scrape.Manager 治理所有的抓取对象;
- 所有的抓取对象按 group 分组,每个 group 是一个 job_name;
- 每个 group 下含多个 scrapeTarget,即具体的抓取指标 endpoint;
- 对每个指标 endpoint,启动一个抓取 goroutine,依照 interval 距离循环的抓取对象的指标;
如果 prometheus.yaml 中的抓取配置为:
scrape_configs:
- job_name: "monitor"
static_configs:
- targets: ['192.168.101.9:11504']
- job_name: 'node-exporter'
static_configs:
- targets: ['10.21.1.74:9100', '192.168.101.9:9100']
那么,抓取对象将按如下构造分组:
{
"monitor": [
{
"Targets": [
{"__address__": "192.168.101.9:11504"}
],
"Labels": null,
"Source": "0"
}
],
"node-exporter": [
{
"Targets": [
{"__address__": "10.21.1.74:9100"},
{"__address__": "192.168.101.9:9100"}
],
"Labels": null,
"Source": "0"
}
]
}
2.scrape.Manager 的代码逻辑
代码入口,其中函数参数中的 map[string][]*targetgroup.Group 由 discover 组件传入:
// scrape.manager.go
func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {go m.reloader()
for {
select {
case ts := <-tsets:
m.updateTsets(ts)
select {case m.triggerReload <- struct{}{}: // 发送数据到 channel: m.triggerReload
default:
}
case <-m.graceShut:
return nil
}
}
}
// 期待 channel:m.triggerReload 上的数据,而后进行 reload
// 热加载
func (m *Manager) reloader() {ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-m.graceShut:
return
case <-ticker.C:
select {
case <-m.triggerReload:
m.reload()
case <-m.graceShut:
return
}
}
}
}
具体的初始化动作在 reload() 中,对每个 targetGroup:
- 创立 scrapePool;
- 对 scrapePool 进行 Sync:同步信息进行抓取;
// scrape.manager.go
// 遍历每个 targetGroup:创立 scrapePool,而后对 scrapePool 进行 Sync
func (m *Manager) reload() {m.mtxScrape.Lock()
var wg sync.WaitGroup
for setName, groups := range m.targetSets {if _, ok := m.scrapePools[setName]; !ok {scrapeConfig, ok := m.scrapeConfigs[setName]
...
// 创立 scrapePool
sp, err := newScrapePool(scrapeConfig, m.append, m.jitterSeed, log.With(m.logger, "scrape_pool", setName))
m.scrapePools[setName] = sp
}
wg.Add(1)
// Run the sync in parallel as these take a while and at high load can't catch up.
go func(sp *scrapePool, groups []*targetgroup.Group) {sp.Sync(groups) //scrapePool 内的 Sync,这里是 groups 是数组,然而个别只有 1 个元素
wg.Done()}(m.scrapePools[setName], groups)
}
m.mtxScrape.Unlock()
wg.Wait()}
对于一个 targetGroup 下的所有 target 对象,它们共享 httpclient 和 bufferPool:
// scrape/scrape.go
// 创立 scrapePool
func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed uint64, logger log.Logger) (*scrapePool, error) {client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName, false)
// bufferPool
buffers := pool.New(1e3, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) })
ctx, cancel := context.WithCancel(context.Background())
sp := &scrapePool{
cancel: cancel,
appendable: app,
config: cfg,
client: client,
activeTargets: map[uint64]*Target{},
loops: map[uint64]loop{},
logger: logger,
}
sp.newLoop = func(opts scrapeLoopOptions) loop {
......
return newScrapeLoop(
ctx,
opts.scraper,
log.With(logger, "target", opts.target),
buffers,
func(l labels.Labels) labels.Labels {return mutateSampleLabels(l, opts.target, opts.honorLabels, opts.mrc)
},
func(l labels.Labels) labels.Labels {return mutateReportSampleLabels(l, opts.target) },
func() storage.Appender { return appender(app.Appender(), opts.limit) },
cache,
jitterSeed,
opts.honorTimestamps,
)
}
return sp, nil
}
3.scrape.scrapePool 的代码逻辑
scrapePool 为 targetGroup 下的每个 targets,创立 1 个 scrapeLoop,而后让 scrapeLoop 干活。
// scrape/scrape.go
func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
// 所有的 targets
var all []*Target
sp.mtx.Lock()
sp.droppedTargets = []*Target{}
for _, tg := range tgs {targets, err := targetsFromGroup(tg, sp.config)
......
for _, t := range targets {if t.Labels().Len() > 0 {all = append(all, t)
}
......
}
}
sp.mtx.Unlock()
// 指挥 target 干活
sp.sync(all)
......
}
对每个 target,应用 newLoop() 创立 targetLoop,而后启动 1 个 goroutine,让 targetLoop.run() 循环拉取:
// scrape/scrape.go
// 遍历 Group 下的每个 target,对每个 target: 创立 targetScraper,创立 scrapeLoop,而后 scrapeLoop 进行干活
func (sp *scrapePool) sync(targets []*Target) {
for _, t := range targets {
t := t
hash := t.hash()
uniqueTargets[hash] = struct{}{}
if _, ok := sp.activeTargets[hash]; !ok {s := &targetScraper{Target: t, client: sp.client, timeout: timeout} // 创立 targetScraper
l := sp.newLoop(scrapeLoopOptions{ // 创立 scrapeLoop
target: t,
scraper: s,
limit: limit,
honorLabels: honorLabels,
honorTimestamps: honorTimestamps,
mrc: mrc,
})
sp.activeTargets[hash] = t
sp.loops[hash] = l
go l.run(interval, timeout, nil) //scrapeLoop 循环拉取
}
......
}
...
wg.Wait()}
4.scrapeLoop 的代码逻辑
每个 scrapeLoop 按抓取周期循环执行
- scrape 抓取指标数据;
- append 写入底层存储;
- 最初更新 scrapeLoop 的状态 (次要是指标值);
// scrape/scrape.go
func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
......
ticker := time.NewTicker(interval) // 定时器,定时执行
defer ticker.Stop()
for {
......
var (start = time.Now()
scrapeCtx, cancel = context.WithTimeout(sl.ctx, timeout)
)
......
contentType, scrapeErr := sl.scraper.scrape(scrapeCtx, buf) //scrape 进行抓取
......
// 写 scrape 的数据写入底层存储
total, added, seriesAdded, appErr := sl.append(b, contentType, start)
......
sl.buffers.Put(b) // 写入 buffer
// 更新 scrapeLoop 的状态
if err := sl.report(start, time.Since(start), total, added, seriesAdded, scrapeErr); err != nil {level.Warn(sl.l).Log("msg", "Appending scrape report failed", "err", err)
}
select {
......
case <-ticker.C: // 循环执行
}
}
......
}
5.targetScrape 的抓取逻辑
最终达到 HTTP 抓取的逻辑:
- 首先,向 target 的 url 发送 HTTP Get;
- 而后,将写入 io.writer(即上文中的 buffers) 中,待前面解析出指标:
// 抓取逻辑
func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error) {
if s.req == nil {req, err := http.NewRequest("GET", s.URL().String(), nil) //Get /metrics 接口
if err != nil {return "", err}
req.Header.Add("Accept", acceptHeader)
req.Header.Add("Accept-Encoding", "gzip")
req.Header.Set("User-Agent", userAgentHeader)
req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", fmt.Sprintf("%f", s.timeout.Seconds()))
s.req = req
}
resp, err := s.client.Do(s.req.WithContext(ctx)) // 发送 http GET 申请
if err != nil {return "", err}
.......
if resp.Header.Get("Content-Encoding") != "gzip" {_, err = io.Copy(w, resp.Body) // 将 response body 写入参数 w
if err != nil {return "", err}
return resp.Header.Get("Content-Type"), nil
}
if s.gzipr == nil {s.buf = bufio.NewReader(resp.Body)
s.gzipr, err = gzip.NewReader(s.buf)
if err != nil {return "", err}
}
// 写入 io.writer
_, err = io.Copy(w, s.gzipr)
s.gzipr.Close()
return resp.Header.Get("Content-Type"), nil
}
正文完
发表至: prometheus
2022-01-08