意识RPC
- RPC是什么货色?
RPC: Remote Procedure Call(近程过程调用),是一个计算机通信协议。 - 协定的次要内容是什么?
该协定容许运行于一台计算机中的程序调用另一个地址空间(通常为一个凋谢网络中的一台计算机)的子程序,而程序员就像调用本地程序一样,无需额定的为这个交互作用编程(无需关注细节)。 - 次要用来解决什么问题?
解决分布式系统中服务之间的调用问题。
使近程调用像本地办法调用一样不便,暗藏底层网络通信的复杂性,使咱们更专一于业务。 一次rpc通信会波及到哪些角色?流程是什么样的?
client: 客户端程序,调用的发起者
client-stub: 将调用参数和办法依照约定的协定进行编码,而后传输到server
server: 服务端程序,解决client的调用
server-stub: 收到client的音讯后,依照约定的协定进行解码,而后调用本地办法进行解决
RPCX框架简介
rpcx是一个分布式的RPC框架,由go语言开发,反对Zookepper、etcd、consul多种服务发现形式,多种服务路由形式,是目前性能最好的RPC框架之一。
详情见官网介绍: https://books.studygolang.com...
server端源码分析
从入口开始,咱们启动一个rpc服务时,须要通过NewServer办法去创立一个Server对象,源码如下:
// NewServer returns a server.func NewServer(options ...OptionFn) *Server { s := &Server{ // 创立一个Server对象,给一些字段赋默认值 Plugins: &pluginContainer{}, options: make(map[string]interface{}), activeConn: make(map[net.Conn]struct{}), doneChan: make(chan struct{}), serviceMap: make(map[string]*service), router: make(map[string]Handler), AsyncWrite: false, // 除非你想benchmark或者极致优化,否则倡议你设置为false } for _, op := range options { op(s) } // 设置tcp连贯的keepAlive参数 if s.options["TCPKeepAlivePeriod"] == nil { s.options["TCPKeepAlivePeriod"] = 3 * time.Minute } return s}
Server构造如下:
type Handler func(ctx *Context) error// Server is rpcx server that use TCP or UDP.type Server struct { ln net.Listener // 监听器,用来监听服务端的端口 readTimeout time.Duration // 读取client申请数据包的超时工夫 writeTimeout time.Duration // 给client写响应数据包的超时工夫 gatewayHTTPServer *http.Server // http网管 DisableHTTPGateway bool // should disable http invoke or not. DisableJSONRPC bool // should disable json rpc or not. AsyncWrite bool // set true if your server only serves few clients serviceMapMu sync.RWMutex // 读写锁,爱护sericeMap的并发读写 serviceMap map[string]*service // 服务记录表 router map[string]Handler mu sync.RWMutex // 读写锁,爱护activeConn的并发读写 activeConn map[net.Conn]struct{} // 沉闷连贯记录表 doneChan chan struct{} seq uint64 // server端id inShutdown int32 onShutdown []func(s *Server) onRestart []func(s *Server) // TLSConfig for creating tls tcp connection. tlsConfig *tls.Config // BlockCrypt for kcp.BlockCrypt options map[string]interface{} // CORS options 跨域 corsOptions *CORSOptions // 插件 Plugins PluginContainer // AuthFunc can be used to auth. AuthFunc func(ctx context.Context, req *protocol.Message, token string) error handlerMsgNum int32 HandleServiceError func(error)}
对象创立实现之后,须要咱们来注册对外提供的服务,通过RegisterName办法来实现:
/* @name: 对外提供的服务名称 @rcvr: 对外提供的服务对象的实例*/func (s *Server) RegisterName(name string, rcvr interface{}, metadata string) error { _, err := s.register(rcvr, name, true) // 创立service对象,存储到对象表中 if err != nil { return err } if s.Plugins == nil { s.Plugins = &pluginContainer{} } return s.Plugins.DoRegister(name, rcvr, metadata)}// 注册servicefunc (s *Server) register(rcvr interface{}, name string, useName bool) (string, error) { // 加写锁 s.serviceMapMu.Lock() defer s.serviceMapMu.Unlock() // 结构service对象 service := new(service) service.typ = reflect.TypeOf(rcvr) service.rcvr = reflect.ValueOf(rcvr) sname := reflect.Indirect(service.rcvr).Type().Name() // Type if useName { sname = name } if sname == "" { errorStr := "rpcx.Register: no service name for type " + service.typ.String() log.Error(errorStr) return sname, errors.New(errorStr) } if !useName && !isExported(sname) { errorStr := "rpcx.Register: type " + sname + " is not exported" log.Error(errorStr) return sname, errors.New(errorStr) } service.name = sname // Install the methods // 将rcvr对象的所有办法查找进去 service.method = suitableMethods(service.typ, true) if len(service.method) == 0 { var errorStr string // To help the user, see if a pointer receiver would work. method := suitableMethods(reflect.PtrTo(service.typ), false) if len(method) != 0 { errorStr = "rpcx.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)" } else { errorStr = "rpcx.Register: type " + sname + " has no exported methods of suitable type" } log.Error(errorStr) return sname, errors.New(errorStr) } // 更新服务记录表 s.serviceMap[service.name] = service return sname, nil}// DoRegister invokes DoRegister plugin.func (p *pluginContainer) DoRegister(name string, rcvr interface{}, metadata string) error { var es []error for _, rp := range p.plugins { // 如果是RegisterPlugin类型的插件,会调用它的Register函数进行注册 if plugin, ok := rp.(RegisterPlugin); ok { err := plugin.Register(name, rcvr, metadata) if err != nil { es = append(es, err) } } } if len(es) > 0 { return errors.NewMultiError(es) } return nil}
服务注册结束之后,就开始启动服务了:
// Serve starts and listens RPC requests.// It is blocked until receiving connections from clients.func (s *Server) Serve(network, address string) (err error) { var ln net.Listener // 设置监听器,监听地址中配置的端口 ln, err = s.makeListener(network, address) if err != nil { return } // 如果抉择http协定,走这里 if network == "http" { s.serveByHTTP(ln, "") return nil } // 抉择ws/wss协定走这里 if network == "ws" || network == "wss" { s.serveByWS(ln, "") return nil } // try to start gateway // 尝试启动网关,如果是tcp协定,则会通过goroutine启动一个网关服务 ln = s.startGateway(network, ln) // tcp协定走这里,开始解决监听端口处的数据 return s.serveListener(ln)}// 获取端口数据并启动goroutine解决func (s *Server) serveListener(ln net.Listener) error { var tempDelay time.Duration s.mu.Lock() s.ln = ln s.mu.Unlock() for {// 死循环 // 有申请数据达到时,channel会返回数据 conn, e := ln.Accept() if e != nil { // 错误处理 select { case <-s.getDoneChan(): // 连贯敞开,报错,跳出循环 return ErrServerClosed default: } if ne, ok := e.(net.Error); ok && ne.Temporary() { // 短暂谬误 if tempDelay == 0 { tempDelay = 5 * time.Millisecond } else { tempDelay *= 2 } if max := 1 * time.Second; tempDelay > max { tempDelay = max } log.Errorf("rpcx: Accept error: %v; retrying in %v", e, tempDelay) time.Sleep(tempDelay) continue } if strings.Contains(e.Error(), "listener closed") { return ErrServerClosed } return e } tempDelay = 0 // 如果是tcp连贯,设置keepAlive相干参数 if tc, ok := conn.(*net.TCPConn); ok { period := s.options["TCPKeepAlivePeriod"] if period != nil { tc.SetKeepAlive(true) tc.SetKeepAlivePeriod(period.(time.Duration)) tc.SetLinger(10) } } // 有PostConnAcceptPlugin类型插件时,进行解决 conn, ok := s.Plugins.DoPostConnAccept(conn) if !ok { conn.Close() continue } s.mu.Lock() // 记录沉闷连贯 s.activeConn[conn] = struct{}{} s.mu.Unlock() if share.Trace { log.Debugf("server accepted an conn: %v", conn.RemoteAddr().String()) } // 启动goroutine异步解决申请,持续开始下一次循环 go s.serveConn(conn) }}
监听到申请后,会启动一个goroutine进行异步解决,接下来看看申请的解决逻辑:
func (s *Server) serveConn(conn net.Conn) { if s.isShutdown() { // 异样解决,敞开连贯 s.closeConn(conn) return } defer func() {// 捕捉异样,打印谬误堆栈信息,敞开连贯 if err := recover(); err != nil { const size = 64 << 10 buf := make([]byte, size) ss := runtime.Stack(buf, false) if ss > size { ss = size } buf = buf[:ss] log.Errorf("serving %s panic error: %s, stack:\n %s", conn.RemoteAddr(), err, buf) } if share.Trace { log.Debugf("server closed conn: %v", conn.RemoteAddr().String()) } s.closeConn(conn) }() // TLS解决 if tlsConn, ok := conn.(*tls.Conn); ok { if d := s.readTimeout; d != 0 { conn.SetReadDeadline(time.Now().Add(d)) } if d := s.writeTimeout; d != 0 { conn.SetWriteDeadline(time.Now().Add(d)) } if err := tlsConn.Handshake(); err != nil { log.Errorf("rpcx: TLS handshake error from %s: %v", conn.RemoteAddr(), err) return } } // 申请空间,寄存申请数据 r := bufio.NewReaderSize(conn, ReaderBuffsize) var writeCh chan *[]byte if s.AsyncWrite { // 如果设置了同步写,走这里 writeCh = make(chan *[]byte, 1) defer close(writeCh) go s.serveAsyncWrite(conn, writeCh) } for { if s.isShutdown() { return } t0 := time.Now() if s.readTimeout != 0 { // 设置超时工夫 conn.SetReadDeadline(t0.Add(s.readTimeout)) } // 创立上下文 ctx := share.WithValue(context.Background(), RemoteConnContextKey, conn) // 获取申请数据 req, err := s.readRequest(ctx, r) if err != nil { // 获取失败的解决逻辑 protocol.FreeMsg(req) if err == io.EOF { log.Infof("client has closed this connection: %s", conn.RemoteAddr().String()) } else if strings.Contains(err.Error(), "use of closed network connection") { log.Infof("rpcx: connection %s is closed", conn.RemoteAddr().String()) } else if errors.Is(err, ErrReqReachLimit) { if !req.IsOneway() { res := req.Clone() res.SetMessageType(protocol.Response) if len(res.Payload) > 1024 && req.CompressType() != protocol.None { res.SetCompressType(req.CompressType()) } handleError(res, err) s.Plugins.DoPreWriteResponse(ctx, req, res, err) data := res.EncodeSlicePointer() if s.AsyncWrite { writeCh <- data } else { conn.Write(*data) protocol.PutData(data) } s.Plugins.DoPostWriteResponse(ctx, req, res, err) protocol.FreeMsg(res) } else { s.Plugins.DoPreWriteResponse(ctx, req, nil, err) } protocol.FreeMsg(req) continue } else { log.Warnf("rpcx: failed to read request: %v", err) } return } if s.writeTimeout != 0 { conn.SetWriteDeadline(t0.Add(s.writeTimeout)) } if share.Trace { log.Debugf("server received an request %+v from conn: %v", req, conn.RemoteAddr().String()) } // 更新上下文的值 ctx = share.WithLocalValue(ctx, StartRequestContextKey, time.Now().UnixNano()) closeConn := false if !req.IsHeartbeat() { // 不是心跳包 err = s.auth(ctx, req) // 如果设置有auth防备,执行auth逻辑 closeConn = err != nil } if err != nil { // auth失败的解决逻辑 if !req.IsOneway() { res := req.Clone() res.SetMessageType(protocol.Response) if len(res.Payload) > 1024 && req.CompressType() != protocol.None { res.SetCompressType(req.CompressType()) } handleError(res, err) s.Plugins.DoPreWriteResponse(ctx, req, res, err) data := res.EncodeSlicePointer() if s.AsyncWrite { writeCh <- data } else { conn.Write(*data) protocol.PutData(data) } s.Plugins.DoPostWriteResponse(ctx, req, res, err) protocol.FreeMsg(res) } else { s.Plugins.DoPreWriteResponse(ctx, req, nil, err) } protocol.FreeMsg(req) // auth failed, closed the connection if closeConn { log.Infof("auth failed for conn %s: %v", conn.RemoteAddr().String(), err) return } continue } go func() {//启动goroutine解决 defer func() { // 解决完结,捕捉异样 if r := recover(); r != nil { // maybe panic because the writeCh is closed. log.Errorf("failed to handle request: %v", r) } }() // 原子操作,记录服务解决的音讯数量 atomic.AddInt32(&s.handlerMsgNum, 1) defer atomic.AddInt32(&s.handlerMsgNum, -1) if req.IsHeartbeat() { // 心跳包的解决逻辑 s.Plugins.DoHeartbeatRequest(ctx, req) req.SetMessageType(protocol.Response) data := req.EncodeSlicePointer() if s.AsyncWrite { writeCh <- data } else { conn.Write(*data) protocol.PutData(data) } protocol.FreeMsg(req) return } resMetadata := make(map[string]string) ctx = share.WithLocalValue(share.WithLocalValue(ctx, share.ReqMetaDataKey, req.Metadata), share.ResMetaDataKey, resMetadata) // 如果服务通过metadata设置了超时工夫,则创立一个超时上下文 cancelFunc := parseServerTimeout(ctx, req) if cancelFunc != nil { defer cancelFunc() } // 插件解决 s.Plugins.DoPreHandleRequest(ctx, req) if share.Trace { log.Debugf("server handle request %+v from conn: %v", req, conn.RemoteAddr().String()) } // first use handler if handler, ok := s.router[req.ServicePath+"."+req.ServiceMethod]; ok { sctx := NewContext(ctx, conn, req, writeCh) err := handler(sctx) if err != nil { log.Errorf("[handler internal error]: servicepath: %s, servicemethod, err: %v", req.ServicePath, req.ServiceMethod, err) } return } // 解决申请 res, err := s.handleRequest(ctx, req) if err != nil { if s.HandleServiceError != nil { s.HandleServiceError(err) } else { log.Warnf("rpcx: failed to handle request: %v", err) } } s.Plugins.DoPreWriteResponse(ctx, req, res, err) if !req.IsOneway() { if len(resMetadata) > 0 { // copy meta in context to request meta := res.Metadata if meta == nil { res.Metadata = resMetadata } else { for k, v := range resMetadata { if meta[k] == "" { meta[k] = v } } } } if len(res.Payload) > 1024 && req.CompressType() != protocol.None { res.SetCompressType(req.CompressType()) } data := res.EncodeSlicePointer() if s.AsyncWrite { writeCh <- data } else { conn.Write(*data) protocol.PutData(data) } } s.Plugins.DoPostWriteResponse(ctx, req, res, err) if share.Trace { log.Debugf("server write response %+v for an request %+v from conn: %v", res, req, conn.RemoteAddr().String()) } // 开释对象,放回对象池 protocol.FreeMsg(req) protocol.FreeMsg(res) }() }}
未完待续……