aggregator组件负责聚合,即依据定义的指标表达式,聚合该组下所有host的指标值,生成新的指标发送到agent。
agent会将数据发送给transfer,transfer发送给graph,最终由graph存入TSDB。
值得注意的是,aggregator实例对分布式部署的反对无限,因为它每个实例都执行所有的聚合工作,没有进行聚合工作在不同节点的均分,也没有锁机制保障执行1个实例运行。
当然,部署多份实例也不会呈现逻辑谬误,只是运行了多份雷同的工作而已。
1. aggregator的聚合表达式
指标表达式分为分子和分母两局部,每局部都是1个表达式,别离计算出2局部的值,做商即得后果。
# 计算cpu.used.percent的平均值分子:$(cpu.used.percent)分母:$## 计算磁盘吞吐量的总量分子:$(disk.io.read_bytes)分母:1
此外,还能进行更简单的汇合语义:
# 计算disk.io.util大于等于40%的机器个数分子:$(disk.io.util)>=40分母:1# 计算集群diso.io.util大于40%的比率分子:$(disk.io.util)>40分母:$#
2. aggregator的聚合模型
聚合模型体现在Cluster构造中:
type Cluster struct { ..... GroupId int64 //聚合组id Numerator string //分子表达式 Denominator string //分母表达式 Endpoint string //指标endpoint Metric string //指标metric Tags string //指标tags DsType string //指标metric类型 Step int //指标metric的step大小 ....}
其中:
- GroupId: 聚合组id,依据该id能够查问失去该组下所有host;
- Numberator: 分子表达式,比方$(cpu.used.percent);
- Denominator: 分母表达式,比方$#;
- Endpoint: 聚合后的endpoint;
- Metric: 聚合后的metric;
- Tags: 聚合后的tags;
- DsType: 聚合后的metric类型,GAUGE或COUNTER;
- Step: 聚合后的metric的step大小;
以聚合组内所有host的cpu.used.percent平均值为例:
GroupId = 900Numerator = $(cpu.used.percent)Denominator = $#Endpoint = 84aba056-6e6c-4c53-bfed-4de0729421efMetric = cpu.used.percentTags = nullDsType = GAUGEStep = 60
聚合组内所有host的cpu.used.percent,计算平均值,而后生成endpoint=84aba056-6e6c-4c53-bfed-4de0729421ef,metric=cpu.used.percent, dsType=GAUGE, Step=60的指标。
3. aggregator的聚合流程
- 所有要聚合的信息存储在db中,每条记录就是下面的一个cluster构造;
对每个聚合记录,启动1个goroutine定期进行聚合;
- 聚合时,先查问聚合组下所有的host列表
- 再查问每个host,表达式内相干指标的的最新数据点,比方要聚合cpu.used.percent,就查问所有host最新的cpu.used.percent的最新数据点;
- 利用最新的数据点,别离计算分子表达式和分母表达式的值,做商失去最终的指标值;
- 聚合的指标值存储在本地Queue中,有1个goroutine默默的pop进去,HTTP发送给agent;
聚合的入口代码:定期检查新monitorItem和删除老的monitorItem
// modules/aggregator/cron/updater.gofunc UpdateItems() { for { updateItems() d := time.Duration(g.Config().Database.Interval) * time.Second time.Sleep(d) }}func updateItems() { items, err := db.ReadClusterMonitorItems() if err != nil { return } deleteNoUseWorker(items) createWorkerIfNeed(items)}
对于新的monitorItem,会给它启动一个worker:
//modules/aggregator/cron/worker.gofunc createWorkerIfNeed(m map[string]*g.Cluster) { for key, item := range m { //若是新的monitorItem if _, ok := Workers[key]; !ok { if item.Step <= 0 { log.Println("[W] invalid cluster(step <= 0):", item) continue } worker := NewWorker(item) Workers[key] = worker worker.Start() } }}
新的worker启动了1个goroutine,定期进行聚合:
// modules/aggregator/cron/worker.gofunc (this Worker) Start() { go func() { for { select { case <-this.Ticker.C: WorkerRun(this.ClusterItem) case <-this.Quit: this.Ticker.Stop() return } } }()}
具体的聚合过程比较复杂,联合正文剖析一下:
- 首先别离解析分子表达式和分母表达式,失去所有相干的metric及其计算形式;
- 而后查看聚合组下所有的hosts列表;
- 而后查问hosts列所有相干metric的最近的指标点,存储在valueMap;
对所有hosts别离计算分子和分母的数值,分子/分母即是最终的聚合值;
- 两头有一个非凡解决,比方$#是算平均值,1是算sum;
// modules/aggregator/cron/run.gofunc WorkerRun(item *g.Cluster) { ..... // 解析分子表达式 //比方 numberatorStr=$(cpu.used.percent) //返回 [cpu.used.percent], [], '' numeratorOperands, numeratorOperators, numeratorComputeMode := parse(numeratorStr, needComputeNumerator) // 解析分母表达式 // 比方 $# // 返回 [], [], '' denominatorOperands, denominatorOperators, denominatorComputeMode := parse(denominatorStr, needComputeDenominator) // 查问GroupId内的所有hosts hostnames, err := sdk.HostnamesByID(item.GroupId) // 查问hosts所有相干指标的最近指标点 valueMap, err := queryCounterLast(numeratorOperands, denominatorOperands, hostnames, now-int64(item.Step*2), now) var numerator, denominator float64 var validCount int for _, hostname := range hostnames { var numeratorVal, denominatorVal float64 var err error //计算分子的聚合值 if needComputeNumerator { numeratorVal, err = compute(numeratorOperands, numeratorOperators, numeratorComputeMode, hostname, valueMap) .... } //计算分母的聚合值 if needComputeDenominator { denominatorVal, err = compute(denominatorOperands, denominatorOperators, denominatorComputeMode, hostname, valueMap) ..... } numerator += numeratorVal denominator += denominatorVal validCount += 1 } // $#是算平均值,1是算sum if !needComputeDenominator { if denominatorStr == "$#" { denominator = float64(validCount) } else { denominator, err = strconv.ParseFloat(denominatorStr, 64) } } // 最终指标值=分子/分母,放入本地Queue sender.Push(item.Endpoint, item.Metric, item.Tags, numerator/denominator, item.DsType, int64(item.Step))}
sender.Push()将聚合后果放入本地Queue:
// common/sdk/sender/make.gofunc Push(endpoint, metric, tags string, val interface{}, counterType string, step_and_ts ...int64) { md := MakeMetaData(endpoint, metric, tags, val, counterType, step_and_ts...) MetaDataQueue.PushFront(md)}
后端有1个goroutine默默的pop Queue,而后HTTP发送给agent:
// common/sdk/sender/sender.gofunc startSender() { for { L := MetaDataQueue.PopBack(LIMIT) if len(L) == 0 { time.Sleep(time.Millisecond * 200) continue } err := PostPush(L) ... }}