prometheus的rule有两类:
- AlertingRule: 告警规定;
- RecordingRule: 表达式规定,用于产生新的指标;
1.整体框架
prometheus的rule治理次要在代码目录prometheus/rules/中:
- rules.Manager在运行时的时候,会读取rules/*.yaml文件,读取出所有的分组rules.Group;
- 为每个rules.Group调配1个goroutine,周期性的执行group下所有的rules;
对每个Rule:
- 若是AlertingRule,则进行eval,若触发告警,则sendAlerts进来;
- 若是RecordingRule,则进行eval,将后果写入TSDB;
如果rules/xxx.yaml文件内容如下:
groups:- name: test rules: - expr: | 100 - avg without (cpu, mode) ( rate(node_cpu_seconds_total{job="node-exporter", mode="idle"}[1m]) )*100 record: instance:node_cpu_used_percent:rate1m labels: __type__: "gauge" - alert: Too_Many_Goroutine expr: | go_goroutines > 100 labels: serverity: high annotations: summary: too many goroutines- name: cpu_used rules: - expr: | node_cpu_seconds_total{mode="user", cpu="0"} record: cpu_used_percent labels: cpu: "0"
那么,它将产生2个group:
- group: test,内含1个recodingRule和1个alertingRule;
- group: cpu_used,内含1个recordingRule;
2.规定加载的代码逻辑
manager.Update加载规定,并为每个group启动1个goroutine运行规定:
// rules/manager.gofunc (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels) error { // 加载规定 groups, errs := m.LoadGroups(interval, externalLabels, files...) var wg sync.WaitGroup for _, newg := range groups { ...... wg.Add(1) go func(newg *Group) { ... go func() { ...... // 运行group内的规定 newg.run(m.opts.Context) }() wg.Done() }(newg) } ...... wg.Wait() m.groups = groups return nil}
读取规定的逻辑:
- 1个group下可能有多个rules;
- 依据规定中是否配置alert字段判断,是alertingRule or recordingRule;
// rules/manager.go// LoadGroups reads groups from a list of files.func (m *Manager) LoadGroups( interval time.Duration, externalLabels labels.Labels, filenames ...string,) (map[string]*Group, []error) { groups := make(map[string]*Group) for _, fn := range filenames { rgs, errs := rulefmt.ParseFile(fn) for _, rg := range rgs.Groups { itv := interval if rg.Interval != 0 { itv = time.Duration(rg.Interval) } rules := make([]Rule, 0, len(rg.Rules)) for _, r := range rg.Rules { expr, err := parser.ParseExpr(r.Expr.Value) // 告警规定 if r.Alert.Value != "" { rules = append(rules, NewAlertingRule( r.Alert.Value, expr, time.Duration(r.For), labels.FromMap(r.Labels), labels.FromMap(r.Annotations), externalLabels, m.restored, log.With(m.logger, "alert", r.Alert), )) continue } // 表达式规定 rules = append(rules, NewRecordingRule( r.Record.Value, expr, labels.FromMap(r.Labels), )) } groups[groupKey(fn, rg.Name)] = NewGroup(GroupOptions{ Name: rg.Name, File: fn, Interval: itv, Rules: rules, ShouldRestore: shouldRestore, Opts: m.opts, done: m.done, }) } } return groups, nil}
3.规定执行逻辑
对每个group,利用定时器,每隔interval执行1次:
// rules/manager.gofunc (g *Group) run(ctx context.Context) { // Wait an initial amount to have consistently slotted intervals. evalTimestamp := g.evalTimestamp().Add(g.interval) select { case <-time.After(time.Until(evalTimestamp)): case <-g.done: return } iter := func() { ...... g.Eval(ctx, evalTimestamp) } tick := time.NewTicker(g.interval) defer tick.Stop() iter() for { select { case <-g.done: return default: select { case <-g.done: return case <-tick.C: ..... iter() } } }}
group每个interval做的事件:
- 遍历group下的每个rule,执行每个规定;
- 若是告警规定,当告警触发时,将alert收回去;
- 若是表达式规定,将新metrics写入TSDB;
// rules/manager.go// Eval runs a single evaluation cycle in which all rules are evaluated sequentially.func (g *Group) Eval(ctx context.Context, ts time.Time) { for i, rule := range g.rules { ...... func(i int, rule Rule) { // 执行规定 vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL) // 告警规定被触发 if ar, ok := rule.(*AlertingRule); ok { ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc) } app := g.opts.Appendable.Appender() seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i])) defer func() { // flush新metrics if err := app.Commit(); err != nil { level.Warn(g.logger).Log("msg", "Rule sample appending failed", "err", err) return } }() for _, s := range vector { // 增加新metrics _, err := app.Add(s.Metric, s.T, s.V); .... } }(i, rule) }}
4.规定的eval代码逻辑
从下面看到,不论是alertingRule还是recordingRule,都是通过rule.Eval执行的。
对于alertingRule.Eval():
- 利用PromQL查问表达式的值,若规定被触发,则PromQL将输入res;
- 利用告警维持工夫,将沉闷告警保护在r.active[]中;告警发送模块仅发送r.active[]中的告警;
// rules/alerting.go// Eval evaluates the rule expression and then creates pending alerts and fires// or removes previously pending alerts accordingly.func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL) (promql.Vector, error) { // promQL查问出后果 res, err := query(ctx, r.vector.String(), ts) var alerts = make(map[uint64]*Alert, len(res)) for _, smpl := range res { ...... alerts[h] = &Alert{ Labels: lbs, Annotations: annotations, ActiveAt: ts, State: StatePending, Value: smpl.V, } } for h, a := range alerts { // Check whether we already have alerting state for the identifying label set. // Update the last value and annotations if so, create a new alert entry otherwise. if alert, ok := r.active[h]; ok && alert.State != StateInactive { alert.Value = a.Value alert.Annotations = a.Annotations continue } r.active[h] = a //alert被存在r.active中,active中的alert被发送给alertmanager } ... // We have already acquired the lock above hence using SetHealth and // SetLastError will deadlock. r.health = HealthGood r.lastError = err return vec, nil}
对于recordingRule.Eval()
- PromQL计算出最新的metrics值;
// rules/recording.go// Eval evaluates the rule and then overrides the metric names and labels accordingly.func (rule *RecordingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, _ *url.URL) (promql.Vector, error) { vector, err := query(ctx, rule.vector.String(), ts) // Override the metric name and labels. for i := range vector { sample := &vector[i] lb := labels.NewBuilder(sample.Metric) lb.Set(labels.MetricName, rule.name) for _, l := range rule.labels { lb.Set(l.Name, l.Value) } sample.Metric = lb.Labels() } ... return vector, nil}
下面的PromQL计算,均应用了QueryFunc,QueryFunc应用了PromQL的引擎;
// cmd/prometheus/main.go// 初始化ruleManager = rules.NewManager(&rules.ManagerOptions{ Appendable: fanoutStorage, TSDB: localStorage, QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage), NotifyFunc: sendAlerts(notifierManager, cfg.web.ExternalURL.String()), Context: ctxRule, ExternalURL: cfg.web.ExternalURL, Registerer: prometheus.DefaultRegisterer, Logger: log.With(logger, "component", "rule manager"), OutageTolerance: time.Duration(cfg.outageTolerance), ForGracePeriod: time.Duration(cfg.forGracePeriod), ResendDelay: time.Duration(cfg.resendDelay), })// rules/manager.go// EngineQueryFunc returns a new query function that executes instant queries against// the given engine.// It converts scalar into vector results.func EngineQueryFunc(engine *promql.Engine, q storage.Queryable) QueryFunc { return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) { q, err := engine.NewInstantQuery(q, qs, t) if err != nil { return nil, err } res := q.Exec(ctx) if res.Err != nil { return nil, res.Err } switch v := res.Value.(type) { case promql.Vector: return v, nil case promql.Scalar: return promql.Vector{promql.Sample{ Point: promql.Point(v), Metric: labels.Labels{}, }}, nil default: return nil, errors.New("rule result is not a vector or scalar") } }}
参考
1.告警断定与告警发现:https://segmentfault.com/a/11...