乐趣区

关于监控工具:Openfalcon-transfer源码解读

transfer 能够了解为直达模块,它接管 agent 上报的指标,而后转发给后端的 graph 和 judge 实例。

transfer 接管到 agent 上报的指标后,先存储到内存 queue,而后再由 goroutine 默默的将 queue 的数据 Pop 进去,转发给 graph 和 judge。

transfer 后端接多个 graph 和 judge 实例,如何保障某一个指标稳固的转发到某个实例,同时还能保障多个 graph 间放弃平衡,不会呈现某个 graph 承当过多的指标而产生数据歪斜?transfer 应用了一致性 hash 算法来做到这一点。

整体架构:

1. transfer 接管 agent 上报的指标数据

transfer 通过 TCP RPC 接管 agent 的数据:

// modules/transfer/receiver/rpc/rpc.go
func StartRpc() {listener, err := net.ListenTCP("tcp", tcpAddr)
    server := rpc.NewServer()
    server.Register(new(Transfer))
    for {conn, err := listener.AcceptTCP()
        go server.ServeCodec(jsonrpc.NewServerCodec(conn))
    }
}

transfer 的 RPC 办法:Transfer.Update,负责接收数据

//modules/transfer/receiver/rpc/rpc_transfer.go
type Transfer int
func (t *Transfer) Update(args []*cmodel.MetricValue, reply *cmodel.TransferResponse) error {return RecvMetricValues(args, reply, "rpc")
}
func RecvMetricValues(args []*cmodel.MetricValue, reply *cmodel.TransferResponse, from string) error {items := []*cmodel.MetaData{}
    for _, v := range args {
        fv := &cmodel.MetaData{
            Metric:      v.Metric,
            Endpoint:    v.Endpoint,
            Timestamp:   v.Timestamp,
            Step:        v.Step,
            CounterType: v.Type,
            Tags:        cutils.DictedTagstring(v.Tags), 
        }
        .......
        items = append(items, fv)
    }
    if cfg.Graph.Enabled {sender.Push2GraphSendQueue(items)
    }
    if cfg.Judge.Enabled {sender.Push2JudgeSendQueue(items)
    }
}

能够看到,transfer 间接将 items 放入 graph/judge 中的 Queue 就返回了,并不会间接发送;这样做有以下益处:

  • 更快的响应 agent;
  • 把零散的数据做成恒定大小的批次,再发送给后端,加重对后端实例的冲击;
  • 将数据缓存当前,能够从容的解决发送超时等异常情况;

以 graph 为例,剖析如何确定某个 item 放入那个 graph Queue:

// modules/transfer/sender/sender.go
func Push2GraphSendQueue(items []*cmodel.MetaData) {
    for _, item := range items {pk := item.PK()
        // 依据 item 的 key 确定放入那个 graph Queue
        node, err := GraphNodeRing.GetNode(pk)
        
        // 将数据 Push 进 queue
        for _, addr := range cnode.Addrs {Q := GraphQueues[node+addr]
            if !Q.PushFront(graphItem) {errCnt += 1}
        }
    }
}

能够看出,依据 item 的 key 确定 graphQueue,而 key 是将 endpoint/metric/tags 信息拼成了一个字符串:

func (t *MetaData) PK() string {return MUtils.PK(t.Endpoint, t.Metric, t.Tags)
}
func PK(endpoint, metric string, tags map[string]string) string {ret := bufferPool.Get().(*bytes.Buffer)
    ret.Reset()
    defer bufferPool.Put(ret)

    if tags == nil || len(tags) == 0 {ret.WriteString(endpoint)
        ret.WriteString("/")
        ret.WriteString(metric)

        return ret.String()}
    ret.WriteString(endpoint)
    ret.WriteString("/")
    ret.WriteString(metric)
    ret.WriteString("/")
    ret.WriteString(SortedTags(tags))
    return ret.String()}

2. 一致性 hash 保障 graph/judge 间的数据平衡

itemKey 是个 string,如何确定将该 item 放入哪个 graphQueue 呢?

答案是一致性 hash 算法,依据 itemKey 通过一致性 hash 确定一个 node,而后每个 node 对应 1 个 graphQueue。

这里重点关注一致性 hash 如何应用:

