乐趣区

关于prometheus:n9e告警可高用的实现机制分析

n9e=nightingale

n9e 监控告警框架,提供了监控绘图、监控告警以及告诉等一体的监控运维体系,在云原生时代,能够认为是 Open-falcon 的升级版。

一. 告警的数据流

指标存储:应用 push 模式

  • categraf 采集后 push 给 n9e-server;
  • n9e-server 将指标值 push 给时序库 prometheus;

指标告警:应用 pull 模式

  • 由 n9e-server 应用 PromQL 向 prometheus 进行查问;
  • 若查问到后果,示意告警被触发;

二. 告警实例的高可用

告警配置被存储在 MySQL 中,n9e-server 从 MySQL 中读出,而后查问其中的 PromQL,断定告警是否被触发。

1. 问题

相似于 prometheus,当部署多个 n9e-server 实例后,若每个 n9e-server 都进行下面的告警断定过程,会导致每个 n9e-server 上都有告警产生。

prometheus 是通过 alertmanager 进行告警去重的,那 n9e-server 是如何实现的呢?

2. 解决

n9e-server 通过 heartbeat+ 一致性 hash,对告警规定进行分片,每个 n9e-server 只负责本人实例的告警规定的断定。

  • 当 n9e-server 有实例 退出 时,已有的 n9e-server 实例会进行 rehash,让新节点负责一部分告警规定;
  • 当 n9e-server 有实例 退出 时,所有的 n9e-server 实例会进行 rehash,退出节点的告警规定被分给存活的节点;

Heartbeat:

  • n9e-server 实例启动一个 heartbeat 线程,定期的上报本人的状态并存储在 MySQL 中;
  • 若发现 过程内保留的实例列表 MySQL 的实例列表 不同,阐明产生了节点变动,则进行 rehash;

一致性 hash:

  • 对某个 datasource,比方 prometheus-a1,有 N 个 n9e-server 注册进来,则将 N 个 n9e-server 组成一致性 hash 环;
  • 对某个告警规定,应用 rule.id 进行一致性 hash,命中到哪个 n9e-server 实例,就由其进行告警断定;

三. 源码剖析

