乐趣区

关于prometheus:prometheus-告警判定与告警发送alertManager

prometheus 配置告警表达式,定期检查是否触发阈值,若触发阈值,则通过 HTTP 发送告警给 alertManager。

# cat node_alerts.yml
groups:
- name: cpu_alerts
  rules:
  - alert: CPUAlert
    expr: 100 - avg(irate(node_cpu_seconds_total{mode="idle"}[1m])) by (instance) * 100 > 60
    for: 3m
    labels:
      severity: warning
    annotations:
      summary: High Node CPU

prometheus 告警状态

告警有三种状态:

  • 初始触发阈值,生成 alert 并置 alert 状态 =pending;
  • 当该 alert 在 pending 维持肯定工夫 (如 for 3m),alert 状态 =Firing;
  • prometheus 通过 HTTP POST 发送 alert 给 alertManager;
  • 当 alert 不再触发阈值,则 alert 状态 =Inactive;

prometheus 告警断定

首先,用告警表达式查问最新的数据,若查问到后果,则阐明触发了阈值,生成新的 alert 对象。
而后,将本次触发阈值的 alert 与以后 active 的 alert 列表进行比对,依据不同的场景治理 alert 的生命周期。

治理 alert 生命周期的过程:

  • 原来没有 alert,本次合乎阈值:

    • 新建一个 alert 对象,状态 =pending,放入 r.active 数组;
  • 原来有 alert=pending,本次合乎阈值:

    • 没有达到 holdDuration:放弃不变;
    • 达到 holdDuration:更新状态 =Firing,触发工夫 fireAt=ts;
  • 原来有 alert=pending,本次不合乎阈值:

    • 将其从 r.active 中删除;
    • alert 状态批改为 Inacvie,ResolvedAt=ts:
  • 原来有 alert=firing,本次合乎阈值:

    • 放弃不变;
  • 原来有 alert=firing,本次不合乎阈值:

    • alert 状态 =Inactive,ResolvedAt=ts;
    • 若 resolveAt 已过 15min,那么将其从 r.active 中删除;(便于 alertmanager 判断其复原)
//rules/alerting.go
func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL) (promql.Vector, error) {res, err := query(ctx, r.vector.String(), ts)    // 用告警表达式去查问数据,若查问到后果,则触发了阈值

    // 本次触发的告警 lables.hash() 汇合
    resultFPs := map[uint64]struct{}{}

    var vec promql.Vector
    var alerts = make(map[uint64]*Alert, len(res))
    for _, smpl := range res {
        ......
        lbs := lb.Labels()
        h := lbs.Hash()
        resultFPs[h] = struct{}{}
        alerts[h] = &Alert{       // 生成告警对象
            Labels:      lbs,
            Annotations: annotations,
            ActiveAt:    ts,
            State:       StatePending,    // 触发了 pending
            Value:       smpl.V,
        }
    }

    for h, a := range alerts {
        // 曾经是 active,更新 value 和 annotations
        if alert, ok := r.active[h]; ok && alert.State != StateInactive {
            alert.Value = a.Value
            alert.Annotations = a.Annotations
            continue
        }

        r.active[h] = a        // []active 寄存所有 pending&Firing 的告警}

    // Check if any pending alerts should be removed or fire now. Write out alert timeseries.
    for fp, a := range r.active {
        // 针对之前已有的告警,当初不再满足阈值了
        if _, ok := resultFPs[fp]; !ok {        
            // If the alert was previously firing, keep it around for a given
            // retention time so it is reported as resolved to the AlertManager.
            if a.State == StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > resolvedRetention) {    // resolvedRetention=15min
                delete(r.active, fp)
            }
            // 不再触发阈值了,将其 state 批改为 Inactive,ResolvedAt 有值
            if a.State != StateInactive {        
                a.State = StateInactive
                a.ResolvedAt = ts
            }
            continue
        }
        // 针对之前的 pending,当初 holdDuration 已到,更新其状态为 Firing
        if a.State == StatePending && ts.Sub(a.ActiveAt) >= r.holdDuration {    
            a.State = StateFiring
            a.FiredAt = ts
        }
    }
    return vec, nil
}

这里有个特地的中央:
原来 Firing 的告警,本次不再触发阈值,那么它将变成 Inactive,同时 alert.ResolvedAt=now()。
同时,该 alert 对象要等 resolvedRetention(15min) 后,才从 r.actvie[] 中删除,依照正文的说法:

