rpc相干介绍见上一篇:https://segmentfault.com/a/11...

client端源码分析

首先创立连接池:

// NewXClientPool creates a fixed size XClient pool.func NewXClientPool(count int, servicePath string, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option) *XClientPool {    pool := &XClientPool{ // 连接池对象        count:       uint64(count),        xclients:    make([]XClient, count),        servicePath: servicePath,        failMode:    failMode,        selectMode:  selectMode,        discovery:   discovery,        option:      option,    }    for i := 0; i < count; i++ {// 创立client,数量跟count雷同        xclient := NewXClient(servicePath, failMode, selectMode, discovery, option)        pool.xclients[i] = xclient    }    return pool}// XClientPool is a xclient pool with fixed size.// It uses roundrobin algorithm to call its xclients.// All xclients share the same configurations such as ServiceDiscovery and serverMessageChan.type XClientPool struct {    count    uint64 // 池子中保留的连贯数量    index    uint64 // 从池子里获取连贯时,用来定位获取哪个连贯    xclients []XClient // 客户端对象    servicePath       string // rpc服务名称    failMode          FailMode // rpc调用失败后的解决形式    selectMode        SelectMode // 路由策略    discovery         ServiceDiscovery // 服务发现    option            Option    serverMessageChan chan<- *protocol.Message}// NewXClient creates a XClient that supports service discovery and service governance.func NewXClient(servicePath string, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option) XClient {    client := &xClient{ // 创立client对象        failMode:     failMode,        selectMode:   selectMode,        discovery:    discovery,        servicePath:  servicePath, // rpc服务名称,一个裸露进去的服务会提供多个办法        cachedClient: make(map[string]RPCClient),        option:       option,    }    // 获取注册的服务配置    pairs := discovery.GetServices()    servers := make(map[string]string, len(pairs))    for _, p := range pairs {        servers[p.Key] = p.Value    }    filterByStateAndGroup(client.option.Group, servers)    client.servers = servers    if selectMode != Closest && selectMode != SelectByUser {        // 设置路由形式        client.selector = newSelector(selectMode, servers)    }    client.Plugins = &pluginContainer{}    // 目前固定返回nil    ch := client.discovery.WatchService()    if ch != nil {        client.ch = ch        go client.watch(ch)    }    return client}

创立完连接池之后,等到用的时候,间接从池子里拿一个就行:

// 从连接池中获取一个连贯// Get returns a xclient.// It does not remove this xclient from its cache so you don't need to put it back.// Don't close this xclient because maybe other goroutines are using this xclient.func (p *XClientPool) Get() XClient {    // 每次获取时,index自增    i := atomic.AddUint64(&p.index, 1)    // 取模    picked := int(i % p.count)    return p.xclients[picked]}

拿到一个客户端实例后,就能够开始调用rpc办法了:

// Call invokes the named function, waits for it to complete, and returns its error status.// It handles errors base on FailMode.func (c *xClient) Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error {    if c.isShutdown { // 异样解决        return ErrXClientShutdown    }    if c.auth != "" {// 如果设置了认证信息        ......    }    var err error    k, client, err := c.selectClient(ctx, c.servicePath, serviceMethod, args)    if err != nil {        if c.failMode == Failfast {            return err        }    }    var e error    switch c.failMode {// 以下是不同失败模式的解决逻辑    case Failtry: // 在这种模式下, rpcx如果调用一个节点的服务呈现谬误, 它也会尝试,然而还是抉择这个节点进行重试, 直到节点失常返回数据或者达到最大重试次数。        ......    case Failover: // 在这种模式下, rpcx如果遇到谬误,它会尝试调用另外一个节点, 直到服务节点能失常返回信息,或者达到最大的重试次数        retries := c.option.Retries        for retries >= 0 {            retries--            if client != nil {                // TODO 结构申请,发送数据,还没看懂是在哪里给reply赋值的???                err = c.wrapCall(ctx, client, serviceMethod, args, reply)                if err == nil {                    return nil                }                if _, ok := err.(ServiceError); ok {                    return err                }            }            if uncoverError(err) {                c.removeClient(k, client)            }            //select another server            k, client, e = c.selectClient(ctx, c.servicePath, serviceMethod, args)        }        if err == nil {            err = e        }        return err    case Failbackup:        ......    default: //Failfast        err = c.wrapCall(ctx, client, serviceMethod, args, reply)        if err != nil {            if uncoverError(err) {                c.removeClient(k, client)            }        }        return err    }}// selects a client from candidates base on c.selectModefunc (c *xClient) selectClient(ctx context.Context, servicePath, serviceMethod string, args interface{}) (string, RPCClient, error) {    c.mu.Lock()    // 依据咱们配置的路由模式,抉择正确的Select函数,此处是interface,相似运行时多态    var fn = c.selector.Select    if c.Plugins != nil { // 插件解决        fn = c.Plugins.DoWrapSelect(fn)    }    // 调用所选路由模式的Select函数,返回一个rpc服务地址    k := fn(ctx, servicePath, serviceMethod, args)    c.mu.Unlock()    if k == "" {        return "", nil, ErrXClientNoServer    }    // 通过k去获取一个rpc客户端,有缓存解决,如果曾经存在,则间接返回,不会创立新的客户端    // 新连贯的创立逻辑也在这里    client, err := c.getCachedClient(k)    return k, client, err}