hbs负责周期性的读取db中内容,缓存到本地cache,而后提供RPC接口以供agent和judge两个组件查问调用。

// modules/hbs/rpc/rpc.gofunc Start() {    server := rpc.NewServer()    server.Register(new(Agent))    server.Register(new(Hbs))    l, e := net.Listen("tcp", addr)    for {        conn, err := l.Accept()        ......        go server.ServeCodec(jsonrpc.NewServerCodec(conn))    }}

hbs对接agent:

  • 解决agent heartbeat申请;
  • 解决agent plugin的查问申请;
  • 解决agent 监控哪些过程、哪些端口的查问申请;

hbs对接judge:

  • 查问以后配置的所有告警策略;
  • 查问以后配置的所有告警表达式;

整体流程图:

1. hbs对接agent

agent查问执行哪些plugin:

// modules/hbs/rpc/agent.gofunc (t *Agent) MinePlugins(args model.AgentHeartbeatRequest, reply *model.AgentPluginsResponse) error {    if args.Hostname == "" {        return nil    }    reply.Plugins = cache.GetPlugins(args.Hostname)    reply.Timestamp = time.Now().Unix()    return nil}

agent查问监控哪些过程、哪些端口:

// modules/hbs/rpc/agent.gofunc (t *Agent) BuiltinMetrics(args *model.AgentHeartbeatRequest, reply *model.BuiltinMetricResponse) error {    if args.Hostname == "" {        return nil    }    metrics, err := cache.GetBuiltinMetrics(args.Hostname)    if err != nil {        return nil    }    checksum := ""    if len(metrics) > 0 {        checksum = DigestBuiltinMetrics(metrics)    }    if args.Checksum == checksum {        reply.Metrics = []*model.BuiltinMetric{}    } else {        reply.Metrics = metrics    }    reply.Checksum = checksum    reply.Timestamp = time.Now().Unix()    return nil}

能够看到,下面的rpc接口操作的都是cache,hbs会定期的从db中查问数据,而后缓存在本地cache,以供agent查问:

// modules/hbs/cache/cache.gofunc LoopInit() {    for {        //1min周期        time.Sleep(time.Minute)        GroupPlugins.Init()        ....    }}

从db读取数据,而后保留在本地:为了避免并发写data,这里加了锁

// moduels/hbs/cache/plugins.govar GroupPlugins = &SafeGroupPlugins{M: make(map[int][]string)}func (this *SafeGroupPlugins) Init() {    m, err := db.QueryPlugins()    if err != nil {        return    }    this.Lock()    defer this.Unlock()    this.M = m}

2. hbs对接judge

judge查问告警表达式:

// modules/hbs/rpc/hbs.gofunc (t *Hbs) GetExpressions(req model.NullRpcRequest, reply *model.ExpressionResponse) error {    reply.Expressions = cache.ExpressionCache.Get()    return nil}

judge查问告警策略:因为关联多个db table,这里进行了比较复杂的拼装

// modules/hbs/rpc/hbs.gofunc (t *Hbs) GetStrategies(req model.NullRpcRequest, reply *model.StrategiesResponse) error {    reply.HostStrategies = []*model.HostStrategy{}    hidTids := cache.HostTemplateIds.GetMap()    hosts := cache.MonitoredHosts.Get()    tpls := cache.TemplateCache.GetMap()    strategies := cache.Strategies.GetMap()    tpl2Strategies := Tpl2Strategies(strategies)    hostStrategies := make([]*model.HostStrategy, 0, sz)    for hostId, tplIds := range hidTids {        ss := CalcInheritStrategies(tpls, tplIds, tpl2Strategies)        hs := model.HostStrategy{            Hostname:   h.Name,            Strategies: ss,        }        hostStrategies = append(hostStrategies, &hs)    }    reply.HostStrategies = hostStrategies    return nil}

同样的,下面的rpc接口操作的都是cache,hbs会定期的从db中查问数据,而后缓存在本地cache,以供judge查问:

// modules/hbs/cache/cache.gofunc LoopInit() {    for {        time.Sleep(time.Minute)        ......        GroupTemplates.Init()        HostGroupsMap.Init()        HostMap.Init()        TemplateCache.Init()        Strategies.Init(TemplateCache.GetMap())        HostTemplateIds.Init()        ExpressionCache.Init()    }}

cache的数据保留在map中,而后读取db中的数据(定期),笼罩掉map

// modules/hbs/cache/templates.gotype SafeTemplateCache struct {    sync.RWMutex    M map[int]*model.Template}var TemplateCache = &SafeTemplateCache{M: make(map[int]*model.Template)}func (this *SafeTemplateCache) Init() {    ts, err := db.QueryTemplates()    if err != nil {        return    }    this.Lock()    defer this.Unlock()    this.M = ts}