agent是指标采集模块,仅关注linux自身的监控指标,次要负责:

  • 定期进行指标采集,而后通过RPC上报给Transfer;
  • 向hbs发送heartbeat,同时从hbs获取要监听的process、port和要执行的plugin信息;
  • 定期执行plugin,将plugin的指标后果发送给Transfer;

整体架构:

1. 指标采集

代码入口:

func main() {    ......    cron.Collect()    ......}//modules/agent/cron/collector.gofunc Collect() {    .....    for _, v := range funcs.Mappers {        go collect(int64(v.Interval), v.Fs)    }}

其中funcs.Mappers是采集函数的汇合,agent为每一类采集启动了一个goroutine,有几种分类就有几个goroutine:

//modules/agent/funcs/funcs.govar Mappers []FuncsAndIntervalfunc BuildMappers() {    interval := g.Config().Transfer.Interval    Mappers = []FuncsAndInterval{        {            Fs: []func() []*model.MetricValue{                AgentMetrics,                CpuMetrics,                NetMetrics,                KernelMetrics,                LoadAvgMetrics,                ......            },            Interval: interval,        },        {            Fs: []func() []*model.MetricValue{                DeviceMetrics,            },            Interval: interval,        },        ......    }}

具体的采集过程,执行每个采集函数,将采集的指标会集起来,发送给transfer:

// modules/agent/cron/collector.gofunc collect(sec int64, fns []func() []*model.MetricValue) {    t := time.NewTicker(time.Second * time.Duration(sec))    defer t.Stop()    for {        <-t.C        hostname, err := g.Hostname()        if err != nil {            continue        }        mvs := []*model.MetricValue{}        ignoreMetrics := g.Config().IgnoreMetrics        for _, fn := range fns {            items := fn()                        for _, mv := range items {                mvs = append(mvs, mv)                ......            }        }        ......        g.SendToTransfer(mvs)    }}

2. 指标上报Transfer

agent与Transfer之间是TCP RPC通道,agent配置了多个transfer的地址,上报时随机选一个地址,只有上报胜利就退出,不再尝试其余的transfer地址:

func SendMetrics(metrics []*model.MetricValue, resp *model.TransferResponse) {    rand.Seed(time.Now().UnixNano())    for _, i := range rand.Perm(len(Config().Transfer.Addrs)) {        addr := Config().Transfer.Addrs[i]        c := getTransferClient(addr)        if c == nil {            c = initTransferClient(addr)        }        if updateMetrics(c, metrics, resp) {            break        }    }}

agent与transfer之间维持一个TCP长连贯,由SingleConnRpcClient来保护:

type SingleConnRpcClient struct {    sync.Mutex    rpcClient *rpc.Client    RpcServer string    Timeout   time.Duration}

SingleConnRpcClient包装了rpc.Client,调用rpc.Client.Call("Transfer.Update")实现最终的办法调用;

  • 首先确保有1个TCP rpc长连贯到Transfer;
  • 调用rpc.Client.Call()办法进行理论的rpc调用;
  • rpc调用放在goroutine内执行,里面应用select进行超时判断;
func (this *SingleConnRpcClient) Call(method string, args interface{}, reply interface{}) error {    this.Lock()    defer this.Unlock()    err := this.serverConn()    if err != nil {        return err    }    timeout := time.Duration(10 * time.Second)    done := make(chan error, 1)    go func() {        err := this.rpcClient.Call(method, args, reply)        done <- err    }()    select {    case <-time.After(timeout):        this.close()        return errors.New(this.RpcServer + " rpc call timeout")    case err := <-done:        if err != nil {            this.close()            return err        }    }    return nil}

3. 指标类型:GUAGE与COUNTER

GAUGE与COUNTER类型,与Prometheus的类型语义雷同:

  • GAUGE示意有实际意义的立刻数,比方内存用量;
  • COUNTER示意递增的计数,比方cpu应用工夫的计数;

agent用cpu在user模式下的应用计数差值 / 总模式的计数差值,失去cpu.user.percent(GAUGE类型),具体来看一下代码;

agent应用1个goroutine来定期采集/proc/stat下各种cpu的应用计数:

//modules/agent/cron/collector.gofunc InitDataHistory() {    for {        funcs.UpdateCpuStat()                time.Sleep(g.COLLECT_INTERVAL)    }}

因为要计算统计工夫距离内的差值,故保留了2份数据,上一次统计和本次统计的:

//modules/agent/funcs/cpustat.goconst (    historyCount int = 2)func UpdateCpuStat() error {    ps, err := nux.CurrentProcStat()    if err != nil {        return err    }    psLock.Lock()    defer psLock.Unlock()    for i := historyCount - 1; i > 0; i-- {        procStatHistory[i] = procStatHistory[i-1]    }    procStatHistory[0] = ps    return nil}

cpu指标数据的起源,是读取/proc/stats文件:

