agent 是指标采集模块,仅关注 linux 自身的监控指标,次要负责:
- 定期进行指标采集,而后通过 RPC 上报给 Transfer;
- 向 hbs 发送 heartbeat,同时从 hbs 获取要监听的 process、port 和要执行的 plugin 信息;
- 定期执行 plugin,将 plugin 的指标后果发送给 Transfer;
整体架构:
1. 指标采集
代码入口:
func main() {
......
cron.Collect()
......
}
//modules/agent/cron/collector.go
func Collect() {
.....
for _, v := range funcs.Mappers {go collect(int64(v.Interval), v.Fs)
}
}
其中 funcs.Mappers 是采集函数的汇合,agent 为每一类采集启动了一个 goroutine,有几种分类就有几个 goroutine:
//modules/agent/funcs/funcs.go
var Mappers []FuncsAndInterval
func BuildMappers() {interval := g.Config().Transfer.Interval
Mappers = []FuncsAndInterval{
{Fs: []func() []*model.MetricValue{
AgentMetrics,
CpuMetrics,
NetMetrics,
KernelMetrics,
LoadAvgMetrics,
......
},
Interval: interval,
},
{Fs: []func() []*model.MetricValue{DeviceMetrics,},
Interval: interval,
},
......
}
}
具体的采集过程,执行每个采集函数,将采集的指标会集起来,发送给 transfer:
// modules/agent/cron/collector.go
func collect(sec int64, fns []func() []*model.MetricValue) {t := time.NewTicker(time.Second * time.Duration(sec))
defer t.Stop()
for {
<-t.C
hostname, err := g.Hostname()
if err != nil {continue}
mvs := []*model.MetricValue{}
ignoreMetrics := g.Config().IgnoreMetrics
for _, fn := range fns {items := fn()
for _, mv := range items {mvs = append(mvs, mv)
......
}
}
......
g.SendToTransfer(mvs)
}
}
2. 指标上报 Transfer
agent 与 Transfer 之间是 TCP RPC 通道,agent 配置了多个 transfer 的地址,上报时随机选一个地址,只有上报胜利就退出,不再尝试其余的 transfer 地址:
func SendMetrics(metrics []*model.MetricValue, resp *model.TransferResponse) {rand.Seed(time.Now().UnixNano())
for _, i := range rand.Perm(len(Config().Transfer.Addrs)) {addr := Config().Transfer.Addrs[i]
c := getTransferClient(addr)
if c == nil {c = initTransferClient(addr)
}
if updateMetrics(c, metrics, resp) {break}
}
}
agent 与 transfer 之间维持一个 TCP 长连贯,由 SingleConnRpcClient 来保护:
type SingleConnRpcClient struct {
sync.Mutex
rpcClient *rpc.Client
RpcServer string
Timeout time.Duration
}
SingleConnRpcClient 包装了 rpc.Client,调用 rpc.Client.Call(“Transfer.Update”)实现最终的办法调用;
- 首先确保有 1 个 TCP rpc 长连贯到 Transfer;
- 调用 rpc.Client.Call()办法进行理论的 rpc 调用;
- rpc 调用放在 goroutine 内执行,里面应用 select 进行超时判断;
func (this *SingleConnRpcClient) Call(method string, args interface{}, reply interface{}) error {this.Lock()
defer this.Unlock()
err := this.serverConn()
if err != nil {return err}
timeout := time.Duration(10 * time.Second)
done := make(chan error, 1)
go func() {err := this.rpcClient.Call(method, args, reply)
done <- err
}()
select {case <-time.After(timeout):
this.close()
return errors.New(this.RpcServer + "rpc call timeout")
case err := <-done:
if err != nil {this.close()
return err
}
}
return nil
}
3. 指标类型:GUAGE 与 COUNTER
GAUGE 与 COUNTER 类型,与 Prometheus 的类型语义雷同:
- GAUGE 示意有实际意义的立刻数,比方内存用量;
- COUNTER 示意递增的计数,比方 cpu 应用工夫的计数;
agent 用 cpu 在 user 模式下的应用计数差值 / 总模式的计数差值,失去 cpu.user.percent(GAUGE 类型),具体来看一下代码;
agent 应用 1 个 goroutine 来定期采集 /proc/stat 下各种 cpu 的应用计数:
//modules/agent/cron/collector.go
func InitDataHistory() {
for {funcs.UpdateCpuStat()
time.Sleep(g.COLLECT_INTERVAL)
}
}
因为要计算统计工夫距离内的差值,故保留了 2 份数据,上一次统计和本次统计的:
//modules/agent/funcs/cpustat.go
const (historyCount int = 2)
func UpdateCpuStat() error {ps, err := nux.CurrentProcStat()
if err != nil {return err}
psLock.Lock()
defer psLock.Unlock()
for i := historyCount - 1; i > 0; i-- {procStatHistory[i] = procStatHistory[i-1]
}
procStatHistory[0] = ps
return nil
}
cpu 指标数据的起源,是读取 /proc/stats 文件:
//github.com/toolkits/nux/cpustat.go
func CurrentProcStat() (*ProcStat, error) {f := Root() + "/proc/stat"
bs, err := ioutil.ReadFile(f)
if err != nil {return nil, err}
ps := &ProcStat{Cpus: make([]*CpuUsage, NumCpu())}
reader := bufio.NewReader(bytes.NewBuffer(bs))
for {line, err := file.ReadLine(reader)
if err == io.EOF {
err = nil
break
} else if err != nil {return ps, err}
parseLine(line, ps)
}
return ps, nil
}
最初看下 cpu.user.percent 指标如何计算的,其计算公式为:
(cpu.user2 - cpu.use1) / (cpu.total2 - cpu.total1)
//modules/agent/funcs/cpustat.go
func CpuMetrics() []*model.MetricValue {user := GaugeValue("cpu.user", CpuUser())
return []*model.MetricValue{user, ....}
}
func CpuUser() float64 {psLock.RLock()
defer psLock.RUnlock()
dt := deltaTotal()
if dt == 0 {return 0.0}
invQuotient := 100.00 / float64(dt)
return float64(procStatHistory[0].Cpu.User-procStatHistory[1].Cpu.User) * invQuotient
}
func deltaTotal() uint64 {if procStatHistory[1] == nil {return 0}
return procStatHistory[0].Cpu.Total - procStatHistory[1].Cpu.Total
}
4. agent 与 hbs
agent 定期向 hbs 发送 heartbeat,上报时调用 hbs 的 RPC 办法 (Agent.ReportStatus) 实现的:
func reportAgentStatus(interval time.Duration) {
for {hostname, err := g.Hostname()
req := model.AgentReportRequest{
Hostname: hostname,
IP: g.IP(),
AgentVersion: g.VersionMsg(),
PluginVersion: g.GetCurrPluginVersion(),}
var resp model.SimpleRpcResponse
err = g.HbsClient.Call("Agent.ReportStatus", req, &resp)
time.Sleep(interval)
}
}
agent 还会定期向 hbs 查问本人要执行的 plugin 名称及版本信息,通过调用 hbs 的 RPC 办法 (Agent.MinePlugins) 实现:
// modules/agent/cron/plugin.go
func syncMinePlugins() {duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second
for {time.Sleep(duration)
hostname, err := g.Hostname()
req := model.AgentHeartbeatRequest{Hostname: hostname,}
var resp model.AgentPluginsResponse
err = g.HbsClient.Call("Agent.MinePlugins", req, &resp)
......
}
}
5. 执行 plugin
plugin 即采集的插件或脚本,通常是 shell 或 py,该脚本执行后通常会输入特定格局的指标信息,agent 读取该执行后果,发送到 transfer;比方:
[root@host01:/path/to/plugins/plugin/sys/ntp]#./600_ntp.py
[{"endpoint": "host01", "tags": "","timestamp": 1431349763,"metric":"sys.ntp.offset","value": 0.73699999999999999,"counterType":"GAUGE","step": 600}]
上一步讲到,agent 向 hbs 查问要执行的 plugin list,跟本地执行的 plugin list 进行比对,若有新的 plugin 则执行:
//modules/agent/plugins/plugins.go
func AddNewPlugins(newPlugins map[string]*Plugin) {
for fpath, newPlugin := range newPlugins {if _, ok := Plugins[fpath]; ok && newPlugin.MTime == Plugins[fpath].MTime {continue}
Plugins[fpath] = newPlugin
sch := NewPluginScheduler(newPlugin)
PluginsWithScheduler[fpath] = sch
sch.Schedule()}
}
每个 plugin 是定期被执行的:
func (this *PluginScheduler) Schedule() {go func() {
for {
select {
case <-this.Ticker.C:
PluginRun(this.Plugin)
case <-this.Quit:
this.Ticker.Stop()
return
}
}
}()}
plugin 的执行,实际上是执行 cmd 命令,而后读取 cmd 命令的输入后果,解析后发送给 transfer:
func PluginRun(plugin *Plugin) {
var cmd *exec.Cmd
if args == "" {cmd = exec.Command(fpath)
} else {arg_list := PluginArgsParse(args)
cmd = exec.Command(fpath, arg_list...)
}
// 执行 cmd 命令
err, isTimeout := sys.CmdRunWithTimeout(cmd, time.Duration(timeout)*time.Millisecond)
// exec successfully
data := stdout.Bytes()
// 发送到 transfer
var metrics []*model.MetricValue
err = json.Unmarshal(data, &metrics)
g.SendToTransfer(metrics)
}
plugin 的执行有个危险,因为 goroutine 执行 cmd 会进入 SYSCALL 阻塞,也就是其对应的 G 和 M 一起被阻塞,若阻塞较多的话,会有较多的 M 被阻塞,M 是对应零碎线程,较多的 M 被创立并阻塞,会导致系统的性能降落。
参考:
1.Open-falcon docs: https://book.open-falcon.org/…