乐趣区

关于监控工具:Openfalcon-rpc长连接client

agent 通过 RCP 向 transfer 发送 metrics 时,应用 rpc 长连贯,通过 net/rpc 保护一个 tcp connection。

agent 封装了 SingleConnRpcClient 构造,来实现单连贯 rcp client。

1. SingleConnRpcClient 如何应用的?

agent 对应的后端 transfer 有多个,只有向 1 个 transfer 发送胜利就 OK;
发送时,先依据 addr 找可用的 rpcclient,找不到就 init 一个,而后调用 update 实现调用操作。

func SendMetrics(metrics []*model.MetricValue, resp *model.TransferResponse) {rand.Seed(time.Now().UnixNano())
    for _, i := range rand.Perm(len(Config().Transfer.Addrs)) {    // 把数组的程序打乱
        addr := Config().Transfer.Addrs[i]

        c := getTransferClient(addr)        // 从 map 中找
        if c == nil {c = initTransferClient(addr)   // 没找到,进行初始化
        }

        if updateMetrics(c, metrics, resp) {    // 发送,若胜利则 break
            break
        }
    }
}

初始化 rpcclient:

func initTransferClient(addr string) *SingleConnRpcClient {
    var c *SingleConnRpcClient = &SingleConnRpcClient{
        RpcServer: addr,
        Timeout:   time.Duration(Config().Transfer.Timeout) * time.Millisecond,
    }
    ......
    TransferClients[addr] = c
    return c
}

发送的逻辑:调用 RPC 办法:Transfer.Update

func updateMetrics(c *SingleConnRpcClient, metrics []*model.MetricValue, resp *model.TransferResponse) bool {err := c.Call("Transfer.Update", metrics, resp)
    if err != nil {log.Errorln("failed to call Transfer.Update:", c, err)
        return false
    }
    return true
}

2. SingleConnRpcClient 如何实现的?

先看一下数据结构:

type SingleConnRpcClient struct {
   sync.Mutex
   rpcClient *rpc.Client
   RpcServer string
   Timeout   time.Duration
}

为防止并发调用,外部封装了 sync.Mutext;

重点是 Call() 办法:

  • 先看一下连贯在不在,不在的话要 init 一个;
  • 启动一个 goroutine 来执行 rpc 办法;
  • 在主流程中用 select 进行超时管制,只有超时就 close 连贯;
func (this *SingleConnRpcClient) Call(method string, args interface{}, reply interface{}) error {this.Lock()
   defer this.Unlock()

   err := this.serverConn()
   if err != nil {return err}

   timeout := time.Duration(10 * time.Second)
   done := make(chan error, 1)

   go func() {err := this.rpcClient.Call(method, args, reply)
      done <- err
   }()

   select {case <-time.After(timeout):
      log.Printf("[WARN] rpc call timeout %v => %v", this.rpcClient, this.RpcServer)
      this.close()
   case err := <-done:        // 失常返回 err=nil
      if err != nil {log.Printf("[ERROR] rpc call with error => %v -- %v", err, this.rpcClient)
         this.close()
         return err
      }
   }
   return nil
}

初始化连贯很简略,就是创立了一个 JsonRpCClient:

func (this *SingleConnRpcClient) serverConn() error {
   if this.rpcClient != nil {return nil}

   var err error
   var retry int = 1

   for {
      if this.rpcClient != nil {return nil}

      this.rpcClient, err = net.JsonRpcClient("tcp", this.RpcServer, this.Timeout)
      if err != nil {log.Printf("dial %s fail: %v", this.RpcServer, err)
         if retry > 3 {return err}
         time.Sleep(time.Duration(math.Pow(2.0, float64(retry))) * time.Second)
         retry++
         continue
      }
      return err
   }
}

连贯敞开:

func (this *SingleConnRpcClient) close() {
   if this.rpcClient != nil {this.rpcClient.Close()
      this.rpcClient = nil
   }
}
退出移动版