乐趣区

关于监控工具:Openfalcon-transfer发送judge的流程优化

transfer 组件通过一致性 hash 调配,应用 RPC 接口将数据发送给 judge 组件,由 judge 组件做告警断定。

在 transfer 中,对于没有配置告警策略的指标,没有必要存储到 judgeQueue,也没有必要发送给 judge 节点。

Open-falcon 的解决是由 transfer 把所有的指标无脑发给 judge,由 judge 判断该指标是否有告警规定关联。

nightingale 作为升级版的 Open-falcon,它的解决较为奇妙,它在 transfer 组件中就做了判断,若指标没有告警规定关联,则就无需存入 judgeQueue,也不会发送到 judge 了,大大减少了网络发送数据量。

基于此,可对 Open-falcon 的 transfer–>judge 做肯定的流程优化。

1. Open-falcon 的 transfer–>judge

transfer 将所有的指标无脑发送给 judge,由 judge 做告警规定的断定;
看一下 judge 的代码,g.FilterMap 中存储了 metric 及其告警规定信息,g.FilterMap 的信息由 judge 申请 hbs 取得:

//modules/judge/rpc/receiver.go
func (this *Judge) Send(items []*model.JudgeItem, resp *model.SimpleRpcResponse) error {remain := g.Config().Remain
    now := time.Now().Unix()
    for _, item := range items {exists := g.FilterMap.Exists(item.Metric)    // 过滤有 metric 的数据
        if !exists {continue}
        pk := item.PrimaryKey()
        store.HistoryBigMap[pk[0:2]].PushFrontAndMaintain(pk, item, remain, now)
    }
    return nil
}

流程如下图所示:

2. nightingale 的 transfer–>judge

nightingale 在 transfer 这一层就把无关告警规定的 metric 都过滤掉了,不再发送给 judge。

Transfer 接管 agent 指标的代码入口:

//agent 发送数据到 transfer,transfer 存储到 queue
// src/modules/transfer/rpc/push.go
func (t *Transfer) Push(args []*dataobj.MetricValue, reply *dataobj.TransferResp) error {start := time.Now()

    items := make([]*dataobj.MetricValue, 0)
    for _, v := range args {if err := v.CheckValidity(start.Unix()); err != nil {
            ......
            continue
        }
        items = append(items, v)
    }
    ......
    if backend.Config.Enabled {backend.Push2JudgeSendQueue(items)
    }

    reply.Total = len(args)
    reply.Latency = (time.Now().UnixNano() - start.UnixNano()) / 1000000
    return nil
}

筛选 metric 到 judgeQueue 的代码:

  • 先比对 endpoint/metric 是否关联告警策略;
  • 再比照 tag 是否匹配;
  • 若两者都匹配,则放入 judgeQueue;
// src/modules/transfer/backend/judge.go
func Push2JudgeSendQueue(items []*dataobj.MetricValue) {
    errCnt := 0
    for _, item := range items {
        // 判断是否有关联的 endpoint/metric
        key := str.PK(item.Metric, item.Endpoint)
        stras := cache.StraMap.GetByKey(key)

        for _, stra := range stras {
            // 是否 tag 匹配
            if !TagMatch(stra.Tags, item.TagsMap) {continue}
            judgeItem := &dataobj.JudgeItem{
                Endpoint:  item.Endpoint,
                Metric:    item.Metric,
                Value:     item.Value,
                Timestamp: item.Timestamp,
                DsType:    item.CounterType,
                Tags:      item.Tags,
                TagsMap:   item.TagsMap,
                Step:      int(item.Step),
                Sid:       stra.Id,
                Extra:     item.Extra,
            }

            q, exists := JudgeQueues.Get(stra.JudgeInstance)
            q.PushFront(judgeItem)
        }
    }
    stats.Counter.Set("judge.queue.err", errCnt)
}

cache.StraMap 是 transfer 定期从 mon-api 组件查问失去的;

流程图如下:

3. 对 Open-falcon 的优化及其优缺点

优化点:

  • 在 transfer 中,向 judgeQueue 存入数据之前,查问该 endpoint/metric/tags 是否关联告警规定,仅有关联的告警规定时才会入队;
  • endpoint/metric/tags 关联的告警规定,须要向 hbs 查问取得;
  • 该优化大大减少了 transfer 中 judgeQueue 的内存压力,也大大减少了向 judge 节点发送的网络数据量;

毛病:

  • transfer 中须要引入对 hbs 的依赖,以保留以后的 endpoint/metric/tags 关联的告警规定信息;

参考:
1. 夜莺监控零碎:https://github.com/didi/night…

退出移动版