监控告警个别是作为一个整体,包含从采集数据、存储、展现、规定计算、告警音讯解决等等。 Alertmanager(以下简称 am 了) 是一个告警音讯治理组件,包含音讯路由、静默、克制、去重等性能,总之其它负责规定计算的组件能够把音讯无脑发给 am, 由它来对音讯进行解决, 尽可能收回高质量的告警音讯。

先看个概览图,这个是我基于原开源库外面的架构图画的,原仓库中的架构图有很多跟理论源码出入的中央,所以这个图比原来的更丰盛,更精确。

这篇先说第一局部:告警的写入

告警的写入到最终被解决能够形象成生产-生产模型,生产侧就是 api 接管告警,生产侧就是图中的 dispatcher,两头的 provider.Alerts 作为缓冲区。

上面是写入时的逻辑,次要就是判断告警状态,am 的告警状态是由 alert.StartsAtalert.EndsAt 来判断,而后续还有很多须要这个属性的逻辑,所以这个地位须要把起止工夫确认。

func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...*types.Alert) {    now := time.Now()        api.mtx.RLock()    resolveTimeout := time.Duration(api.config.Global.ResolveTimeout)    api.mtx.RUnlock()    // 确定一个告警音讯的起止工夫    // 须要依据起止工夫来定义告警的状态, 如果止工夫在以后之前就是 Resolved    for _, alert := range alerts {        // 新收到的告警标记接管工夫, 这样如果有两个告警 label 统一, 能够判断出哪个是最新收到的        alert.UpdatedAt = now        // Ensure StartsAt is set.        if alert.StartsAt.IsZero() {            if alert.EndsAt.IsZero() {                alert.StartsAt = now            } else {                alert.StartsAt = alert.EndsAt            }        }        // 止工夫如果没有就须要应用 resolveTimeout 计算一个        if alert.EndsAt.IsZero() {            alert.Timeout = true            alert.EndsAt = now.Add(resolveTimeout)        }        if alert.EndsAt.After(time.Now()) {            api.m.Firing().Inc()        } else {            api.m.Resolved().Inc()        }    }    // Make a best effort to insert all alerts that are valid.    var (        validAlerts    = make([]*types.Alert, 0, len(alerts))        validationErrs = &types.MultiError{}    )    // 校验alert, 比方清理空值的 label, 起止工夫, 至多一个label, label中的kv命名规定 等等    for _, a := range alerts {        removeEmptyLabels(a.Labels)        if err := a.Validate(); err != nil {            validationErrs.Add(err)            api.m.Invalid().Inc()            continue        }        validAlerts = append(validAlerts, a)    }    // 写入 alertsProvider, 这一端相当于生产者    if err := api.alerts.Put(validAlerts...); err != nil {        api.respondError(w, apiError{            typ: errorInternal,            err: err,        }, nil)        return    }}

provider.Alerts 是一个interface

// 所有办法都要 groutine-safetype Alerts interface {    Subscribe() AlertIterator    GetPending() AlertIterator    Get(model.Fingerprint) (*types.Alert, error)    Put(...*types.Alert) error}

源码中给出了一个基于内存的实现,所以所有告警的接管都会先写入这个构造,其它过程都从这里获取本人须要的告警,前面会称这个基于内存的实现称为 AlertsProvider

// Alerts 治理构造, 就是架构图中的 Alerts 应用的构造type Alerts struct {    cancel context.CancelFunc    mtx       sync.Mutex    alerts    *store.Alerts                // 存储 map[fingerprint]*Alert    listeners map[int]listeningAlerts    // 所有监听者    next      int                        // 监听者计数    callback AlertStoreCallback    logger log.Logger}

先看看 AlertsProviderPut 看看告警是如何写入的

func (a *Alerts) Put(alerts ...*types.Alert) error {    for _, alert := range alerts {        // 制作惟一ID, 基于 LabelSets 中的 label name 和 label value        fp := alert.Fingerprint()        existing := false        // 如果曾经存在雷同的alert, 就是labelSets雷同        if old, err := a.alerts.Get(fp); err == nil {            existing = true            // 新旧告警区间有重叠的, 合并, 依照肯定的策略应用较新的告警内容            if (alert.EndsAt.After(old.StartsAt) && alert.EndsAt.Before(old.EndsAt)) ||                (alert.StartsAt.After(old.StartsAt) && alert.StartsAt.Before(old.EndsAt)) {                alert = old.Merge(alert)            }        }        // 这个 Set 办法就把以后这个 alert 应用下面制作的 fp 写入 map[fp]*Alert        if err := a.alerts.Set(alert); err != nil {            level.Error(a.logger).Log("msg", "error on set alert", "err", err)            continue        }                // 程序中其它模块会通过调用 Subscribe 来注册一个 listener 到 AlertsProvider        // AlertsProvider 每次胜利存储一个 alert 都会对所有 listener 播送        // 这个过程上锁保障所有的 listeners 收到统一的播送        a.mtx.Lock()        for _, l := range a.listeners {            select {            case l.alerts <- alert:            case <-l.done:            }        }        a.mtx.Unlock()    }    return nil}

程序中其它局部(dispatcher, Inhibitor)都是通过调用 Subscribe 来监听新写入的告警音讯

func (a *Alerts) Subscribe() provider.AlertIterator {    // groutine-safe    a.mtx.Lock()    defer a.mtx.Unlock()    var (        done   = make(chan struct{})        alerts = a.alerts.List()                                               // 获取所有的alerts        ch     = make(chan *types.Alert, max(len(alerts), alertChannelLength)) // 创立一个 buffer chan, 保障容量要么亏损, 要么恰好    )    // 把调用时曾经存在的 alerts 写入一个 buffered chan    // 实际上从 Alerts 开始接管到其它组件subscribe都是在程序启动期间就实现    // 这两头接管到告警的可能性很小,即便接管到也不会很多    for _, a := range alerts {        ch <- a    }    // 为 AlertsProvider 新建一个 listener, 构造就是 buffered chan 和一个敞开信号 chan    // 很显著 buffered chan 就是调用方获取告警的, 敞开信号 chan 就是调用方用来监听完结信号的    // 应用 next 作为计数, 以后共有 next 个 listener    a.listeners[a.next] = listeningAlerts{alerts: ch, done: done}    a.next++        // 这里把 buffered chan 和敞开信号 chan 从新打包成了 alertIterator 返回给调用方    return provider.NewAlertIterator(ch, done, nil)}

所以调用方就要应用 alertIterator 提供的一些办法了

type alertIterator struct {    ch   <-chan *types.Alert    done chan struct{}    err  error}func (ai alertIterator) Next() <-chan *types.Alert { return ai.ch }func (ai alertIterator) Err() error { return ai.err }func (ai alertIterator) Close()     { close(ai.done) }

alertIterator 的实现相似于迭代器协定,让调用方应用 for 循环的形式来应用,先看一下Dispatcher 如何应用的

// 首先由 main.go 实例化 Dispatcher 并启动go disp.Run()// 其次, Dispatcher 中可导出的 Run 就从 AlertsProvider 通过调用 Subscribe 获取了一个 alertIteratorfunc (d *Dispatcher) Run() {    d.done = make(chan struct{})    d.mtx.Lock()    d.aggrGroupsPerRoute = map[*Route]map[model.Fingerprint]*aggrGroup{}    d.aggrGroupsNum = 0    d.metrics.aggrGroups.Set(0)    d.ctx, d.cancel = context.WithCancel(context.Background())    d.mtx.Unlock()    d.run(d.alerts.Subscribe())    close(d.done)}// 最初, Dispatcher 的不可导出的 run, // 就是一个 fo-select 构造同时监听 AlertsProvider 中新收到的 alert, gc 信号和退出信号func (d *Dispatcher) run(it provider.AlertIterator) {    cleanup := time.NewTicker(30 * time.Second)    defer cleanup.Stop()    defer it.Close()    for {        select {        // alertIterator 的 Next() 办法返回一个 chan, 也就是下面 AlertsProvider 中提供的 buffered chan        case alert, ok := <-it.Next():            // 接下来就是 Dispatcher 在接管到新的告警该如何解决的问题了            if !ok {                // Iterator exhausted for some reason.                if err := it.Err(); err != nil {                    level.Error(d.logger).Log("msg", "Error on alert update", "err", err)                }                return            }            level.Debug(d.logger).Log("msg", "Received alert", "alert", alert)            // Log errors but keep trying.            if err := it.Err(); err != nil {                level.Error(d.logger).Log("msg", "Error on alert update", "err", err)                continue            }                        // 从 Dispatcher 中找到哪些 router 跟这个 alert 匹配, 可能有多个 router 匹配            // 应用每个 router 来解决这个 alert            now := time.Now()            for _, r := range d.route.Match(alert.Labels) {                d.processAlert(alert, r)            }            d.metrics.processingDuration.Observe(time.Since(now).Seconds())        case <-cleanup.C:            // Dispatcher 会有个 gc 过程,次要清理一些不必的内存容器            d.mtx.Lock()            for _, groups := range d.aggrGroupsPerRoute {                for _, ag := range groups {                    if ag.empty() {                        ag.stop()                        delete(groups, ag.fingerprint())                        d.aggrGroupsNum--                        d.metrics.aggrGroups.Dec()                    }                }            }            d.mtx.Unlock()        case <-d.ctx.Done():            return        }    }}

综合 Put 中的播送过程、Subscribe 办法、alertIterator 的设计还有 Dispatcher 的监听,能够看出这个公布-订阅模式: 订阅者订阅时获取了一个 buffered chan, 同时这个 buffered chan 中曾经有了订阅之前发布者曾经有的音讯,这个 buffered chan 会被发布者记录到 listeners 中,订阅者监听本人订阅时返回的 buffered chan,发布者每次收到音讯把listeners 中所有的 buffered chan 都播送,这样每个订阅者都会收到音讯。

当初告警曾经被写入 AlertsProvider, 其它模块通过订阅可能监听到最新的告警,接下来就到了 Dispatcher.processAlert 看看告警是如何被解决的