关于源码分析:alertmanager-源码分析一

29次阅读

共计 5760 个字符,预计需要花费 15 分钟才能阅读完成。

监控告警个别是作为一个整体,包含从采集数据、存储、展现、规定计算、告警音讯解决等等。Alertmanager(以下简称 am 了) 是一个告警音讯治理组件,包含音讯路由、静默、克制、去重等性能,总之其它负责规定计算的组件能够把音讯无脑发给 am, 由它来对音讯进行解决, 尽可能收回高质量的告警音讯。

先看个概览图,这个是我基于原开源库外面的架构图画的,原仓库中的架构图有很多跟理论源码出入的中央,所以这个图比原来的更丰盛,更精确。

这篇先说第一局部:告警的写入

告警的写入到最终被解决能够形象成生产 - 生产模型,生产侧就是 api 接管告警,生产侧就是图中的 dispatcher,两头的 provider.Alerts 作为缓冲区。

上面是写入时的逻辑,次要就是判断告警状态,am 的告警状态是由 alert.StartsAtalert.EndsAt 来判断,而后续还有很多须要这个属性的逻辑,所以这个地位须要把起止工夫确认。

func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...*types.Alert) {now := time.Now()
    
    api.mtx.RLock()
    resolveTimeout := time.Duration(api.config.Global.ResolveTimeout)
    api.mtx.RUnlock()

    // 确定一个告警音讯的起止工夫
    // 须要依据起止工夫来定义告警的状态, 如果止工夫在以后之前就是 Resolved
    for _, alert := range alerts {

        // 新收到的告警标记接管工夫, 这样如果有两个告警 label 统一, 能够判断出哪个是最新收到的
        alert.UpdatedAt = now

        // Ensure StartsAt is set.
        if alert.StartsAt.IsZero() {if alert.EndsAt.IsZero() {alert.StartsAt = now} else {alert.StartsAt = alert.EndsAt}
        }
        // 止工夫如果没有就须要应用 resolveTimeout 计算一个
        if alert.EndsAt.IsZero() {
            alert.Timeout = true
            alert.EndsAt = now.Add(resolveTimeout)
        }
        if alert.EndsAt.After(time.Now()) {api.m.Firing().Inc()} else {api.m.Resolved().Inc()}
    }

    // Make a best effort to insert all alerts that are valid.
    var (validAlerts    = make([]*types.Alert, 0, len(alerts))
        validationErrs = &types.MultiError{})

    // 校验 alert, 比方清理空值的 label, 起止工夫, 至多一个 label, label 中的 kv 命名规定 等等
    for _, a := range alerts {removeEmptyLabels(a.Labels)

        if err := a.Validate(); err != nil {validationErrs.Add(err)
            api.m.Invalid().Inc()
            continue
        }
        validAlerts = append(validAlerts, a)
    }
    // 写入 alertsProvider, 这一端相当于生产者
    if err := api.alerts.Put(validAlerts...); err != nil {
        api.respondError(w, apiError{
            typ: errorInternal,
            err: err,
        }, nil)
        return
    }
}

provider.Alerts 是一个interface

// 所有办法都要 groutine-safe
type Alerts interface {Subscribe() AlertIterator
    GetPending() AlertIterator
    Get(model.Fingerprint) (*types.Alert, error)
    Put(...*types.Alert) error
}

源码中给出了一个基于内存的实现,所以所有告警的接管都会先写入这个构造,其它过程都从这里获取本人须要的告警,前面会称这个基于内存的实现称为 AlertsProvider

// Alerts 治理构造, 就是架构图中的 Alerts 应用的构造
type Alerts struct {
    cancel context.CancelFunc

    mtx       sync.Mutex
    alerts    *store.Alerts                // 存储 map[fingerprint]*Alert
    listeners map[int]listeningAlerts    // 所有监听者
    next      int                        // 监听者计数

    callback AlertStoreCallback
    logger log.Logger
}

先看看 AlertsProviderPut 看看告警是如何写入的

func (a *Alerts) Put(alerts ...*types.Alert) error {
    for _, alert := range alerts {
        // 制作惟一 ID, 基于 LabelSets 中的 label name 和 label value
        fp := alert.Fingerprint()

        existing := false

        // 如果曾经存在雷同的 alert, 就是 labelSets 雷同
        if old, err := a.alerts.Get(fp); err == nil {
            existing = true

            // 新旧告警区间有重叠的, 合并, 依照肯定的策略应用较新的告警内容
            if (alert.EndsAt.After(old.StartsAt) && alert.EndsAt.Before(old.EndsAt)) ||
                (alert.StartsAt.After(old.StartsAt) && alert.StartsAt.Before(old.EndsAt)) {alert = old.Merge(alert)
            }
        }
        // 这个 Set 办法就把以后这个 alert 应用下面制作的 fp 写入 map[fp]*Alert
        if err := a.alerts.Set(alert); err != nil {level.Error(a.logger).Log("msg", "error on set alert", "err", err)
            continue
        }
        
        // 程序中其它模块会通过调用 Subscribe 来注册一个 listener 到 AlertsProvider
        // AlertsProvider 每次胜利存储一个 alert 都会对所有 listener 播送
        // 这个过程上锁保障所有的 listeners 收到统一的播送
        a.mtx.Lock()
        for _, l := range a.listeners {
            select {
            case l.alerts <- alert:
            case <-l.done:
            }
        }
        a.mtx.Unlock()}

    return nil
}

