openfalcon-聚合器aggregator代码解析

2次阅读

共计 5301 个字符,预计需要花费 14 分钟才能阅读完成。

总结:aggregator 聚合器就是从 falcon_portal.cluster 表中取出用户在页面上配置的表达式, 然后解析后,通过 api 拿到对应机器组的所有机器,通过 api 查询 graph 数据算出一个值重新打回 transfer 作为一个新的点。

  • 定时从 db 中拿出所有的聚合器配置放到一个 map 中
  • 第一次启动时遍历聚合器 map 生成 workers map 这两个 map 的 key 都是 id+updatetime
  • 同时下一次拿出 db 生成 map 对 workers 这个 map 进行增量更新 和删除操作删除是通过 worker.Quit chan 通信的
  • workers 这个 map 通过 ticker 跑 cron 运行 WorkerRun 这个方法
  • WorkerRun 这个方法解析分子分母的配置
  • 调用 api 根据 grp_id 拿出所有机器列表
  • 调用 graph 的 last 接口拿出所有 endpoint 的 counter 的值然后进行计算
  • 计算后重新打回 一个线程安全的双向链表队列
  • 另外一个 goroutine 异步 pop 队列中的值发生给 transfer 的 http 接口 (不是给 agent 用的 rpc 接口)
  • 机器量很多时获取机器列表和查询最新的值都是瓶颈
  • 我在想如果直接在 transfer 中直接做数据的聚合速度上不存在瓶颈

下面我们来看下代码:

  1. main.go 中核心的两个地方
    // 查询 db 调 api 算值 push 到 push 的队列中
    go cron.UpdateItems()
    // 从 push 队列 push 到 transfer
    sender.StartSender()

2. 看下 go cron.UpdateItems()

func updateItems() {
        // 从 db 中查询出结果
    items, err := db.ReadClusterMonitorItems()
    if err != nil {return}
        // 对比 key(id+uptime), 将已经变更的项删除 
    deleteNoUseWorker(items)
    // 启动新的 worker
    createWorkerIfNeed(items)
}
// 看下这个读 db 的 func
func ReadClusterMonitorItems() (M map[string]*g.Cluster, err error){
   ......
   /* 看到这个 funcreturn 的是个 map key 是 每个聚合项的 id 和他更新时间的字符串
   value 就是 Cluster 结构体指针
   type Cluster struct {
    Id          int64
    GroupId     int64
    Numerator   string
    Denominator string
    Endpoint    string
    Metric      string
    Tags        string
    DsType      string
    Step        int
    LastUpdate  time.Time
   }
   */
   M[fmt.Sprintf("%d%v", c.Id, c.LastUpdate)] = &c
   return M, err
}

3. 看下 deleteNoUseWorker 和 createWorkerIfNeed 这两个 func 都是围绕 Worker 这个 struct 的进行增删

func deleteNoUseWorker(m map[string]*g.Cluster) {del := []string{}
    for key, worker := range Workers {
            // 遍历已经创建的 work, 如果 key 在新的 map 中没有了说明这条记录在 db 中被更改或删除了
        // 所以删掉它 给 Workers 这个 map 缩容
        if _, ok := m[key]; !ok {
               // 将 worker 中的 Quit chan 关闭 会调用 ticker.stop 真正关闭 
            worker.Drop()
            del = append(del, key)
        }
    }

    for _, key := range del {delete(Workers, key)
    }
}

func createWorkerIfNeed(m map[string]*g.Cluster) {
 
    for key, item := range m {if _, ok := Workers[key]; !ok {
                // 如果配置中 step 小于 0 丢弃这条
            if item.Step <= 0 {log.Println("[W] invalid cluster(step <= 0):", item)
                continue
            }
                        // 初始化 worker     
            worker := NewWorker(item)
            Workers[key] = worker
            worker.Start()}
    }
}

4. 看下 Worker 这个结构体包含三个域

  • ticker 作为一个计时器实现类似 cron 的功能每隔一段时间执行一次 Start 中的 func
  • ClusterItem 作为每个聚合器的配置
  • Quit 是一个 chan 用来外部关闭 key 在新的 map 中没有了说明这条记录在 db 中被更改或删除了
type Worker struct {
    Ticker      *time.Ticker
    ClusterItem *g.Cluster
    Quit        chan struct{}}

func NewWorker(ci *g.Cluster) Worker {w := Worker{}
    w.Ticker = time.NewTicker(time.Duration(ci.Step) * time.Second)
    w.Quit = make(chan struct{})
    w.ClusterItem = ci
    return w
}

func (this Worker) Start() {go func() {
        for {
            select {
            case <-this.Ticker.C:
                WorkerRun(this.ClusterItem)
            case <-this.Quit:
                if g.Config().Debug {log.Println("[I] drop worker", this.ClusterItem)
                }
                this.Ticker.Stop()
                return
            }
        }
    }()}

func (this Worker) Drop() {close(this.Quit)
}

var Workers = make(map[string]Worker)

