共计 5760 个字符,预计需要花费 15 分钟才能阅读完成。
监控告警个别是作为一个整体,包含从采集数据、存储、展现、规定计算、告警音讯解决等等。Alertmanager(以下简称 am 了) 是一个告警音讯治理组件,包含音讯路由、静默、克制、去重等性能,总之其它负责规定计算的组件能够把音讯无脑发给 am, 由它来对音讯进行解决, 尽可能收回高质量的告警音讯。
先看个概览图,这个是我基于原开源库外面的架构图画的,原仓库中的架构图有很多跟理论源码出入的中央,所以这个图比原来的更丰盛,更精确。
这篇先说第一局部:告警的写入
告警的写入到最终被解决能够形象成生产 - 生产模型,生产侧就是 api
接管告警,生产侧就是图中的 dispatcher
,两头的 provider.Alerts
作为缓冲区。
上面是写入时的逻辑,次要就是判断告警状态,am 的告警状态是由 alert.StartsAt
和 alert.EndsAt
来判断,而后续还有很多须要这个属性的逻辑,所以这个地位须要把起止工夫确认。
func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...*types.Alert) {now := time.Now()
api.mtx.RLock()
resolveTimeout := time.Duration(api.config.Global.ResolveTimeout)
api.mtx.RUnlock()
// 确定一个告警音讯的起止工夫
// 须要依据起止工夫来定义告警的状态, 如果止工夫在以后之前就是 Resolved
for _, alert := range alerts {
// 新收到的告警标记接管工夫, 这样如果有两个告警 label 统一, 能够判断出哪个是最新收到的
alert.UpdatedAt = now
// Ensure StartsAt is set.
if alert.StartsAt.IsZero() {if alert.EndsAt.IsZero() {alert.StartsAt = now} else {alert.StartsAt = alert.EndsAt}
}
// 止工夫如果没有就须要应用 resolveTimeout 计算一个
if alert.EndsAt.IsZero() {
alert.Timeout = true
alert.EndsAt = now.Add(resolveTimeout)
}
if alert.EndsAt.After(time.Now()) {api.m.Firing().Inc()} else {api.m.Resolved().Inc()}
}
// Make a best effort to insert all alerts that are valid.
var (validAlerts = make([]*types.Alert, 0, len(alerts))
validationErrs = &types.MultiError{})
// 校验 alert, 比方清理空值的 label, 起止工夫, 至多一个 label, label 中的 kv 命名规定 等等
for _, a := range alerts {removeEmptyLabels(a.Labels)
if err := a.Validate(); err != nil {validationErrs.Add(err)
api.m.Invalid().Inc()
continue
}
validAlerts = append(validAlerts, a)
}
// 写入 alertsProvider, 这一端相当于生产者
if err := api.alerts.Put(validAlerts...); err != nil {
api.respondError(w, apiError{
typ: errorInternal,
err: err,
}, nil)
return
}
}
而 provider.Alerts
是一个interface
// 所有办法都要 groutine-safe
type Alerts interface {Subscribe() AlertIterator
GetPending() AlertIterator
Get(model.Fingerprint) (*types.Alert, error)
Put(...*types.Alert) error
}
源码中给出了一个基于内存的实现,所以所有告警的接管都会先写入这个构造,其它过程都从这里获取本人须要的告警,前面会称这个基于内存的实现称为 AlertsProvider
// Alerts 治理构造, 就是架构图中的 Alerts 应用的构造
type Alerts struct {
cancel context.CancelFunc
mtx sync.Mutex
alerts *store.Alerts // 存储 map[fingerprint]*Alert
listeners map[int]listeningAlerts // 所有监听者
next int // 监听者计数
callback AlertStoreCallback
logger log.Logger
}
先看看 AlertsProvider
的 Put
看看告警是如何写入的
func (a *Alerts) Put(alerts ...*types.Alert) error {
for _, alert := range alerts {
// 制作惟一 ID, 基于 LabelSets 中的 label name 和 label value
fp := alert.Fingerprint()
existing := false
// 如果曾经存在雷同的 alert, 就是 labelSets 雷同
if old, err := a.alerts.Get(fp); err == nil {
existing = true
// 新旧告警区间有重叠的, 合并, 依照肯定的策略应用较新的告警内容
if (alert.EndsAt.After(old.StartsAt) && alert.EndsAt.Before(old.EndsAt)) ||
(alert.StartsAt.After(old.StartsAt) && alert.StartsAt.Before(old.EndsAt)) {alert = old.Merge(alert)
}
}
// 这个 Set 办法就把以后这个 alert 应用下面制作的 fp 写入 map[fp]*Alert
if err := a.alerts.Set(alert); err != nil {level.Error(a.logger).Log("msg", "error on set alert", "err", err)
continue
}
// 程序中其它模块会通过调用 Subscribe 来注册一个 listener 到 AlertsProvider
// AlertsProvider 每次胜利存储一个 alert 都会对所有 listener 播送
// 这个过程上锁保障所有的 listeners 收到统一的播送
a.mtx.Lock()
for _, l := range a.listeners {
select {
case l.alerts <- alert:
case <-l.done:
}
}
a.mtx.Unlock()}
return nil
}
程序中其它局部 (dispatcher, Inhibitor
) 都是通过调用 Subscribe
来监听新写入的告警音讯
func (a *Alerts) Subscribe() provider.AlertIterator {
// groutine-safe
a.mtx.Lock()
defer a.mtx.Unlock()
var (done = make(chan struct{})
alerts = a.alerts.List() // 获取所有的 alerts
ch = make(chan *types.Alert, max(len(alerts), alertChannelLength)) // 创立一个 buffer chan, 保障容量要么亏损, 要么恰好
)
// 把调用时曾经存在的 alerts 写入一个 buffered chan
// 实际上从 Alerts 开始接管到其它组件 subscribe 都是在程序启动期间就实现
// 这两头接管到告警的可能性很小,即便接管到也不会很多
for _, a := range alerts {ch <- a}
// 为 AlertsProvider 新建一个 listener, 构造就是 buffered chan 和一个敞开信号 chan
// 很显著 buffered chan 就是调用方获取告警的, 敞开信号 chan 就是调用方用来监听完结信号的
// 应用 next 作为计数, 以后共有 next 个 listener
a.listeners[a.next] = listeningAlerts{alerts: ch, done: done}
a.next++
// 这里把 buffered chan 和敞开信号 chan 从新打包成了 alertIterator 返回给调用方
return provider.NewAlertIterator(ch, done, nil)
}
所以调用方就要应用 alertIterator
提供的一些办法了
type alertIterator struct {
ch <-chan *types.Alert
done chan struct{}
err error
}
func (ai alertIterator) Next() <-chan *types.Alert { return ai.ch}
func (ai alertIterator) Err() error { return ai.err}
func (ai alertIterator) Close() { close(ai.done) }
alertIterator
的实现相似于迭代器协定,让调用方应用 for
循环的形式来应用,先看一下Dispatcher
如何应用的
// 首先由 main.go 实例化 Dispatcher 并启动
go disp.Run()
// 其次, Dispatcher 中可导出的 Run 就从 AlertsProvider 通过调用 Subscribe 获取了一个 alertIterator
func (d *Dispatcher) Run() {d.done = make(chan struct{})
d.mtx.Lock()
d.aggrGroupsPerRoute = map[*Route]map[model.Fingerprint]*aggrGroup{}
d.aggrGroupsNum = 0
d.metrics.aggrGroups.Set(0)
d.ctx, d.cancel = context.WithCancel(context.Background())
d.mtx.Unlock()
d.run(d.alerts.Subscribe())
close(d.done)
}
// 最初, Dispatcher 的不可导出的 run,
// 就是一个 fo-select 构造同时监听 AlertsProvider 中新收到的 alert, gc 信号和退出信号
func (d *Dispatcher) run(it provider.AlertIterator) {cleanup := time.NewTicker(30 * time.Second)
defer cleanup.Stop()
defer it.Close()
for {
select {// alertIterator 的 Next() 办法返回一个 chan, 也就是下面 AlertsProvider 中提供的 buffered chan
case alert, ok := <-it.Next():
// 接下来就是 Dispatcher 在接管到新的告警该如何解决的问题了
if !ok {
// Iterator exhausted for some reason.
if err := it.Err(); err != nil {level.Error(d.logger).Log("msg", "Error on alert update", "err", err)
}
return
}
level.Debug(d.logger).Log("msg", "Received alert", "alert", alert)
// Log errors but keep trying.
if err := it.Err(); err != nil {level.Error(d.logger).Log("msg", "Error on alert update", "err", err)
continue
}
// 从 Dispatcher 中找到哪些 router 跟这个 alert 匹配, 可能有多个 router 匹配
// 应用每个 router 来解决这个 alert
now := time.Now()
for _, r := range d.route.Match(alert.Labels) {d.processAlert(alert, r)
}
d.metrics.processingDuration.Observe(time.Since(now).Seconds())
case <-cleanup.C:
// Dispatcher 会有个 gc 过程,次要清理一些不必的内存容器
d.mtx.Lock()
for _, groups := range d.aggrGroupsPerRoute {
for _, ag := range groups {if ag.empty() {ag.stop()
delete(groups, ag.fingerprint())
d.aggrGroupsNum--
d.metrics.aggrGroups.Dec()}
}
}
d.mtx.Unlock()
case <-d.ctx.Done():
return
}
}
}
综合 Put
中的播送过程、Subscribe
办法、alertIterator
的设计还有 Dispatcher
的监听,能够看出这个公布 - 订阅模式: 订阅者订阅时获取了一个 buffered chan
, 同时这个 buffered chan
中曾经有了订阅之前发布者曾经有的音讯,这个 buffered chan
会被发布者记录到 listeners
中,订阅者监听本人订阅时返回的 buffered chan
,发布者每次收到音讯把listeners
中所有的 buffered chan
都播送,这样每个订阅者都会收到音讯。
当初告警曾经被写入 AlertsProvider
, 其它模块通过订阅可能监听到最新的告警,接下来就到了 Dispatcher.processAlert
看看告警是如何被解决的