意识RPC

  1. RPC是什么货色?
    RPC: Remote Procedure Call(近程过程调用),是一个计算机通信协议。

  2. 协定的次要内容是什么?
    该协定容许运行于一台计算机中的程序调用另一个地址空间(通常为一个凋谢网络中的一台计算机)的子程序,而程序员就像调用本地程序一样,无需额定的为这个交互作用编程(无需关注细节)。

  3. 次要用来解决什么问题?
    解决分布式系统中服务之间的调用问题。
    使近程调用像本地办法调用一样不便,暗藏底层网络通信的复杂性,使咱们更专一于业务。

  4. 一次rpc通信会波及到哪些角色?流程是什么样的?
    client: 客户端程序,调用的发起者
    client-stub: 将调用参数和办法依照约定的协定进行编码,而后传输到server
    server: 服务端程序,解决client的调用
    server-stub: 收到client的音讯后,依照约定的协定进行解码,而后调用本地办法进行解决

RPCX框架简介

rpcx是一个分布式的RPC框架,由go语言开发,反对Zookepper、etcd、consul多种服务发现形式,多种服务路由形式,是目前性能最好的RPC框架之一。

详情见官网介绍: https://books.studygolang.com...

server端源码分析

从入口开始,咱们启动一个rpc服务时,须要通过NewServer办法去创立一个Server对象,源码如下:

// NewServer returns a server.func NewServer(options ...OptionFn) *Server {    s := &Server{ // 创立一个Server对象,给一些字段赋默认值        Plugins:    &pluginContainer{},        options:    make(map[string]interface{}),        activeConn: make(map[net.Conn]struct{}),        doneChan:   make(chan struct{}),        serviceMap: make(map[string]*service),        router:     make(map[string]Handler),        AsyncWrite: false, // 除非你想benchmark或者极致优化,否则倡议你设置为false    }    for _, op := range options {        op(s)    }    // 设置tcp连贯的keepAlive参数    if s.options["TCPKeepAlivePeriod"] == nil {        s.options["TCPKeepAlivePeriod"] = 3 * time.Minute    }    return s}

Server构造如下:

type Handler func(ctx *Context) error// Server is rpcx server that use TCP or UDP.type Server struct {    ln                 net.Listener // 监听器,用来监听服务端的端口    readTimeout        time.Duration // 读取client申请数据包的超时工夫    writeTimeout       time.Duration // 给client写响应数据包的超时工夫    gatewayHTTPServer  *http.Server // http网管    DisableHTTPGateway bool // should disable http invoke or not.    DisableJSONRPC     bool // should disable json rpc or not.    AsyncWrite         bool // set true if your server only serves few clients    serviceMapMu sync.RWMutex // 读写锁,爱护sericeMap的并发读写    serviceMap   map[string]*service // 服务记录表    router map[string]Handler    mu         sync.RWMutex // 读写锁,爱护activeConn的并发读写    activeConn map[net.Conn]struct{} // 沉闷连贯记录表    doneChan   chan struct{}    seq        uint64 // server端id    inShutdown int32    onShutdown []func(s *Server)    onRestart  []func(s *Server)    // TLSConfig for creating tls tcp connection.    tlsConfig *tls.Config    // BlockCrypt for kcp.BlockCrypt    options map[string]interface{}    // CORS options 跨域    corsOptions *CORSOptions    // 插件    Plugins PluginContainer    // AuthFunc can be used to auth.    AuthFunc func(ctx context.Context, req *protocol.Message, token string) error    handlerMsgNum int32    HandleServiceError func(error)}

对象创立实现之后,须要咱们来注册对外提供的服务,通过RegisterName办法来实现:

