关于prometheus:prometheus-告警判定与告警发送alertManager

prometheus配置告警表达式,定期检查是否触发阈值,若触发阈值,则通过HTTP发送告警给alertManager。

# cat node_alerts.yml
groups:
- name: cpu_alerts
  rules:
  - alert: CPUAlert
    expr: 100 - avg(irate(node_cpu_seconds_total{mode="idle"}[1m])) by (instance) * 100 > 60
    for: 3m
    labels:
      severity: warning
    annotations:
      summary: High Node CPU

prometheus告警状态

告警有三种状态:

  • 初始触发阈值,生成alert并置alert状态=pending;
  • 当该alert在pending维持肯定工夫(如for 3m),alert状态=Firing;
  • prometheus通过HTTP POST发送alert给alertManager;
  • 当alert不再触发阈值,则alert状态=Inactive;

prometheus告警断定

首先,用告警表达式查问最新的数据,若查问到后果,则阐明触发了阈值,生成新的alert对象。
而后,将本次触发阈值的alert与以后active的alert列表进行比对,依据不同的场景治理alert的生命周期。

治理alert生命周期的过程:

  • 原来没有alert,本次合乎阈值:

    • 新建一个alert对象,状态=pending,放入r.active数组;
  • 原来有alert=pending,本次合乎阈值:

    • 没有达到holdDuration:放弃不变;
    • 达到holdDuration:更新状态=Firing, 触发工夫fireAt=ts;
  • 原来有alert=pending,本次不合乎阈值:

    • 将其从r.active中删除;
    • alert状态批改为Inacvie,ResolvedAt=ts:
  • 原来有alert=firing,本次合乎阈值:

    • 放弃不变;
  • 原来有alert=firing,本次不合乎阈值:

    • alert状态=Inactive,ResolvedAt=ts;
    • 若resolveAt已过15min,那么将其从r.active中删除;(便于alertmanager判断其复原)
//rules/alerting.go
func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL) (promql.Vector, error) {
    res, err := query(ctx, r.vector.String(), ts)    //用告警表达式去查问数据,若查问到后果,则触发了阈值

    // 本次触发的告警lables.hash()汇合
    resultFPs := map[uint64]struct{}{}

    var vec promql.Vector
    var alerts = make(map[uint64]*Alert, len(res))
    for _, smpl := range res {
        ......
        lbs := lb.Labels()
        h := lbs.Hash()
        resultFPs[h] = struct{}{}
        alerts[h] = &Alert{       //生成告警对象
            Labels:      lbs,
            Annotations: annotations,
            ActiveAt:    ts,
            State:       StatePending,    //触发了pending
            Value:       smpl.V,
        }
    }

    for h, a := range alerts {
        // 曾经是active,更新value和annotations
        if alert, ok := r.active[h]; ok && alert.State != StateInactive {
            alert.Value = a.Value
            alert.Annotations = a.Annotations
            continue
        }

        r.active[h] = a        // []active寄存所有pending&Firing的告警
    }

    // Check if any pending alerts should be removed or fire now. Write out alert timeseries.
    for fp, a := range r.active {
        // 针对之前已有的告警,当初不再满足阈值了
        if _, ok := resultFPs[fp]; !ok {        
            // If the alert was previously firing, keep it around for a given
            // retention time so it is reported as resolved to the AlertManager.
            if a.State == StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > resolvedRetention) {    // resolvedRetention=15min
                delete(r.active, fp)
            }
            // 不再触发阈值了,将其state批改为Inactive,ResolvedAt有值
            if a.State != StateInactive {        
                a.State = StateInactive
                a.ResolvedAt = ts
            }
            continue
        }
        // 针对之前的pending,当初holdDuration已到,更新其状态为Firing
        if a.State == StatePending && ts.Sub(a.ActiveAt) >= r.holdDuration {    
            a.State = StateFiring
            a.FiredAt = ts
        }
    }
    return vec, nil
}

这里有个特地的中央:
原来Firing的告警,本次不再触发阈值,那么它将变成Inactive,同时alert.ResolvedAt=now()。
同时,该alert对象要等resolvedRetention(15min)后,才从r.actvie[]中删除,依照正文的说法:

