乐趣区

关于prometheus:prometheus源码分析discovery自动发现

discovery 反对文件、http、consul 等主动发现 targets,targets 会被发送到 scrape 模块进行拉取。

一. 整体框架

discovery 组件通过 Manager 对象治理所有的逻辑,当有数据变动时,通过 syncChannel 将数据发送给 scrape 组件。

discovery 组件会为每个 Job_name 创立一个 provider 对象,它蕴含 Discover 对象:

  • Discover 对象会主动发现 target;
  • 当有 targets 变动时:

    • 首先,通过 updateGroup() 更新 Manager 中的 targets 对象;
    • 而后,向 Manager 的 triggerSend channel 发送音讯,通知 Manager 要更新;
    • 最初,Manager 收到 triggerSend channel 中的音讯,将 Manager 中的所有 targets 发送给 syncChannel;

scrape 组件接管 syncChannel 中的数据,而后应用 reload() 进行抓取对象更新:

  • 若有新 job,则创立 scrapePool 并启动它;
  • 若有新 target,则创立 scrapeLoop 并启动它;
  • 若有隐没的 target,则进行其 scrapeLoop;

二.discovery 组件的代码入口

  • 先创立 provider 和 dicover 对象;
  • 再启动 provider;
// discovery/manager.go
func (m *Manager) ApplyConfig(cfg map[string]sd_config.ServiceDiscoveryConfig) error {
    .......
    m.targets = make(map[poolKey]map[string]*targetgroup.Group)
    // 依据配置创立 provider 和 discover 对象
    for name, scfg := range cfg {failedCount += m.registerProviders(scfg, name)
    }
    // 启动 provder
    for _, prov := range m.providers {m.startProvider(m.ctx, prov)
    }
    return nil
}

三.provider 和 discover 对象的初始化

provider 对象的初始化在 Manager.registerProviders():

  • 对主动发现的配置,为其每个配置创立 provider 和 discover 对象;
  • 将 provider 对象退出 m.providers;
// discovery/manager.go
// registerProviders returns a number of failed SD config.
func (m *Manager) registerProviders(cfg sd_config.ServiceDiscoveryConfig, setName string) int {add := func(cfg interface{}, newDiscoverer func() (Discoverer, error)) {t := reflect.TypeOf(cfg).String()
        // 创立 Discover 对象
        d, err := newDiscoverer()
        // 结构 provider 对象
        provider := provider{name:   fmt.Sprintf("%s/%d", t, len(m.providers)),
            d:      d,
            config: cfg,
            subs:   []string{setName},
        }
        // 退出 m.providers
        m.providers = append(m.providers, &provider)
    }
    // 对 file_sd_configs 中的每个配置,创立 provider 和 discover
    for _, c := range cfg.FileSDConfigs {add(c, func() (Discoverer, error) {return file.NewDiscovery(c, log.With(m.logger, "discovery", "file")), nil
        })
    }
    ......
    for _, c := range cfg.KubernetesSDConfigs {add(c, func() (Discoverer, error) {return kubernetes.New(log.With(m.logger, "discovery", "k8s"), c)
       })
    }
    ………
}

discover 对象的创立,以 file_sd.Discover 为例:

// discovery/file/file.go
// 应用 file_sd_config,创立 file Discovery 对象
func NewDiscovery(conf *SDConfig, logger log.Logger) *Discovery {
    disc := &Discovery{
        paths:      conf.Files,
        interval:   time.Duration(conf.RefreshInterval),
        timestamps: make(map[string]float64),
        logger:     logger,
    }
    fileSDTimeStamp.addDiscoverer(disc)
    return disc
}

discover 是 interface,不同的发现形式均实现了该 interface:

// discovery/manager.go
type Discoverer interface {Run(ctx context.Context, up chan<- []*targetgroup.Group)
}

discover 对象当发现有数据变动,会将数据写入 Run() 中的 chan 参数。

四.discover 对象产生数据

每个 provider 对象启动时:

  • 启动 1 个 goroutine: 执行 disover.Run() 去主动发现 targets;
  • 启动 1 个 goroutine: 执行 m.update() 更新数据并告诉 Manager;
// discovery/manager.go
func (m *Manager) startProvider(ctx context.Context, p *provider) {ctx, cancel := context.WithCancel(ctx)
    updates := make(chan []*targetgroup.Group)
    m.discoverCancel = append(m.discoverCancel, cancel)
    // 让 provider 中的 discover 对象,应用 Run() 去主动发现
    // 当有数据变动,将数据写入 updates
    go p.d.Run(ctx, updates)
    // 更新 Manager 中的 targets 并告诉 Manager
    go m.updater(ctx, p, updates)
}

1) goroutine1: p.d.Run()

以 file.Discovery 为例:

  • 它感知 file 内容的变动,应用 d.refresh() 解析 file 内容:

    • 通过 fsnotify 感知 file 变动;
    • 应用 Ticker 定期刷新 (默认 5min);
  • d.refresh() 负责读文件,而后将后果发送到 channel:
// discovery/file/file.go
func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {watcher, err := fsnotify.NewWatcher()
    d.watcher = watcher
    d.refresh(ctx, ch)

    ticker := time.NewTicker(d.interval)    // 默认 5min
    defer ticker.Stop()
    for {
        select {case <-ctx.Done():
            return
        case event := <-d.watcher.Events:    // fsnotify 感知到 file 变动
            d.refresh(ctx, ch)
        case <-ticker.C:
            // Setting a new watch after an update might fail. Make sure we don't lose
            // those files forever.
            d.refresh(ctx, ch)
    }
}
// discovery/file/file.go
func (d *Discovery) refresh(ctx context.Context, ch chan<- []*targetgroup.Group) {for _, p := range d.listFiles() {tgroups, err := d.readFile(p)
        select {
        // 发送到 chan
        case ch <- tgroups:
        case <-ctx.Done():
            return
        }
    }
}

2) goroutine2: m.updater()

