总结:aggregator聚合器就是从falcon_portal.cluster表中取出用户在页面上配置的表达式,然后解析后,通过api拿到对应机器组的所有机器,通过api查询graph数据算出一个值重新打回transfer作为一个新的点。

  • 定时从db中拿出所有的聚合器配置放到一个map中
  • 第一次启动时遍历聚合器map生成workers map 这两个map的key都是id+updatetime
  • 同时下一次拿出db生成map 对workers这个map进行增量更新 和删除操作删除是通过 worker.Quit chan通信的
  • workers这个map 通过 ticker跑cron 运行WorkerRun这个方法
  • WorkerRun这个方法解析分子分母的配置
  • 调用api 根据grp_id拿出所有机器列表
  • 调用graph的last接口拿出所有endpoint的counter 的值然后进行计算
  • 计算后重新打回 一个线程安全的双向链表队列
  • 另外一个goroutine异步pop队列中的值发生给 transfer的http接口(不是给agent用的rpc接口)
  • 机器量很多时获取机器列表和查询最新的值都是瓶颈
  • 我在想如果直接在transfer中直接做数据的聚合速度上不存在瓶颈

下面我们来看下代码:

  1. main.go中核心的两个地方
    //查询db 调api算值 push 到push的队列中    go cron.UpdateItems()    //从push队列push到transfer    sender.StartSender()

2.看下go cron.UpdateItems()