到这里我们已经看明白聚合器的流程了:

  • 定时从 db 中拿出所有的聚合器配置放到一个 map 中
  • 第一次启动时遍历聚合器 map 生成 workers map 这两个 map 的 key 都是 id+updatetime
  • 同时下一次拿出 db 生成 map 对 workers 这个 map 进行增量更新 和删除操作删除是通过 worker.Quit chan 通信的
  • workers 这个 map 通过 ticker 跑 cron 运行 WorkerRun 这个方法

5. 下面看下最重要的方法 WorkerRun

func WorkerRun(item *g.Cluster) {debug := g.Config().Debug
    /*
    Numerator 代表分子    例如 $(cpu.user)+$(cpu.system) 代表求 cpu.user 和 cpu.system 的和
    Denominator 代表分母  例如 $# 代表所有机器
    */
        //cleanParam 去除 \r 等字符
    numeratorStr := cleanParam(item.Numerator)
    denominatorStr := cleanParam(item.Denominator)
        // 判断分子分母是否合法
    if !expressionValid(numeratorStr) || !expressionValid(denominatorStr) {log.Println("[W] invalid numerator or denominator", item)
        return
    }
        // 判断分子分母是否需要计算
      needComputeNumerator := needCompute(numeratorStr)
    needComputeDenominator := needCompute(denominatorStr)
    // 如果分子分母都不需要计算就不需要用到聚合器了
    if !needComputeNumerator && !needComputeDenominator {log.Println("[W] no need compute", item)
        return
    }
        // 比如分子是这样的: "($(cpu.busy)+$(cpu.idle)-$(cpu.nice))>80"
    // 那么 parse 的返回值为 [cpu.busy cpu.idle cpu.nice] [+ -] >80
    numeratorOperands, numeratorOperators, numeratorComputeMode := parse(numeratorStr, needComputeNumerator)
    denominatorOperands, denominatorOperators, denominatorComputeMode := parse(denominatorStr, needComputeDenominator)

    if !operatorsValid(numeratorOperators) || !operatorsValid(denominatorOperators) {log.Println("[W] operators invalid", item)
        return
    }
    /*add retry for gethostname bygid
    这里源码是动过 sdk 根据 group_id 查找组里面机器列表
    这里我进行了两点优化:
    1.sdk 调用时没有加重试,http 失败导致这次没有 get 到机器所以这个点就不算了导致断点
    2. 原来的接口在机器量超过 1k 时就效率就会很慢 2w+ 机器需要 8s, 看了代码是用 orm 进行了多次查询而且附带了很多别的信息
    这里我只需要 group_id 对应 endpoint_list 所以我写了一个新的接口用一条 raw_sql 进行查询
    测试 2w+ 的机器 0.2s 就能返回
    */
    retry_limit :=3
    r_s :=0
    var hostnames []string
    for r_s <retry_limit{hostnames_tmp, err_tmp := sdk.HostnamesByID(item.GroupId)
        if err_tmp != nil {log.Println("[E] get hostlist err",err_tmp)
            r_s+=1
            time.Sleep(time.Second)
        }else{
            hostnames = hostnames_tmp
            break
        }
    }
    // 没有机器当然不用算了
    if len(hostnames)==0{log.Println("[E] get 0 record hostname item:",item)
        return
    }

    now := time.Now().Unix()

    /* 这里是调用 graph/lastpoint 这个 api 查询最近一个点的数据
    1. 机器是上面查到的主机列表
    2.counter 这里做了合并 把所有要查的 metirc 都放在一个请求里面查询了
    3. 查询的时候在 api 那边做了 for 循环 逐个 item 查询 估计这里也会拖慢速度
    4. 查完之后计算下值推到发送队列
    */
    valueMap, err := queryCounterLast(numeratorOperands, denominatorOperands, hostnames, now-int64(item.Step*2), now)
    if err != nil {log.Println("[E] get queryCounterLast", err, item)
        return
    }

    ..........
    sender.Push(item.Endpoint, item.Metric, item.Tags, numerator/denominator, item.DsType, int64(item.Step))
}

6. 最后看下发送的代码

  • MetaDataQueue 是个线程安全的双向链表
  • 上面说的 WorkerRun 方法中会将转化好的监控项数据 PushFront 入链表
  • startSender 这个 goroutine 每 200 毫秒会将队列中的数据取出发送到 transfer 的 http 接口
func Push(endpoint, metric, tags string, val interface{}, counterType string, step_and_ts ...int64) {md := MakeMetaData(endpoint, metric, tags, val, counterType, step_and_ts...)
    MetaDataQueue.PushFront(md)
}

const LIMIT = 200

var MetaDataQueue = NewSafeLinkedList()
var PostPushUrl string
var Debug bool

func StartSender() {go startSender()
}

func startSender() {
    for {L := MetaDataQueue.PopBack(LIMIT)
        if len(L) == 0 {time.Sleep(time.Millisecond * 200)
            continue
        }

        err := PostPush(L)
        if err != nil {log.Println("[E] push to transfer fail", err)
        }
    }
}
正文完
 0