关于golang:alertmanager-源码分析一-告警的处理

10次阅读

共计 5627 个字符,预计需要花费 15 分钟才能阅读完成。

上篇说到告警曾经写入到内存构造中 AlertsProvider 中, 并且 Dispatcher 通过订阅 AlertsProvider 获取一个 chan,可能实时读到新写入的 alert

Dispatcher 的作用就是把 alert 分派给正确的 route 来解决告警,route 是一个树状构造,每个节点有 Matchers 用于判断和以后正在解决的 alert 是否匹配,匹配就对这个 alert 利用这个 route 上的规定,先看看 route 构造:

type Route struct {
    parent *Route
    Routes []*Route
    RouteOpts RouteOpts
    
// alert 就是由 alert.LabelSet 和 Route.Matchers 两个是否匹配来决定是否应用这个 Route 来解决
    Matchers labels.Matchers

    // 以后层呈现一个 route 匹配 alert 后,是否在同级上持续应用这个 alert 尝试匹配其它 route
    Continue bool
}
// RouteOpts 保留了跟以后 Route 绑定的一些规定,alert 匹配后就会应用这些规定解决
type RouteOpts struct {
    Receiver string

    GroupBy map[model.LabelName]struct{}

    GroupByAll bool

    GroupWait      time.Duration
    GroupInterval  time.Duration
    RepeatInterval time.Duration

    MuteTimeIntervals []string}

对树状构造数据,源码中应用了递归来遍历节点

// lset 就是传入的告警的 label 汇合
func (r *Route) Match(lset model.LabelSet) []*Route {

    // 以后节点匹配
    if !r.Matchers.Matches(lset) {return nil}
    var all []*Route

    for _, cr := range r.Routes {matches := cr.Match(lset) // 深度优先

        all = append(all, matches...)

        // 如果曾经匹配到同级的一个, 且 Continue=false 就完结以后层的匹配
        if matches != nil && !cr.Continue {break}
    }

    // 如果以后 route 的子 route.Routes 为空或者 route.Routes 中没有匹配 lset 时, 那么以后 route 就是匹配的
    if len(all) == 0 {all = append(all, r)
    }

    return all
}

找到了所有跟以后告警匹配的 route, 接下来就是解决告警了,看看 processAlert,

func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {

    // 获取以后 route 上用于分组的标签 kv, 计算一个组的 fp
    // 如果 route 的 groupBy 是 ["a", "b"], 那么 a=1,b="s" 的告警和 a=1,b="e" 的告警就会在一个 route 下的不同分组内
    groupLabels := getGroupLabels(alert, route)
    fp := groupLabels.Fingerprint()

    d.mtx.Lock()
    defer d.mtx.Unlock()
    
    // 没 routeGroups 就建一个, route 对应的是 routeGroups, fp 对应的是 aggrGroup
    routeGroups, ok := d.aggrGroupsPerRoute[route]
    if !ok {routeGroups = map[model.Fingerprint]*aggrGroup{}
        d.aggrGroupsPerRoute[route] = routeGroups
    }

    ag, ok := routeGroups[fp]
    if ok {
        // groupBy 中的 label 呈现了一个新的取值组合, 间接写入
        ag.insert(alert)
        return
    }

    ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger)
    routeGroups[fp] = ag
    d.aggrGroupsNum++
    d.metrics.aggrGroups.Inc()

    // 终于, 一个新的告警被 Dispatcher 分派到了一个 routeGroup 下的一个 aggrGroup 中
    ag.insert(alert)

    // 每个 aggrGroup 会有个独立的解决以后分组告警的 groutine, 这里传给 ag.run 的匿名函数最终会是真正解决每个告警的函数
    go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
        // 处理函数就是应用 Dispatcher 的 stage 来解决
        // stage 是在 Dispatcher 新建时编排的流水线, 外面都是 stage 接口类型的对象, stage 接口只有一个 Exec 办法
        _, _, err := d.stage.Exec(ctx, d.logger, alerts...)
        // ...
        return err == nil
    })
}

所以 processAlert 就是把告警分派给某个 route 下的某个 aggrGroup, 而每个 aggrGroup 会有个独立的 groutine 专门解决本人 aggrGroup 内的告警,在持续看每个 aggrGroup 是怎么解决本人组内的告警之前,先理解一下 aggrGroup 的构造