// If the alert was previously firing, keep it around for a given
// retention time so it is reported as resolved to the AlertManager.

也就是持续保留 resolvedRetention(15min),保留过程中 alert 是 Inactive 且复原的状态。
因为 prometheus 会发送 r.active[] 中的告警对象给 alertManager,也就是说,Inactive 后的 15min 内,prometheus 会不停的向 alertManager 发送复原。

prometheus 告警发送

发送前的判断

  • 仅发送 r.active[] 且 alert.State!=Pending,即仅发送:Firing[ 告警]/Inactive[复原];
  • 发送前要管制发送的频率,1min 发送一次;
//rules/alerting.go
func (r *AlertingRule) sendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) {alerts := []*Alert{}
    r.ForEachActiveAlert(func(alert *Alert) {if alert.needsSending(ts, resendDelay) {
            alert.LastSentAt = ts
            anew := *alert
            alerts = append(alerts, &anew)
        }
    })
    notifyFunc(ctx, r.vector.String(), alerts...)
}

仅发送 r.active[] 数组中的告警:

func (r *AlertingRule) ForEachActiveAlert(f func(*Alert)) {r.mtx.Lock()
    defer r.mtx.Unlock()
    // 仅发送 r.active[] 数组中的告警
    for _, a := range r.active {f(a)
    }
}

仅发送 Firing(告警) 和 Inactive(resolved 复原) 的告警,并且 1min 发送一次:

func (a *Alert) needsSending(ts time.Time, resendDelay time.Duration) bool {
    if a.State == StatePending {    // 不会发送 Pending 状态的 alert
        return false
    }
    // if an alert has been resolved since the last send, resend it
    if a.ResolvedAt.After(a.LastSentAt) {return true}
    return a.LastSentAt.Add(resendDelay).Before(ts)    // resendDlay=1min,这里保障 1min 发送一次
}

发送的过程

  • 将以后所有告警 json 序列化生成 payload(应用 V1),发送给每个 alertmanager;
  • 发送失败时,没有 retry,仅记录日志和 metrics 计数;
//notifier/notifier.go
// sendAll sends the alerts to all configured Alertmanagers concurrently.
// It returns true if the alerts could be sent successfully to at least one Alertmanager.
func (n *Manager) sendAll(alerts ...*Alert) bool {begin := time.Now()
    // 发往 AlertManager 的 payload,目前应用 V1
    var v1Payload, v2Payload []byte
    amSets := n.alertmanagers        ## 所有的 alertmanagers 对象
    
    for _, ams := range amSets {
        switch ams.cfg.APIVersion {
        case config.AlertmanagerAPIVersionV1:
            {
                if v1Payload == nil {v1Payload, err = json.Marshal(alerts)    
                    ......
                }
                payload = v1Payload        // r.active[] 序列化组成 payload}
            ...
        }
        for _, am := range ams.ams {wg.Add(1)
            ctx, cancel := context.WithTimeout(n.ctx, time.Duration(ams.cfg.Timeout))
            defer cancel()
            go func(client *http.Client, url string) {if err := n.sendOne(ctx, client, url, payload); err != nil {    // sendOne 负责具体发送
                    level.Error(n.logger).Log("alertmanager", url, "count", len(alerts), "msg", "Error sending alert", "err", err)
                    n.metrics.errors.WithLabelValues(url).Inc()}
                ......
                wg.Done()}(ams.client, am.url().String())
        }
    }
    wg.Wait()
    return numSuccess > 0
}

向 alertManager 发送 POST /api/v1/alerts:

//notifier/notifier.go
// HTTP 将 alert[] 发送到单个 alertmanager
func (n *Manager) sendOne(ctx context.Context, c *http.Client, url string, b []byte) error {req, err := http.NewRequest("POST", url, bytes.NewReader(b))
    if err != nil {return err}
    req.Header.Set("User-Agent", userAgent)
    req.Header.Set("Content-Type", contentTypeJSON)
    resp, err := n.opts.Do(ctx, c, req)
    if err != nil {return err}
    defer func() {io.Copy(ioutil.Discard, resp.Body)
        resp.Body.Close()}()

    // Any HTTP status 2xx is OK.
    if resp.StatusCode/100 != 2 {return errors.Errorf("bad response status %s", resp.Status)
    }
    return nil
}
退出移动版