程序中其它局部 (dispatcher, Inhibitor) 都是通过调用 Subscribe 来监听新写入的告警音讯

func (a *Alerts) Subscribe() provider.AlertIterator {
    // groutine-safe
    a.mtx.Lock()
    defer a.mtx.Unlock()

    var (done   = make(chan struct{})
        alerts = a.alerts.List()                                               // 获取所有的 alerts
        ch     = make(chan *types.Alert, max(len(alerts), alertChannelLength)) // 创立一个 buffer chan, 保障容量要么亏损, 要么恰好
    )
    // 把调用时曾经存在的 alerts 写入一个 buffered chan
    // 实际上从 Alerts 开始接管到其它组件 subscribe 都是在程序启动期间就实现
    // 这两头接管到告警的可能性很小,即便接管到也不会很多
    for _, a := range alerts {ch <- a}

    // 为 AlertsProvider 新建一个 listener, 构造就是 buffered chan 和一个敞开信号 chan
    // 很显著 buffered chan 就是调用方获取告警的, 敞开信号 chan 就是调用方用来监听完结信号的
    // 应用 next 作为计数, 以后共有 next 个 listener
    a.listeners[a.next] = listeningAlerts{alerts: ch, done: done}
    a.next++
    
    // 这里把 buffered chan 和敞开信号 chan 从新打包成了 alertIterator 返回给调用方
    return provider.NewAlertIterator(ch, done, nil)
}

所以调用方就要应用 alertIterator 提供的一些办法了

type alertIterator struct {
    ch   <-chan *types.Alert
    done chan struct{}
    err  error
}
func (ai alertIterator) Next() <-chan *types.Alert { return ai.ch}
func (ai alertIterator) Err() error { return ai.err}
func (ai alertIterator) Close()     { close(ai.done) }

alertIterator 的实现相似于迭代器协定,让调用方应用 for 循环的形式来应用,先看一下Dispatcher 如何应用的

// 首先由 main.go 实例化 Dispatcher 并启动
go disp.Run()

// 其次, Dispatcher 中可导出的 Run 就从 AlertsProvider 通过调用 Subscribe 获取了一个 alertIterator
func (d *Dispatcher) Run() {d.done = make(chan struct{})

    d.mtx.Lock()
    d.aggrGroupsPerRoute = map[*Route]map[model.Fingerprint]*aggrGroup{}
    d.aggrGroupsNum = 0
    d.metrics.aggrGroups.Set(0)
    d.ctx, d.cancel = context.WithCancel(context.Background())
    d.mtx.Unlock()

    d.run(d.alerts.Subscribe())
    close(d.done)
}

// 最初, Dispatcher 的不可导出的 run, 
// 就是一个 fo-select 构造同时监听 AlertsProvider 中新收到的 alert, gc 信号和退出信号
func (d *Dispatcher) run(it provider.AlertIterator) {cleanup := time.NewTicker(30 * time.Second)
    defer cleanup.Stop()

    defer it.Close()

    for {
        select {// alertIterator 的 Next() 办法返回一个 chan, 也就是下面 AlertsProvider 中提供的 buffered chan
        case alert, ok := <-it.Next():
            // 接下来就是 Dispatcher 在接管到新的告警该如何解决的问题了
            if !ok {
                // Iterator exhausted for some reason.
                if err := it.Err(); err != nil {level.Error(d.logger).Log("msg", "Error on alert update", "err", err)
                }
                return
            }

            level.Debug(d.logger).Log("msg", "Received alert", "alert", alert)

            // Log errors but keep trying.
            if err := it.Err(); err != nil {level.Error(d.logger).Log("msg", "Error on alert update", "err", err)
                continue
            }
            
            // 从 Dispatcher 中找到哪些 router 跟这个 alert 匹配, 可能有多个 router 匹配
            // 应用每个 router 来解决这个 alert
            now := time.Now()
            for _, r := range d.route.Match(alert.Labels) {d.processAlert(alert, r)
            }
            d.metrics.processingDuration.Observe(time.Since(now).Seconds())

        case <-cleanup.C:
            // Dispatcher 会有个 gc 过程,次要清理一些不必的内存容器
            d.mtx.Lock()

            for _, groups := range d.aggrGroupsPerRoute {
                for _, ag := range groups {if ag.empty() {ag.stop()
                        delete(groups, ag.fingerprint())
                        d.aggrGroupsNum--
                        d.metrics.aggrGroups.Dec()}
                }
            }

            d.mtx.Unlock()

        case <-d.ctx.Done():
            return
        }
    }
}

综合 Put 中的播送过程、Subscribe 办法、alertIterator 的设计还有 Dispatcher 的监听,能够看出这个公布 - 订阅模式: 订阅者订阅时获取了一个 buffered chan, 同时这个 buffered chan 中曾经有了订阅之前发布者曾经有的音讯,这个 buffered chan 会被发布者记录到 listeners 中,订阅者监听本人订阅时返回的 buffered chan,发布者每次收到音讯把listeners 中所有的 buffered chan 都播送,这样每个订阅者都会收到音讯。

当初告警曾经被写入 AlertsProvider, 其它模块通过订阅可能监听到最新的告警,接下来就到了 Dispatcher.processAlert 看看告警是如何被解决的

正文完
 0