共计 10463 个字符,预计需要花费 27 分钟才能阅读完成。
意识 RPC
- RPC 是什么货色?
RPC: Remote Procedure Call(近程过程调用),是一个计算机通信协议。 - 协定的次要内容是什么?
该协定容许运行于一台计算机中的程序调用另一个地址空间(通常为一个凋谢网络中的一台计算机)的子程序,而程序员就像调用本地程序一样,无需额定的为这个交互作用编程(无需关注细节)。 - 次要用来解决什么问题?
解决分布式系统中服务之间的调用问题。
使近程调用像本地办法调用一样不便,暗藏底层网络通信的复杂性,使咱们更专一于业务。 -
一次 rpc 通信会波及到哪些角色?流程是什么样的?
client: 客户端程序,调用的发起者
client-stub: 将调用参数和办法依照约定的协定进行编码,而后传输到 server
server: 服务端程序,解决 client 的调用
server-stub: 收到 client 的音讯后,依照约定的协定进行解码,而后调用本地办法进行解决
RPCX 框架简介
rpcx 是一个分布式的 RPC 框架,由 go 语言开发,反对 Zookepper、etcd、consul 多种服务发现形式,多种服务路由形式,是目前性能最好的 RPC 框架之一。
详情见官网介绍: https://books.studygolang.com…
server 端源码分析
从入口开始,咱们启动一个 rpc 服务时,须要通过 NewServer 办法去创立一个 Server 对象,源码如下:
// NewServer returns a server.
func NewServer(options ...OptionFn) *Server {
s := &Server{ // 创立一个 Server 对象,给一些字段赋默认值
Plugins: &pluginContainer{},
options: make(map[string]interface{}),
activeConn: make(map[net.Conn]struct{}),
doneChan: make(chan struct{}),
serviceMap: make(map[string]*service),
router: make(map[string]Handler),
AsyncWrite: false, // 除非你想 benchmark 或者极致优化,否则倡议你设置为 false
}
for _, op := range options {op(s)
}
// 设置 tcp 连贯的 keepAlive 参数
if s.options["TCPKeepAlivePeriod"] == nil {s.options["TCPKeepAlivePeriod"] = 3 * time.Minute
}
return s
}
Server 构造如下:
type Handler func(ctx *Context) error
// Server is rpcx server that use TCP or UDP.
type Server struct {
ln net.Listener // 监听器,用来监听服务端的端口
readTimeout time.Duration // 读取 client 申请数据包的超时工夫
writeTimeout time.Duration // 给 client 写响应数据包的超时工夫
gatewayHTTPServer *http.Server // http 网管
DisableHTTPGateway bool // should disable http invoke or not.
DisableJSONRPC bool // should disable json rpc or not.
AsyncWrite bool // set true if your server only serves few clients
serviceMapMu sync.RWMutex // 读写锁,爱护 sericeMap 的并发读写
serviceMap map[string]*service // 服务记录表
router map[string]Handler
mu sync.RWMutex // 读写锁,爱护 activeConn 的并发读写
activeConn map[net.Conn]struct{} // 沉闷连贯记录表
doneChan chan struct{}
seq uint64 // server 端 id
inShutdown int32
onShutdown []func(s *Server)
onRestart []func(s *Server)
// TLSConfig for creating tls tcp connection.
tlsConfig *tls.Config
// BlockCrypt for kcp.BlockCrypt
options map[string]interface{}
// CORS options 跨域
corsOptions *CORSOptions
// 插件
Plugins PluginContainer
// AuthFunc can be used to auth.
AuthFunc func(ctx context.Context, req *protocol.Message, token string) error
handlerMsgNum int32
HandleServiceError func(error)
}
对象创立实现之后,须要咱们来注册对外提供的服务,通过 RegisterName 办法来实现:
/*
@name: 对外提供的服务名称
@rcvr: 对外提供的服务对象的实例
*/
func (s *Server) RegisterName(name string, rcvr interface{}, metadata string) error {_, err := s.register(rcvr, name, true) // 创立 service 对象,存储到对象表中
if err != nil {return err}
if s.Plugins == nil {s.Plugins = &pluginContainer{}
}
return s.Plugins.DoRegister(name, rcvr, metadata)
}
// 注册 service
func (s *Server) register(rcvr interface{}, name string, useName bool) (string, error) {
// 加写锁
s.serviceMapMu.Lock()
defer s.serviceMapMu.Unlock()
// 结构 service 对象
service := new(service)
service.typ = reflect.TypeOf(rcvr)
service.rcvr = reflect.ValueOf(rcvr)
sname := reflect.Indirect(service.rcvr).Type().Name() // Type
if useName {sname = name}
if sname == "" {errorStr := "rpcx.Register: no service name for type" + service.typ.String()
log.Error(errorStr)
return sname, errors.New(errorStr)
}
if !useName && !isExported(sname) {
errorStr := "rpcx.Register: type" + sname + "is not exported"
log.Error(errorStr)
return sname, errors.New(errorStr)
}
service.name = sname
// Install the methods
// 将 rcvr 对象的所有办法查找进去
service.method = suitableMethods(service.typ, true)
if len(service.method) == 0 {
var errorStr string
// To help the user, see if a pointer receiver would work.
method := suitableMethods(reflect.PtrTo(service.typ), false)
if len(method) != 0 {errorStr = "rpcx.Register: type" + sname + "has no exported methods of suitable type (hint: pass a pointer to value of that type)"
} else {errorStr = "rpcx.Register: type" + sname + "has no exported methods of suitable type"}
log.Error(errorStr)
return sname, errors.New(errorStr)
}
// 更新服务记录表
s.serviceMap[service.name] = service
return sname, nil
}
// DoRegister invokes DoRegister plugin.
func (p *pluginContainer) DoRegister(name string, rcvr interface{}, metadata string) error {var es []error
for _, rp := range p.plugins {
// 如果是 RegisterPlugin 类型的插件,会调用它的 Register 函数进行注册
if plugin, ok := rp.(RegisterPlugin); ok {err := plugin.Register(name, rcvr, metadata)
if err != nil {es = append(es, err)
}
}
}
if len(es) > 0 {return errors.NewMultiError(es)
}
return nil
}
服务注册结束之后,就开始启动服务了:
// Serve starts and listens RPC requests.
// It is blocked until receiving connections from clients.
func (s *Server) Serve(network, address string) (err error) {
var ln net.Listener
// 设置监听器,监听地址中配置的端口
ln, err = s.makeListener(network, address)
if err != nil {return}
// 如果抉择 http 协定,走这里
if network == "http" {s.serveByHTTP(ln, "")
return nil
}
// 抉择 ws/wss 协定走这里
if network == "ws" || network == "wss" {s.serveByWS(ln, "")
return nil
}
// try to start gateway
// 尝试启动网关,如果是 tcp 协定,则会通过 goroutine 启动一个网关服务
ln = s.startGateway(network, ln)
// tcp 协定走这里,开始解决监听端口处的数据
return s.serveListener(ln)
}
// 获取端口数据并启动 goroutine 解决
func (s *Server) serveListener(ln net.Listener) error {
var tempDelay time.Duration
s.mu.Lock()
s.ln = ln
s.mu.Unlock()
for {// 死循环
// 有申请数据达到时,channel 会返回数据
conn, e := ln.Accept()
if e != nil { // 错误处理
select {case <-s.getDoneChan(): // 连贯敞开,报错,跳出循环
return ErrServerClosed
default:
}
if ne, ok := e.(net.Error); ok && ne.Temporary() { // 短暂谬误
if tempDelay == 0 {tempDelay = 5 * time.Millisecond} else {tempDelay *= 2}
if max := 1 * time.Second; tempDelay > max {tempDelay = max}
log.Errorf("rpcx: Accept error: %v; retrying in %v", e, tempDelay)
time.Sleep(tempDelay)
continue
}
if strings.Contains(e.Error(), "listener closed") {return ErrServerClosed}
return e
}
tempDelay = 0
// 如果是 tcp 连贯,设置 keepAlive 相干参数
if tc, ok := conn.(*net.TCPConn); ok {period := s.options["TCPKeepAlivePeriod"]
if period != nil {tc.SetKeepAlive(true)
tc.SetKeepAlivePeriod(period.(time.Duration))
tc.SetLinger(10)
}
}
// 有 PostConnAcceptPlugin 类型插件时,进行解决
conn, ok := s.Plugins.DoPostConnAccept(conn)
if !ok {conn.Close()
continue
}
s.mu.Lock()
// 记录沉闷连贯
s.activeConn[conn] = struct{}{}
s.mu.Unlock()
if share.Trace {log.Debugf("server accepted an conn: %v", conn.RemoteAddr().String())
}
// 启动 goroutine 异步解决申请,持续开始下一次循环
go s.serveConn(conn)
}
}
监听到申请后,会启动一个 goroutine 进行异步解决,接下来看看申请的解决逻辑:
func (s *Server) serveConn(conn net.Conn) {if s.isShutdown() { // 异样解决,敞开连贯
s.closeConn(conn)
return
}
defer func() {// 捕捉异样,打印谬误堆栈信息,敞开连贯
if err := recover(); err != nil {
const size = 64 << 10
buf := make([]byte, size)
ss := runtime.Stack(buf, false)
if ss > size {ss = size}
buf = buf[:ss]
log.Errorf("serving %s panic error: %s, stack:\n %s", conn.RemoteAddr(), err, buf)
}
if share.Trace {log.Debugf("server closed conn: %v", conn.RemoteAddr().String())
}
s.closeConn(conn)
}()
// TLS 解决
if tlsConn, ok := conn.(*tls.Conn); ok {
if d := s.readTimeout; d != 0 {conn.SetReadDeadline(time.Now().Add(d))
}
if d := s.writeTimeout; d != 0 {conn.SetWriteDeadline(time.Now().Add(d))
}
if err := tlsConn.Handshake(); err != nil {log.Errorf("rpcx: TLS handshake error from %s: %v", conn.RemoteAddr(), err)
return
}
}
// 申请空间,寄存申请数据
r := bufio.NewReaderSize(conn, ReaderBuffsize)
var writeCh chan *[]byte
if s.AsyncWrite { // 如果设置了同步写,走这里
writeCh = make(chan *[]byte, 1)
defer close(writeCh)
go s.serveAsyncWrite(conn, writeCh)
}
for {if s.isShutdown() {return}
t0 := time.Now()
if s.readTimeout != 0 { // 设置超时工夫
conn.SetReadDeadline(t0.Add(s.readTimeout))
}
// 创立上下文
ctx := share.WithValue(context.Background(), RemoteConnContextKey, conn)
// 获取申请数据
req, err := s.readRequest(ctx, r)
if err != nil { // 获取失败的解决逻辑
protocol.FreeMsg(req)
if err == io.EOF {log.Infof("client has closed this connection: %s", conn.RemoteAddr().String())
} else if strings.Contains(err.Error(), "use of closed network connection") {log.Infof("rpcx: connection %s is closed", conn.RemoteAddr().String())
} else if errors.Is(err, ErrReqReachLimit) {if !req.IsOneway() {res := req.Clone()
res.SetMessageType(protocol.Response)
if len(res.Payload) > 1024 && req.CompressType() != protocol.None {res.SetCompressType(req.CompressType())
}
handleError(res, err)
s.Plugins.DoPreWriteResponse(ctx, req, res, err)
data := res.EncodeSlicePointer()
if s.AsyncWrite {writeCh <- data} else {conn.Write(*data)
protocol.PutData(data)
}
s.Plugins.DoPostWriteResponse(ctx, req, res, err)
protocol.FreeMsg(res)
} else {s.Plugins.DoPreWriteResponse(ctx, req, nil, err)
}
protocol.FreeMsg(req)
continue
} else {log.Warnf("rpcx: failed to read request: %v", err)
}
return
}
if s.writeTimeout != 0 {conn.SetWriteDeadline(t0.Add(s.writeTimeout))
}
if share.Trace {log.Debugf("server received an request %+v from conn: %v", req, conn.RemoteAddr().String())
}
// 更新上下文的值
ctx = share.WithLocalValue(ctx, StartRequestContextKey, time.Now().UnixNano())
closeConn := false
if !req.IsHeartbeat() { // 不是心跳包
err = s.auth(ctx, req) // 如果设置有 auth 防备,执行 auth 逻辑
closeConn = err != nil
}
if err != nil { // auth 失败的解决逻辑
if !req.IsOneway() {res := req.Clone()
res.SetMessageType(protocol.Response)
if len(res.Payload) > 1024 && req.CompressType() != protocol.None {res.SetCompressType(req.CompressType())
}
handleError(res, err)
s.Plugins.DoPreWriteResponse(ctx, req, res, err)
data := res.EncodeSlicePointer()
if s.AsyncWrite {writeCh <- data} else {conn.Write(*data)
protocol.PutData(data)
}
s.Plugins.DoPostWriteResponse(ctx, req, res, err)
protocol.FreeMsg(res)
} else {s.Plugins.DoPreWriteResponse(ctx, req, nil, err)
}
protocol.FreeMsg(req)
// auth failed, closed the connection
if closeConn {log.Infof("auth failed for conn %s: %v", conn.RemoteAddr().String(), err)
return
}
continue
}
go func() {// 启动 goroutine 解决
defer func() { // 解决完结,捕捉异样
if r := recover(); r != nil {
// maybe panic because the writeCh is closed.
log.Errorf("failed to handle request: %v", r)
}
}()
// 原子操作,记录服务解决的音讯数量
atomic.AddInt32(&s.handlerMsgNum, 1)
defer atomic.AddInt32(&s.handlerMsgNum, -1)
if req.IsHeartbeat() { // 心跳包的解决逻辑
s.Plugins.DoHeartbeatRequest(ctx, req)
req.SetMessageType(protocol.Response)
data := req.EncodeSlicePointer()
if s.AsyncWrite {writeCh <- data} else {conn.Write(*data)
protocol.PutData(data)
}
protocol.FreeMsg(req)
return
}
resMetadata := make(map[string]string)
ctx = share.WithLocalValue(share.WithLocalValue(ctx, share.ReqMetaDataKey, req.Metadata),
share.ResMetaDataKey, resMetadata)
// 如果服务通过 metadata 设置了超时工夫,则创立一个超时上下文
cancelFunc := parseServerTimeout(ctx, req)
if cancelFunc != nil {defer cancelFunc()
}
// 插件解决
s.Plugins.DoPreHandleRequest(ctx, req)
if share.Trace {log.Debugf("server handle request %+v from conn: %v", req, conn.RemoteAddr().String())
}
// first use handler
if handler, ok := s.router[req.ServicePath+"."+req.ServiceMethod]; ok {sctx := NewContext(ctx, conn, req, writeCh)
err := handler(sctx)
if err != nil {log.Errorf("[handler internal error]: servicepath: %s, servicemethod, err: %v", req.ServicePath, req.ServiceMethod, err)
}
return
}
// 解决申请
res, err := s.handleRequest(ctx, req)
if err != nil {
if s.HandleServiceError != nil {s.HandleServiceError(err)
} else {log.Warnf("rpcx: failed to handle request: %v", err)
}
}
s.Plugins.DoPreWriteResponse(ctx, req, res, err)
if !req.IsOneway() {if len(resMetadata) > 0 { // copy meta in context to request
meta := res.Metadata
if meta == nil {res.Metadata = resMetadata} else {
for k, v := range resMetadata {if meta[k] == "" {meta[k] = v
}
}
}
}
if len(res.Payload) > 1024 && req.CompressType() != protocol.None {res.SetCompressType(req.CompressType())
}
data := res.EncodeSlicePointer()
if s.AsyncWrite {writeCh <- data} else {conn.Write(*data)
protocol.PutData(data)
}
}
s.Plugins.DoPostWriteResponse(ctx, req, res, err)
if share.Trace {log.Debugf("server write response %+v for an request %+v from conn: %v", res, req, conn.RemoteAddr().String())
}
// 开释对象,放回对象池
protocol.FreeMsg(req)
protocol.FreeMsg(res)
}()}
}
未完待续……
正文完