prometheus的rule有两类:

  • AlertingRule: 告警规定;
  • RecordingRule: 表达式规定,用于产生新的指标;

1.整体框架

prometheus的rule治理次要在代码目录prometheus/rules/中:

  • rules.Manager在运行时的时候,会读取rules/*.yaml文件,读取出所有的分组rules.Group;
  • 为每个rules.Group调配1个goroutine,周期性的执行group下所有的rules;
  • 对每个Rule:

    • 若是AlertingRule,则进行eval,若触发告警,则sendAlerts进来;
    • 若是RecordingRule,则进行eval,将后果写入TSDB;

如果rules/xxx.yaml文件内容如下:

groups:- name: test  rules:  - expr: |      100 - avg without (cpu, mode) (        rate(node_cpu_seconds_total{job="node-exporter", mode="idle"}[1m])      )*100    record: instance:node_cpu_used_percent:rate1m    labels:      __type__: "gauge"  - alert: Too_Many_Goroutine    expr: |      go_goroutines > 100    labels:      serverity: high    annotations:      summary: too many goroutines- name: cpu_used  rules:  - expr: |      node_cpu_seconds_total{mode="user", cpu="0"}    record: cpu_used_percent    labels:      cpu: "0"

那么,它将产生2个group:

  • group: test,内含1个recodingRule和1个alertingRule;
  • group: cpu_used,内含1个recordingRule;

2.规定加载的代码逻辑

manager.Update加载规定,并为每个group启动1个goroutine运行规定:

// rules/manager.gofunc (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels) error {    // 加载规定    groups, errs := m.LoadGroups(interval, externalLabels, files...)        var wg sync.WaitGroup    for _, newg := range groups {                ......        wg.Add(1)        go func(newg *Group) {            ...            go func() {                ......                // 运行group内的规定                newg.run(m.opts.Context)            }()            wg.Done()        }(newg)    }    ......    wg.Wait()    m.groups = groups    return nil}

读取规定的逻辑:

  • 1个group下可能有多个rules;
  • 依据规定中是否配置alert字段判断,是alertingRule or recordingRule;
// rules/manager.go// LoadGroups reads groups from a list of files.func (m *Manager) LoadGroups(    interval time.Duration, externalLabels labels.Labels, filenames ...string,) (map[string]*Group, []error) {    groups := make(map[string]*Group)    for _, fn := range filenames {        rgs, errs := rulefmt.ParseFile(fn)                for _, rg := range rgs.Groups {            itv := interval            if rg.Interval != 0 {                itv = time.Duration(rg.Interval)            }            rules := make([]Rule, 0, len(rg.Rules))            for _, r := range rg.Rules {                expr, err := parser.ParseExpr(r.Expr.Value)                                // 告警规定                if r.Alert.Value != "" {                    rules = append(rules, NewAlertingRule(                        r.Alert.Value,                        expr,                        time.Duration(r.For),                        labels.FromMap(r.Labels),                        labels.FromMap(r.Annotations),                        externalLabels,                        m.restored,                        log.With(m.logger, "alert", r.Alert),                    ))                    continue                }                // 表达式规定                rules = append(rules, NewRecordingRule(                    r.Record.Value,                    expr,                    labels.FromMap(r.Labels),                ))            }            groups[groupKey(fn, rg.Name)] = NewGroup(GroupOptions{                Name:          rg.Name,                File:          fn,                Interval:      itv,                Rules:         rules,                ShouldRestore: shouldRestore,                Opts:          m.opts,                done:          m.done,            })        }    }    return groups, nil}

3.规定执行逻辑

对每个group,利用定时器,每隔interval执行1次:

// rules/manager.gofunc (g *Group) run(ctx context.Context) {    // Wait an initial amount to have consistently slotted intervals.    evalTimestamp := g.evalTimestamp().Add(g.interval)    select {    case <-time.After(time.Until(evalTimestamp)):    case <-g.done:        return    }        iter := func() {                ......        g.Eval(ctx, evalTimestamp)            }    tick := time.NewTicker(g.interval)    defer tick.Stop()        iter()        for {        select {        case <-g.done:            return        default:            select {            case <-g.done:                return            case <-tick.C:                .....                iter()            }        }    }}

group每个interval做的事件:

  • 遍历group下的每个rule,执行每个规定;
  • 若是告警规定,当告警触发时,将alert收回去;
  • 若是表达式规定,将新metrics写入TSDB;
// rules/manager.go// Eval runs a single evaluation cycle in which all rules are evaluated sequentially.func (g *Group) Eval(ctx context.Context, ts time.Time) {    for i, rule := range g.rules {                ......        func(i int, rule Rule) {            // 执行规定            vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL)                        // 告警规定被触发            if ar, ok := rule.(*AlertingRule); ok {                ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc)            }                        app := g.opts.Appendable.Appender()            seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i]))            defer func() {                // flush新metrics                if err := app.Commit(); err != nil {                    level.Warn(g.logger).Log("msg", "Rule sample appending failed", "err", err)                    return                }            }()            for _, s := range vector {                // 增加新metrics                _, err := app.Add(s.Metric, s.T, s.V);                ....            }        }(i, rule)    }}

4.规定的eval代码逻辑

从下面看到,不论是alertingRule还是recordingRule,都是通过rule.Eval执行的。
对于alertingRule.Eval():

  • 利用PromQL查问表达式的值,若规定被触发,则PromQL将输入res;
  • 利用告警维持工夫,将沉闷告警保护在r.active[]中;告警发送模块仅发送r.active[]中的告警;
// rules/alerting.go// Eval evaluates the rule expression and then creates pending alerts and fires// or removes previously pending alerts accordingly.func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL) (promql.Vector, error) {    // promQL查问出后果    res, err := query(ctx, r.vector.String(), ts)            var alerts = make(map[uint64]*Alert, len(res))    for _, smpl := range res {        ......        alerts[h] = &Alert{            Labels:      lbs,            Annotations: annotations,            ActiveAt:    ts,            State:       StatePending,            Value:       smpl.V,        }    }    for h, a := range alerts {        // Check whether we already have alerting state for the identifying label set.        // Update the last value and annotations if so, create a new alert entry otherwise.        if alert, ok := r.active[h]; ok && alert.State != StateInactive {            alert.Value = a.Value            alert.Annotations = a.Annotations            continue        }        r.active[h] = a    //alert被存在r.active中,active中的alert被发送给alertmanager    }    ...    // We have already acquired the lock above hence using SetHealth and    // SetLastError will deadlock.    r.health = HealthGood    r.lastError = err    return vec, nil}

对于recordingRule.Eval()

  • PromQL计算出最新的metrics值;
// rules/recording.go// Eval evaluates the rule and then overrides the metric names and labels accordingly.func (rule *RecordingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, _ *url.URL) (promql.Vector, error) {    vector, err := query(ctx, rule.vector.String(), ts)        // Override the metric name and labels.    for i := range vector {        sample := &vector[i]        lb := labels.NewBuilder(sample.Metric)        lb.Set(labels.MetricName, rule.name)        for _, l := range rule.labels {            lb.Set(l.Name, l.Value)        }        sample.Metric = lb.Labels()    }    ...    return vector, nil}

下面的PromQL计算,均应用了QueryFunc,QueryFunc应用了PromQL的引擎;

// cmd/prometheus/main.go// 初始化ruleManager = rules.NewManager(&rules.ManagerOptions{            Appendable:      fanoutStorage,            TSDB:            localStorage,            QueryFunc:       rules.EngineQueryFunc(queryEngine, fanoutStorage),            NotifyFunc:      sendAlerts(notifierManager, cfg.web.ExternalURL.String()),            Context:         ctxRule,            ExternalURL:     cfg.web.ExternalURL,            Registerer:      prometheus.DefaultRegisterer,            Logger:          log.With(logger, "component", "rule manager"),            OutageTolerance: time.Duration(cfg.outageTolerance),            ForGracePeriod:  time.Duration(cfg.forGracePeriod),            ResendDelay:     time.Duration(cfg.resendDelay),        })// rules/manager.go// EngineQueryFunc returns a new query function that executes instant queries against// the given engine.// It converts scalar into vector results.func EngineQueryFunc(engine *promql.Engine, q storage.Queryable) QueryFunc {    return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {        q, err := engine.NewInstantQuery(q, qs, t)        if err != nil {            return nil, err        }        res := q.Exec(ctx)        if res.Err != nil {            return nil, res.Err        }        switch v := res.Value.(type) {        case promql.Vector:            return v, nil        case promql.Scalar:            return promql.Vector{promql.Sample{                Point:  promql.Point(v),                Metric: labels.Labels{},            }}, nil        default:            return nil, errors.New("rule result is not a vector or scalar")        }    }}

参考

1.告警断定与告警发现:https://segmentfault.com/a/11...