func updateItems() {        //从db中查询出结果    items, err := db.ReadClusterMonitorItems()    if err != nil {        return    }        //对比key(id+uptime),将已经变更的项删除     deleteNoUseWorker(items)    //启动新的worker    createWorkerIfNeed(items)}//看下这个读db的funcfunc ReadClusterMonitorItems() (M map[string]*g.Cluster, err error){   ......   /*看到这个funcreturn的是个map key是 每个聚合项的id和他更新时间的字符串   value 就是Cluster结构体指针   type Cluster struct {    Id          int64    GroupId     int64    Numerator   string    Denominator string    Endpoint    string    Metric      string    Tags        string    DsType      string    Step        int    LastUpdate  time.Time   }   */   M[fmt.Sprintf("%d%v", c.Id, c.LastUpdate)] = &c   return M, err}

3.看下 deleteNoUseWorker 和createWorkerIfNeed 这两个func都是围绕 Worker这个struct的进行增删

func deleteNoUseWorker(m map[string]*g.Cluster) {    del := []string{}    for key, worker := range Workers {            //遍历已经创建的work,如果key在新的map中没有了说明这条记录在db中被更改或删除了        //所以删掉它 给Workers这个map缩容        if _, ok := m[key]; !ok {               //将worker 中的Quit chan关闭 会调用ticker.stop 真正关闭             worker.Drop()            del = append(del, key)        }    }    for _, key := range del {        delete(Workers, key)    }}func createWorkerIfNeed(m map[string]*g.Cluster) {     for key, item := range m {        if _, ok := Workers[key]; !ok {                //如果配置中step小于0 丢弃这条            if item.Step <= 0 {                log.Println("[W] invalid cluster(step <= 0):", item)                continue            }                        //初始化worker                 worker := NewWorker(item)            Workers[key] = worker            worker.Start()        }    }}

4. 看下Worker这个结构体包含三个域

  • ticker作为一个计时器实现类似cron的功能每隔一段时间执行一次Start 中的func
  • ClusterItem作为每个聚合器的配置
  • Quit是一个chan用来外部关闭 key在新的map中没有了说明这条记录在db中被更改或删除了
type Worker struct {    Ticker      *time.Ticker    ClusterItem *g.Cluster    Quit        chan struct{}}func NewWorker(ci *g.Cluster) Worker {    w := Worker{}    w.Ticker = time.NewTicker(time.Duration(ci.Step) * time.Second)    w.Quit = make(chan struct{})    w.ClusterItem = ci    return w}func (this Worker) Start() {    go func() {        for {            select {            case <-this.Ticker.C:                WorkerRun(this.ClusterItem)            case <-this.Quit:                if g.Config().Debug {                    log.Println("[I] drop worker", this.ClusterItem)                }                this.Ticker.Stop()                return            }        }    }()}func (this Worker) Drop() {    close(this.Quit)}var Workers = make(map[string]Worker)

到这里我们已经看明白聚合器的流程了:

  • 定时从db中拿出所有的聚合器配置放到一个map中
  • 第一次启动时遍历聚合器map生成workers map 这两个map的key都是id+updatetime
  • 同时下一次拿出db生成map 对workers这个map进行增量更新 和删除操作删除是通过 worker.Quit chan通信的
  • workers这个map 通过 ticker跑cron 运行WorkerRun这个方法

5.下面看下最重要的方法 WorkerRun

func WorkerRun(item *g.Cluster) {    debug := g.Config().Debug    /*    Numerator代表分子    例如 $(cpu.user)+$(cpu.system) 代表求cpu.user和cpu.system的和    Denominator代表分母  例如 $# 代表所有机器    */        //cleanParam去除\r等字符    numeratorStr := cleanParam(item.Numerator)    denominatorStr := cleanParam(item.Denominator)        //判断分子分母是否合法    if !expressionValid(numeratorStr) || !expressionValid(denominatorStr) {        log.Println("[W] invalid numerator or denominator", item)        return    }        //判断分子分母是否需要计算      needComputeNumerator := needCompute(numeratorStr)    needComputeDenominator := needCompute(denominatorStr)    //如果分子分母都不需要计算就不需要用到聚合器了    if !needComputeNumerator && !needComputeDenominator {        log.Println("[W] no need compute", item)        return    }        //比如分子是这样的: "($(cpu.busy)+$(cpu.idle)-$(cpu.nice))>80"    //那么parse的返回值为 [cpu.busy cpu.idle cpu.nice] [+ -] >80    numeratorOperands, numeratorOperators, numeratorComputeMode := parse(numeratorStr, needComputeNumerator)    denominatorOperands, denominatorOperators, denominatorComputeMode := parse(denominatorStr, needComputeDenominator)    if !operatorsValid(numeratorOperators) || !operatorsValid(denominatorOperators) {        log.Println("[W] operators invalid", item)        return    }    /*add retry for gethostname bygid    这里源码是动过sdk根据group_id查找组里面机器列表    这里我进行了两点优化:    1.sdk调用时没有加重试,http失败导致这次没有get到机器所以这个点就不算了导致断点    2.原来的接口在机器量超过1k时就效率就会很慢 2w+机器需要8s,看了代码是用orm进行了多次查询而且附带了很多别的信息    这里我只需要group_id对应endpoint_list所以我写了一个新的接口用一条raw_sql进行查询    测试2w+的机器0.2s就能返回    */    retry_limit :=3    r_s :=0    var hostnames []string    for r_s <retry_limit{        hostnames_tmp, err_tmp := sdk.HostnamesByID(item.GroupId)        if err_tmp != nil {            log.Println("[E] get hostlist err",err_tmp)            r_s+=1            time.Sleep(time.Second)        }else{            hostnames = hostnames_tmp            break        }    }    //没有机器当然不用算了    if len(hostnames)==0{        log.Println("[E] get 0 record hostname item:",item)        return    }    now := time.Now().Unix()    /*这里是调用graph/lastpoint这个api 查询最近一个点的数据    1.机器是上面查到的主机列表    2.counter这里做了合并 把所有要查的metirc都放在一个请求里面查询了    3.查询的时候在api那边做了for循环 逐个item查询 估计这里也会拖慢速度    4.查完之后计算下值推到发送队列    */    valueMap, err := queryCounterLast(numeratorOperands, denominatorOperands, hostnames, now-int64(item.Step*2), now)    if err != nil {        log.Println("[E] get queryCounterLast", err, item)        return    }    ..........    sender.Push(item.Endpoint, item.Metric, item.Tags, numerator/denominator, item.DsType, int64(item.Step))}

6.最后看下发送的代码

  • MetaDataQueue是个线程安全的双向链表
  • 上面说的WorkerRun方法中会将转化好的监控项数据PushFront入链表
  • startSender这个goroutine 每200毫秒会将队列中的数据取出发送到transfer的http接口
func Push(endpoint, metric, tags string, val interface{}, counterType string, step_and_ts ...int64) {    md := MakeMetaData(endpoint, metric, tags, val, counterType, step_and_ts...)    MetaDataQueue.PushFront(md)}const LIMIT = 200var MetaDataQueue = NewSafeLinkedList()var PostPushUrl stringvar Debug boolfunc StartSender() {    go startSender()}func startSender() {    for {        L := MetaDataQueue.PopBack(LIMIT)        if len(L) == 0 {            time.Sleep(time.Millisecond * 200)            continue        }        err := PostPush(L)        if err != nil {            log.Println("[E] push to transfer fail", err)        }    }}