judge模块应用历史数据进行告警断定,其历史数据保留在内存中(仅保留最近的数据),若触发告警,则发送event至redis,由alarm模块生产解决redis的事件。
因为judge模块将最近的历史数据保留在内存,因而它是有状态的;当节点宕机时,内存中的历史数据将失落。然而,因为监控数据会源源不断的上报,它会上报到其它judge节点,在其它节点从新进行告警断定。
judge模块的职责:
- 接管transfer转发的数据,并仅存储最近的几个点,以进行告警断定;配置remain=11,也就是依据最新的10个点数据,断定是否触发了告警;
- 从hbs同步告警策略,hbs缓存了用户配置的策略列表,judge应用1个rpc长连贯查问策略列表;
- 判断数据是否达到阈值产生告警事件:依据历史数据和策略表达式,判断是否达到阈值;
判断告警事件是否应该写入redis:
- 告警事件不肯定写入redis,须要依据配置的最大报警次数来确定是否写入redis;
- 比方配置最大报警次数=3,当第4次产生报警事件的时候,就不会写入redis;
整体架构:
1. 接收数据:大Map
judge接管transfer转发的数据,将数据存入本地内存:
// modules/judge/rpc/receiver.gofunc (this *Judge) Send(items []*model.JudgeItem, resp *model.SimpleRpcResponse) error { remain := g.Config().Remain now := time.Now().Unix() for _, item := range items { ....... pk := item.PrimaryKey() store.HistoryBigMap[pk[0:2]].PushFrontAndMaintain(pk, item, remain, now) } return nil}
本地的store.HistoryBigMap是个大Map: 为了加重大Map的并发读写压力,对itemKey的md5的前2个字符进行了拆分,分成了16*16=256个小Map,每个小Map内并发读写加锁,升高了锁粒度:
// modules/judge/store/history.govar HistoryBigMap = make(map[string]*JudgeItemMap)func InitHistoryBigMap() { arr := []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"} for i := 0; i < 16; i++ { for j := 0; j < 16; j++ { HistoryBigMap[arr[i]+arr[j]] = NewJudgeItemMap() } }}type JudgeItemMap struct { sync.RWMutex M map[string]*SafeLinkedList}
小map中:map[string]*SafeLinkedList,key=itemKey,value=最近的11个点,当有新数据时,会把老的数据从list中删掉;
magicNumber=11(最近的11个点)在配置文件中指定,是一个教训数据,一般来讲依据最近的11个点来断定告警触发已足够;
小map读写时应用Lock进行并发管制;
// modules/judge/store/linkedlist.gofunc (this *SafeLinkedList) PushFrontAndMaintain(v *model.JudgeItem, maxCount int) bool { this.Lock() defer this.Unlock() sz := this.L.Len() // 新数据push进list this.L.PushFront(v) sz++ if sz <= maxCount { return true } // 超过了11个点,删掉老数据 del := sz - maxCount for i := 0; i < del; i++ { this.L.Remove(this.L.Back()) } return true}
2. 告警断定
告警断定时,会解析如下的逻辑表达式:
- all(#3) > 80、max(#3) > 80、min(#3) > 80、sum(#3) > 80、avg(#3) > 80;
针对最新的几个点:
- all(#3):示意最近的3个点都满足阈值;
- max(#3): 标识最近3个点的最大值满足阈值;
- min(#3): 标识最近3个点的最小值满足阈值;
- sum(#3): 标识最近3个点的和满足阈值;
- avg(#3): 标识最近3个点的平均值满足阈值;
告警断定的入口:
// modules/judge/store/judge.gofunc Judge(L *SafeLinkedList, firstItem *model.JudgeItem, now int64) { CheckStrategy(L, firstItem, now) CheckExpression(L, firstItem, now)}
先断定该条数据是否有关联的告警策略:也可能关联多个告警策略
// modules/judge/store/judge.gofunc CheckStrategy(L *SafeLinkedList, firstItem *model.JudgeItem, now int64) { key := fmt.Sprintf("%s/%s", firstItem.Endpoint, firstItem.Metric) strategyMap := g.StrategyMap.Get() strategies, exists := strategyMap[key] if !exists { return } for _, s := range strategies { ... judgeItemWithStrategy(L, s, firstItem, now) }}
断定时,先解析断定函数fn;再看是否满足断定要求的点数,若点数不够则间接返回;最初依据fn.Compute()的后果,决定是否将告警事件发送redis:
// modules/judge/store/judge.gofunc judgeItemWithStrategy(L *SafeLinkedList, strategy model.Strategy, firstItem *model.JudgeItem, now int64) { fn, err := ParseFuncFromString(strategy.Func, strategy.Operator, strategy.RightValue) historyData, leftValue, isTriggered, isEnough := fn.Compute(L) // 以后的数据点太少,不足以做告警断定 if !isEnough { return } // 触发阈值,产生告警事件 event := &model.Event{ Id: fmt.Sprintf("s_%d_%s", strategy.Id, firstItem.PrimaryKey()), Strategy: &strategy, Endpoint: firstItem.Endpoint, LeftValue: leftValue, EventTime: firstItem.Timestamp, PushedTags: firstItem.Tags, } sendEventIfNeed(historyData, isTriggered, now, event, strategy.MaxStep)}
告警事件是否发送redis,跟上次的事件相干,跟最大告警次数也相干:
若本次触发触发阈值:
- 若上次未触发或上次是OK,那么本次产生告警事件;
- 若已超过最大报警次数,则返回;
- 最初,将本次事件的告警次数+1,发送告警事件;
若本次未触发阈值:
- 若上次产生了Problem事件,则将其复原,产生复原的告警事件;
// modules/judge/store/judge.gofunc sendEventIfNeed(historyData []*model.HistoryData, isTriggered bool, now int64, event *model.Event, maxStep int) { lastEvent, exists := g.LastEvents.Get(event.Id) if isTriggered { event.Status = "PROBLEM" // 上次未触发或者上次是Ok,那么本次产生告警事件 if !exists || lastEvent.Status[0] == 'O' { event.CurrentStep = 1 sendEvent(event) return } // 已超过最大报警次数 if lastEvent.CurrentStep >= maxStep { return } event.CurrentStep = lastEvent.CurrentStep + 1 sendEvent(event) } else { // 本次未触发,如果lastEvent是Problem,则本次将其复原 if exists && lastEvent.Status[0] == 'P' { event.Status = "OK" event.CurrentStep = 1 sendEvent(event) } }}
3. 告警事件写入redis
告警事件通过lpush命令写入redis的队列,不同的告警等级写入不同的队列,其队列名称=event:p{level},告警事件最终被alarm组件生产:
// modules/judge/store/judge.gofunc sendEvent(event *model.Event) { // update last event g.LastEvents.Set(event.Id, event) bs, err := json.Marshal(event) // "event:p%v" redisKey := fmt.Sprintf(g.Config().Alarm.QueuePattern, event.Priority()) rc := g.RedisConnPool.Get() defer rc.Close() rc.Do("LPUSH", redisKey, string(bs))}