discovery反对文件、http、consul等主动发现targets,targets会被发送到scrape模块进行拉取。

一.整体框架

discovery组件通过Manager对象治理所有的逻辑,当有数据变动时,通过syncChannel将数据发送给scrape组件。

discovery组件会为每个Job_name创立一个provider对象,它蕴含Discover对象:

  • Discover对象会主动发现target;
  • 当有targets变动时:

    • 首先,通过updateGroup()更新Manager中的targets对象;
    • 而后,向Manager的triggerSend channel发送音讯,通知Manager要更新;
    • 最初,Manager收到triggerSend channel中的音讯,将Manager中的所有targets发送给syncChannel;

scrape组件接管syncChannel中的数据,而后应用reload()进行抓取对象更新:

  • 若有新job,则创立scrapePool并启动它;
  • 若有新target,则创立scrapeLoop并启动它;
  • 若有隐没的target,则进行其scrapeLoop;

二.discovery组件的代码入口

  • 先创立provider和dicover对象;
  • 再启动provider;
// discovery/manager.gofunc (m *Manager) ApplyConfig(cfg map[string]sd_config.ServiceDiscoveryConfig) error {    .......    m.targets = make(map[poolKey]map[string]*targetgroup.Group)    //依据配置创立provider和discover对象    for name, scfg := range cfg {        failedCount += m.registerProviders(scfg, name)    }    //启动provder    for _, prov := range m.providers {        m.startProvider(m.ctx, prov)    }    return nil}

三.provider和discover对象的初始化

provider对象的初始化在Manager.registerProviders():

  • 对主动发现的配置,为其每个配置创立provider和discover对象;
  • 将provider对象退出m.providers;
// discovery/manager.go// registerProviders returns a number of failed SD config.func (m *Manager) registerProviders(cfg sd_config.ServiceDiscoveryConfig, setName string) int {    add := func(cfg interface{}, newDiscoverer func() (Discoverer, error)) {        t := reflect.TypeOf(cfg).String()        //创立Discover对象        d, err := newDiscoverer()        //结构provider对象        provider := provider{            name:   fmt.Sprintf("%s/%d", t, len(m.providers)),            d:      d,            config: cfg,            subs:   []string{setName},        }        //退出m.providers        m.providers = append(m.providers, &provider)    }    // 对file_sd_configs中的每个配置,创立provider和discover    for _, c := range cfg.FileSDConfigs {        add(c, func() (Discoverer, error) {            return file.NewDiscovery(c, log.With(m.logger, "discovery", "file")), nil        })    }    ......    for _, c := range cfg.KubernetesSDConfigs {       add(c, func() (Discoverer, error) {          return kubernetes.New(log.With(m.logger, "discovery", "k8s"), c)       })    }    ………}

discover对象的创立,以file_sd.Discover为例:

// discovery/file/file.go// 应用file_sd_config,创立file Discovery对象func NewDiscovery(conf *SDConfig, logger log.Logger) *Discovery {    disc := &Discovery{        paths:      conf.Files,        interval:   time.Duration(conf.RefreshInterval),        timestamps: make(map[string]float64),        logger:     logger,    }    fileSDTimeStamp.addDiscoverer(disc)    return disc}

discover是interface,不同的发现形式均实现了该interface:

// discovery/manager.gotype Discoverer interface {    Run(ctx context.Context, up chan<- []*targetgroup.Group)}

discover对象当发现有数据变动,会将数据写入Run()中的chan参数。

四.discover对象产生数据

每个provider对象启动时:

  • 启动1个goroutine: 执行disover.Run()去主动发现targets;
  • 启动1个goroutine: 执行m.update()更新数据并告诉Manager;
// discovery/manager.gofunc (m *Manager) startProvider(ctx context.Context, p *provider) {    ctx, cancel := context.WithCancel(ctx)    updates := make(chan []*targetgroup.Group)    m.discoverCancel = append(m.discoverCancel, cancel)    // 让provider中的discover对象,应用Run()去主动发现    // 当有数据变动,将数据写入updates    go p.d.Run(ctx, updates)    // 更新Manager中的targets并告诉Manager    go m.updater(ctx, p, updates)}

1) goroutine1: p.d.Run()

以file.Discovery为例:

  • 它感知file内容的变动,应用d.refresh()解析file内容:

    • 通过fsnotify感知file变动;
    • 应用Ticker定期刷新(默认5min);
  • d.refresh()负责读文件,而后将后果发送到channel:
// discovery/file/file.gofunc (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {    watcher, err := fsnotify.NewWatcher()    d.watcher = watcher    d.refresh(ctx, ch)    ticker := time.NewTicker(d.interval)    //默认5min    defer ticker.Stop()    for {        select {        case <-ctx.Done():            return        case event := <-d.watcher.Events:    // fsnotify感知到file变动            d.refresh(ctx, ch)        case <-ticker.C:            // Setting a new watch after an update might fail. Make sure we don't lose            // those files forever.            d.refresh(ctx, ch)    }}
// discovery/file/file.gofunc (d *Discovery) refresh(ctx context.Context, ch chan<- []*targetgroup.Group) {    for _, p := range d.listFiles() {        tgroups, err := d.readFile(p)        select {        // 发送到chan        case ch <- tgroups:        case <-ctx.Done():            return        }    }}

