关于golang:alertmanager-源码分析一-告警的处理

上篇说到告警曾经写入到内存构造中 AlertsProvider 中,并且 Dispatcher 通过订阅 AlertsProvider 获取一个 chan,可能实时读到新写入的 alert

Dispatcher 的作用就是把 alert 分派给正确的 route 来解决告警,route 是一个树状构造,每个节点有 Matchers 用于判断和以后正在解决的 alert 是否匹配,匹配就对这个 alert 利用这个 route 上的规定,先看看 route 构造:

type Route struct {
    parent *Route
    Routes []*Route
    RouteOpts RouteOpts
    
// alert 就是由 alert.LabelSet 和 Route.Matchers 两个是否匹配来决定是否应用这个 Route 来解决
    Matchers labels.Matchers

    // 以后层呈现一个 route 匹配 alert 后,是否在同级上持续应用这个 alert 尝试匹配其它 route
    Continue bool
}
// RouteOpts 保留了跟以后 Route 绑定的一些规定,alert 匹配后就会应用这些规定解决
type RouteOpts struct {
    Receiver string

    GroupBy map[model.LabelName]struct{}

    GroupByAll bool

    GroupWait      time.Duration
    GroupInterval  time.Duration
    RepeatInterval time.Duration

    MuteTimeIntervals []string
}

对树状构造数据,源码中应用了递归来遍历节点

// lset 就是传入的告警的 label 汇合
func (r *Route) Match(lset model.LabelSet) []*Route {

    // 以后节点匹配
    if !r.Matchers.Matches(lset) {
        return nil
    }
    var all []*Route

    for _, cr := range r.Routes {
        matches := cr.Match(lset) // 深度优先

        all = append(all, matches...)

        // 如果曾经匹配到同级的一个, 且 Continue=false 就完结以后层的匹配
        if matches != nil && !cr.Continue {
            break
        }
    }

    // 如果以后 route 的子 route.Routes 为空或者 route.Routes 中没有匹配 lset 时, 那么以后 route 就是匹配的
    if len(all) == 0 {
        all = append(all, r)
    }

    return all
}

找到了所有跟以后告警匹配的 route, 接下来就是解决告警了,看看 processAlert,

func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {

    // 获取以后 route 上用于分组的标签 kv, 计算一个组的 fp
    // 如果 route 的 groupBy 是 ["a", "b"], 那么 a=1,b="s" 的告警和 a=1,b="e" 的告警就会在一个 route 下的不同分组内
    groupLabels := getGroupLabels(alert, route)
    fp := groupLabels.Fingerprint()

    d.mtx.Lock()
    defer d.mtx.Unlock()
    
    // 没 routeGroups 就建一个, route 对应的是 routeGroups, fp 对应的是 aggrGroup
    routeGroups, ok := d.aggrGroupsPerRoute[route]
    if !ok {
        routeGroups = map[model.Fingerprint]*aggrGroup{}
        d.aggrGroupsPerRoute[route] = routeGroups
    }

    ag, ok := routeGroups[fp]
    if ok {
        // groupBy 中的 label 呈现了一个新的取值组合, 间接写入
        ag.insert(alert)
        return
    }

    ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger)
    routeGroups[fp] = ag
    d.aggrGroupsNum++
    d.metrics.aggrGroups.Inc()

    // 终于, 一个新的告警被 Dispatcher 分派到了一个 routeGroup 下的一个 aggrGroup 中
    ag.insert(alert)

    // 每个 aggrGroup 会有个独立的解决以后分组告警的 groutine, 这里传给 ag.run 的匿名函数最终会是真正解决每个告警的函数
    go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
        // 处理函数就是应用 Dispatcher 的 stage 来解决
        // stage 是在 Dispatcher 新建时编排的流水线, 外面都是 stage 接口类型的对象, stage 接口只有一个 Exec 办法
        _, _, err := d.stage.Exec(ctx, d.logger, alerts...)
        // ...
        return err == nil
    })
}

所以 processAlert 就是把告警分派给某个 route 下的某个 aggrGroup, 而每个 aggrGroup 会有个独立的 groutine 专门解决本人 aggrGroup 内的告警,在持续看每个 aggrGroup 是怎么解决本人组内的告警之前,先理解一下 aggrGroup 的构造

type aggrGroup struct {
    // 分组应用的 labels,比方下面例子中的 a=1,b="s"
    labels   model.LabelSet
    opts     *RouteOpts
    logger   log.Logger
    routeKey string

    // 这个构造和后面告警写入时存储的构造 AlertsProvider 底层应用的雷同
    // 实际上就是一个 mutex.Lock 守护的一个 map[fp]alert
    alerts  *store.Alerts

    // 退出构造, aggrGroup 启动后在逻辑上是属于 Dispatcher, 所以须要监听 Dispatcher 的退出
    // 这个 ctx 就是 Dispatcher 的 ctx
    ctx     context.Context
    cancel  func()
    done    chan struct{}

    // 这个 timer 新建时应用的周期就是 group_wait, 即一个分组的首次告警等待时间
    // 在这个组首次 flush 之后,会被置为 GroupInterval 即一个更长的期待周期
    next    *time.Timer
    timeout func(time.Duration) time.Duration

    mtx        sync.RWMutex
    hasFlushed bool
}
// 写入 aggrGroup 一个告警
func (ag *aggrGroup) insert(alert *types.Alert) {
    // 和 AlertsProvider 的 Put 时一样
    if err := ag.alerts.Set(alert); err != nil {
        level.Error(ag.logger).Log("msg", "error on set alert", "err", err)
    }

    ag.mtx.Lock()
    defer ag.mtx.Unlock()

    // 写入 aggrGroup 胜利后, 如果以后分组在新建后执行过 flush, 而且以后这个告警
会尝试触发一次 flush, 
    // flush 是每个 aggrGroup 用来解决本人分组内告警的函数, 稍后会说
    // 
    if !ag.hasFlushed && alert.StartsAt.Add(ag.opts.GroupWait).Before(time.Now()) {
        ag.next.Reset(0)
    }
}

