乐趣区

关于监控工具:Openfalcon-agent源码解读

agent 是指标采集模块,仅关注 linux 自身的监控指标,次要负责:

  • 定期进行指标采集,而后通过 RPC 上报给 Transfer;
  • 向 hbs 发送 heartbeat,同时从 hbs 获取要监听的 process、port 和要执行的 plugin 信息;
  • 定期执行 plugin,将 plugin 的指标后果发送给 Transfer;

整体架构:

1. 指标采集

代码入口:

func main() {
    ......
    cron.Collect()
    ......
}

//modules/agent/cron/collector.go
func Collect() {
    .....
    for _, v := range funcs.Mappers {go collect(int64(v.Interval), v.Fs)
    }
}

其中 funcs.Mappers 是采集函数的汇合,agent 为每一类采集启动了一个 goroutine,有几种分类就有几个 goroutine:

//modules/agent/funcs/funcs.go
var Mappers []FuncsAndInterval
func BuildMappers() {interval := g.Config().Transfer.Interval
    Mappers = []FuncsAndInterval{
        {Fs: []func() []*model.MetricValue{
                AgentMetrics,
                CpuMetrics,
                NetMetrics,
                KernelMetrics,
                LoadAvgMetrics,
                ......
            },
            Interval: interval,
        },
        {Fs: []func() []*model.MetricValue{DeviceMetrics,},
            Interval: interval,
        },
        ......
    }
}

具体的采集过程,执行每个采集函数,将采集的指标会集起来,发送给 transfer:

// modules/agent/cron/collector.go
func collect(sec int64, fns []func() []*model.MetricValue) {t := time.NewTicker(time.Second * time.Duration(sec))
    defer t.Stop()
    for {
        <-t.C

        hostname, err := g.Hostname()
        if err != nil {continue}

        mvs := []*model.MetricValue{}
        ignoreMetrics := g.Config().IgnoreMetrics

        for _, fn := range fns {items := fn()            
            for _, mv := range items {mvs = append(mvs, mv)
                ......
            }
        }
        ......
        g.SendToTransfer(mvs)

    }
}

2. 指标上报 Transfer

agent 与 Transfer 之间是 TCP RPC 通道,agent 配置了多个 transfer 的地址,上报时随机选一个地址,只有上报胜利就退出,不再尝试其余的 transfer 地址:

func SendMetrics(metrics []*model.MetricValue, resp *model.TransferResponse) {rand.Seed(time.Now().UnixNano())
    for _, i := range rand.Perm(len(Config().Transfer.Addrs)) {addr := Config().Transfer.Addrs[i]

        c := getTransferClient(addr)
        if c == nil {c = initTransferClient(addr)
        }
        if updateMetrics(c, metrics, resp) {break}
    }
}

agent 与 transfer 之间维持一个 TCP 长连贯,由 SingleConnRpcClient 来保护:

type SingleConnRpcClient struct {
    sync.Mutex
    rpcClient *rpc.Client
    RpcServer string
    Timeout   time.Duration
}

SingleConnRpcClient 包装了 rpc.Client,调用 rpc.Client.Call(“Transfer.Update”)实现最终的办法调用;

  • 首先确保有 1 个 TCP rpc 长连贯到 Transfer;
  • 调用 rpc.Client.Call()办法进行理论的 rpc 调用;
  • rpc 调用放在 goroutine 内执行,里面应用 select 进行超时判断;
func (this *SingleConnRpcClient) Call(method string, args interface{}, reply interface{}) error {this.Lock()
    defer this.Unlock()

    err := this.serverConn()
    if err != nil {return err}

    timeout := time.Duration(10 * time.Second)
    done := make(chan error, 1)

    go func() {err := this.rpcClient.Call(method, args, reply)
        done <- err
    }()

    select {case <-time.After(timeout):
        this.close()
        return errors.New(this.RpcServer + "rpc call timeout")
    case err := <-done:
        if err != nil {this.close()
            return err
        }
    }

    return nil
}

3. 指标类型:GUAGE 与 COUNTER

GAUGE 与 COUNTER 类型,与 Prometheus 的类型语义雷同:

  • GAUGE 示意有实际意义的立刻数,比方内存用量;
  • COUNTER 示意递增的计数,比方 cpu 应用工夫的计数;

agent 用 cpu 在 user 模式下的应用计数差值 / 总模式的计数差值,失去 cpu.user.percent(GAUGE 类型),具体来看一下代码;

agent 应用 1 个 goroutine 来定期采集 /proc/stat 下各种 cpu 的应用计数:

//modules/agent/cron/collector.go
func InitDataHistory() {
    for {funcs.UpdateCpuStat()        
        time.Sleep(g.COLLECT_INTERVAL)
    }
}

因为要计算统计工夫距离内的差值,故保留了 2 份数据,上一次统计和本次统计的:

//modules/agent/funcs/cpustat.go
const (historyCount int = 2)
func UpdateCpuStat() error {ps, err := nux.CurrentProcStat()
    if err != nil {return err}

    psLock.Lock()
    defer psLock.Unlock()
    for i := historyCount - 1; i > 0; i-- {procStatHistory[i] = procStatHistory[i-1]
    }

    procStatHistory[0] = ps
    return nil
}

