transfer能够了解为直达模块,它接管agent上报的指标,而后转发给后端的graph和judge实例。

transfer接管到agent上报的指标后,先存储到内存queue,而后再由goroutine默默的将queue的数据Pop进去,转发给graph和judge。

transfer后端接多个graph和judge实例,如何保障某一个指标稳固的转发到某个实例,同时还能保障多个graph间放弃平衡,不会呈现某个graph承当过多的指标而产生数据歪斜?transfer应用了一致性hash算法来做到这一点。

整体架构:

1. transfer接管agent上报的指标数据

transfer通过TCP RPC接管agent的数据:

// modules/transfer/receiver/rpc/rpc.gofunc StartRpc() {    listener, err := net.ListenTCP("tcp", tcpAddr)    server := rpc.NewServer()    server.Register(new(Transfer))    for {        conn, err := listener.AcceptTCP()        go server.ServeCodec(jsonrpc.NewServerCodec(conn))    }}

transfer的RPC办法:Transfer.Update,负责接收数据

//modules/transfer/receiver/rpc/rpc_transfer.gotype Transfer intfunc (t *Transfer) Update(args []*cmodel.MetricValue, reply *cmodel.TransferResponse) error {    return RecvMetricValues(args, reply, "rpc")}func RecvMetricValues(args []*cmodel.MetricValue, reply *cmodel.TransferResponse, from string) error {    items := []*cmodel.MetaData{}    for _, v := range args {        fv := &cmodel.MetaData{            Metric:      v.Metric,            Endpoint:    v.Endpoint,            Timestamp:   v.Timestamp,            Step:        v.Step,            CounterType: v.Type,            Tags:        cutils.DictedTagstring(v.Tags),         }        .......        items = append(items, fv)    }    if cfg.Graph.Enabled {        sender.Push2GraphSendQueue(items)    }    if cfg.Judge.Enabled {        sender.Push2JudgeSendQueue(items)    }}

能够看到,transfer间接将items放入graph/judge中的Queue就返回了,并不会间接发送;这样做有以下益处:

  • 更快的响应agent;
  • 把零散的数据做成恒定大小的批次,再发送给后端,加重对后端实例的冲击;
  • 将数据缓存当前,能够从容的解决发送超时等异常情况;

以graph为例,剖析如何确定某个item放入那个graph Queue:

// modules/transfer/sender/sender.gofunc Push2GraphSendQueue(items []*cmodel.MetaData) {    for _, item := range items {        pk := item.PK()        //依据item的key确定放入那个graph Queue        node, err := GraphNodeRing.GetNode(pk)                //将数据Push进queue        for _, addr := range cnode.Addrs {            Q := GraphQueues[node+addr]            if !Q.PushFront(graphItem) {                errCnt += 1            }        }    }}

能够看出,依据item的key确定graphQueue,而key是将endpoint/metric/tags信息拼成了一个字符串:

func (t *MetaData) PK() string {    return MUtils.PK(t.Endpoint, t.Metric, t.Tags)}func PK(endpoint, metric string, tags map[string]string) string {    ret := bufferPool.Get().(*bytes.Buffer)    ret.Reset()    defer bufferPool.Put(ret)    if tags == nil || len(tags) == 0 {        ret.WriteString(endpoint)        ret.WriteString("/")        ret.WriteString(metric)        return ret.String()    }    ret.WriteString(endpoint)    ret.WriteString("/")    ret.WriteString(metric)    ret.WriteString("/")    ret.WriteString(SortedTags(tags))    return ret.String()}

2. 一致性hash保障graph/judge间的数据平衡

itemKey是个string,如何确定将该item放入哪个graphQueue呢?

答案是一致性hash算法,依据itemKey通过一致性hash确定一个node,而后每个node对应1个graphQueue。

这里重点关注一致性hash如何应用:

//依据pk抉择nodenode, err := GraphNodeRing.GetNode(pk)`//创立Graph节点的hash环GraphNodeRing = rings.NewConsistentHashNodesRing(int32(cfg.Graph.Replicas), cutils.KeysOfMap(cfg.Graph.Cluster))

应用graph节点创立graph节点的hash环,每个节点有replica个虚构节点,以保证数据平衡;

这里的一致性hash应用了github.com/toolkits/consistent/rings的开源实现:

