关于监控工具:Openfalcon-judge源码分析

39次阅读

共计 4171 个字符,预计需要花费 11 分钟才能阅读完成。

judge 模块应用历史数据进行告警断定,其历史数据保留在内存中(仅保留最近的数据),若触发告警,则发送 event 至 redis,由 alarm 模块生产解决 redis 的事件。

因为 judge 模块将最近的历史数据保留在内存,因而它是有状态的;当节点宕机时,内存中的历史数据将失落。然而,因为监控数据会源源不断的上报,它会上报到其它 judge 节点,在其它节点从新进行告警断定。

judge 模块的职责:

  • 接管 transfer 转发的数据,并仅存储最近的几个点,以进行告警断定;配置 remain=11,也就是依据最新的 10 个点数据,断定是否触发了告警;
  • 从 hbs 同步告警策略,hbs 缓存了用户配置的策略列表,judge 应用 1 个 rpc 长连贯查问策略列表;
  • 判断数据是否达到阈值产生告警事件:依据历史数据和策略表达式,判断是否达到阈值;
  • 判断告警事件是否应该写入 redis:

    • 告警事件不肯定写入 redis,须要依据配置的最大报警次数来确定是否写入 redis;
    • 比方配置最大报警次数 =3,当第 4 次产生报警事件的时候,就不会写入 redis;

整体架构:

1. 接收数据:大 Map

judge 接管 transfer 转发的数据,将数据存入本地内存:

// modules/judge/rpc/receiver.go
func (this *Judge) Send(items []*model.JudgeItem, resp *model.SimpleRpcResponse) error {remain := g.Config().Remain
    now := time.Now().Unix()
    for _, item := range items {
        .......
        pk := item.PrimaryKey()
        store.HistoryBigMap[pk[0:2]].PushFrontAndMaintain(pk, item, remain, now)
    }
    return nil
}

本地的 store.HistoryBigMap 是个大 Map: 为了加重大 Map 的并发读写压力,对 itemKey 的 md5 的前 2 个字符进行了拆分,分成了 16*16=256 个小 Map,每个小 Map 内并发读写加锁,升高了锁粒度:

// modules/judge/store/history.go
var HistoryBigMap = make(map[string]*JudgeItemMap)

func InitHistoryBigMap() {arr := []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"}
    for i := 0; i < 16; i++ {
        for j := 0; j < 16; j++ {HistoryBigMap[arr[i]+arr[j]] = NewJudgeItemMap()}
    }
}

type JudgeItemMap struct {
    sync.RWMutex
    M map[string]*SafeLinkedList
}

小 map 中:map[string]*SafeLinkedList,key=itemKey,value= 最近的 11 个点,当有新数据时,会把老的数据从 list 中删掉;
magicNumber=11(最近的 11 个点)在配置文件中指定,是一个教训数据,一般来讲依据最近的 11 个点来断定告警触发已足够;
小 map 读写时应用 Lock 进行并发管制;

// modules/judge/store/linkedlist.go
func (this *SafeLinkedList) PushFrontAndMaintain(v *model.JudgeItem, maxCount int) bool {this.Lock()
    defer this.Unlock()
    
    sz := this.L.Len()
    // 新数据 push 进 list
    this.L.PushFront(v)

    sz++
    if sz <= maxCount {return true}
    // 超过了 11 个点,删掉老数据
    del := sz - maxCount
    for i := 0; i < del; i++ {this.L.Remove(this.L.Back())
    }

    return true
}

2. 告警断定

