共计 3003 个字符,预计需要花费 8 分钟才能阅读完成。
hbs 负责周期性的读取 db 中内容,缓存到本地 cache,而后提供 RPC 接口以供 agent 和 judge 两个组件查问调用。
// modules/hbs/rpc/rpc.go
func Start() {server := rpc.NewServer()
server.Register(new(Agent))
server.Register(new(Hbs))
l, e := net.Listen("tcp", addr)
for {conn, err := l.Accept()
......
go server.ServeCodec(jsonrpc.NewServerCodec(conn))
}
}
hbs 对接 agent:
- 解决 agent heartbeat 申请;
- 解决 agent plugin 的查问申请;
- 解决 agent 监控哪些过程、哪些端口的查问申请;
hbs 对接 judge:
- 查问以后配置的所有告警策略;
- 查问以后配置的所有告警表达式;
整体流程图:
1. hbs 对接 agent
agent 查问执行哪些 plugin:
// modules/hbs/rpc/agent.go
func (t *Agent) MinePlugins(args model.AgentHeartbeatRequest, reply *model.AgentPluginsResponse) error {
if args.Hostname == "" {return nil}
reply.Plugins = cache.GetPlugins(args.Hostname)
reply.Timestamp = time.Now().Unix()
return nil
}
agent 查问监控哪些过程、哪些端口:
// modules/hbs/rpc/agent.go
func (t *Agent) BuiltinMetrics(args *model.AgentHeartbeatRequest, reply *model.BuiltinMetricResponse) error {
if args.Hostname == "" {return nil}
metrics, err := cache.GetBuiltinMetrics(args.Hostname)
if err != nil {return nil}
checksum := ""
if len(metrics) > 0 {checksum = DigestBuiltinMetrics(metrics)
}
if args.Checksum == checksum {reply.Metrics = []*model.BuiltinMetric{}} else {reply.Metrics = metrics}
reply.Checksum = checksum
reply.Timestamp = time.Now().Unix()
return nil
}
能够看到,下面的 rpc 接口操作的都是 cache,hbs 会定期的从 db 中查问数据,而后缓存在本地 cache,以供 agent 查问:
// modules/hbs/cache/cache.go
func LoopInit() {
for {
//1min 周期
time.Sleep(time.Minute)
GroupPlugins.Init()
....
}
}
从 db 读取数据,而后保留在本地:为了避免并发写 data,这里加了锁
// moduels/hbs/cache/plugins.go
var GroupPlugins = &SafeGroupPlugins{M: make(map[int][]string)}
func (this *SafeGroupPlugins) Init() {m, err := db.QueryPlugins()
if err != nil {return}
this.Lock()
defer this.Unlock()
this.M = m
}
2. hbs 对接 judge
judge 查问告警表达式:
// modules/hbs/rpc/hbs.go
func (t *Hbs) GetExpressions(req model.NullRpcRequest, reply *model.ExpressionResponse) error {reply.Expressions = cache.ExpressionCache.Get()
return nil
}
judge 查问告警策略:因为关联多个 db table,这里进行了比较复杂的拼装
// modules/hbs/rpc/hbs.go
func (t *Hbs) GetStrategies(req model.NullRpcRequest, reply *model.StrategiesResponse) error {reply.HostStrategies = []*model.HostStrategy{}
hidTids := cache.HostTemplateIds.GetMap()
hosts := cache.MonitoredHosts.Get()
tpls := cache.TemplateCache.GetMap()
strategies := cache.Strategies.GetMap()
tpl2Strategies := Tpl2Strategies(strategies)
hostStrategies := make([]*model.HostStrategy, 0, sz)
for hostId, tplIds := range hidTids {ss := CalcInheritStrategies(tpls, tplIds, tpl2Strategies)
hs := model.HostStrategy{
Hostname: h.Name,
Strategies: ss,
}
hostStrategies = append(hostStrategies, &hs)
}
reply.HostStrategies = hostStrategies
return nil
}
同样的,下面的 rpc 接口操作的都是 cache,hbs 会定期的从 db 中查问数据,而后缓存在本地 cache,以供 judge 查问:
// modules/hbs/cache/cache.go
func LoopInit() {
for {time.Sleep(time.Minute)
......
GroupTemplates.Init()
HostGroupsMap.Init()
HostMap.Init()
TemplateCache.Init()
Strategies.Init(TemplateCache.GetMap())
HostTemplateIds.Init()
ExpressionCache.Init()}
}
cache 的数据保留在 map 中,而后读取 db 中的数据 (定期),笼罩掉 map
// modules/hbs/cache/templates.go
type SafeTemplateCache struct {
sync.RWMutex
M map[int]*model.Template
}
var TemplateCache = &SafeTemplateCache{M: make(map[int]*model.Template)}
func (this *SafeTemplateCache) Init() {ts, err := db.QueryTemplates()
if err != nil {return}
this.Lock()
defer this.Unlock()
this.M = ts
}
正文完