cpu 指标数据的起源,是读取 /proc/stats 文件:

//github.com/toolkits/nux/cpustat.go
func CurrentProcStat() (*ProcStat, error) {f := Root() + "/proc/stat"
    bs, err := ioutil.ReadFile(f)
    if err != nil {return nil, err}

    ps := &ProcStat{Cpus: make([]*CpuUsage, NumCpu())}
    reader := bufio.NewReader(bytes.NewBuffer(bs))

    for {line, err := file.ReadLine(reader)
        if err == io.EOF {
            err = nil
            break
        } else if err != nil {return ps, err}
        parseLine(line, ps)
    }

    return ps, nil
}

最初看下 cpu.user.percent 指标如何计算的,其计算公式为:

(cpu.user2 - cpu.use1) / (cpu.total2 - cpu.total1)
//modules/agent/funcs/cpustat.go
func CpuMetrics() []*model.MetricValue {user := GaugeValue("cpu.user", CpuUser())
    return []*model.MetricValue{user, ....}
}
func CpuUser() float64 {psLock.RLock()
    defer psLock.RUnlock()
    dt := deltaTotal()
    if dt == 0 {return 0.0}
    invQuotient := 100.00 / float64(dt)
    return float64(procStatHistory[0].Cpu.User-procStatHistory[1].Cpu.User) * invQuotient
}
func deltaTotal() uint64 {if procStatHistory[1] == nil {return 0}
    return procStatHistory[0].Cpu.Total - procStatHistory[1].Cpu.Total
}

4. agent 与 hbs

agent 定期向 hbs 发送 heartbeat,上报时调用 hbs 的 RPC 办法 (Agent.ReportStatus) 实现的:

func reportAgentStatus(interval time.Duration) {
    for {hostname, err := g.Hostname()
        req := model.AgentReportRequest{
            Hostname:      hostname,
            IP:            g.IP(),
            AgentVersion:  g.VersionMsg(),
            PluginVersion: g.GetCurrPluginVersion(),}

        var resp model.SimpleRpcResponse
        err = g.HbsClient.Call("Agent.ReportStatus", req, &resp)
        
        time.Sleep(interval)
    }
}

agent 还会定期向 hbs 查问本人要执行的 plugin 名称及版本信息,通过调用 hbs 的 RPC 办法 (Agent.MinePlugins) 实现:

// modules/agent/cron/plugin.go
func syncMinePlugins() {duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second
    for {time.Sleep(duration)
        hostname, err := g.Hostname()
        req := model.AgentHeartbeatRequest{Hostname: hostname,}

        var resp model.AgentPluginsResponse
        err = g.HbsClient.Call("Agent.MinePlugins", req, &resp)
        ......
    }
}

5. 执行 plugin

plugin 即采集的插件或脚本,通常是 shell 或 py,该脚本执行后通常会输入特定格局的指标信息,agent 读取该执行后果,发送到 transfer;比方:

[root@host01:/path/to/plugins/plugin/sys/ntp]#./600_ntp.py
[{"endpoint": "host01", "tags": "","timestamp": 1431349763,"metric":"sys.ntp.offset","value": 0.73699999999999999,"counterType":"GAUGE","step": 600}]

上一步讲到,agent 向 hbs 查问要执行的 plugin list,跟本地执行的 plugin list 进行比对,若有新的 plugin 则执行:

//modules/agent/plugins/plugins.go
func AddNewPlugins(newPlugins map[string]*Plugin) {
    for fpath, newPlugin := range newPlugins {if _, ok := Plugins[fpath]; ok && newPlugin.MTime == Plugins[fpath].MTime {continue}

        Plugins[fpath] = newPlugin
        sch := NewPluginScheduler(newPlugin)
        PluginsWithScheduler[fpath] = sch
        sch.Schedule()}
}

每个 plugin 是定期被执行的:

func (this *PluginScheduler) Schedule() {go func() {
        for {
            select {
            case <-this.Ticker.C:
                PluginRun(this.Plugin)
            case <-this.Quit:
                this.Ticker.Stop()
                return
            }
        }
    }()}

plugin 的执行,实际上是执行 cmd 命令,而后读取 cmd 命令的输入后果,解析后发送给 transfer:

func PluginRun(plugin *Plugin) {
    var cmd *exec.Cmd
    if args == "" {cmd = exec.Command(fpath)
    } else {arg_list := PluginArgsParse(args)
        cmd = exec.Command(fpath, arg_list...)
    }
    // 执行 cmd 命令
    err, isTimeout := sys.CmdRunWithTimeout(cmd, time.Duration(timeout)*time.Millisecond)
    // exec successfully
    data := stdout.Bytes()
    
    // 发送到 transfer
    var metrics []*model.MetricValue
    err = json.Unmarshal(data, &metrics)
    g.SendToTransfer(metrics)
}

plugin 的执行有个危险,因为 goroutine 执行 cmd 会进入 SYSCALL 阻塞,也就是其对应的 G 和 M 一起被阻塞,若阻塞较多的话,会有较多的 M 被阻塞,M 是对应零碎线程,较多的 M 被创立并阻塞,会导致系统的性能降落。

参考:
1.Open-falcon docs: https://book.open-falcon.org/…

退出移动版