共计 6395 个字符,预计需要花费 16 分钟才能阅读完成。
prometheus 的 rule 有两类:
- AlertingRule: 告警规定;
- RecordingRule: 表达式规定,用于产生新的指标;
1. 整体框架
prometheus 的 rule 治理次要在代码目录 prometheus/rules/ 中:
- rules.Manager 在运行时的时候,会读取 rules/*.yaml 文件,读取出所有的分组 rules.Group;
- 为每个 rules.Group 调配 1 个 goroutine,周期性的执行 group 下所有的 rules;
-
对每个 Rule:
- 若是 AlertingRule,则进行 eval,若触发告警,则 sendAlerts 进来;
- 若是 RecordingRule,则进行 eval,将后果写入 TSDB;
如果 rules/xxx.yaml 文件内容如下:
groups:
- name: test
rules:
- expr: |
100 - avg without (cpu, mode) (rate(node_cpu_seconds_total{job="node-exporter", mode="idle"}[1m])
)*100
record: instance:node_cpu_used_percent:rate1m
labels:
__type__: "gauge"
- alert: Too_Many_Goroutine
expr: |
go_goroutines > 100
labels:
serverity: high
annotations:
summary: too many goroutines
- name: cpu_used
rules:
- expr: |
node_cpu_seconds_total{mode="user", cpu="0"}
record: cpu_used_percent
labels:
cpu: "0"
那么,它将产生 2 个 group:
- group: test,内含 1 个 recodingRule 和 1 个 alertingRule;
- group: cpu_used,内含 1 个 recordingRule;
2. 规定加载的代码逻辑
manager.Update 加载规定,并为每个 group 启动 1 个 goroutine 运行规定:
// rules/manager.go
func (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels) error {
// 加载规定
groups, errs := m.LoadGroups(interval, externalLabels, files...)
var wg sync.WaitGroup
for _, newg := range groups {
......
wg.Add(1)
go func(newg *Group) {
...
go func() {
......
// 运行 group 内的规定
newg.run(m.opts.Context)
}()
wg.Done()}(newg)
}
......
wg.Wait()
m.groups = groups
return nil
}
读取规定的逻辑:
- 1 个 group 下可能有多个 rules;
- 依据规定中是否配置 alert 字段判断,是 alertingRule or recordingRule;
// rules/manager.go
// LoadGroups reads groups from a list of files.
func (m *Manager) LoadGroups(interval time.Duration, externalLabels labels.Labels, filenames ...string,) (map[string]*Group, []error) {groups := make(map[string]*Group)
for _, fn := range filenames {rgs, errs := rulefmt.ParseFile(fn)
for _, rg := range rgs.Groups {
itv := interval
if rg.Interval != 0 {itv = time.Duration(rg.Interval)
}
rules := make([]Rule, 0, len(rg.Rules))
for _, r := range rg.Rules {expr, err := parser.ParseExpr(r.Expr.Value)
// 告警规定
if r.Alert.Value != "" {
rules = append(rules, NewAlertingRule(
r.Alert.Value,
expr,
time.Duration(r.For),
labels.FromMap(r.Labels),
labels.FromMap(r.Annotations),
externalLabels,
m.restored,
log.With(m.logger, "alert", r.Alert),
))
continue
}
// 表达式规定
rules = append(rules, NewRecordingRule(
r.Record.Value,
expr,
labels.FromMap(r.Labels),
))
}
groups[groupKey(fn, rg.Name)] = NewGroup(GroupOptions{
Name: rg.Name,
File: fn,
Interval: itv,
Rules: rules,
ShouldRestore: shouldRestore,
Opts: m.opts,
done: m.done,
})
}
}
return groups, nil
}
3. 规定执行逻辑
对每个 group,利用定时器,每隔 interval 执行 1 次:
// rules/manager.go
func (g *Group) run(ctx context.Context) {
// Wait an initial amount to have consistently slotted intervals.
evalTimestamp := g.evalTimestamp().Add(g.interval)
select {case <-time.After(time.Until(evalTimestamp)):
case <-g.done:
return
}
iter := func() {
......
g.Eval(ctx, evalTimestamp)
}
tick := time.NewTicker(g.interval)
defer tick.Stop()
iter()
for {
select {
case <-g.done:
return
default:
select {
case <-g.done:
return
case <-tick.C:
.....
iter()}
}
}
}
group 每个 interval 做的事件:
- 遍历 group 下的每个 rule,执行每个规定;
- 若是告警规定,当告警触发时,将 alert 收回去;
- 若是表达式规定,将新 metrics 写入 TSDB;
// rules/manager.go
// Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
func (g *Group) Eval(ctx context.Context, ts time.Time) {
for i, rule := range g.rules {
......
func(i int, rule Rule) {
// 执行规定
vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL)
// 告警规定被触发
if ar, ok := rule.(*AlertingRule); ok {ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc)
}
app := g.opts.Appendable.Appender()
seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i]))
defer func() {
// flush 新 metrics
if err := app.Commit(); err != nil {level.Warn(g.logger).Log("msg", "Rule sample appending failed", "err", err)
return
}
}()
for _, s := range vector {
// 增加新 metrics
_, err := app.Add(s.Metric, s.T, s.V);
....
}
}(i, rule)
}
}
4. 规定的 eval 代码逻辑
从下面看到,不论是 alertingRule 还是 recordingRule,都是通过 rule.Eval 执行的。
对于 alertingRule.Eval():
- 利用 PromQL 查问表达式的值,若规定被触发,则 PromQL 将输入 res;
- 利用告警维持工夫,将沉闷告警保护在 r.active[] 中;告警发送模块仅发送 r.active[] 中的告警;
// rules/alerting.go
// Eval evaluates the rule expression and then creates pending alerts and fires
// or removes previously pending alerts accordingly.
func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL) (promql.Vector, error) {
// promQL 查问出后果
res, err := query(ctx, r.vector.String(), ts)
var alerts = make(map[uint64]*Alert, len(res))
for _, smpl := range res {
......
alerts[h] = &Alert{
Labels: lbs,
Annotations: annotations,
ActiveAt: ts,
State: StatePending,
Value: smpl.V,
}
}
for h, a := range alerts {
// Check whether we already have alerting state for the identifying label set.
// Update the last value and annotations if so, create a new alert entry otherwise.
if alert, ok := r.active[h]; ok && alert.State != StateInactive {
alert.Value = a.Value
alert.Annotations = a.Annotations
continue
}
r.active[h] = a //alert 被存在 r.active 中,active 中的 alert 被发送给 alertmanager
}
...
// We have already acquired the lock above hence using SetHealth and
// SetLastError will deadlock.
r.health = HealthGood
r.lastError = err
return vec, nil
}
对于 recordingRule.Eval()
- PromQL 计算出最新的 metrics 值;
// rules/recording.go
// Eval evaluates the rule and then overrides the metric names and labels accordingly.
func (rule *RecordingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, _ *url.URL) (promql.Vector, error) {vector, err := query(ctx, rule.vector.String(), ts)
// Override the metric name and labels.
for i := range vector {sample := &vector[i]
lb := labels.NewBuilder(sample.Metric)
lb.Set(labels.MetricName, rule.name)
for _, l := range rule.labels {lb.Set(l.Name, l.Value)
}
sample.Metric = lb.Labels()}
...
return vector, nil
}
下面的 PromQL 计算,均应用了 QueryFunc,QueryFunc 应用了 PromQL 的引擎;
// cmd/prometheus/main.go
// 初始化
ruleManager = rules.NewManager(&rules.ManagerOptions{
Appendable: fanoutStorage,
TSDB: localStorage,
QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage),
NotifyFunc: sendAlerts(notifierManager, cfg.web.ExternalURL.String()),
Context: ctxRule,
ExternalURL: cfg.web.ExternalURL,
Registerer: prometheus.DefaultRegisterer,
Logger: log.With(logger, "component", "rule manager"),
OutageTolerance: time.Duration(cfg.outageTolerance),
ForGracePeriod: time.Duration(cfg.forGracePeriod),
ResendDelay: time.Duration(cfg.resendDelay),
})
// rules/manager.go
// EngineQueryFunc returns a new query function that executes instant queries against
// the given engine.
// It converts scalar into vector results.
func EngineQueryFunc(engine *promql.Engine, q storage.Queryable) QueryFunc {return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {q, err := engine.NewInstantQuery(q, qs, t)
if err != nil {return nil, err}
res := q.Exec(ctx)
if res.Err != nil {return nil, res.Err}
switch v := res.Value.(type) {
case promql.Vector:
return v, nil
case promql.Scalar:
return promql.Vector{promql.Sample{Point: promql.Point(v),
Metric: labels.Labels{},}}, nil
default:
return nil, errors.New("rule result is not a vector or scalar")
}
}
}
参考
1. 告警断定与告警发现:https://segmentfault.com/a/11…
正文完