discovery 反对文件、http、consul 等主动发现 targets,targets 会被发送到 scrape 模块进行拉取。
一. 整体框架
discovery 组件通过 Manager 对象治理所有的逻辑,当有数据变动时,通过 syncChannel 将数据发送给 scrape 组件。
discovery 组件会为每个 Job_name 创立一个 provider 对象,它蕴含 Discover 对象:
- Discover 对象会主动发现 target;
-
当有 targets 变动时:
- 首先,通过 updateGroup() 更新 Manager 中的 targets 对象;
- 而后,向 Manager 的 triggerSend channel 发送音讯,通知 Manager 要更新;
- 最初,Manager 收到 triggerSend channel 中的音讯,将 Manager 中的所有 targets 发送给 syncChannel;
scrape 组件接管 syncChannel 中的数据,而后应用 reload() 进行抓取对象更新:
- 若有新 job,则创立 scrapePool 并启动它;
- 若有新 target,则创立 scrapeLoop 并启动它;
- 若有隐没的 target,则进行其 scrapeLoop;
二.discovery 组件的代码入口
- 先创立 provider 和 dicover 对象;
- 再启动 provider;
// discovery/manager.go
func (m *Manager) ApplyConfig(cfg map[string]sd_config.ServiceDiscoveryConfig) error {
.......
m.targets = make(map[poolKey]map[string]*targetgroup.Group)
// 依据配置创立 provider 和 discover 对象
for name, scfg := range cfg {failedCount += m.registerProviders(scfg, name)
}
// 启动 provder
for _, prov := range m.providers {m.startProvider(m.ctx, prov)
}
return nil
}
三.provider 和 discover 对象的初始化
provider 对象的初始化在 Manager.registerProviders():
- 对主动发现的配置,为其每个配置创立 provider 和 discover 对象;
- 将 provider 对象退出 m.providers;
// discovery/manager.go
// registerProviders returns a number of failed SD config.
func (m *Manager) registerProviders(cfg sd_config.ServiceDiscoveryConfig, setName string) int {add := func(cfg interface{}, newDiscoverer func() (Discoverer, error)) {t := reflect.TypeOf(cfg).String()
// 创立 Discover 对象
d, err := newDiscoverer()
// 结构 provider 对象
provider := provider{name: fmt.Sprintf("%s/%d", t, len(m.providers)),
d: d,
config: cfg,
subs: []string{setName},
}
// 退出 m.providers
m.providers = append(m.providers, &provider)
}
// 对 file_sd_configs 中的每个配置,创立 provider 和 discover
for _, c := range cfg.FileSDConfigs {add(c, func() (Discoverer, error) {return file.NewDiscovery(c, log.With(m.logger, "discovery", "file")), nil
})
}
......
for _, c := range cfg.KubernetesSDConfigs {add(c, func() (Discoverer, error) {return kubernetes.New(log.With(m.logger, "discovery", "k8s"), c)
})
}
………
}
discover 对象的创立,以 file_sd.Discover 为例:
// discovery/file/file.go
// 应用 file_sd_config,创立 file Discovery 对象
func NewDiscovery(conf *SDConfig, logger log.Logger) *Discovery {
disc := &Discovery{
paths: conf.Files,
interval: time.Duration(conf.RefreshInterval),
timestamps: make(map[string]float64),
logger: logger,
}
fileSDTimeStamp.addDiscoverer(disc)
return disc
}
discover 是 interface,不同的发现形式均实现了该 interface:
// discovery/manager.go
type Discoverer interface {Run(ctx context.Context, up chan<- []*targetgroup.Group)
}
discover 对象当发现有数据变动,会将数据写入 Run() 中的 chan 参数。
四.discover 对象产生数据
每个 provider 对象启动时:
- 启动 1 个 goroutine: 执行 disover.Run() 去主动发现 targets;
- 启动 1 个 goroutine: 执行 m.update() 更新数据并告诉 Manager;
// discovery/manager.go
func (m *Manager) startProvider(ctx context.Context, p *provider) {ctx, cancel := context.WithCancel(ctx)
updates := make(chan []*targetgroup.Group)
m.discoverCancel = append(m.discoverCancel, cancel)
// 让 provider 中的 discover 对象,应用 Run() 去主动发现
// 当有数据变动,将数据写入 updates
go p.d.Run(ctx, updates)
// 更新 Manager 中的 targets 并告诉 Manager
go m.updater(ctx, p, updates)
}
1) goroutine1: p.d.Run()
以 file.Discovery 为例:
-
它感知 file 内容的变动,应用 d.refresh() 解析 file 内容:
- 通过 fsnotify 感知 file 变动;
- 应用 Ticker 定期刷新 (默认 5min);
- d.refresh() 负责读文件,而后将后果发送到 channel:
// discovery/file/file.go
func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {watcher, err := fsnotify.NewWatcher()
d.watcher = watcher
d.refresh(ctx, ch)
ticker := time.NewTicker(d.interval) // 默认 5min
defer ticker.Stop()
for {
select {case <-ctx.Done():
return
case event := <-d.watcher.Events: // fsnotify 感知到 file 变动
d.refresh(ctx, ch)
case <-ticker.C:
// Setting a new watch after an update might fail. Make sure we don't lose
// those files forever.
d.refresh(ctx, ch)
}
}
// discovery/file/file.go
func (d *Discovery) refresh(ctx context.Context, ch chan<- []*targetgroup.Group) {for _, p := range d.listFiles() {tgroups, err := d.readFile(p)
select {
// 发送到 chan
case ch <- tgroups:
case <-ctx.Done():
return
}
}
}
2) goroutine2: m.updater()
该 goroutine 负责将接管并解决上一步 chan 中的数据:
- 首先,将接管的数据更新至 m.targets 中;
- 而后,向 m.triggerSend 这个 chan 发送音讯;
// discovery/manager.go
func (m *Manager) updater(ctx context.Context, p *provider, updates chan []*targetgroup.Group) {
for {
select {case <-ctx.Done():
return
case tgs, ok := <-updates:
// 更新 m.targets 中的数据
for _, s := range p.subs {m.updateGroup(poolKey{setName: s, provider: p.name}, tgs)
}
// 向 m.triggerSend 发送音讯
select {case m.triggerSend <- struct{}{}:
default:
}
}
}
}
Manager 对象会接管并解决 m.triggerSend 音讯:
- 收到 m.triggerSend 后,将 Manager 中的所有 targets 发送到 m.syncChan;
- scrape 组件会接管并解决 syncChan;
// discovery/manager.go
func (m *Manager) sender() {ticker := time.NewTicker(m.updatert)
defer ticker.Stop()
for {
select {case <-m.ctx.Done():
return
case <-ticker.C: // Some discoverers send updates too often so we throttle these with the ticker.
select {
// 接管 m.triggerSend 音讯
case <-m.triggerSend:
select {
// 将 Manager 中的 targets 发送到 syncCh
case m.syncCh <- m.allGroups():
default:
}
}
}
}
五.scrape 组件接收数据
上一步讲到,discovery.Manager 将以后所有的 targets,发送到 m.syncChan;
scrape 组件会接管并解决 m.syncChan 中的数据:
// cmd/prometheus/main.go
func main() {
......
scrapeManager.Run(discoveryManagerScrape.SyncCh())
......
}
scrapeManager.Run() 收到数据后:
- 读取 chan 中的数据,更新至 scrape.Manager.targets 中;
- 发送 triggerReload 音讯,由 m.reload() 进行解决;
// scrape/manager.go
func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {go m.reloader() // 解决 reload 音讯
for {
select {
case ts := <-tsets:
m.updateTsets(ts) // 更新 Manager 中的 targets 对象
select {case m.triggerReload <- struct{}{}: // 发送 reload 音讯
default:
}
}
}
}
m.reloader() 解决 m.triggerReload 音讯:
- 接管 m.triggerReload 音讯;
- 调用 m.reload() 进行解决;
// scrape/manager.go
func (m *Manager) reloader() {ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
select {
case <-m.triggerReload:
m.reload()
case <-m.graceShut:
return
}
}
}
}
m.reload() 解析新的 targetSets:
- 若有新 job_name,则创立 scrapePool;
- 对每个 scrapePool 进行 sync();
func (m *Manager) reload() {m.mtxScrape.Lock()
var wg sync.WaitGroup
for setName, groups := range m.targetSets {
// 新 job_name 配置,创立 scrapePool
if _, ok := m.scrapePools[setName]; !ok {scrapeConfig, ok := m.scrapeConfigs[setName]
sp, err := newScrapePool(scrapeConfig, m.append, m.jitterSeed, log.With(m.logger, "scrape_pool", setName))
m.scrapePools[setName] = sp
}
wg.Add(1)
// 并行的同步每个 scrapePool
// Run the sync in parallel as these take a while and at high load can't catch up.
go func(sp *scrapePool, groups []*targetgroup.Group) {sp.Sync(groups)
wg.Done()}(m.scrapePools[setName], groups)
}
m.mtxScrape.Unlock()
wg.Wait()}
scrapePool.sync():
- 对新的 target,创立 scrapeLoop 并执行;
- 对隐没的 target,进行其 scrapeLoop 并删除其对象;
// scrape/scrape.go
func (sp *scrapePool) sync(targets []*Target) {
for _, t := range targets {
t := t
hash := t.hash()
if _, ok := sp.activeTargets[hash]; !ok {s := &targetScraper{Target: t, client: sp.client, timeout: timeout}
// 新 target,创立 scrapeLoop
l := sp.newLoop(scrapeLoopOptions{
target: t,
scraper: s,
limit: limit,
honorLabels: honorLabels,
honorTimestamps: honorTimestamps,
mrc: mrc,
})
sp.activeTargets[hash] = t
sp.loops[hash] = l
// 执行 scrapeLoop
go l.run(interval, timeout, nil)
}
}
var wg sync.WaitGroup
for hash := range sp.activeTargets {
// 隐没的 target
if _, ok := uniqueTargets[hash]; !ok {wg.Add(1)
go func(l loop) {l.stop() // 进行 scrapeLoop
wg.Done()}(sp.loops[hash])
delete(sp.loops, hash) // 删除对象
delete(sp.activeTargets, hash)
}
}
wg.Wait()}