微服务系列一-Go-RPC-源码解读

28次阅读

共计 10045 个字符,预计需要花费 26 分钟才能阅读完成。

RPC 框架在微服务中重要的一部分,熟悉和了解其原理是非常有必要的。Go 语言中源码自带实现了 RPC 功能,虽然官方已经宣布不再更新,但是因它实现简单,代码量不大,很多地方值得学习和借鉴,是阅读 RPC 源码的一个非常好的开始。

源码地址: https://github.com/golang/go/tree/master/src/net/rpc

1. 基本使用

先来看看调用的官方例子:

  1. 服务器部分代码:
// content of server.go
package main

import(
    "net"
    "net/rpc"
    "net/http"
    "errors"
    "log"
)

type Args struct {A, B int}

type Quotient struct {Quo, Rem int}

type Arith int

func (t *Arith) Multiply(args *Args, reply *int) error {
    *reply = args.A * args.B
    return nil
}

func (t *Arith) Divide(args *Args, quo *Quotient) error {
    if args.B == 0 {return errors.New("divide by zero")
    }
    quo.Quo = args.A / args.B
    quo.Rem = args.A % args.B
    return nil
}

func listenTCP(addr string) (net.Listener, string) {l, e := net.Listen("tcp", addr)
    if e != nil {log.Fatalf("net.Listen tcp :0: %v", e)
    }
    return l, l.Addr().String()
}

func main() {rpc.Register(new(Arith)) // 注册服务
    var l net.Listener
    tcpAddr := "127.0.0.1:8080"
    l, serverAddr := listenTCP(tcpAddr) // 监听 TCP 连接
    log.Println("RPC server listening on", serverAddr)
    go rpc.Accept(l)

    rpc.HandleHTTP() // 监听 HTTP 连接
    httpAddr := "127.0.0.1:8081"
    l, serverAddr = listenTCP(httpAddr)
    log.Println("RPC server listening on", serverAddr)
    go http.Serve(l, nil)

    select{}}

rpc 调用的功能就是 Arith 实现了一个 Multiply 和 Divide 方法。
看 main 函数,rpc 实现了一个注册 rpc.Register(new(Arith)) 方法,然后启动监听listenTCP(tcpAddr),这个是通过 net 包中的 Listen 方法,监听的对象可以是 TCP 连接 rpc.Accept(l),也可以试 HTTP 连接http.Serve(l, nil),这个是借助 net/http 包启动 HTTPServer.

  1. 客户端部分代码
// content of client.go
package main

import(
    "net/rpc"
    "log"
    "fmt"
)

type Args struct {A, B int}

type Quotient struct {Quo, Rem int}

