openfalconalarm-代码分析

34次阅读

共计 5315 个字符,预计需要花费 14 分钟才能阅读完成。

总结:alarm 消费由 judge 产生的 redis 报警事件, 根据优先级高低是否做合并, 发往不同的报警通道

高优先级报警比如 p0: judge 产生报警事件 –> 写入 redis event:p0 队列 –>alarm 消费 –> 获取发送对象并处理调用回调函数 (如果有)–> 根据策略不同生成不同通道的报警(im,sms,mail,phone) 等 –> 写入 redis 各个通道的发送队列 /im /sms /mail /phone –> 发送报警的 worker 取出报警发送

低优先级报警比如 p4: judge 产生报警事件 –> 写入 redis event:p4 队列 –>alarm 消费 –> 获取发送对象并处理调用回调函数 (如果有)–> 根据策略不同生成不同通道的合并(im,sms,mail,phone) 事件写入合并队列 (来自于配置文件中的 /queue/user/im) 等 –> 由不通通道的合并函数处理, 合并报警生成 dashboard 链接调用 dashboard 的 api 写入 falcon_portal.alert_link 表中供用户日后查看原始信息 –> 写入 redis 各个通道的发送队列 /im /sms /mail /phone –> 发送报警的 worker 取出报警发送

下面具体看下代码

1.main 函数中的核心就是这几个 goroutine 了

        // 消费报警事件
     go cron.ReadHighEvent()
    go cron.ReadLowEvent()
    // 合并低优先报警
    go cron.CombineSms()
    go cron.CombineMail()
    go cron.CombineIM()
    // 发送真实报警
    go cron.ConsumeIM()
    go cron.ConsumeSms()
    go cron.ConsumeMail()
    go cron.ConsumePhone()
    go cron.CleanExpiredEvent()

2.ReadHighEvent 和 ReadLowEvent 的区别就是 consume 时分优先级

func ReadHighEvent() {queues := g.Config().Redis.HighQueues
    if len(queues) == 0 {return}

    for {
        /*brpop 多个队列的 1 条返回 event
        1. 传入的是包含多个高优先级的队列的列表比如[p0,p1,p2]
        那么总是先 pop 完 event:p0 的队列, 然后才是 p1 ,p2(这里我进行过实测)
        2. 单纯的 popevent 速度是很快的, 但是每次循环里都有下面的 consume, 如果
        consume 速度慢的话会直接影响整体的 pop 速度, 我观察过再没加 goroutine 之前
        pop 速度大概 5 条 /s , 如果报警过多会有堆积现象, 之前看到会有 4 个小时左右的延迟
        */
        event, err := popEvent(queues)
        if err != nil {time.Sleep(time.Second)
            continue
        }
        // 这里的 consume 其实和 popevent 已经没关系了, 所以异步执行, 但是可能会产生过多的 goroutine
        go consume(event, true)
    }
}

3. 消费报警事件函数 consume

func consume(event *cmodel.Event, isHigh bool) {actionId := event.ActionId()
    if actionId <= 0 {return}
        /* 这里通过 event 中的 actionid 拿到 action
        就是拿到这个 报警组的名字 是否有回调等信息
        */
    action := api.GetAction(actionId)
    if action == nil {return}
        // 有回调的话处理下 http get 调用相应的回调函数, 会把报警的信息作为参数带上
    if action.Callback == 1 {HandleCallback(event, action)
    }

    if isHigh {consumeHighEvents(event, action)
    } else {consumeLowEvents(event, action)
    }
}

4. 下面分别看下高低优先级的 consume 函数

// 高优先级的不做报警合并
func consumeHighEvents(event *cmodel.Event, action *api.Action) {
    // 如果报警没有接收组, 那么直接返回了
    if action.Uic == "" {return}

    phones, mails, ims := api.ParseTeams(action.Uic)
        log.Infof("api.ParseTeams--phones, mails, ims,action.uic",phones, mails, ims,action.Uic)
      // 生成报警内容, 这里可以为不同通道的报警做定制
    smsContent := GenerateSmsContent(event)
    mailContent := GenerateMailContent(event)
    //imContent := GenerateIMContent(event)
        phoneContent := GeneratePhoneContent(event)


    /* 这里根据报警的级别可以做通道的定制
    如 <=P2 才发送短信 =p9 电话报警等等
    下面的 redi.wirtesms 等方法就是将报警内容 lpush 到不通通道的发送队列中
    */
    if event.Priority() < 3 {redi.WriteSms(phones, smsContent)
    }
        //p9 电话报警
        if event.Priority() ==9 {redi.WriteSms(phones, smsContent)
        redi.WritePhone(phones, phoneContent)
    }
    redi.WriteIM(mails, mailContent)
    redi.WriteMail(mails, smsContent, mailContent)

}
// 低优先级的做报警合并
func consumeLowEvents(event *cmodel.Event, action *api.Action) {
    if action.Uic == "" {return}

    // <=P2 才发送短信
        //parseuser 函数将 event 转换为合并消息 写入中间队列
    if event.Priority() < 3 {ParseUserSms(event, action)
    }
        
    ParseUserIm(event, action)
    ParseUserMail(event, action)
}

