agent通过RCP向transfer发送metrics时,应用rpc长连贯,通过net/rpc保护一个tcp connection。
agent封装了SingleConnRpcClient构造,来实现单连贯rcp client。
1. SingleConnRpcClient如何应用的?
agent对应的后端transfer有多个,只有向1个transfer发送胜利就OK;
发送时,先依据addr找可用的rpcclient,找不到就init一个,而后调用update实现调用操作。
func SendMetrics(metrics []*model.MetricValue, resp *model.TransferResponse) { rand.Seed(time.Now().UnixNano()) for _, i := range rand.Perm(len(Config().Transfer.Addrs)) { //把数组的程序打乱 addr := Config().Transfer.Addrs[i] c := getTransferClient(addr) //从map中找 if c == nil { c = initTransferClient(addr) //没找到,进行初始化 } if updateMetrics(c, metrics, resp) { //发送,若胜利则break break } }}
初始化rpcclient:
func initTransferClient(addr string) *SingleConnRpcClient { var c *SingleConnRpcClient = &SingleConnRpcClient{ RpcServer: addr, Timeout: time.Duration(Config().Transfer.Timeout) * time.Millisecond, } ...... TransferClients[addr] = c return c}
发送的逻辑:调用RPC办法:Transfer.Update
func updateMetrics(c *SingleConnRpcClient, metrics []*model.MetricValue, resp *model.TransferResponse) bool { err := c.Call("Transfer.Update", metrics, resp) if err != nil { log.Errorln("failed to call Transfer.Update:", c, err) return false } return true}
2. SingleConnRpcClient如何实现的?
先看一下数据结构:
type SingleConnRpcClient struct { sync.Mutex rpcClient *rpc.Client RpcServer string Timeout time.Duration}
为防止并发调用,外部封装了sync.Mutext;
重点是Call()办法:
- 先看一下连贯在不在,不在的话要init一个;
- 启动一个goroutine来执行rpc办法;
- 在主流程中用select进行超时管制,只有超时就close连贯;
func (this *SingleConnRpcClient) Call(method string, args interface{}, reply interface{}) error { this.Lock() defer this.Unlock() err := this.serverConn() if err != nil { return err } timeout := time.Duration(10 * time.Second) done := make(chan error, 1) go func() { err := this.rpcClient.Call(method, args, reply) done <- err }() select { case <-time.After(timeout): log.Printf("[WARN] rpc call timeout %v => %v", this.rpcClient, this.RpcServer) this.close() case err := <-done: //失常返回err=nil if err != nil { log.Printf("[ERROR] rpc call with error => %v -- %v", err, this.rpcClient) this.close() return err } } return nil}
初始化连贯很简略,就是创立了一个JsonRpCClient:
func (this *SingleConnRpcClient) serverConn() error { if this.rpcClient != nil { return nil } var err error var retry int = 1 for { if this.rpcClient != nil { return nil } this.rpcClient, err = net.JsonRpcClient("tcp", this.RpcServer, this.Timeout) if err != nil { log.Printf("dial %s fail: %v", this.RpcServer, err) if retry > 3 { return err } time.Sleep(time.Duration(math.Pow(2.0, float64(retry))) * time.Second) retry++ continue } return err }}
连贯敞开:
func (this *SingleConnRpcClient) close() { if this.rpcClient != nil { this.rpcClient.Close() this.rpcClient = nil }}