关于prometheus:prometheus源码分析rules模块

9次阅读

共计 6395 个字符,预计需要花费 16 分钟才能阅读完成。

prometheus 的 rule 有两类:

  • AlertingRule: 告警规定;
  • RecordingRule: 表达式规定,用于产生新的指标;

1. 整体框架

prometheus 的 rule 治理次要在代码目录 prometheus/rules/ 中:

  • rules.Manager 在运行时的时候,会读取 rules/*.yaml 文件,读取出所有的分组 rules.Group;
  • 为每个 rules.Group 调配 1 个 goroutine,周期性的执行 group 下所有的 rules;
  • 对每个 Rule:

    • 若是 AlertingRule,则进行 eval,若触发告警,则 sendAlerts 进来;
    • 若是 RecordingRule,则进行 eval,将后果写入 TSDB;

如果 rules/xxx.yaml 文件内容如下:

groups:
- name: test
  rules:
  - expr: |
      100 - avg without (cpu, mode) (rate(node_cpu_seconds_total{job="node-exporter", mode="idle"}[1m])
      )*100
    record: instance:node_cpu_used_percent:rate1m
    labels:
      __type__: "gauge"
  - alert: Too_Many_Goroutine
    expr: |
      go_goroutines > 100
    labels:
      serverity: high
    annotations:
      summary: too many goroutines
- name: cpu_used
  rules:
  - expr: |
      node_cpu_seconds_total{mode="user", cpu="0"}
    record: cpu_used_percent
    labels:
      cpu: "0"

那么,它将产生 2 个 group:

  • group: test,内含 1 个 recodingRule 和 1 个 alertingRule;
  • group: cpu_used,内含 1 个 recordingRule;

2. 规定加载的代码逻辑

manager.Update 加载规定,并为每个 group 启动 1 个 goroutine 运行规定:

// rules/manager.go
func (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels) error {
    // 加载规定
    groups, errs := m.LoadGroups(interval, externalLabels, files...)
    
    var wg sync.WaitGroup
    for _, newg := range groups {        
        ......
        wg.Add(1)
        go func(newg *Group) {
            ...
            go func() {
                ......
                // 运行 group 内的规定
                newg.run(m.opts.Context)
            }()
            wg.Done()}(newg)
    }
    ......
    wg.Wait()
    m.groups = groups

    return nil
}

读取规定的逻辑:

  • 1 个 group 下可能有多个 rules;
  • 依据规定中是否配置 alert 字段判断,是 alertingRule or recordingRule;
// rules/manager.go
// LoadGroups reads groups from a list of files.
func (m *Manager) LoadGroups(interval time.Duration, externalLabels labels.Labels, filenames ...string,) (map[string]*Group, []error) {groups := make(map[string]*Group)

    for _, fn := range filenames {rgs, errs := rulefmt.ParseFile(fn)        
        for _, rg := range rgs.Groups {
            itv := interval
            if rg.Interval != 0 {itv = time.Duration(rg.Interval)
            }
            rules := make([]Rule, 0, len(rg.Rules))
            for _, r := range rg.Rules {expr, err := parser.ParseExpr(r.Expr.Value)                
                // 告警规定
                if r.Alert.Value != "" {
                    rules = append(rules, NewAlertingRule(
                        r.Alert.Value,
                        expr,
                        time.Duration(r.For),
                        labels.FromMap(r.Labels),
                        labels.FromMap(r.Annotations),
                        externalLabels,
                        m.restored,
                        log.With(m.logger, "alert", r.Alert),
                    ))
                    continue
                }
                // 表达式规定
                rules = append(rules, NewRecordingRule(
                    r.Record.Value,
                    expr,
                    labels.FromMap(r.Labels),
                ))
            }
            groups[groupKey(fn, rg.Name)] = NewGroup(GroupOptions{
                Name:          rg.Name,
                File:          fn,
                Interval:      itv,
                Rules:         rules,
                ShouldRestore: shouldRestore,
                Opts:          m.opts,
                done:          m.done,
            })
        }
    }

    return groups, nil
}

3. 规定执行逻辑

对每个 group,利用定时器,每隔 interval 执行 1 次:

// rules/manager.go
func (g *Group) run(ctx context.Context) {
    // Wait an initial amount to have consistently slotted intervals.
    evalTimestamp := g.evalTimestamp().Add(g.interval)
    select {case <-time.After(time.Until(evalTimestamp)):
    case <-g.done:
        return
    }    

    iter := func() {        
        ......
        g.Eval(ctx, evalTimestamp)        
    }
    tick := time.NewTicker(g.interval)
    defer tick.Stop()    

    iter()    

    for {
        select {
        case <-g.done:
            return
        default:
            select {
            case <-g.done:
                return
            case <-tick.C:
                .....
                iter()}
        }
    }
}

group 每个 interval 做的事件:

  • 遍历 group 下的每个 rule,执行每个规定;
  • 若是告警规定,当告警触发时,将 alert 收回去;
  • 若是表达式规定,将新 metrics 写入 TSDB;
// rules/manager.go
// Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
func (g *Group) Eval(ctx context.Context, ts time.Time) {
    for i, rule := range g.rules {        
        ......
        func(i int, rule Rule) {
            // 执行规定
            vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL)
            
            // 告警规定被触发
            if ar, ok := rule.(*AlertingRule); ok {ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc)
            }
            
            app := g.opts.Appendable.Appender()
            seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i]))
            defer func() {
                // flush 新 metrics
                if err := app.Commit(); err != nil {level.Warn(g.logger).Log("msg", "Rule sample appending failed", "err", err)
                    return
                }
            }()
            for _, s := range vector {
                // 增加新 metrics
                _, err := app.Add(s.Metric, s.T, s.V);
                ....
            }
        }(i, rule)
    }
}

4. 规定的 eval 代码逻辑

从下面看到,不论是 alertingRule 还是 recordingRule,都是通过 rule.Eval 执行的。
对于 alertingRule.Eval():

  • 利用 PromQL 查问表达式的值,若规定被触发,则 PromQL 将输入 res;
  • 利用告警维持工夫,将沉闷告警保护在 r.active[] 中;告警发送模块仅发送 r.active[] 中的告警;
// rules/alerting.go
// Eval evaluates the rule expression and then creates pending alerts and fires
// or removes previously pending alerts accordingly.
func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL) (promql.Vector, error) {
    // promQL 查问出后果
    res, err := query(ctx, r.vector.String(), ts)    
    
    var alerts = make(map[uint64]*Alert, len(res))
    for _, smpl := range res {
        ......
        alerts[h] = &Alert{
            Labels:      lbs,
            Annotations: annotations,
            ActiveAt:    ts,
            State:       StatePending,
            Value:       smpl.V,
        }
    }
    for h, a := range alerts {
        // Check whether we already have alerting state for the identifying label set.
        // Update the last value and annotations if so, create a new alert entry otherwise.
        if alert, ok := r.active[h]; ok && alert.State != StateInactive {
            alert.Value = a.Value
            alert.Annotations = a.Annotations
            continue
        }
        r.active[h] = a    //alert 被存在 r.active 中,active 中的 alert 被发送给 alertmanager
    }
    ...
    // We have already acquired the lock above hence using SetHealth and
    // SetLastError will deadlock.
    r.health = HealthGood
    r.lastError = err
    return vec, nil
}

对于 recordingRule.Eval()

  • PromQL 计算出最新的 metrics 值;
// rules/recording.go
// Eval evaluates the rule and then overrides the metric names and labels accordingly.
func (rule *RecordingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, _ *url.URL) (promql.Vector, error) {vector, err := query(ctx, rule.vector.String(), ts)
    
    // Override the metric name and labels.
    for i := range vector {sample := &vector[i]
        lb := labels.NewBuilder(sample.Metric)
        lb.Set(labels.MetricName, rule.name)
        for _, l := range rule.labels {lb.Set(l.Name, l.Value)
        }
        sample.Metric = lb.Labels()}
    ...
    return vector, nil
}

下面的 PromQL 计算,均应用了 QueryFunc,QueryFunc 应用了 PromQL 的引擎;

// cmd/prometheus/main.go
// 初始化
ruleManager = rules.NewManager(&rules.ManagerOptions{
            Appendable:      fanoutStorage,
            TSDB:            localStorage,
            QueryFunc:       rules.EngineQueryFunc(queryEngine, fanoutStorage),
            NotifyFunc:      sendAlerts(notifierManager, cfg.web.ExternalURL.String()),
            Context:         ctxRule,
            ExternalURL:     cfg.web.ExternalURL,
            Registerer:      prometheus.DefaultRegisterer,
            Logger:          log.With(logger, "component", "rule manager"),
            OutageTolerance: time.Duration(cfg.outageTolerance),
            ForGracePeriod:  time.Duration(cfg.forGracePeriod),
            ResendDelay:     time.Duration(cfg.resendDelay),
        })

// rules/manager.go
// EngineQueryFunc returns a new query function that executes instant queries against
// the given engine.
// It converts scalar into vector results.
func EngineQueryFunc(engine *promql.Engine, q storage.Queryable) QueryFunc {return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {q, err := engine.NewInstantQuery(q, qs, t)
        if err != nil {return nil, err}
        res := q.Exec(ctx)
        if res.Err != nil {return nil, res.Err}
        switch v := res.Value.(type) {
        case promql.Vector:
            return v, nil
        case promql.Scalar:
            return promql.Vector{promql.Sample{Point:  promql.Point(v),
                Metric: labels.Labels{},}}, nil
        default:
            return nil, errors.New("rule result is not a vector or scalar")
        }
    }
}

参考

1. 告警断定与告警发现:https://segmentfault.com/a/11…

正文完
 0