//创立hash环func NewConsistentHashNodesRing(numberOfReplicas int32, nodes []string) *ConsistentHashNodeRing {    ret := &ConsistentHashNodeRing{ring: consistent.New()}    ret.SetNumberOfReplicas(numberOfReplicas)    ret.SetNodes(nodes)    return ret}
// 依据pk,获取node节点. chash(pk) -> nodefunc (this *ConsistentHashNodeRing) GetNode(pk string) (string, error) {    return this.ring.Get(pk)}

item选到node当前,就被push到该node对应的graphQueue,最初graphQueue的数据被RPC发送给graph节点:

GraphQueues  = make(map[string]*nlist.SafeListLimited)Q := GraphQueues[node+addr]if !Q.PushFront(graphItem) {    errCnt += 1}

其中nlist.SafeListLimited是用list封装的一个queue构造:

// SafeList with Limited Sizetype SafeListLimited struct {    maxSize int    SL      *SafeList}type SafeList struct {    sync.RWMutex    L *list.List}

3. queue中的数据转发给graph/judge

每个graph节点对应一个graphQueue,须要被TCP RPC发送给graph节点,这由后盾的goroutine来执行的:

// modules/transfer/sender/send_tasks.gofunc startSendTasks() {    ......    for node, nitem := range cfg.Graph.ClusterList {        for _, addr := range nitem.Addrs {            queue := GraphQueues[node+addr]            go forward2GraphTask(queue, node, addr, graphConcurrent)        }    }    ......}func forward2GraphTask(Q *list.SafeListLimited, node string, addr string, concurrent int) {    batch := g.Config().Graph.Batch     sema := nsema.NewSemaphore(concurrent)    for {        items := Q.PopBackBy(batch)        count := len(items)        if count == 0 {            time.Sleep(DefaultSendTaskSleepInterval)            continue        }        sema.Acquire()        go func(addr string, graphItems []*cmodel.GraphItem, count int) {            defer sema.Release()            err = GraphConnPools.Call(addr, "Graph.Send", graphItems, resp)        }(addr, graphItems, count)    }    }

每个graph节点启动1个goroutine进行发送;发送过程中,每来一个batch就启动1个goroutine进行发送,为管制发送的goroutine数量,应用semaphore(channel实现)管制并发。

具体的发送过程:为每个graph创立了1个rpc连接池,发送时先从池中Fetch一个连贯,而后应用这个conn调用rpcClient.Call()实现发送:

// common/backend_pool/rpc_backends.gofunc (this *SafeRpcConnPools) Call(addr, method string, args interface{}, resp interface{}) error {    connPool, exists := this.Get(addr)    //先获取一个连贯    conn, err := connPool.Fetch()    rpcClient := conn.(*rpcpool.RpcClient)    done := make(chan error, 1)    go func() {        //具体的rpc发送        done <- rpcClient.Call(method, args, resp)    }()    // select timeout进行超时管制    select {    case <-time.After(callTimeout):        connPool.ForceClose(conn)        return fmt.Errorf("%s, call timeout", addr)    case err = <-done:        connPool.Release(conn)        return err    }}    

connPool的实现也很简略,外部保护了一个maxConns和maxIdle个数;每次fetch的时候,判断是否有闲暇连贯,若有闲暇则间接返回,否则new一个新的:

// common/backend_pool/rpc_backends.gofunc createOneRpcPool(name string, address string, connTimeout time.Duration, maxConns int, maxIdle int) *connp.ConnPool {    p := connp.NewConnPool(name, address, int32(maxConns), int32(maxIdle))    p.New = func(connName string) (connp.NConn, error) {        _, err := net.ResolveTCPAddr("tcp", p.Address)        if err != nil {            return nil, err        }        conn, err := net.DialTimeout("tcp", p.Address, connTimeout)        if err != nil {            return nil, err        }        return rpcpool.NewRpcClient(rpc.NewClient(conn), connName), nil    }    return p}

ConnPool实现在github.com/toolkits/conn_pool中:

//github.com/toolkits/conn_pool/conn_pool.gofunc (this *ConnPool) Fetch() (NConn, error) {    this.Lock()    defer this.Unlock()    // get from free    conn := this.fetchFree()    if conn != nil {        return conn, nil    }    if this.overMax() {        return nil, ErrMaxConn    }    // create new conn    conn, err := this.newConn()    if err != nil {        return nil, err    }    this.increActive()    return conn, nil}