//github.com/toolkits/nux/cpustat.gofunc CurrentProcStat() (*ProcStat, error) {    f := Root() + "/proc/stat"    bs, err := ioutil.ReadFile(f)    if err != nil {        return nil, err    }    ps := &ProcStat{Cpus: make([]*CpuUsage, NumCpu())}    reader := bufio.NewReader(bytes.NewBuffer(bs))    for {        line, err := file.ReadLine(reader)        if err == io.EOF {            err = nil            break        } else if err != nil {            return ps, err        }        parseLine(line, ps)    }    return ps, nil}

最初看下cpu.user.percent指标如何计算的,其计算公式为:

(cpu.user2 - cpu.use1) / (cpu.total2 - cpu.total1)
//modules/agent/funcs/cpustat.gofunc CpuMetrics() []*model.MetricValue {    user := GaugeValue("cpu.user", CpuUser())    return []*model.MetricValue{user, ....}}func CpuUser() float64 {    psLock.RLock()    defer psLock.RUnlock()    dt := deltaTotal()    if dt == 0 {        return 0.0    }    invQuotient := 100.00 / float64(dt)    return float64(procStatHistory[0].Cpu.User-procStatHistory[1].Cpu.User) * invQuotient}func deltaTotal() uint64 {    if procStatHistory[1] == nil {        return 0    }    return procStatHistory[0].Cpu.Total - procStatHistory[1].Cpu.Total}

4. agent与hbs

agent定期向hbs发送heartbeat,上报时调用hbs的RPC办法(Agent.ReportStatus)实现的:

func reportAgentStatus(interval time.Duration) {    for {        hostname, err := g.Hostname()        req := model.AgentReportRequest{            Hostname:      hostname,            IP:            g.IP(),            AgentVersion:  g.VersionMsg(),            PluginVersion: g.GetCurrPluginVersion(),        }        var resp model.SimpleRpcResponse        err = g.HbsClient.Call("Agent.ReportStatus", req, &resp)                time.Sleep(interval)    }}

agent还会定期向hbs查问本人要执行的plugin名称及版本信息,通过调用hbs的RPC办法(Agent.MinePlugins)实现:

// modules/agent/cron/plugin.gofunc syncMinePlugins() {    duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second    for {        time.Sleep(duration)        hostname, err := g.Hostname()        req := model.AgentHeartbeatRequest{            Hostname: hostname,        }        var resp model.AgentPluginsResponse        err = g.HbsClient.Call("Agent.MinePlugins", req, &resp)        ......    }}

5. 执行plugin

plugin即采集的插件或脚本,通常是shell或py,该脚本执行后通常会输入特定格局的指标信息,agent读取该执行后果,发送到transfer;比方:

[root@host01:/path/to/plugins/plugin/sys/ntp]#./600_ntp.py[{"endpoint": "host01", "tags": "", "timestamp": 1431349763, "metric": "sys.ntp.offset", "value": 0.73699999999999999, "counterType": "GAUGE", "step": 600}]

上一步讲到,agent向hbs查问要执行的plugin list,跟本地执行的plugin list进行比对,若有新的plugin则执行:

//modules/agent/plugins/plugins.gofunc AddNewPlugins(newPlugins map[string]*Plugin) {    for fpath, newPlugin := range newPlugins {        if _, ok := Plugins[fpath]; ok && newPlugin.MTime == Plugins[fpath].MTime {            continue        }        Plugins[fpath] = newPlugin        sch := NewPluginScheduler(newPlugin)        PluginsWithScheduler[fpath] = sch        sch.Schedule()    }}

每个plugin是定期被执行的:

func (this *PluginScheduler) Schedule() {    go func() {        for {            select {            case <-this.Ticker.C:                PluginRun(this.Plugin)            case <-this.Quit:                this.Ticker.Stop()                return            }        }    }()}

plugin的执行,实际上是执行cmd命令,而后读取cmd命令的输入后果,解析后发送给transfer:

func PluginRun(plugin *Plugin) {    var cmd *exec.Cmd    if args == "" {        cmd = exec.Command(fpath)    } else {        arg_list := PluginArgsParse(args)        cmd = exec.Command(fpath, arg_list...)    }    //执行cmd命令    err, isTimeout := sys.CmdRunWithTimeout(cmd, time.Duration(timeout)*time.Millisecond)    // exec successfully    data := stdout.Bytes()        //发送到transfer    var metrics []*model.MetricValue    err = json.Unmarshal(data, &metrics)    g.SendToTransfer(metrics)}

plugin的执行有个危险,因为goroutine执行cmd会进入SYSCALL阻塞,也就是其对应的G和M一起被阻塞,若阻塞较多的话,会有较多的M被阻塞,M是对应零碎线程,较多的M被创立并阻塞,会导致系统的性能降落。

参考:
1.Open-falcon docs: https://book.open-falcon.org/...