n9e=nightingale

n9e监控告警框架,提供了监控绘图、监控告警以及告诉等一体的监控运维体系,在云原生时代,能够认为是Open-falcon的升级版。

一. 告警的数据流

指标存储:应用push模式

  • categraf采集后push给n9e-server;
  • n9e-server将指标值push给时序库prometheus;

指标告警:应用pull模式

  • 由n9e-server应用PromQL向prometheus进行查问;
  • 若查问到后果,示意告警被触发;

二. 告警实例的高可用

告警配置被存储在MySQL中,n9e-server从MySQL中读出,而后查问其中的PromQL,断定告警是否被触发。

1. 问题

相似于prometheus,当部署多个n9e-server实例后,若每个n9e-server都进行下面的告警断定过程,会导致每个n9e-server上都有告警产生。

prometheus是通过alertmanager进行告警去重的,那n9e-server是如何实现的呢?

2. 解决

n9e-server通过heartbeat+一致性hash,对告警规定进行分片,每个n9e-server只负责本人实例的告警规定的断定。

  • 当n9e-server有实例退出时,已有的n9e-server实例会进行rehash,让新节点负责一部分告警规定;
  • 当n9e-server有实例退出时,所有的n9e-server实例会进行rehash,退出节点的告警规定被分给存活的节点;

Heartbeat:

  • n9e-server实例启动一个heartbeat线程,定期的上报本人的状态并存储在MySQL中;
  • 若发现过程内保留的实例列表MySQL的实例列表不同,阐明产生了节点变动,则进行rehash;

一致性hash:

  • 对某个datasource,比方prometheus-a1,有N个n9e-server注册进来,则将N个n9e-server组成一致性hash环;
  • 对某个告警规定,应用rule.id进行一致性hash,命中到哪个n9e-server实例,就由其进行告警断定;

三. 源码剖析