func main() {client, err := rpc.DialHTTP("tcp", "127.0.0.1:8081")
    if err != nil {log.Fatal("dialing:", err)
    }

    // Synchronous call
    args := &Args{7,8}
    var reply int
    err = client.Call("Arith.Multiply", args, &reply)
    if err != nil {log.Fatal("arith error:", err)
    }
    fmt.Printf("Arith: %d*%d=%d\n", args.A, args.B, reply)

    // Asynchronous call
    clientTCP, err := rpc.Dial("tcp", "127.0.0.1:8080")
    if err != nil {log.Fatal("dialing:", err)
    }
    
    quotient := new(Quotient)
    divCall := clientTCP.Go("Arith.Divide", args, quotient, nil)
    replyCall := <-divCall.Done    // will be equal to divCall
    if replyCall.Error != nil {fmt.Println(replyCall.Error)
    } else {fmt.Printf("Arith: %d/%d=%d...%d\n", args.A, args.B, quotient.Quo, quotient.Rem)
    }

客户端代码 rpc 提供了两个方法 rpc.DialHTTPrpc.Dial 分别提供监听 HTTP 和 Tcp 连接。然后通过 Call 或者 Go 来调用服务器的方法,二者的区别是一个是同步调用,Go是异步调用。

运行结果:

// server.go
➜  server ./serve
2019/06/23 15:56:15 Test RPC server listening on 127.0.0.1:8080
2019/06/23 15:56:15 Test RPC server listening on 127.0.0.1:8081
// client.go
➜  client ./client
Arith: 7*8=56
Arith: 7/8=0...7

2.client.go 源码分析

先来看看客户端的源码,先上一张图了解一下客户端代码的主要逻辑:

  1. Dial and DialHTTP
// Dial connects to an RPC server at the specified network address.
func Dial(network, address string) (*Client, error) {conn, err := net.Dial(network, address)
    if err != nil {return nil, err}
    return NewClient(conn), nil
}

Dial 建立在 net.Dial 上,返回一个 client 对象,DialHTTPDial 类似,只不过多了一些 HTTP 的处理,最终都是返回 NewClient(conn)。

  1. NewClient
func NewClient(conn io.ReadWriteCloser) *Client {encBuf := bufio.NewWriter(conn)
    client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
    return NewClientWithCodec(client)
}

// NewClientWithCodec is like NewClient but uses the specified
// codec to encode requests and decode responses.
func NewClientWithCodec(codec ClientCodec) *Client {
    client := &Client{
        codec:   codec,
        pending: make(map[uint64]*Call),
    }
    go client.input()
    return client
}

NewClient 里做了 2 件事,第一件事是生成 client 结构体对象,包括序列化方式,初始化其中对象等等,Go Rpc 默认采用的是 gob 序列化,但也可以用 json 或者 protobuf。第二件事是启动一个 goroutine 协程,调用了 input 方法,这个 client 的核心部分,下面再讲。

  1. Call and Go

上面例子中,生成 client 对象后,会显式的调用CallGo,表示同步调用和异步调用。下面来看看源码:

func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {call := new(Call)
    call.ServiceMethod = serviceMethod
    call.Args = args
    call.Reply = reply
    if done == nil {done = make(chan *Call, 10) // buffered.
    } else {if cap(done) == 0 {log.Panic("rpc: done channel is unbuffered")
        }
    }
    call.Done = done
    client.send(call)
    return call
}

// Call invokes the named function, waits for it to complete, and returns its error status.
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
    return call.Error
}

可以看到,client.Call 方法其实也是调用 client.Go,只不过通过chan 进行阻塞。

生成一个 Call 的结构体,将服务器的调用方法、参数、返回参数,调用结束标记进行组装,然后调用 client.send 的方法,将 call 结构体发给服务器。服务器拿到这些参数后,会通过反射出具体的方法,然后执行对应的函数。
下面是 Call 结构体的定义:

// Call 
type Call struct {
    ServiceMethod string      // The name of the service and method to call. 服务方法名
    Args          interface{} // The argument to the function (*struct). 请求参数
    Reply         interface{} // The reply from the function (*struct). 返回参数
    Error         error       // After completion, the error status. 错误状态
    Done          chan *Call  // Strobes when call is complete. 
}
  1. client.send
func (client *Client) send(call *Call) {client.reqMutex.Lock()
    defer client.reqMutex.Unlock()
    // Register this call.
    client.mutex.Lock()
    if client.shutdown || client.closing {client.mutex.Unlock()
        call.Error = ErrShutdown
        call.done()
        return
    }
    seq := client.seq
    client.seq++
    client.pending[seq] = call
    client.mutex.Unlock()
    // Encode and send the request.
    client.request.Seq = seq
    client.request.ServiceMethod = call.ServiceMethod
    err := client.codec.WriteRequest(&client.request, call.Args)
    if err != nil {client.mutex.Lock()
        call = client.pending[seq]
        delete(client.pending, seq)
        client.mutex.Unlock()
        if call != nil {
            call.Error = err
            call.done()}
    }
}

send 方法是将刚才的 call 结构体中的信息发给服务器,首先数据不是直接发给服务器的,而是将请求参数和服务器的方法先赋值给 client 结构体中的 Request 结构体,同时在赋值的过程需要加锁。然后再调用 Gob 的 WriteRequest 方法,将数据刷到缓存区。

  1. client.input