2) goroutine2: m.updater()

该goroutine负责将接管并解决上一步chan中的数据:

  • 首先,将接管的数据更新至m.targets中;
  • 而后,向m.triggerSend这个chan发送音讯;
// discovery/manager.gofunc (m *Manager) updater(ctx context.Context, p *provider, updates chan []*targetgroup.Group) {    for {        select {        case <-ctx.Done():            return        case tgs, ok := <-updates:            //更新m.targets中的数据            for _, s := range p.subs {                m.updateGroup(poolKey{setName: s, provider: p.name}, tgs)            }            //向m.triggerSend发送音讯            select {            case m.triggerSend <- struct{}{}:            default:            }        }    }}

Manager对象会接管并解决m.triggerSend音讯:

  • 收到m.triggerSend后,将Manager中的所有targets发送到m.syncChan;
  • scrape组件会接管并解决syncChan;
// discovery/manager.gofunc (m *Manager) sender() {    ticker := time.NewTicker(m.updatert)    defer ticker.Stop()    for {        select {        case <-m.ctx.Done():            return        case <-ticker.C: // Some discoverers send updates too often so we throttle these with the ticker.            select {            // 接管m.triggerSend音讯            case <-m.triggerSend:                select {                // 将Manager中的targets发送到syncCh                case m.syncCh <- m.allGroups():                default:            }        }    }}

五.scrape组件接收数据

上一步讲到,discovery.Manager将以后所有的targets,发送到m.syncChan;

scrape组件会接管并解决m.syncChan中的数据:

// cmd/prometheus/main.gofunc main() {    ......    scrapeManager.Run(discoveryManagerScrape.SyncCh())    ......}

scrapeManager.Run()收到数据后:

  • 读取chan中的数据,更新至scrape.Manager.targets中;
  • 发送triggerReload音讯,由m.reload()进行解决;
// scrape/manager.gofunc (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {    go m.reloader()        //解决reload音讯    for {        select {        case ts := <-tsets:            m.updateTsets(ts)    //更新Manager中的targets对象            select {            case m.triggerReload <- struct{}{}:    //发送reload音讯            default:            }        }    }}

m.reloader()解决m.triggerReload音讯:

  • 接管m.triggerReload音讯;
  • 调用m.reload()进行解决;
// scrape/manager.gofunc (m *Manager) reloader() {    ticker := time.NewTicker(5 * time.Second)    defer ticker.Stop()    for {        select {        case <-ticker.C:            select {            case <-m.triggerReload:                m.reload()            case <-m.graceShut:                return            }        }    }}

m.reload()解析新的targetSets:

  • 若有新job_name,则创立scrapePool;
  • 对每个scrapePool进行sync();
func (m *Manager) reload() {    m.mtxScrape.Lock()    var wg sync.WaitGroup    for setName, groups := range m.targetSets {        // 新job_name配置,创立scrapePool        if _, ok := m.scrapePools[setName]; !ok {            scrapeConfig, ok := m.scrapeConfigs[setName]            sp, err := newScrapePool(scrapeConfig, m.append, m.jitterSeed, log.With(m.logger, "scrape_pool", setName))            m.scrapePools[setName] = sp        }        wg.Add(1)        // 并行的同步每个scrapePool        // Run the sync in parallel as these take a while and at high load can't catch up.        go func(sp *scrapePool, groups []*targetgroup.Group) {            sp.Sync(groups)            wg.Done()        }(m.scrapePools[setName], groups)    }    m.mtxScrape.Unlock()    wg.Wait()}

scrapePool.sync():

  • 对新的target,创立scrapeLoop并执行;
  • 对隐没的target,进行其scrapeLoop并删除其对象;
// scrape/scrape.gofunc (sp *scrapePool) sync(targets []*Target) {    for _, t := range targets {        t := t        hash := t.hash()        if _, ok := sp.activeTargets[hash]; !ok {            s := &targetScraper{Target: t, client: sp.client, timeout: timeout}            // 新target,创立scrapeLoop            l := sp.newLoop(scrapeLoopOptions{                target:          t,                scraper:         s,                limit:           limit,                honorLabels:     honorLabels,                honorTimestamps: honorTimestamps,                mrc:             mrc,            })            sp.activeTargets[hash] = t            sp.loops[hash] = l            // 执行scrapeLoop            go l.run(interval, timeout, nil)        }    }            var wg sync.WaitGroup    for hash := range sp.activeTargets {       // 隐没的target       if _, ok := uniqueTargets[hash]; !ok {           wg.Add(1)           go func(l loop) {              l.stop()                    //进行scrapeLoop              wg.Done()           }(sp.loops[hash])           delete(sp.loops, hash)        //删除对象           delete(sp.activeTargets, hash)        }    }    wg.Wait()}