实现代码次要在alert/naming/*.go。

在做告警断定时,对每个告警规定:

  • 若rule不归本节点负责,则跳过;
// alert/eval/alert_rule.gofunc (s *Scheduler) syncAlertRules() {    ids := s.alertRuleCache.GetRuleIds()    alertRuleWorkers := make(map[string]*AlertRuleWorker)    externalRuleWorkers := make(map[string]*process.Processor)    for _, id := range ids {        rule := s.alertRuleCache.Get(id)        if rule == nil {            continue        }        if rule.IsPrometheusRule() {            datasourceIds := s.promClients.Hit(rule.DatasourceIdsJson)            for _, dsId := range datasourceIds {                // 断定rule是否归本节点负责                if !naming.DatasourceHashRing.IsHit(dsId, fmt.Sprintf("%d", rule.Id), s.aconf.Heartbeat.Endpoint) {                    continue                }                ...            }        }        ...    }    ...}

1. 一致性hash

一致性hash的key=rule.id,依据key判断该rule是否归本节点负责:

// alert/naming/hashring.gofunc (chr *DatasourceHashRingType) IsHit(datasourceId int64, pk string, currentNode string) bool {    node, err := chr.GetNode(datasourceId, pk)    ...    return node == currentNode}

依据key确定节点的过程:

  • 对每个datasource,都有一个hash环:
  • 先找到datasource对应的hash环;
  • 再在hash环中找负责的节点;
// alert/naming/hashring.gofunc (chr *DatasourceHashRingType) GetNode(datasourceId int64, pk string) (string, error) {    chr.Lock()    defer chr.Unlock()    _, exists := chr.Rings[datasourceId]    if !exists {        ...    }    return chr.Rings[datasourceId].Get(pk)}

hash环的结构在heartbeat中。

2. heartbeat

n9e-server中开启一个goroutine,专门进行heartbeat:

  • 默认1s执行1次;
// alert/naming/heartbeat.gofunc (n *Naming) loopHeartbeat() {    interval := time.Duration(n.heartbeatConfig.Interval) * time.Millisecond    for {        time.Sleep(interval)        if err := n.heartbeat(); err != nil {            logger.Warning(err)        }    }}

heartbeat的过程:

  • 对每个datasource,查找其关联的最新的n9e-server列表=newss;
  • 跟缓存的oldss比照,若有变动,则对server进行rehash,从新结构hash环;
// alert/naming/heartbeat.gofunc (n *Naming) heartbeat() error {    // 在页面上保护实例和集群的对应关系    datasourceIds, err = models.GetDatasourceIdsByEngineName(n.ctx, n.heartbeatConfig.EngineName)    for i := 0; i < len(datasourceIds); i++ {        servers, err := n.ActiveServers(datasourceIds[i])        ...        sort.Strings(servers)        newss := strings.Join(servers, " ")        oldss, exists := localss[datasourceIds[i]]        if exists && oldss == newss {            continue            // 没有变动        }        RebuildConsistentHashRing(datasourceIds[i], servers)    // rehash        localss[datasourceIds[i]] = newss    }    return nil}

查问datasource关联的最新n9e-server的过程:

  • 实际上就查问alerting_engines表,找指标datasource对应的记录,要求30s内有心跳;
// alert/naming/heartbeat.gofunc (n *Naming) ActiveServers(datasourceId int64) ([]string, error) {    ...    // 30秒内有心跳,就认为是活的    return models.AlertingEngineGetsInstances(n.ctx, "datasource_id = ? and clock > ?", datasourceId, time.Now().Unix()-30)}func AlertingEngineGetsInstances(ctx *ctx.Context, where string, args ...interface{}) ([]string, error) {    var arr []string    var err error    session := DB(ctx).Model(new(AlertingEngines)).Order("instance")    if where == "" {        err = session.Pluck("instance", &arr).Error    } else {        err = session.Where(where, args...).Pluck("instance", &arr).Error    }    return arr, err}
mysql> select * from alerting_engines;+----+----------------------+---------------+----------------+------------+| id | instance             | datasource_id | engine_cluster | clock      |+----+----------------------+---------------+----------------+------------+|  2 | 192.168.100.20:17000 |      99999999 | default        | 1690537360 ||  3 | 192.168.100.20:17000 |             1 | default        | 1690537360 ||  4 | 192.168.100.20:17000 |             2 | default        | 1690537360 |+----+----------------------+---------------+----------------+------------+3 rows in set (0.00 sec)

n9e-server实例,在heartbeat的过程中,会将本人注册进去:

// alert/naming/heartbeat.gofunc (n *Naming) heartbeat() error {    ...    for i := 0; i < len(datasourceIds); i++ {        err := models.AlertingEngineHeartbeatWithCluster(n.ctx, n.heartbeatConfig.Endpoint, n.heartbeatConfig.EngineName, datasourceIds[i])        if err != nil {            logger.Warningf("heartbeat with cluster %d err:%v", datasourceIds[i], err)        }    }    ...    }func AlertingEngineHeartbeatWithCluster(ctx *ctx.Context, instance, cluster string, datasourceId int64) error {    ...    var total int64    err := DB(ctx).Model(new(AlertingEngines)).Where("instance=? and engine_cluster = ? and datasource_id=?", instance, cluster, datasourceId).Count(&total).Error    ...    // 没有记录,则insert    if total == 0 {        // insert        err = DB(ctx).Create(&AlertingEngines{            Instance:      instance,            DatasourceId:  datasourceId,            EngineCluster: cluster,            Clock:         time.Now().Unix(),        }).Error    } else { // 否则,update最新工夫        // updates        fields := map[string]interface{}{"clock": time.Now().Unix()}        err = DB(ctx).Model(new(AlertingEngines)).Where("instance=? and engine_cluster = ? and datasource_id=?", instance, cluster, datasourceId).Updates(fields).Error    }    return err}

参考:

  1. https://flashcat.cloud/docs/content/flashcat-monitor/nighting...
  2. https://answer.flashcat.cloud/questions/10010000000002963/100...