// If the alert was previously firing, keep it around for a given
// retention time so it is reported as resolved to the AlertManager.

也就是持续保留resolvedRetention(15min),保留过程中alert是Inactive且复原的状态。
因为prometheus会发送r.active[]中的告警对象给alertManager,也就是说,Inactive后的15min内,prometheus会不停的向alertManager发送复原。

prometheus告警发送

发送前的判断

  • 仅发送r.active[]且alert.State!=Pending,即仅发送:Firing[告警]/Inactive[复原];
  • 发送前要管制发送的频率,1min发送一次;
//rules/alerting.go
func (r *AlertingRule) sendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) {
    alerts := []*Alert{}
    r.ForEachActiveAlert(func(alert *Alert) {
        if alert.needsSending(ts, resendDelay) {
            alert.LastSentAt = ts
            anew := *alert
            alerts = append(alerts, &anew)
        }
    })
    notifyFunc(ctx, r.vector.String(), alerts...)
}

仅发送r.active[]数组中的告警:

func (r *AlertingRule) ForEachActiveAlert(f func(*Alert)) {
    r.mtx.Lock()
    defer r.mtx.Unlock()
    //仅发送r.active[]数组中的告警
    for _, a := range r.active {
        f(a)
    }
}

仅发送Firing(告警)和Inactive(resolved复原)的告警,并且1min发送一次:

func (a *Alert) needsSending(ts time.Time, resendDelay time.Duration) bool {
    if a.State == StatePending {    // 不会发送Pending状态的alert
        return false
    }
    // if an alert has been resolved since the last send, resend it
    if a.ResolvedAt.After(a.LastSentAt) {
        return true
    }
    return a.LastSentAt.Add(resendDelay).Before(ts)    // resendDlay=1min,这里保障1min发送一次
}

发送的过程

  • 将以后所有告警json序列化生成payload(应用V1),发送给每个alertmanager;
  • 发送失败时,没有retry,仅记录日志和metrics计数;
//notifier/notifier.go
// sendAll sends the alerts to all configured Alertmanagers concurrently.
// It returns true if the alerts could be sent successfully to at least one Alertmanager.
func (n *Manager) sendAll(alerts ...*Alert) bool {
    begin := time.Now()
    //发往AlertManager的payload,目前应用V1
    var v1Payload, v2Payload []byte
    amSets := n.alertmanagers        ## 所有的alertmanagers对象
    
    for _, ams := range amSets {
        switch ams.cfg.APIVersion {
        case config.AlertmanagerAPIVersionV1:
            {
                if v1Payload == nil {
                    v1Payload, err = json.Marshal(alerts)    
                    ......
                }
                payload = v1Payload        // r.active[]序列化组成payload
            }
            ...
        }
        for _, am := range ams.ams {
            wg.Add(1)
            ctx, cancel := context.WithTimeout(n.ctx, time.Duration(ams.cfg.Timeout))
            defer cancel()
            go func(client *http.Client, url string) {
                if err := n.sendOne(ctx, client, url, payload); err != nil {    // sendOne负责具体发送
                    level.Error(n.logger).Log("alertmanager", url, "count", len(alerts), "msg", "Error sending alert", "err", err)
                    n.metrics.errors.WithLabelValues(url).Inc()
                }
                ......
                wg.Done()
            }(ams.client, am.url().String())
        }
    }
    wg.Wait()
    return numSuccess > 0
}

向alertManager发送POST /api/v1/alerts:

//notifier/notifier.go
// HTTP将alert[]发送到单个alertmanager
func (n *Manager) sendOne(ctx context.Context, c *http.Client, url string, b []byte) error {
    req, err := http.NewRequest("POST", url, bytes.NewReader(b))
    if err != nil {
        return err
    }
    req.Header.Set("User-Agent", userAgent)
    req.Header.Set("Content-Type", contentTypeJSON)
    resp, err := n.opts.Do(ctx, c, req)
    if err != nil {
        return err
    }
    defer func() {
        io.Copy(ioutil.Discard, resp.Body)
        resp.Body.Close()
    }()

    // Any HTTP status 2xx is OK.
    if resp.StatusCode/100 != 2 {
        return errors.Errorf("bad response status %s", resp.Status)
    }
    return nil
}

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理