该 goroutine 负责将接管并解决上一步 chan 中的数据:

  • 首先,将接管的数据更新至 m.targets 中;
  • 而后,向 m.triggerSend 这个 chan 发送音讯;
// discovery/manager.go
func (m *Manager) updater(ctx context.Context, p *provider, updates chan []*targetgroup.Group) {
    for {
        select {case <-ctx.Done():
            return
        case tgs, ok := <-updates:
            // 更新 m.targets 中的数据
            for _, s := range p.subs {m.updateGroup(poolKey{setName: s, provider: p.name}, tgs)
            }
            // 向 m.triggerSend 发送音讯
            select {case m.triggerSend <- struct{}{}:
            default:
            }
        }
    }
}

Manager 对象会接管并解决 m.triggerSend 音讯:

  • 收到 m.triggerSend 后,将 Manager 中的所有 targets 发送到 m.syncChan;
  • scrape 组件会接管并解决 syncChan;
// discovery/manager.go
func (m *Manager) sender() {ticker := time.NewTicker(m.updatert)
    defer ticker.Stop()

    for {
        select {case <-m.ctx.Done():
            return
        case <-ticker.C: // Some discoverers send updates too often so we throttle these with the ticker.
            select {
            // 接管 m.triggerSend 音讯
            case <-m.triggerSend:
                select {
                // 将 Manager 中的 targets 发送到 syncCh
                case m.syncCh <- m.allGroups():
                default:
            }
        }
    }
}

五.scrape 组件接收数据

上一步讲到,discovery.Manager 将以后所有的 targets,发送到 m.syncChan;

scrape 组件会接管并解决 m.syncChan 中的数据:

// cmd/prometheus/main.go
func main() {
    ......
    scrapeManager.Run(discoveryManagerScrape.SyncCh())
    ......
}

scrapeManager.Run() 收到数据后:

  • 读取 chan 中的数据,更新至 scrape.Manager.targets 中;
  • 发送 triggerReload 音讯,由 m.reload() 进行解决;
// scrape/manager.go
func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {go m.reloader()        // 解决 reload 音讯
    for {
        select {
        case ts := <-tsets:
            m.updateTsets(ts)    // 更新 Manager 中的 targets 对象

            select {case m.triggerReload <- struct{}{}:    // 发送 reload 音讯
            default:
            }
        }
    }
}

m.reloader() 解决 m.triggerReload 音讯:

  • 接管 m.triggerReload 音讯;
  • 调用 m.reload() 进行解决;
// scrape/manager.go
func (m *Manager) reloader() {ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()
    for {
        select {
        case <-ticker.C:
            select {
            case <-m.triggerReload:
                m.reload()
            case <-m.graceShut:
                return
            }
        }
    }
}

m.reload() 解析新的 targetSets:

  • 若有新 job_name,则创立 scrapePool;
  • 对每个 scrapePool 进行 sync();
func (m *Manager) reload() {m.mtxScrape.Lock()
    var wg sync.WaitGroup
    for setName, groups := range m.targetSets {
        // 新 job_name 配置,创立 scrapePool
        if _, ok := m.scrapePools[setName]; !ok {scrapeConfig, ok := m.scrapeConfigs[setName]
            sp, err := newScrapePool(scrapeConfig, m.append, m.jitterSeed, log.With(m.logger, "scrape_pool", setName))
            m.scrapePools[setName] = sp
        }
        wg.Add(1)
        // 并行的同步每个 scrapePool
        // Run the sync in parallel as these take a while and at high load can't catch up.
        go func(sp *scrapePool, groups []*targetgroup.Group) {sp.Sync(groups)
            wg.Done()}(m.scrapePools[setName], groups)
    }
    m.mtxScrape.Unlock()
    wg.Wait()}

scrapePool.sync():

  • 对新的 target,创立 scrapeLoop 并执行;
  • 对隐没的 target,进行其 scrapeLoop 并删除其对象;
// scrape/scrape.go
func (sp *scrapePool) sync(targets []*Target) {
    for _, t := range targets {
        t := t
        hash := t.hash()
        if _, ok := sp.activeTargets[hash]; !ok {s := &targetScraper{Target: t, client: sp.client, timeout: timeout}
            // 新 target,创立 scrapeLoop
            l := sp.newLoop(scrapeLoopOptions{
                target:          t,
                scraper:         s,
                limit:           limit,
                honorLabels:     honorLabels,
                honorTimestamps: honorTimestamps,
                mrc:             mrc,
            })
            sp.activeTargets[hash] = t
            sp.loops[hash] = l
            // 执行 scrapeLoop
            go l.run(interval, timeout, nil)
        }
    }
        
    var wg sync.WaitGroup
    for hash := range sp.activeTargets {
       // 隐没的 target
       if _, ok := uniqueTargets[hash]; !ok {wg.Add(1)
           go func(l loop) {l.stop()                    // 进行 scrapeLoop
              wg.Done()}(sp.loops[hash])
           delete(sp.loops, hash)        // 删除对象
           delete(sp.activeTargets, hash)
        }
    }
    wg.Wait()}
退出移动版