client.send方法是将数据发给 Server,而 input 则相反,获取 Server 的返回结果 Response 给客户端。

func (client *Client) input() {
    var err error
    var response Response
    for err == nil {response = Response{}
        err = client.codec.ReadResponseHeader(&response)
        if err != nil {break}
        seq := response.Seq
        client.mutex.Lock()
        call := client.pending[seq]
        delete(client.pending, seq)
        client.mutex.Unlock()

        switch {
        case call == nil:
            
            err = client.codec.ReadResponseBody(nil)
            
            ....
            
            }
            call.done()}
    }
    
    ...
    
    }
}

主要逻辑是不断循环读取 TCP 上的流,把 Header 解析成 Response 对象,以及将 Body 解析到 call.Reply 对象, 解析完后触发 call 中的 done 函数。这样客户端就可以拿到 Reply 对象就是服务器返回的结果,可以打印获取其中的值。

总结:

描述完这几个方法,在回头看开始的 client.go 的流程图就清晰了,可以说是分两条线,一条线显示的调用发送请求数据,另外一条线则起协程获取服务器的返回数据。

3. server.go 源码分析

话不多说,先来一张图了解一下大概:
图片描述

整体分三部分,第一部分注册服务器定义的方法,第二部分监听客户端的请求,解析获取到客户端的请求参数。第三部分拿到请求参数执行服务器的调用函数,将返回结果返回给客户端。

整个过程其实可以对比是一次 socket 的调用过程。

  1. register

首先来看一下 server 的结构体:

type methodType struct {
    sync.Mutex // protects counters
    method     reflect.Method
    ArgType    reflect.Type
    ReplyType  reflect.Type
    numCalls   uint
}

type service struct {
    name   string                 // name of service
    rcvr   reflect.Value          // receiver of methods for the service
    typ    reflect.Type           // type of the receiver
    method map[string]*methodType // registered methods
}

type Server struct {serviceMap sync.Map   // map[string]*service
    reqLock    sync.Mutex // protects freeReq
    freeReq    *Request
    respLock   sync.Mutex // protects freeResp
    freeResp   *Response
}

看英语注释就比起清楚具体是做什么的,Server 存储服务器的 service 以及其请求的 Request 和 Response,这二个就是跟客户端约定的协议,如下:

type Request struct {
    ServiceMethod string   // format: "Service.Method"
    Seq           uint64   // sequence number chosen by client
    next          *Request // for free list in Server
}

type Response struct {
    ServiceMethod string    // echoes that of the Request
    Seq           uint64    // echoes that of the request
    Error         string    // error, if any.
    next          *Response // for free list in Server
}

service 存储服务器需要注册的方法,methodType 就是具体方法的属性。

所以要想客户端进行远程调用服务器的方法,前提是在调用之前,服务器的方法均已加载在 Server 结构体中,所以需要服务器显示的调用 register 方法,下面看一下里面核心的代码:

func (server *Server) register(rcvr interface{}, name string, useName bool) error {s := new(service)
    s.typ = reflect.TypeOf(rcvr)
    s.rcvr = reflect.ValueOf(rcvr)
    sname := reflect.Indirect(s.rcvr).Type().Name()
    ...
    s.name = sname

    // Install the methods
    s.method = suitableMethods(s.typ, true)

    ...
    
    if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {...}
    ...
}


func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {methods := make(map[string]*methodType)
    for m := 0; m < typ.NumMethod(); m++ {method := typ.Method(m)
        mtype := method.Type
        mname := method.Name
        
        argType := mtype.In(1)
        
        ...
    
        replyType := mtype.In(2)
        
        ...
        
        methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
    }
    return methods
}

这段代码就是通过反射把结构体实现的方法的一些属性获取到,包括本身可执行的方法对象、名称、请求参数、返回参数。

最终存储到 server 的 serviceMap 中。客户端调用服务器的方法的结构为 struct.method,这样只需要按 . 进行分割,拿到 struct 名称和 method 名称则可以通过再 serviceMap 获取到方法,执行获得结果。