// 依据 pk 抉择 node
node, err := GraphNodeRing.GetNode(pk)`

// 创立 Graph 节点的 hash 环
GraphNodeRing = rings.NewConsistentHashNodesRing(int32(cfg.Graph.Replicas), cutils.KeysOfMap(cfg.Graph.Cluster))

应用 graph 节点创立 graph 节点的 hash 环,每个节点有 replica 个虚构节点,以保证数据平衡;

这里的一致性 hash 应用了 github.com/toolkits/consistent/rings 的开源实现:

// 创立 hash 环
func NewConsistentHashNodesRing(numberOfReplicas int32, nodes []string) *ConsistentHashNodeRing {ret := &ConsistentHashNodeRing{ring: consistent.New()}
    ret.SetNumberOfReplicas(numberOfReplicas)
    ret.SetNodes(nodes)
    return ret
}
// 依据 pk, 获取 node 节点. chash(pk) -> node
func (this *ConsistentHashNodeRing) GetNode(pk string) (string, error) {return this.ring.Get(pk)
}

item 选到 node 当前,就被 push 到该 node 对应的 graphQueue,最初 graphQueue 的数据被 RPC 发送给 graph 节点:

GraphQueues  = make(map[string]*nlist.SafeListLimited)

Q := GraphQueues[node+addr]
if !Q.PushFront(graphItem) {errCnt += 1}

其中 nlist.SafeListLimited 是用 list 封装的一个 queue 构造:

// SafeList with Limited Size
type SafeListLimited struct {
    maxSize int
    SL      *SafeList
}

type SafeList struct {
    sync.RWMutex
    L *list.List
}

3. queue 中的数据转发给 graph/judge

每个 graph 节点对应一个 graphQueue,须要被 TCP RPC 发送给 graph 节点,这由后盾的 goroutine 来执行的:

// modules/transfer/sender/send_tasks.go
func startSendTasks() {
    ......
    for node, nitem := range cfg.Graph.ClusterList {
        for _, addr := range nitem.Addrs {queue := GraphQueues[node+addr]
            go forward2GraphTask(queue, node, addr, graphConcurrent)
        }
    }
    ......
}

func forward2GraphTask(Q *list.SafeListLimited, node string, addr string, concurrent int) {batch := g.Config().Graph.Batch 
    sema := nsema.NewSemaphore(concurrent)
    for {items := Q.PopBackBy(batch)
        count := len(items)
        if count == 0 {time.Sleep(DefaultSendTaskSleepInterval)
            continue
        }
        sema.Acquire()
        go func(addr string, graphItems []*cmodel.GraphItem, count int) {defer sema.Release()
            err = GraphConnPools.Call(addr, "Graph.Send", graphItems, resp)
        }(addr, graphItems, count)
    }    
}

每个 graph 节点启动 1 个 goroutine 进行发送;发送过程中,每来一个 batch 就启动 1 个 goroutine 进行发送,为管制发送的 goroutine 数量,应用 semaphore(channel 实现) 管制并发。

具体的发送过程:为每个 graph 创立了 1 个 rpc 连接池,发送时先从池中 Fetch 一个连贯,而后应用这个 conn 调用 rpcClient.Call() 实现发送:

// common/backend_pool/rpc_backends.go
func (this *SafeRpcConnPools) Call(addr, method string, args interface{}, resp interface{}) error {connPool, exists := this.Get(addr)
    // 先获取一个连贯
    conn, err := connPool.Fetch()
    rpcClient := conn.(*rpcpool.RpcClient)
    done := make(chan error, 1)
    go func() {
        // 具体的 rpc 发送
        done <- rpcClient.Call(method, args, resp)
    }()
    // select timeout 进行超时管制
    select {case <-time.After(callTimeout):
        connPool.ForceClose(conn)
        return fmt.Errorf("%s, call timeout", addr)
    case err = <-done:
        connPool.Release(conn)
        return err
    }
}    

connPool 的实现也很简略,外部保护了一个 maxConns 和 maxIdle 个数;每次 fetch 的时候,判断是否有闲暇连贯,若有闲暇则间接返回,否则 new 一个新的:

// common/backend_pool/rpc_backends.go
func createOneRpcPool(name string, address string, connTimeout time.Duration, maxConns int, maxIdle int) *connp.ConnPool {p := connp.NewConnPool(name, address, int32(maxConns), int32(maxIdle))
    p.New = func(connName string) (connp.NConn, error) {_, err := net.ResolveTCPAddr("tcp", p.Address)
        if err != nil {return nil, err}
        conn, err := net.DialTimeout("tcp", p.Address, connTimeout)
        if err != nil {return nil, err}
        return rpcpool.NewRpcClient(rpc.NewClient(conn), connName), nil
    }
    return p
}

ConnPool 实现在 github.com/toolkits/conn_pool 中:

//github.com/toolkits/conn_pool/conn_pool.go
func (this *ConnPool) Fetch() (NConn, error) {this.Lock()
    defer this.Unlock()

    // get from free
    conn := this.fetchFree()
    if conn != nil {return conn, nil}

    if this.overMax() {return nil, ErrMaxConn}

    // create new conn
    conn, err := this.newConn()
    if err != nil {return nil, err}

    this.increActive()
    return conn, nil
}
退出移动版