type aggrGroup struct {
    // 分组应用的 labels,比方下面例子中的 a=1,b="s"
    labels   model.LabelSet
    opts     *RouteOpts
    logger   log.Logger
    routeKey string

    // 这个构造和后面告警写入时存储的构造 AlertsProvider 底层应用的雷同
    // 实际上就是一个 mutex.Lock 守护的一个 map[fp]alert
    alerts  *store.Alerts

    // 退出构造, aggrGroup 启动后在逻辑上是属于 Dispatcher, 所以须要监听 Dispatcher 的退出
    // 这个 ctx 就是 Dispatcher 的 ctx
    ctx     context.Context
    cancel  func()
    done    chan struct{}

    // 这个 timer 新建时应用的周期就是 group_wait, 即一个分组的首次告警等待时间
    // 在这个组首次 flush 之后,会被置为 GroupInterval 即一个更长的期待周期
    next    *time.Timer
    timeout func(time.Duration) time.Duration

    mtx        sync.RWMutex
    hasFlushed bool
}
// 写入 aggrGroup 一个告警
func (ag *aggrGroup) insert(alert *types.Alert) {
    // 和 AlertsProvider 的 Put 时一样
    if err := ag.alerts.Set(alert); err != nil {level.Error(ag.logger).Log("msg", "error on set alert", "err", err)
    }

    ag.mtx.Lock()
    defer ag.mtx.Unlock()

    // 写入 aggrGroup 胜利后, 如果以后分组在新建后执行过 flush, 而且以后这个告警
会尝试触发一次 flush, 
    // flush 是每个 aggrGroup 用来解决本人分组内告警的函数, 稍后会说
    // 
    if !ag.hasFlushed && alert.StartsAt.Add(ag.opts.GroupWait).Before(time.Now()) {ag.next.Reset(0)
    }
}

接下来就是 aggrGroup 是如何解决本人组内的告警的

func (ag *aggrGroup) run(nf notifyFunc) {defer close(ag.done)
    defer ag.next.Stop()

    for {
        select {

        case now := <-ag.next.C:
            // 如果以后告警没有在 GroupInterval 内实现, 就立刻终止, 因为下个 flush 来了
            ctx, cancel := context.WithTimeout(ag.ctx, ag.timeout(ag.opts.GroupInterval))
            // flush 中应用的 now 都是以这个工夫为准
            ctx = notify.WithNow(ctx, now)

            // Populate context with information needed along the pipeline.
            ctx = notify.WithGroupKey(ctx, ag.GroupKey())
            ctx = notify.WithGroupLabels(ctx, ag.labels)
            ctx = notify.WithReceiverName(ctx, ag.opts.Receiver)
            ctx = notify.WithRepeatInterval(ctx, ag.opts.RepeatInterval)
            ctx = notify.WithMuteTimeIntervals(ctx, ag.opts.MuteTimeIntervals)

            ag.mtx.Lock()
            // ag.next 这个 timer 在 aggrGroup 新建的时候应用 group_await
            // 首次触发后, 就被重置为 GroupInterval, 个别这个更长一点
            ag.next.Reset(ag.opts.GroupInterval)
            ag.hasFlushed = true
            ag.mtx.Unlock()

            ag.flush(func(alerts ...*types.Alert) bool {return nf(ctx, alerts...)
            })

            cancel()

        case <-ag.ctx.Done():
            return
        }
    }
}

func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {if ag.empty() {return}
    var (alerts      = ag.alerts.List()
        alertsSlice = make(types.AlertSlice, 0, len(alerts))
        now         = time.Now())
    for _, alert := range alerts {
        a := *alert
        // 没解决的就把完结工夫置零, 不然后续流程如果很长, 可能就解决到一半 EndsAt 就到期了
        if !a.ResolvedAt(now) {a.EndsAt = time.Time{}
        }
        alertsSlice = append(alertsSlice, &a)
    }
    sort.Stable(alertsSlice)

    // 这个 notify(alertsSlice...) 就是音讯解决流水线入口, 
    // if body 中的逻辑就是跑完流水线并且没有谬误再执行的逻辑
    if notify(alertsSlice...) {
        for _, a := range alertsSlice {fp := a.Fingerprint()
            got, err := ag.alerts.Get(fp)
            if err != nil {
                // This should never happen.
                level.Error(ag.logger).Log("msg", "failed to get alert", "err", err, "alert", a.String())
                continue
            }
            // 从以后的 aggrGroup 中清理曾经解决的告警
            if a.Resolved() && got.UpdatedAt == a.UpdatedAt {if err := ag.alerts.Delete(fp); err != nil {level.Error(ag.logger).Log("msg", "error on delete alert", "err", err, "alert", a.String())
                }
            }
        }
    }
}

到这个地位告警曾经被解决掉了,实际上就是跑完流水线后告警的状态变了,aggrGroup 中清理告警只是清理告警的一个指针,还有其它中央的组件还存储者这个告警的指针,比方 AlertsProvider 中,不过那边就由 AlertsProvider 的 GC 函数解决了。

下面的流水线的入口函数被传递了很多层,我把它收集在一起就看的比较清楚:

// 最后是 aggrGroup 启动时定义, 传给 aggrGroup.run 函数, 实际上这个函数是一个闭包, 因为援用了 d.logger, d.stage
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {_, _, err := d.stage.Exec(ctx, d.logger, alerts...)
    return err == nil
})
// aggrGroup.run 对它进一步封装, 把 aggrGroup.run 中计算好的 ctx 传给这个函数, 减少了闭包中的信息
func (ag *aggrGroup) run(nf notifyFunc) {
    for {
        select {
        case now := <-ag.next.C:
            ag.flush(func(alerts ...*types.Alert) bool {return nf(ctx, alerts...)
            })
            cancel()}
    }
}
// 最终是在 flush 中应用本组的 alert 调用这个函数
func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {if notify(alertsSlice...) {}}

总结下来,就是为了调用 d.stage.Exec(ctx, d.logger, alerts...),应用了 Dispatcher.logger, aggrGroupctxalerts

正文完
 0