关于golang:RPCX源码学习client端

6次阅读

共计 3848 个字符,预计需要花费 10 分钟才能阅读完成。

rpc 相干介绍见上一篇:https://segmentfault.com/a/11…

client 端源码分析

首先创立连接池:

// NewXClientPool creates a fixed size XClient pool.
func NewXClientPool(count int, servicePath string, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option) *XClientPool {
    pool := &XClientPool{ // 连接池对象
        count:       uint64(count),
        xclients:    make([]XClient, count),
        servicePath: servicePath,
        failMode:    failMode,
        selectMode:  selectMode,
        discovery:   discovery,
        option:      option,
    }

    for i := 0; i < count; i++ {// 创立 client,数量跟 count 雷同
        xclient := NewXClient(servicePath, failMode, selectMode, discovery, option)
        pool.xclients[i] = xclient
    }
    return pool
}

// XClientPool is a xclient pool with fixed size.
// It uses roundrobin algorithm to call its xclients.
// All xclients share the same configurations such as ServiceDiscovery and serverMessageChan.
type XClientPool struct {
    count    uint64 // 池子中保留的连贯数量
    index    uint64 // 从池子里获取连贯时,用来定位获取哪个连贯
    xclients []XClient // 客户端对象

    servicePath       string // rpc 服务名称
    failMode          FailMode // rpc 调用失败后的解决形式
    selectMode        SelectMode // 路由策略
    discovery         ServiceDiscovery // 服务发现
    option            Option
    serverMessageChan chan<- *protocol.Message
}

// NewXClient creates a XClient that supports service discovery and service governance.
func NewXClient(servicePath string, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option) XClient {
    client := &xClient{ // 创立 client 对象
        failMode:     failMode,
        selectMode:   selectMode,
        discovery:    discovery,
        servicePath:  servicePath, // rpc 服务名称,一个裸露进去的服务会提供多个办法
        cachedClient: make(map[string]RPCClient),
        option:       option,
    }

    // 获取注册的服务配置
    pairs := discovery.GetServices()
    servers := make(map[string]string, len(pairs))
    for _, p := range pairs {servers[p.Key] = p.Value
    }
    filterByStateAndGroup(client.option.Group, servers)

    client.servers = servers
    if selectMode != Closest && selectMode != SelectByUser {
        // 设置路由形式
        client.selector = newSelector(selectMode, servers)
    }

    client.Plugins = &pluginContainer{}
    // 目前固定返回 nil
    ch := client.discovery.WatchService()
    if ch != nil {
        client.ch = ch
        go client.watch(ch)
    }

    return client
}

创立完连接池之后,等到用的时候,间接从池子里拿一个就行:

// 从连接池中获取一个连贯
// Get returns a xclient.
// It does not remove this xclient from its cache so you don't need to put it back.
// Don't close this xclient because maybe other goroutines are using this xclient.
func (p *XClientPool) Get() XClient {
    // 每次获取时,index 自增
    i := atomic.AddUint64(&p.index, 1)
    // 取模
    picked := int(i % p.count)
    return p.xclients[picked]
}

拿到一个客户端实例后,就能够开始调用 rpc 办法了:

// Call invokes the named function, waits for it to complete, and returns its error status.
// It handles errors base on FailMode.
func (c *xClient) Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error {
    if c.isShutdown { // 异样解决
        return ErrXClientShutdown
    }

    if c.auth != "" {// 如果设置了认证信息
        ......
    }

    var err error
    k, client, err := c.selectClient(ctx, c.servicePath, serviceMethod, args)
    if err != nil {
        if c.failMode == Failfast {return err}
    }

    var e error
    switch c.failMode {// 以下是不同失败模式的解决逻辑
    case Failtry: // 在这种模式下,rpcx 如果调用一个节点的服务呈现谬误,它也会尝试,然而还是抉择这个节点进行重试,直到节点失常返回数据或者达到最大重试次数。......
    case Failover: // 在这种模式下, rpcx 如果遇到谬误,它会尝试调用另外一个节点,直到服务节点能失常返回信息,或者达到最大的重试次数
        retries := c.option.Retries
        for retries >= 0 {
            retries--

            if client != nil {
                // TODO 结构申请,发送数据,还没看懂是在哪里给 reply 赋值的???err = c.wrapCall(ctx, client, serviceMethod, args, reply)
                if err == nil {return nil}
                if _, ok := err.(ServiceError); ok {return err}
            }

            if uncoverError(err) {c.removeClient(k, client)
            }
            //select another server
            k, client, e = c.selectClient(ctx, c.servicePath, serviceMethod, args)
        }

        if err == nil {err = e}
        return err
    case Failbackup:
        ......
    default: //Failfast
        err = c.wrapCall(ctx, client, serviceMethod, args, reply)
        if err != nil {if uncoverError(err) {c.removeClient(k, client)
            }
        }

        return err
    }
}

// selects a client from candidates base on c.selectMode
func (c *xClient) selectClient(ctx context.Context, servicePath, serviceMethod string, args interface{}) (string, RPCClient, error) {c.mu.Lock()
    // 依据咱们配置的路由模式,抉择正确的 Select 函数,此处是 interface,相似运行时多态
    var fn = c.selector.Select
    if c.Plugins != nil { // 插件解决
        fn = c.Plugins.DoWrapSelect(fn)
    }
    // 调用所选路由模式的 Select 函数,返回一个 rpc 服务地址
    k := fn(ctx, servicePath, serviceMethod, args)
    c.mu.Unlock()
    if k == "" {return "", nil, ErrXClientNoServer}
    // 通过 k 去获取一个 rpc 客户端,有缓存解决,如果曾经存在,则间接返回,不会创立新的客户端
    // 新连贯的创立逻辑也在这里
    client, err := c.getCachedClient(k)
    return k, client, err
}
正文完
 0