告警断定时,会解析如下的逻辑表达式:

  • all(#3) > 80、max(#3) > 80、min(#3) > 80、sum(#3) > 80、avg(#3) > 80;
  • 针对最新的几个点:

    • all(#3):示意最近的 3 个点都满足阈值;
    • max(#3): 标识最近 3 个点的最大值满足阈值;
    • min(#3): 标识最近 3 个点的最小值满足阈值;
    • sum(#3): 标识最近 3 个点的和满足阈值;
    • avg(#3): 标识最近 3 个点的平均值满足阈值;

    告警断定的入口:

// modules/judge/store/judge.go
func Judge(L *SafeLinkedList, firstItem *model.JudgeItem, now int64) {CheckStrategy(L, firstItem, now)
    CheckExpression(L, firstItem, now)
}

先断定该条数据是否有关联的告警策略:也可能关联多个告警策略

// modules/judge/store/judge.go
func CheckStrategy(L *SafeLinkedList, firstItem *model.JudgeItem, now int64) {key := fmt.Sprintf("%s/%s", firstItem.Endpoint, firstItem.Metric)
    strategyMap := g.StrategyMap.Get()
    strategies, exists := strategyMap[key]
    if !exists {return}
    for _, s := range strategies {
        ...
        judgeItemWithStrategy(L, s, firstItem, now)
    }
}    

断定时,先解析断定函数 fn;再看是否满足断定要求的点数,若点数不够则间接返回;最初依据 fn.Compute()的后果,决定是否将告警事件发送 redis:

// modules/judge/store/judge.go
func judgeItemWithStrategy(L *SafeLinkedList, strategy model.Strategy, firstItem *model.JudgeItem, now int64) {fn, err := ParseFuncFromString(strategy.Func, strategy.Operator, strategy.RightValue)    
    historyData, leftValue, isTriggered, isEnough := fn.Compute(L)
    // 以后的数据点太少,不足以做告警断定
    if !isEnough {return}
    // 触发阈值,产生告警事件
    event := &model.Event{Id:         fmt.Sprintf("s_%d_%s", strategy.Id, firstItem.PrimaryKey()),
        Strategy:   &strategy,
        Endpoint:   firstItem.Endpoint,
        LeftValue:  leftValue,
        EventTime:  firstItem.Timestamp,
        PushedTags: firstItem.Tags,
    }
    sendEventIfNeed(historyData, isTriggered, now, event, strategy.MaxStep)
}

告警事件是否发送 redis,跟上次的事件相干,跟最大告警次数也相干:

  • 若本次触发触发阈值:

    • 若上次未触发或上次是 OK,那么本次产生告警事件;
    • 若已超过最大报警次数,则返回;
    • 最初,将本次事件的告警次数 +1,发送告警事件;
  • 若本次未触发阈值:

    • 若上次产生了 Problem 事件,则将其复原,产生复原的告警事件;
// modules/judge/store/judge.go
func sendEventIfNeed(historyData []*model.HistoryData, isTriggered bool, now int64, event *model.Event, maxStep int) {lastEvent, exists := g.LastEvents.Get(event.Id)
    if isTriggered {
        event.Status = "PROBLEM"
        // 上次未触发或者上次是 Ok,那么本次产生告警事件
        if !exists || lastEvent.Status[0] == 'O' {
            event.CurrentStep = 1
            sendEvent(event)
            return
        }
        // 已超过最大报警次数
        if lastEvent.CurrentStep >= maxStep {return}
        event.CurrentStep = lastEvent.CurrentStep + 1
        sendEvent(event)
    } else {
        // 本次未触发,如果 lastEvent 是 Problem,则本次将其复原
        if exists && lastEvent.Status[0] == 'P' {
            event.Status = "OK"
            event.CurrentStep = 1
            sendEvent(event)
        }
    }
}

3. 告警事件写入 redis

告警事件通过 lpush 命令写入 redis 的队列,不同的告警等级写入不同的队列,其队列名称 =event:p{level},告警事件最终被 alarm 组件生产:

// modules/judge/store/judge.go
func sendEvent(event *model.Event) {
    // update last event
    g.LastEvents.Set(event.Id, event)

    bs, err := json.Marshal(event)
    // "event:p%v"
    redisKey := fmt.Sprintf(g.Config().Alarm.QueuePattern, event.Priority())
    rc := g.RedisConnPool.Get()
    defer rc.Close()
    rc.Do("LPUSH", redisKey, string(bs))
}    

正文完
 0