共计 5233 个字符,预计需要花费 14 分钟才能阅读完成。
prometheus 配置告警表达式,定期检查是否触发阈值,若触发阈值,则通过 HTTP 发送告警给 alertManager。
# cat node_alerts.yml
groups:
- name: cpu_alerts
rules:
- alert: CPUAlert
expr: 100 - avg(irate(node_cpu_seconds_total{mode="idle"}[1m])) by (instance) * 100 > 60
for: 3m
labels:
severity: warning
annotations:
summary: High Node CPU
prometheus 告警状态
告警有三种状态:
- 初始触发阈值,生成 alert 并置 alert 状态 =pending;
- 当该 alert 在 pending 维持肯定工夫 (如 for 3m),alert 状态 =Firing;
- prometheus 通过 HTTP POST 发送 alert 给 alertManager;
- 当 alert 不再触发阈值,则 alert 状态 =Inactive;
prometheus 告警断定
首先,用告警表达式查问最新的数据,若查问到后果,则阐明触发了阈值,生成新的 alert 对象。
而后,将本次触发阈值的 alert 与以后 active 的 alert 列表进行比对,依据不同的场景治理 alert 的生命周期。
治理 alert 生命周期的过程:
-
原来没有 alert,本次合乎阈值:
- 新建一个 alert 对象,状态 =pending,放入 r.active 数组;
-
原来有 alert=pending,本次合乎阈值:
- 没有达到 holdDuration:放弃不变;
- 达到 holdDuration:更新状态 =Firing,触发工夫 fireAt=ts;
-
原来有 alert=pending,本次不合乎阈值:
- 将其从 r.active 中删除;
- alert 状态批改为 Inacvie,ResolvedAt=ts:
-
原来有 alert=firing,本次合乎阈值:
- 放弃不变;
-
原来有 alert=firing,本次不合乎阈值:
- alert 状态 =Inactive,ResolvedAt=ts;
- 若 resolveAt 已过 15min,那么将其从 r.active 中删除;(便于 alertmanager 判断其复原)
//rules/alerting.go
func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL) (promql.Vector, error) {res, err := query(ctx, r.vector.String(), ts) // 用告警表达式去查问数据,若查问到后果,则触发了阈值
// 本次触发的告警 lables.hash() 汇合
resultFPs := map[uint64]struct{}{}
var vec promql.Vector
var alerts = make(map[uint64]*Alert, len(res))
for _, smpl := range res {
......
lbs := lb.Labels()
h := lbs.Hash()
resultFPs[h] = struct{}{}
alerts[h] = &Alert{ // 生成告警对象
Labels: lbs,
Annotations: annotations,
ActiveAt: ts,
State: StatePending, // 触发了 pending
Value: smpl.V,
}
}
for h, a := range alerts {
// 曾经是 active,更新 value 和 annotations
if alert, ok := r.active[h]; ok && alert.State != StateInactive {
alert.Value = a.Value
alert.Annotations = a.Annotations
continue
}
r.active[h] = a // []active 寄存所有 pending&Firing 的告警}
// Check if any pending alerts should be removed or fire now. Write out alert timeseries.
for fp, a := range r.active {
// 针对之前已有的告警,当初不再满足阈值了
if _, ok := resultFPs[fp]; !ok {
// If the alert was previously firing, keep it around for a given
// retention time so it is reported as resolved to the AlertManager.
if a.State == StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > resolvedRetention) { // resolvedRetention=15min
delete(r.active, fp)
}
// 不再触发阈值了,将其 state 批改为 Inactive,ResolvedAt 有值
if a.State != StateInactive {
a.State = StateInactive
a.ResolvedAt = ts
}
continue
}
// 针对之前的 pending,当初 holdDuration 已到,更新其状态为 Firing
if a.State == StatePending && ts.Sub(a.ActiveAt) >= r.holdDuration {
a.State = StateFiring
a.FiredAt = ts
}
}
return vec, nil
}
这里有个特地的中央:
原来 Firing 的告警,本次不再触发阈值,那么它将变成 Inactive,同时 alert.ResolvedAt=now()。
同时,该 alert 对象要等 resolvedRetention(15min) 后,才从 r.actvie[] 中删除,依照正文的说法:
// If the alert was previously firing, keep it around for a given
// retention time so it is reported as resolved to the AlertManager.
也就是持续保留 resolvedRetention(15min),保留过程中 alert 是 Inactive 且复原的状态。
因为 prometheus 会发送 r.active[] 中的告警对象给 alertManager,也就是说,Inactive 后的 15min 内,prometheus 会不停的向 alertManager 发送复原。
prometheus 告警发送
发送前的判断
- 仅发送 r.active[] 且 alert.State!=Pending,即仅发送:Firing[ 告警]/Inactive[复原];
- 发送前要管制发送的频率,1min 发送一次;
//rules/alerting.go
func (r *AlertingRule) sendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) {alerts := []*Alert{}
r.ForEachActiveAlert(func(alert *Alert) {if alert.needsSending(ts, resendDelay) {
alert.LastSentAt = ts
anew := *alert
alerts = append(alerts, &anew)
}
})
notifyFunc(ctx, r.vector.String(), alerts...)
}
仅发送 r.active[] 数组中的告警:
func (r *AlertingRule) ForEachActiveAlert(f func(*Alert)) {r.mtx.Lock()
defer r.mtx.Unlock()
// 仅发送 r.active[] 数组中的告警
for _, a := range r.active {f(a)
}
}
仅发送 Firing(告警) 和 Inactive(resolved 复原) 的告警,并且 1min 发送一次:
func (a *Alert) needsSending(ts time.Time, resendDelay time.Duration) bool {
if a.State == StatePending { // 不会发送 Pending 状态的 alert
return false
}
// if an alert has been resolved since the last send, resend it
if a.ResolvedAt.After(a.LastSentAt) {return true}
return a.LastSentAt.Add(resendDelay).Before(ts) // resendDlay=1min,这里保障 1min 发送一次
}
发送的过程
- 将以后所有告警 json 序列化生成 payload(应用 V1),发送给每个 alertmanager;
- 发送失败时,没有 retry,仅记录日志和 metrics 计数;
//notifier/notifier.go
// sendAll sends the alerts to all configured Alertmanagers concurrently.
// It returns true if the alerts could be sent successfully to at least one Alertmanager.
func (n *Manager) sendAll(alerts ...*Alert) bool {begin := time.Now()
// 发往 AlertManager 的 payload,目前应用 V1
var v1Payload, v2Payload []byte
amSets := n.alertmanagers ## 所有的 alertmanagers 对象
for _, ams := range amSets {
switch ams.cfg.APIVersion {
case config.AlertmanagerAPIVersionV1:
{
if v1Payload == nil {v1Payload, err = json.Marshal(alerts)
......
}
payload = v1Payload // r.active[] 序列化组成 payload}
...
}
for _, am := range ams.ams {wg.Add(1)
ctx, cancel := context.WithTimeout(n.ctx, time.Duration(ams.cfg.Timeout))
defer cancel()
go func(client *http.Client, url string) {if err := n.sendOne(ctx, client, url, payload); err != nil { // sendOne 负责具体发送
level.Error(n.logger).Log("alertmanager", url, "count", len(alerts), "msg", "Error sending alert", "err", err)
n.metrics.errors.WithLabelValues(url).Inc()}
......
wg.Done()}(ams.client, am.url().String())
}
}
wg.Wait()
return numSuccess > 0
}
向 alertManager 发送 POST /api/v1/alerts:
//notifier/notifier.go
// HTTP 将 alert[] 发送到单个 alertmanager
func (n *Manager) sendOne(ctx context.Context, c *http.Client, url string, b []byte) error {req, err := http.NewRequest("POST", url, bytes.NewReader(b))
if err != nil {return err}
req.Header.Set("User-Agent", userAgent)
req.Header.Set("Content-Type", contentTypeJSON)
resp, err := n.opts.Do(ctx, c, req)
if err != nil {return err}
defer func() {io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()}()
// Any HTTP status 2xx is OK.
if resp.StatusCode/100 != 2 {return errors.Errorf("bad response status %s", resp.Status)
}
return nil
}