agent通过RCP向transfer发送metrics时,应用rpc长连贯,通过net/rpc保护一个tcp connection。

agent封装了SingleConnRpcClient构造,来实现单连贯rcp client。

1. SingleConnRpcClient如何应用的?

agent对应的后端transfer有多个,只有向1个transfer发送胜利就OK;
发送时,先依据addr找可用的rpcclient,找不到就init一个,而后调用update实现调用操作。

func SendMetrics(metrics []*model.MetricValue, resp *model.TransferResponse) {    rand.Seed(time.Now().UnixNano())    for _, i := range rand.Perm(len(Config().Transfer.Addrs)) {    //把数组的程序打乱        addr := Config().Transfer.Addrs[i]        c := getTransferClient(addr)        //从map中找        if c == nil {                c = initTransferClient(addr)   //没找到,进行初始化        }        if updateMetrics(c, metrics, resp) {    //发送,若胜利则break            break        }    }}

初始化rpcclient:

func initTransferClient(addr string) *SingleConnRpcClient {    var c *SingleConnRpcClient = &SingleConnRpcClient{        RpcServer: addr,        Timeout:   time.Duration(Config().Transfer.Timeout) * time.Millisecond,    }    ......    TransferClients[addr] = c    return c}

发送的逻辑:调用RPC办法:Transfer.Update

func updateMetrics(c *SingleConnRpcClient, metrics []*model.MetricValue, resp *model.TransferResponse) bool {    err := c.Call("Transfer.Update", metrics, resp)    if err != nil {        log.Errorln("failed to call Transfer.Update:", c, err)        return false    }    return true}

2. SingleConnRpcClient如何实现的?

先看一下数据结构:

type SingleConnRpcClient struct {   sync.Mutex   rpcClient *rpc.Client   RpcServer string   Timeout   time.Duration}

为防止并发调用,外部封装了sync.Mutext;

重点是Call()办法:

  • 先看一下连贯在不在,不在的话要init一个;
  • 启动一个goroutine来执行rpc办法;
  • 在主流程中用select进行超时管制,只有超时就close连贯;
func (this *SingleConnRpcClient) Call(method string, args interface{}, reply interface{}) error {   this.Lock()   defer this.Unlock()   err := this.serverConn()   if err != nil {      return err   }   timeout := time.Duration(10 * time.Second)   done := make(chan error, 1)   go func() {      err := this.rpcClient.Call(method, args, reply)      done <- err   }()   select {   case <-time.After(timeout):      log.Printf("[WARN] rpc call timeout %v => %v", this.rpcClient, this.RpcServer)      this.close()   case err := <-done:        //失常返回err=nil      if err != nil {         log.Printf("[ERROR] rpc call with error => %v -- %v", err, this.rpcClient)         this.close()         return err      }   }   return nil}

初始化连贯很简略,就是创立了一个JsonRpCClient:

func (this *SingleConnRpcClient) serverConn() error {   if this.rpcClient != nil {      return nil   }   var err error   var retry int = 1   for {      if this.rpcClient != nil {         return nil      }      this.rpcClient, err = net.JsonRpcClient("tcp", this.RpcServer, this.Timeout)      if err != nil {         log.Printf("dial %s fail: %v", this.RpcServer, err)         if retry > 3 {            return err         }         time.Sleep(time.Duration(math.Pow(2.0, float64(retry))) * time.Second)         retry++         continue      }      return err   }}

连贯敞开:

func (this *SingleConnRpcClient) close() {   if this.rpcClient != nil {      this.rpcClient.Close()      this.rpcClient = nil   }}