/*    @name: 对外提供的服务名称    @rcvr: 对外提供的服务对象的实例*/func (s *Server) RegisterName(name string, rcvr interface{}, metadata string) error {    _, err := s.register(rcvr, name, true) // 创立service对象,存储到对象表中    if err != nil {        return err    }    if s.Plugins == nil {        s.Plugins = &pluginContainer{}    }    return s.Plugins.DoRegister(name, rcvr, metadata)}// 注册servicefunc (s *Server) register(rcvr interface{}, name string, useName bool) (string, error) {    // 加写锁    s.serviceMapMu.Lock()    defer s.serviceMapMu.Unlock()    // 结构service对象    service := new(service)    service.typ = reflect.TypeOf(rcvr)    service.rcvr = reflect.ValueOf(rcvr)    sname := reflect.Indirect(service.rcvr).Type().Name() // Type    if useName {        sname = name    }    if sname == "" {        errorStr := "rpcx.Register: no service name for type " + service.typ.String()        log.Error(errorStr)        return sname, errors.New(errorStr)    }    if !useName && !isExported(sname) {        errorStr := "rpcx.Register: type " + sname + " is not exported"        log.Error(errorStr)        return sname, errors.New(errorStr)    }    service.name = sname    // Install the methods    // 将rcvr对象的所有办法查找进去    service.method = suitableMethods(service.typ, true)    if len(service.method) == 0 {        var errorStr string        // To help the user, see if a pointer receiver would work.        method := suitableMethods(reflect.PtrTo(service.typ), false)        if len(method) != 0 {            errorStr = "rpcx.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)"        } else {            errorStr = "rpcx.Register: type " + sname + " has no exported methods of suitable type"        }        log.Error(errorStr)        return sname, errors.New(errorStr)    }    // 更新服务记录表    s.serviceMap[service.name] = service    return sname, nil}// DoRegister invokes DoRegister plugin.func (p *pluginContainer) DoRegister(name string, rcvr interface{}, metadata string) error {    var es []error    for _, rp := range p.plugins {        // 如果是RegisterPlugin类型的插件,会调用它的Register函数进行注册        if plugin, ok := rp.(RegisterPlugin); ok {            err := plugin.Register(name, rcvr, metadata)            if err != nil {                es = append(es, err)            }        }    }    if len(es) > 0 {        return errors.NewMultiError(es)    }    return nil}

服务注册结束之后,就开始启动服务了:

// Serve starts and listens RPC requests.// It is blocked until receiving connections from clients.func (s *Server) Serve(network, address string) (err error) {    var ln net.Listener    // 设置监听器,监听地址中配置的端口    ln, err = s.makeListener(network, address)    if err != nil {        return    }    // 如果抉择http协定,走这里    if network == "http" {        s.serveByHTTP(ln, "")        return nil    }    // 抉择ws/wss协定走这里    if network == "ws" || network == "wss" {        s.serveByWS(ln, "")        return nil    }    // try to start gateway    // 尝试启动网关,如果是tcp协定,则会通过goroutine启动一个网关服务    ln = s.startGateway(network, ln)    // tcp协定走这里,开始解决监听端口处的数据    return s.serveListener(ln)}// 获取端口数据并启动goroutine解决func (s *Server) serveListener(ln net.Listener) error {    var tempDelay time.Duration    s.mu.Lock()    s.ln = ln    s.mu.Unlock()    for {// 死循环        // 有申请数据达到时,channel会返回数据        conn, e := ln.Accept()        if e != nil { // 错误处理            select {            case <-s.getDoneChan(): // 连贯敞开,报错,跳出循环                return ErrServerClosed            default:            }            if ne, ok := e.(net.Error); ok && ne.Temporary() { // 短暂谬误                if tempDelay == 0 {                    tempDelay = 5 * time.Millisecond                } else {                    tempDelay *= 2                }                if max := 1 * time.Second; tempDelay > max {                    tempDelay = max                }                log.Errorf("rpcx: Accept error: %v; retrying in %v", e, tempDelay)                time.Sleep(tempDelay)                continue            }            if strings.Contains(e.Error(), "listener closed") {                return ErrServerClosed            }            return e        }        tempDelay = 0        // 如果是tcp连贯,设置keepAlive相干参数        if tc, ok := conn.(*net.TCPConn); ok {            period := s.options["TCPKeepAlivePeriod"]            if period != nil {                tc.SetKeepAlive(true)                tc.SetKeepAlivePeriod(period.(time.Duration))                tc.SetLinger(10)            }        }        // 有PostConnAcceptPlugin类型插件时,进行解决        conn, ok := s.Plugins.DoPostConnAccept(conn)        if !ok {            conn.Close()            continue        }        s.mu.Lock()        // 记录沉闷连贯        s.activeConn[conn] = struct{}{}        s.mu.Unlock()        if share.Trace {            log.Debugf("server accepted an conn: %v", conn.RemoteAddr().String())        }        // 启动goroutine异步解决申请,持续开始下一次循环        go s.serveConn(conn)    }}

监听到申请后,会启动一个goroutine进行异步解决,接下来看看申请的解决逻辑:

func (s *Server) serveConn(conn net.Conn) {    if s.isShutdown() { // 异样解决,敞开连贯        s.closeConn(conn)        return    }    defer func() {// 捕捉异样,打印谬误堆栈信息,敞开连贯        if err := recover(); err != nil {            const size = 64 << 10            buf := make([]byte, size)            ss := runtime.Stack(buf, false)            if ss > size {                ss = size            }            buf = buf[:ss]            log.Errorf("serving %s panic error: %s, stack:\n %s", conn.RemoteAddr(), err, buf)        }        if share.Trace {            log.Debugf("server closed conn: %v", conn.RemoteAddr().String())        }        s.closeConn(conn)    }()    // TLS解决    if tlsConn, ok := conn.(*tls.Conn); ok {        if d := s.readTimeout; d != 0 {            conn.SetReadDeadline(time.Now().Add(d))        }        if d := s.writeTimeout; d != 0 {            conn.SetWriteDeadline(time.Now().Add(d))        }        if err := tlsConn.Handshake(); err != nil {            log.Errorf("rpcx: TLS handshake error from %s: %v", conn.RemoteAddr(), err)            return        }    }    // 申请空间,寄存申请数据    r := bufio.NewReaderSize(conn, ReaderBuffsize)    var writeCh chan *[]byte    if s.AsyncWrite { // 如果设置了同步写,走这里        writeCh = make(chan *[]byte, 1)        defer close(writeCh)        go s.serveAsyncWrite(conn, writeCh)    }    for {        if s.isShutdown() {            return        }        t0 := time.Now()        if s.readTimeout != 0 { // 设置超时工夫            conn.SetReadDeadline(t0.Add(s.readTimeout))        }        // 创立上下文        ctx := share.WithValue(context.Background(), RemoteConnContextKey, conn)        // 获取申请数据        req, err := s.readRequest(ctx, r)        if err != nil { // 获取失败的解决逻辑            protocol.FreeMsg(req)            if err == io.EOF {                log.Infof("client has closed this connection: %s", conn.RemoteAddr().String())            } else if strings.Contains(err.Error(), "use of closed network connection") {                log.Infof("rpcx: connection %s is closed", conn.RemoteAddr().String())            } else if errors.Is(err, ErrReqReachLimit) {                if !req.IsOneway() {                    res := req.Clone()                    res.SetMessageType(protocol.Response)                    if len(res.Payload) > 1024 && req.CompressType() != protocol.None {                        res.SetCompressType(req.CompressType())                    }                    handleError(res, err)                    s.Plugins.DoPreWriteResponse(ctx, req, res, err)                    data := res.EncodeSlicePointer()                    if s.AsyncWrite {                        writeCh <- data                    } else {                        conn.Write(*data)                        protocol.PutData(data)                    }                    s.Plugins.DoPostWriteResponse(ctx, req, res, err)                    protocol.FreeMsg(res)                } else {                    s.Plugins.DoPreWriteResponse(ctx, req, nil, err)                }                protocol.FreeMsg(req)                continue            } else {                log.Warnf("rpcx: failed to read request: %v", err)            }            return        }        if s.writeTimeout != 0 {            conn.SetWriteDeadline(t0.Add(s.writeTimeout))        }        if share.Trace {            log.Debugf("server received an request %+v from conn: %v", req, conn.RemoteAddr().String())        }        // 更新上下文的值        ctx = share.WithLocalValue(ctx, StartRequestContextKey, time.Now().UnixNano())        closeConn := false        if !req.IsHeartbeat() { // 不是心跳包            err = s.auth(ctx, req) // 如果设置有auth防备,执行auth逻辑            closeConn = err != nil        }        if err != nil { // auth失败的解决逻辑            if !req.IsOneway() {                res := req.Clone()                res.SetMessageType(protocol.Response)                if len(res.Payload) > 1024 && req.CompressType() != protocol.None {                    res.SetCompressType(req.CompressType())                }                handleError(res, err)                s.Plugins.DoPreWriteResponse(ctx, req, res, err)                data := res.EncodeSlicePointer()                if s.AsyncWrite {                    writeCh <- data                } else {                    conn.Write(*data)                    protocol.PutData(data)                }                s.Plugins.DoPostWriteResponse(ctx, req, res, err)                protocol.FreeMsg(res)            } else {                s.Plugins.DoPreWriteResponse(ctx, req, nil, err)            }            protocol.FreeMsg(req)            // auth failed, closed the connection            if closeConn {                log.Infof("auth failed for conn %s: %v", conn.RemoteAddr().String(), err)                return            }            continue        }        go func() {//启动goroutine解决            defer func() { // 解决完结,捕捉异样                if r := recover(); r != nil {                    // maybe panic because the writeCh is closed.                    log.Errorf("failed to handle request: %v", r)                }            }()            // 原子操作,记录服务解决的音讯数量            atomic.AddInt32(&s.handlerMsgNum, 1)            defer atomic.AddInt32(&s.handlerMsgNum, -1)            if req.IsHeartbeat() { // 心跳包的解决逻辑                s.Plugins.DoHeartbeatRequest(ctx, req)                req.SetMessageType(protocol.Response)                data := req.EncodeSlicePointer()                if s.AsyncWrite {                    writeCh <- data                } else {                    conn.Write(*data)                    protocol.PutData(data)                }                protocol.FreeMsg(req)                return            }            resMetadata := make(map[string]string)            ctx = share.WithLocalValue(share.WithLocalValue(ctx, share.ReqMetaDataKey, req.Metadata),                share.ResMetaDataKey, resMetadata)            // 如果服务通过metadata设置了超时工夫,则创立一个超时上下文            cancelFunc := parseServerTimeout(ctx, req)            if cancelFunc != nil {                defer cancelFunc()            }            // 插件解决            s.Plugins.DoPreHandleRequest(ctx, req)            if share.Trace {                log.Debugf("server handle request %+v from conn: %v", req, conn.RemoteAddr().String())            }            // first use handler            if handler, ok := s.router[req.ServicePath+"."+req.ServiceMethod]; ok {                sctx := NewContext(ctx, conn, req, writeCh)                err := handler(sctx)                if err != nil {                    log.Errorf("[handler internal error]: servicepath: %s, servicemethod, err: %v", req.ServicePath, req.ServiceMethod, err)                }                return            }            // 解决申请            res, err := s.handleRequest(ctx, req)            if err != nil {                if s.HandleServiceError != nil {                    s.HandleServiceError(err)                } else {                    log.Warnf("rpcx: failed to handle request: %v", err)                }            }            s.Plugins.DoPreWriteResponse(ctx, req, res, err)            if !req.IsOneway() {                if len(resMetadata) > 0 { // copy meta in context to request                    meta := res.Metadata                    if meta == nil {                        res.Metadata = resMetadata                    } else {                        for k, v := range resMetadata {                            if meta[k] == "" {                                meta[k] = v                            }                        }                    }                }                if len(res.Payload) > 1024 && req.CompressType() != protocol.None {                    res.SetCompressType(req.CompressType())                }                data := res.EncodeSlicePointer()                if s.AsyncWrite {                    writeCh <- data                } else {                    conn.Write(*data)                    protocol.PutData(data)                }            }            s.Plugins.DoPostWriteResponse(ctx, req, res, err)            if share.Trace {                log.Debugf("server write response %+v for an request %+v from conn: %v", res, req, conn.RemoteAddr().String())            }            // 开释对象,放回对象池            protocol.FreeMsg(req)            protocol.FreeMsg(res)        }()    }}

未完待续……