rpc相干介绍见上一篇:https://segmentfault.com/a/11...
client端源码分析
首先创立连接池:
// NewXClientPool creates a fixed size XClient pool.func NewXClientPool(count int, servicePath string, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option) *XClientPool { pool := &XClientPool{ // 连接池对象 count: uint64(count), xclients: make([]XClient, count), servicePath: servicePath, failMode: failMode, selectMode: selectMode, discovery: discovery, option: option, } for i := 0; i < count; i++ {// 创立client,数量跟count雷同 xclient := NewXClient(servicePath, failMode, selectMode, discovery, option) pool.xclients[i] = xclient } return pool}// XClientPool is a xclient pool with fixed size.// It uses roundrobin algorithm to call its xclients.// All xclients share the same configurations such as ServiceDiscovery and serverMessageChan.type XClientPool struct { count uint64 // 池子中保留的连贯数量 index uint64 // 从池子里获取连贯时,用来定位获取哪个连贯 xclients []XClient // 客户端对象 servicePath string // rpc服务名称 failMode FailMode // rpc调用失败后的解决形式 selectMode SelectMode // 路由策略 discovery ServiceDiscovery // 服务发现 option Option serverMessageChan chan<- *protocol.Message}// NewXClient creates a XClient that supports service discovery and service governance.func NewXClient(servicePath string, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option) XClient { client := &xClient{ // 创立client对象 failMode: failMode, selectMode: selectMode, discovery: discovery, servicePath: servicePath, // rpc服务名称,一个裸露进去的服务会提供多个办法 cachedClient: make(map[string]RPCClient), option: option, } // 获取注册的服务配置 pairs := discovery.GetServices() servers := make(map[string]string, len(pairs)) for _, p := range pairs { servers[p.Key] = p.Value } filterByStateAndGroup(client.option.Group, servers) client.servers = servers if selectMode != Closest && selectMode != SelectByUser { // 设置路由形式 client.selector = newSelector(selectMode, servers) } client.Plugins = &pluginContainer{} // 目前固定返回nil ch := client.discovery.WatchService() if ch != nil { client.ch = ch go client.watch(ch) } return client}
创立完连接池之后,等到用的时候,间接从池子里拿一个就行:
// 从连接池中获取一个连贯// Get returns a xclient.// It does not remove this xclient from its cache so you don't need to put it back.// Don't close this xclient because maybe other goroutines are using this xclient.func (p *XClientPool) Get() XClient { // 每次获取时,index自增 i := atomic.AddUint64(&p.index, 1) // 取模 picked := int(i % p.count) return p.xclients[picked]}
拿到一个客户端实例后,就能够开始调用rpc办法了:
// Call invokes the named function, waits for it to complete, and returns its error status.// It handles errors base on FailMode.func (c *xClient) Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error { if c.isShutdown { // 异样解决 return ErrXClientShutdown } if c.auth != "" {// 如果设置了认证信息 ...... } var err error k, client, err := c.selectClient(ctx, c.servicePath, serviceMethod, args) if err != nil { if c.failMode == Failfast { return err } } var e error switch c.failMode {// 以下是不同失败模式的解决逻辑 case Failtry: // 在这种模式下, rpcx如果调用一个节点的服务呈现谬误, 它也会尝试,然而还是抉择这个节点进行重试, 直到节点失常返回数据或者达到最大重试次数。 ...... case Failover: // 在这种模式下, rpcx如果遇到谬误,它会尝试调用另外一个节点, 直到服务节点能失常返回信息,或者达到最大的重试次数 retries := c.option.Retries for retries >= 0 { retries-- if client != nil { // TODO 结构申请,发送数据,还没看懂是在哪里给reply赋值的??? err = c.wrapCall(ctx, client, serviceMethod, args, reply) if err == nil { return nil } if _, ok := err.(ServiceError); ok { return err } } if uncoverError(err) { c.removeClient(k, client) } //select another server k, client, e = c.selectClient(ctx, c.servicePath, serviceMethod, args) } if err == nil { err = e } return err case Failbackup: ...... default: //Failfast err = c.wrapCall(ctx, client, serviceMethod, args, reply) if err != nil { if uncoverError(err) { c.removeClient(k, client) } } return err }}// selects a client from candidates base on c.selectModefunc (c *xClient) selectClient(ctx context.Context, servicePath, serviceMethod string, args interface{}) (string, RPCClient, error) { c.mu.Lock() // 依据咱们配置的路由模式,抉择正确的Select函数,此处是interface,相似运行时多态 var fn = c.selector.Select if c.Plugins != nil { // 插件解决 fn = c.Plugins.DoWrapSelect(fn) } // 调用所选路由模式的Select函数,返回一个rpc服务地址 k := fn(ctx, servicePath, serviceMethod, args) c.mu.Unlock() if k == "" { return "", nil, ErrXClientNoServer } // 通过k去获取一个rpc客户端,有缓存解决,如果曾经存在,则间接返回,不会创立新的客户端 // 新连贯的创立逻辑也在这里 client, err := c.getCachedClient(k) return k, client, err}