注册完方法后,接下来就是监听客户端的请求了。

  1. Accept

先来看看 Accept 的代码:

func (server *Server) Accept(lis net.Listener) {
    for {conn, err := lis.Accept()
        if err != nil {log.Print("rpc.Serve: accept:", err.Error())
            return
        }
        go server.ServeConn(conn)

通过 net 包中的监听 tcp 端口,然后起了一个协程,来看看这个协程里做了什么?

func (server *Server) ServeConn(conn io.ReadWriteCloser) {buf := bufio.NewWriter(conn)
    srv := &gobServerCodec{
        rwc:    conn,
        dec:    gob.NewDecoder(conn),
        enc:    gob.NewEncoder(buf),
        encBuf: buf,
    }
    server.ServeCodec(srv)
}

func (server *Server) ServeCodec(codec ServerCodec) {sending := new(sync.Mutex)
    wg := new(sync.WaitGroup)
    for {service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
        
       ...
       
        go service.call(server, sending, wg, mtype, req, argv, replyv, codec)
    }
  ...
}

这段也好理解,ServeConn 将 gob 序列化的方法和 conn 保存到 gobServerCodec 结构体,然后调用了 server.ServeCodec 方法,这个方式做的事情就是将客户端传过来的包解析序列化解析,将请求参数,待返回的变量,以及是调服务器哪个方法,这些均在上面的 server.readRequest 方法处理。

func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, err error) {service, mtype, req, keepReading, err = server.readRequestHeader(codec)
    ...
}

func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype *methodType, req *Request, keepReading bool, err error) {
    // Grab the request header.
    req = server.getRequest()
    ...
    dot := strings.LastIndex(req.ServiceMethod, ".")
    ...
    serviceName := req.ServiceMethod[:dot]
    methodName := req.ServiceMethod[dot+1:]

    // Look up the request.
    svci, ok := server.serviceMap.Load(serviceName)
    ...
    svc = svci.(*service)
    mtype = svc.method[methodName]
    ...
    }
    return
}

核心的功能再 readRequestHeader 中,做的一件事就是将客户端传过来的 struct.method,按 . 进行分割,然后拿到 serviceName 和 methodName,然后再去 server.serviceMap 中拿到具体的服务和方法执行对象。

拿到之后,会起一个协程,调 service.call 方法,这里面做的事情就是执行服务器服务的方法,拿到返回结果,再调用 WriteReponse,将数据写回去。然后客户端的 input 方法循环获取结果。这样形成闭环。

下面看看 service.call 方法:

func (s *service) call(server *Server, sending *sync.Mutex, wg *sync.WaitGroup, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
    ...
    function := mtype.method.Func
    returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
    ...
    server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
    ...
}

实现的功能跟上面分析的一样,通过 mtype 拿到函数对象,然后调用反射的 Call 方法执行得到结果,最后调用 server.sendResponse 发送发回结果。

看到这里再回过来看上面画的 Server 代码流程图,就非常清晰了。

Go Rpc 源码解读就到这里。

4. 总结

Go RPC 源码目前官方已经没有维护,官方推荐使用 grpc,下一篇计划分析 grpc 的源码。

下面总结一下优缺点:

  • 优点:

    • 代码精简,可扩展性高。
  • 缺点:

    • 同步调用时,通过 chan 阻塞异步的 Go 方法,并没有处理超时,这样如果超时将导致大量的协程无法释放。
    • 可能存在内存泄漏的情况,因为客户端的请求数据在 Server 结构体中,如果 Server 端不返回则不会清理其中的数据,客户端的 Go 函数退出并不会清理其中的内容,所以 Server 结构体会一直存储,从而内存泄漏。

目前开源的 RPC 框架已经不是像这种简单的网络调用了,还会包括很多服务治理的功能,比如服务注册与发现、限流熔断、监控等等。这个以后接触新的 rpc 再分享,最终达到可以自己完整写一个 rpc 框架的目的。

更多关于 Go 微服务相关文章,请关注公众号: 天澄技术杂谈

正文完
 0