下面以 ParseUserMail 为例

func ParseUserMail(event *cmodel.Event, action *api.Action) {
    //api 根据报警组获取组里人
    userMap := api.GetUsers(action.Uic)

    metric := event.Metric()
    subject := GenerateSmsContent(event)
    content := GenerateMailContent(event)
    status := event.Status
    priority := event.Priority()

    queue := g.Config().Redis.UserMailQueue

    rc := g.RedisConnPool.Get()
    defer rc.Close()
       // 遍历 usermap 生成报警中间态消息并 LPUSH 写入中间队列
    for _, user := range userMap {
        dto := MailDto{
            Priority: priority,
            Metric:   metric,
            Subject:  subject,
            Content:  content,
            Email:    user.Email,
            Status:   status,
        }
        bs, err := json.Marshal(dto)
        if err != nil {log.Error("json marshal MailDto fail:", err)
            continue
        }

        _, err = rc.Do("LPUSH", queue, string(bs))
        if err != nil {log.Error("LPUSH redis", queue, "fail:", err, "dto:", string(bs))
        }
    }
}

此时低优先级的报警存在于配置文件中的中间队列名称的 redis 队列中 /queue/user/mail

5. 报警合并函数

func CombineSms() {
    for {
        // 每分钟读取处理一次
        time.Sleep(time.Minute)
        combineSms()}
}


func combineIM() {
    // 从中间队列中 pop 出要合并的报警
    dtos := popAllImDto()
    count := len(dtos)
    if count == 0 {return}

    dtoMap := make(map[string][]*ImDto)
    for i := 0; i < count; i++ {
        // 根据报警的 metirc priority status 和接收人作为 key 合并报警为列表
        key := fmt.Sprintf("%d%s%s%s", dtos[i].Priority, dtos[i].Status, dtos[i].IM, dtos[i].Metric)
        if _, ok := dtoMap[key]; ok {dtoMap[key] = append(dtoMap[key], dtos[i])
        } else {dtoMap[key] = []*ImDto{dtos[i]}
        }
    }

    for _, arr := range dtoMap {size := len(arr)
        // 如果合并后的报警只有一条直接写入 redis 发送队列
        if size == 1 {redi.WriteIM([]string{arr[0].IM}, arr[0].Content)
            continue
        }

        // 把多个 im 内容写入数据库,只给用户提供一个链接
        contentArr := make([]string, size)
        for i := 0; i < size; i++ {contentArr[i] = arr[i].Content
        }
        content := strings.Join(contentArr, ",,")

        first := arr[0].Content
        t := strings.Split(first, "][")
        eg := ""
        if len(t) >= 3 {eg = t[2]
        }
                // 调用 dashboard 的 api 将合并后的信息写入 falcon_portal.alert_link 表
        path, err := api.LinkToSMS(content)
        chat := ""if err != nil || path =="" {chat = fmt.Sprintf("[P%d][%s] %d %s.  e.g. %s. detail in email", arr[0].Priority, arr[0].Status, size, arr[0].Metric, eg)
            log.Error("create short link fail", err)
        } else {
            // 生成一个汇总信息 展示:metric status link 的 url
            chat = fmt.Sprintf("[P%d][%s] %d %s e.g. %s %s/portal/links/%s",
                arr[0].Priority, arr[0].Status, size, arr[0].Metric, eg, g.Config().Api.Dashboard, path)
            log.Debugf("combined im is:%s", chat)
        }
        if  arr[0].IM==""{email:= fmt.Sprintf("%s@bytedance.com",arr[0].Name)
            redi.WriteIM([]string{email}, chat)
        }else{redi.WriteIM([]string{arr[0].IM}, chat)
        }

    }
}

6. 最后看下报警发送函数

func ConsumeIM() {
    for {
        //rpop 出所有的报警信息到一个 slice 中
        L := redi.PopAllIM()
        if len(L) == 0 {time.Sleep(time.Millisecond * 200)
            continue
        }
        SendIMList(L)
    }
}

func SendIMList(L []*model.IM) {
    for _, im := range L {
        /*
        1.IMWorkerChan 是带缓冲的 chan,chan 的长度意思就是同时可以多少个 send 作业
        2. 向 im 发送 workerchan 中写入 1 说明可以发送一条
        3. 如果队列没满, 是不会阻塞在这里的, 否则会阻塞
        */
        IMWorkerChan <- 1
        go SendIM(im)
    }
}

func SendIM(im *model.IM) {
    /*
    1. 这里使用 defer 的逻辑是先发送后读取 chan
    2. 因为如果先读取意味着又有一个 work 可以开始和逻辑相反
    3. 下面就是自己定制的发送方式了
    */
    defer func() {<-IMWorkerChan}()
    if im.Tos==""{log.Errorf("content_tos_empty_error %s",im.Content)
        return
    }
}

正文完
 0