接下来就是 aggrGroup 是如何解决本人组内的告警的

func (ag *aggrGroup) run(nf notifyFunc) {
    defer close(ag.done)
    defer ag.next.Stop()

    for {
        select {

        case now := <-ag.next.C:
            // 如果以后告警没有在 GroupInterval 内实现, 就立刻终止, 因为下个 flush 来了
            ctx, cancel := context.WithTimeout(ag.ctx, ag.timeout(ag.opts.GroupInterval))
            // flush 中应用的 now 都是以这个工夫为准
            ctx = notify.WithNow(ctx, now)

            // Populate context with information needed along the pipeline.
            ctx = notify.WithGroupKey(ctx, ag.GroupKey())
            ctx = notify.WithGroupLabels(ctx, ag.labels)
            ctx = notify.WithReceiverName(ctx, ag.opts.Receiver)
            ctx = notify.WithRepeatInterval(ctx, ag.opts.RepeatInterval)
            ctx = notify.WithMuteTimeIntervals(ctx, ag.opts.MuteTimeIntervals)

            ag.mtx.Lock()
            // ag.next 这个 timer 在 aggrGroup 新建的时候应用 group_await
            // 首次触发后, 就被重置为 GroupInterval, 个别这个更长一点
            ag.next.Reset(ag.opts.GroupInterval)
            ag.hasFlushed = true
            ag.mtx.Unlock()

            ag.flush(func(alerts ...*types.Alert) bool {
                return nf(ctx, alerts...)
            })

            cancel()

        case <-ag.ctx.Done():
            return
        }
    }
}

func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {
    if ag.empty() { return }
    var (
        alerts      = ag.alerts.List()
        alertsSlice = make(types.AlertSlice, 0, len(alerts))
        now         = time.Now()
    )
    for _, alert := range alerts {
        a := *alert
        // 没解决的就把完结工夫置零, 不然后续流程如果很长, 可能就解决到一半 EndsAt 就到期了
        if !a.ResolvedAt(now) {
            a.EndsAt = time.Time{}
        }
        alertsSlice = append(alertsSlice, &a)
    }
    sort.Stable(alertsSlice)

    // 这个 notify(alertsSlice...) 就是音讯解决流水线入口, 
    // if body 中的逻辑就是跑完流水线并且没有谬误再执行的逻辑
    if notify(alertsSlice...) {
        for _, a := range alertsSlice {
            fp := a.Fingerprint()
            got, err := ag.alerts.Get(fp)
            if err != nil {
                // This should never happen.
                level.Error(ag.logger).Log("msg", "failed to get alert", "err", err, "alert", a.String())
                continue
            }
            // 从以后的 aggrGroup 中清理曾经解决的告警
            if a.Resolved() && got.UpdatedAt == a.UpdatedAt {
                if err := ag.alerts.Delete(fp); err != nil {
                    level.Error(ag.logger).Log("msg", "error on delete alert", "err", err, "alert", a.String())
                }
            }
        }
    }
}

到这个地位告警曾经被解决掉了,实际上就是跑完流水线后告警的状态变了,aggrGroup 中清理告警只是清理告警的一个指针,还有其它中央的组件还存储者这个告警的指针,比方 AlertsProvider 中,不过那边就由 AlertsProvider 的 GC 函数解决了。

下面的流水线的入口函数被传递了很多层,我把它收集在一起就看的比较清楚:

// 最后是 aggrGroup 启动时定义, 传给 aggrGroup.run 函数, 实际上这个函数是一个闭包, 因为援用了 d.logger, d.stage
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
    _, _, err := d.stage.Exec(ctx, d.logger, alerts...)
    return err == nil
})
// aggrGroup.run 对它进一步封装, 把 aggrGroup.run 中计算好的 ctx 传给这个函数, 减少了闭包中的信息
func (ag *aggrGroup) run(nf notifyFunc) {
    for {
        select {
        case now := <-ag.next.C:
            ag.flush(func(alerts ...*types.Alert) bool {
                return nf(ctx, alerts...)
            })
            cancel()
        }
    }
}
// 最终是在 flush 中应用本组的 alert 调用这个函数
func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {
    if notify(alertsSlice...) {}
}

总结下来,就是为了调用 d.stage.Exec(ctx, d.logger, alerts...),应用了 Dispatcher.logger, aggrGroupctxalerts

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理