共计 4306 个字符,预计需要花费 11 分钟才能阅读完成。
aggregator 组件负责聚合,即依据定义的指标表达式,聚合该组下所有 host 的指标值,生成新的指标发送到 agent。
agent 会将数据发送给 transfer,transfer 发送给 graph,最终由 graph 存入 TSDB。
值得注意的是,aggregator 实例对分布式部署的反对无限,因为它每个实例都执行所有的聚合工作,没有进行聚合工作在不同节点的均分,也没有锁机制保障执行 1 个实例运行。
当然,部署多份实例也不会呈现逻辑谬误,只是运行了多份雷同的工作而已。
1. aggregator 的聚合表达式
指标表达式分为分子和分母两局部,每局部都是 1 个表达式,别离计算出 2 局部的值,做商即得后果。
# 计算 cpu.used.percent 的平均值
分子:$(cpu.used.percent)
分母:$#
# 计算磁盘吞吐量的总量
分子:$(disk.io.read_bytes)
分母:1
此外,还能进行更简单的汇合语义:
# 计算 disk.io.util 大于等于 40% 的机器个数
分子:$(disk.io.util)>=40
分母:1
# 计算集群 diso.io.util 大于 40% 的比率
分子:$(disk.io.util)>40
分母:$#
2. aggregator 的聚合模型
聚合模型体现在 Cluster 构造中:
type Cluster struct {
.....
GroupId int64 // 聚合组 id
Numerator string // 分子表达式
Denominator string // 分母表达式
Endpoint string // 指标 endpoint
Metric string // 指标 metric
Tags string // 指标 tags
DsType string // 指标 metric 类型
Step int // 指标 metric 的 step 大小
....
}
其中:
- GroupId: 聚合组 id,依据该 id 能够查问失去该组下所有 host;
- Numberator: 分子表达式,比方 $(cpu.used.percent);
- Denominator: 分母表达式,比方 $#;
- Endpoint: 聚合后的 endpoint;
- Metric: 聚合后的 metric;
- Tags: 聚合后的 tags;
- DsType: 聚合后的 metric 类型,GAUGE 或 COUNTER;
- Step: 聚合后的 metric 的 step 大小;
以聚合组内所有 host 的 cpu.used.percent 平均值为例:
GroupId = 900
Numerator = $(cpu.used.percent)
Denominator = $#
Endpoint = 84aba056-6e6c-4c53-bfed-4de0729421ef
Metric = cpu.used.percent
Tags = null
DsType = GAUGE
Step = 60
聚合组内所有 host 的 cpu.used.percent,计算平均值,而后生成 endpoint=84aba056-6e6c-4c53-bfed-4de0729421ef,metric=cpu.used.percent, dsType=GAUGE, Step=60 的指标。
3. aggregator 的聚合流程
- 所有要聚合的信息存储在 db 中,每条记录就是下面的一个 cluster 构造;
-
对每个聚合记录,启动 1 个 goroutine 定期进行聚合;
- 聚合时,先查问聚合组下所有的 host 列表
- 再查问每个 host,表达式内相干指标的的最新数据点,比方要聚合 cpu.used.percent,就查问所有 host 最新的 cpu.used.percent 的最新数据点;
- 利用最新的数据点,别离计算分子表达式和分母表达式的值,做商失去最终的指标值;
- 聚合的指标值存储在本地 Queue 中,有 1 个 goroutine 默默的 pop 进去,HTTP 发送给 agent;
聚合的入口代码:定期检查新 monitorItem 和删除老的 monitorItem
// modules/aggregator/cron/updater.go
func UpdateItems() {
for {updateItems()
d := time.Duration(g.Config().Database.Interval) * time.Second
time.Sleep(d)
}
}
func updateItems() {items, err := db.ReadClusterMonitorItems()
if err != nil {return}
deleteNoUseWorker(items)
createWorkerIfNeed(items)
}
对于新的 monitorItem,会给它启动一个 worker:
//modules/aggregator/cron/worker.go
func createWorkerIfNeed(m map[string]*g.Cluster) {
for key, item := range m {
// 若是新的 monitorItem
if _, ok := Workers[key]; !ok {
if item.Step <= 0 {log.Println("[W] invalid cluster(step <= 0):", item)
continue
}
worker := NewWorker(item)
Workers[key] = worker
worker.Start()}
}
}
新的 worker 启动了 1 个 goroutine,定期进行聚合:
// modules/aggregator/cron/worker.go
func (this Worker) Start() {go func() {
for {
select {
case <-this.Ticker.C:
WorkerRun(this.ClusterItem)
case <-this.Quit:
this.Ticker.Stop()
return
}
}
}()}
具体的聚合过程比较复杂,联合正文剖析一下:
- 首先别离解析分子表达式和分母表达式,失去所有相干的 metric 及其计算形式;
- 而后查看聚合组下所有的 hosts 列表;
- 而后查问 hosts 列所有相干 metric 的最近的指标点,存储在 valueMap;
-
对所有 hosts 别离计算分子和分母的数值,分子 / 分母即是最终的聚合值;
- 两头有一个非凡解决,比方 $# 是算平均值,1 是算 sum;
// modules/aggregator/cron/run.go
func WorkerRun(item *g.Cluster) {
.....
// 解析分子表达式
// 比方 numberatorStr=$(cpu.used.percent)
// 返回 [cpu.used.percent], [], ''
numeratorOperands, numeratorOperators, numeratorComputeMode := parse(numeratorStr, needComputeNumerator)
// 解析分母表达式
// 比方 $#
// 返回 [], [], ''
denominatorOperands, denominatorOperators, denominatorComputeMode := parse(denominatorStr, needComputeDenominator)
// 查问 GroupId 内的所有 hosts
hostnames, err := sdk.HostnamesByID(item.GroupId)
// 查问 hosts 所有相干指标的最近指标点
valueMap, err := queryCounterLast(numeratorOperands, denominatorOperands, hostnames, now-int64(item.Step*2), now)
var numerator, denominator float64
var validCount int
for _, hostname := range hostnames {
var numeratorVal, denominatorVal float64
var err error
// 计算分子的聚合值
if needComputeNumerator {numeratorVal, err = compute(numeratorOperands, numeratorOperators, numeratorComputeMode, hostname, valueMap)
....
}
// 计算分母的聚合值
if needComputeDenominator {denominatorVal, err = compute(denominatorOperands, denominatorOperators, denominatorComputeMode, hostname, valueMap)
.....
}
numerator += numeratorVal
denominator += denominatorVal
validCount += 1
}
// $# 是算平均值,1 是算 sum
if !needComputeDenominator {
if denominatorStr == "$#" {denominator = float64(validCount)
} else {denominator, err = strconv.ParseFloat(denominatorStr, 64)
}
}
// 最终指标值 = 分子 / 分母,放入本地 Queue
sender.Push(item.Endpoint, item.Metric, item.Tags, numerator/denominator, item.DsType, int64(item.Step))
}
sender.Push() 将聚合后果放入本地 Queue:
// common/sdk/sender/make.go
func Push(endpoint, metric, tags string, val interface{}, counterType string, step_and_ts ...int64) {md := MakeMetaData(endpoint, metric, tags, val, counterType, step_and_ts...)
MetaDataQueue.PushFront(md)
}
后端有 1 个 goroutine 默默的 pop Queue,而后 HTTP 发送给 agent:
// common/sdk/sender/sender.go
func startSender() {
for {L := MetaDataQueue.PopBack(LIMIT)
if len(L) == 0 {time.Sleep(time.Millisecond * 200)
continue
}
err := PostPush(L)
...
}
}