实现代码次要在 alert/naming/*.go。

在做告警断定时,对每个告警规定:

  • 若 rule 不归本节点负责,则跳过;
// alert/eval/alert_rule.go
func (s *Scheduler) syncAlertRules() {ids := s.alertRuleCache.GetRuleIds()
    alertRuleWorkers := make(map[string]*AlertRuleWorker)
    externalRuleWorkers := make(map[string]*process.Processor)
    for _, id := range ids {rule := s.alertRuleCache.Get(id)
        if rule == nil {continue}
        if rule.IsPrometheusRule() {datasourceIds := s.promClients.Hit(rule.DatasourceIdsJson)
            for _, dsId := range datasourceIds {
                // 断定 rule 是否归本节点负责
                if !naming.DatasourceHashRing.IsHit(dsId, fmt.Sprintf("%d", rule.Id), s.aconf.Heartbeat.Endpoint) {continue}
                ...
            }
        }
        ...
    }
    ...
}

1. 一致性 hash

一致性 hash 的 key=rule.id,依据 key 判断该 rule 是否归本节点负责:

// alert/naming/hashring.go
func (chr *DatasourceHashRingType) IsHit(datasourceId int64, pk string, currentNode string) bool {node, err := chr.GetNode(datasourceId, pk)
    ...
    return node == currentNode
}

依据 key 确定节点的过程:

  • 对每个 datasource,都有一个 hash 环:
  • 先找到 datasource 对应的 hash 环;
  • 再在 hash 环中找负责的节点;
// alert/naming/hashring.go
func (chr *DatasourceHashRingType) GetNode(datasourceId int64, pk string) (string, error) {chr.Lock()
    defer chr.Unlock()
    _, exists := chr.Rings[datasourceId]
    if !exists {...}
    return chr.Rings[datasourceId].Get(pk)
}

hash 环的结构在 heartbeat 中。

2. heartbeat

n9e-server 中开启一个 goroutine,专门进行 heartbeat:

  • 默认 1s 执行 1 次;
// alert/naming/heartbeat.go
func (n *Naming) loopHeartbeat() {interval := time.Duration(n.heartbeatConfig.Interval) * time.Millisecond
    for {time.Sleep(interval)
        if err := n.heartbeat(); err != nil {logger.Warning(err)
        }
    }
}

heartbeat 的过程:

  • 对每个 datasource,查找其关联的最新的 n9e-server 列表 =newss;
  • 跟缓存的 oldss 比照,若有变动,则对 server 进行 rehash,从新结构 hash 环;
// alert/naming/heartbeat.go
func (n *Naming) heartbeat() error {
    // 在页面上保护实例和集群的对应关系
    datasourceIds, err = models.GetDatasourceIdsByEngineName(n.ctx, n.heartbeatConfig.EngineName)

    for i := 0; i < len(datasourceIds); i++ {servers, err := n.ActiveServers(datasourceIds[i])
        ...
        sort.Strings(servers)
        newss := strings.Join(servers, " ")

        oldss, exists := localss[datasourceIds[i]]
        if exists && oldss == newss {continue            // 没有变动}

        RebuildConsistentHashRing(datasourceIds[i], servers)    // rehash
        localss[datasourceIds[i]] = newss
    }
    return nil
}

查问 datasource 关联的最新 n9e-server 的过程:

  • 实际上就查问 alerting_engines 表,找指标 datasource 对应的记录,要求 30s 内有心跳;
// alert/naming/heartbeat.go
func (n *Naming) ActiveServers(datasourceId int64) ([]string, error) {
    ...
    // 30 秒内有心跳,就认为是活的
    return models.AlertingEngineGetsInstances(n.ctx, "datasource_id = ? and clock > ?", datasourceId, time.Now().Unix()-30)
}

func AlertingEngineGetsInstances(ctx *ctx.Context, where string, args ...interface{}) ([]string, error) {var arr []string
    var err error
    session := DB(ctx).Model(new(AlertingEngines)).Order("instance")
    if where == "" {err = session.Pluck("instance", &arr).Error
    } else {err = session.Where(where, args...).Pluck("instance", &arr).Error
    }
    return arr, err
}
mysql> select * from alerting_engines;
+----+----------------------+---------------+----------------+------------+
| id | instance             | datasource_id | engine_cluster | clock      |
+----+----------------------+---------------+----------------+------------+
|  2 | 192.168.100.20:17000 |      99999999 | default        | 1690537360 |
|  3 | 192.168.100.20:17000 |             1 | default        | 1690537360 |
|  4 | 192.168.100.20:17000 |             2 | default        | 1690537360 |
+----+----------------------+---------------+----------------+------------+
3 rows in set (0.00 sec)

n9e-server 实例,在 heartbeat 的过程中,会将本人注册进去:

// alert/naming/heartbeat.go
func (n *Naming) heartbeat() error {
    ...
    for i := 0; i < len(datasourceIds); i++ {err := models.AlertingEngineHeartbeatWithCluster(n.ctx, n.heartbeatConfig.Endpoint, n.heartbeatConfig.EngineName, datasourceIds[i])
        if err != nil {logger.Warningf("heartbeat with cluster %d err:%v", datasourceIds[i], err)
        }
    }
    ...    
}

func AlertingEngineHeartbeatWithCluster(ctx *ctx.Context, instance, cluster string, datasourceId int64) error {
    ...
    var total int64
    err := DB(ctx).Model(new(AlertingEngines)).Where("instance=? and engine_cluster = ? and datasource_id=?", instance, cluster, datasourceId).Count(&total).Error
    ...
    // 没有记录,则 insert
    if total == 0 {
        // insert
        err = DB(ctx).Create(&AlertingEngines{
            Instance:      instance,
            DatasourceId:  datasourceId,
            EngineCluster: cluster,
            Clock:         time.Now().Unix(),
        }).Error
    } else { // 否则,update 最新工夫
        // updates
        fields := map[string]interface{}{"clock": time.Now().Unix()}
        err = DB(ctx).Model(new(AlertingEngines)).Where("instance=? and engine_cluster = ? and datasource_id=?", instance, cluster, datasourceId).Updates(fields).Error
    }
    return err
}

参考:

  1. https://flashcat.cloud/docs/content/flashcat-monitor/nighting…
  2. https://answer.flashcat.cloud/questions/10010000000002963/100…
退出移动版