grpc server源码学习
如何实现一个最简略的grpc server
// 创立listener lis, err := net.Listen("tcp", port) if err != nil { log.Fatalf("failed to listen: %v", err) } // 创立server示例 s := grpc.NewServer() // 注册服务 pb.RegisterGreeterServer(s, &server{}) reflection.Register(s) // 启动服务端监听 if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) }
grpc.NewServer()
grpc.NewServer()会返回一个grpc.Server,它的构造如下:
// Server is a gRPC server to serve RPC requests.type Server struct { opts serverOptions mu sync.Mutex // guards following lis map[net.Listener]bool conns map[transport.ServerTransport]bool serve bool drain bool cv *sync.Cond // signaled when connections close for GracefulStop m map[string]*service // service name -> service info events trace.EventLog quit *grpcsync.Event done *grpcsync.Event channelzRemoveOnce sync.Once serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop channelzID int64 // channelz unique identification number czData *channelzData}
由这个构造,咱们能够略知一二,它应用了一个容器conns用来保留以后的所有连贯;也有和优雅退出的waitgroup,猜想应该是须要期待所有申请解决完后退出;cond猜想是用来告诉所有以后的连贯,服务将被进行了;其余字段的用处临时无奈很显著的猜想进去,咱们将在前面持续剖析。
pb.RegisterGreeterServer
func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) { s.RegisterService(&_Greeter_serviceDesc, srv)}
最终是调用了grpc.Server的RegisterService
进行注册,第一个参数是pb生成代码生成的_Greeter_serviceDesc
,它形容了rpc service的一些属性信息,内容如下:
var _Greeter_serviceDesc = grpc.ServiceDesc{ ServiceName: "helloworld.Greeter", HandlerType: (*GreeterServer)(nil), Methods: []grpc.MethodDesc{ { MethodName: "SayHello", Handler: _Greeter_SayHello_Handler, }, }, Streams: []grpc.StreamDesc{}, Metadata: "demo.proto",}
这外面形容了
- 服务名
- HandlerType(这个是做什么的?存疑)
- 办法列表,办法列表包含办法名和一个Handler(Handler做什么的?存疑)
- Streams,这个应该是只有stream类型的rpc service才会领有具体的值
- Metadata,这个代表生成这个go文件的原始pb文件的文件名
办法列表里,每一项是一个MethodDesc构造体,它的定义如下:
type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)// MethodDesc represents an RPC service's method specification.type MethodDesc struct { MethodName string Handler methodHandler}
能够发现Handler其实是一个函数;
那么实在的_Greeter_SayHello_Handler
蕴含了哪些信息呢?蕴含的信息如下:
func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) in := new(HelloRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { return srv.(GreeterServer).SayHello(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: "/helloworld.Greeter/SayHello", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(GreeterServer).SayHello(ctx, req.(*HelloRequest)) } return interceptor(ctx, in, info, handler)}
通过这段代码能够看出,它其实是真正调用解决rpc定义的接口办法的入口。
- 有一个参数dec,这个首先会将其解码到request构造体上
- 如果没有设置拦截器,那么间接调用srv实现的SayHello办法进行解决并返回。
- 如果设置了拦截器,会通过传入的拦截器做一些非凡的解决。
能够猜想,在grpc server实在调用时,会将具体实现了rpc service的构造体作为第一个参数传入,将通过解码的request数据作为第三个参数传入,由实现了rpc service接口的对象来进行解决。
注册服务
func (s *Server) register(sd *ServiceDesc, ss interface{}) { s.mu.Lock() defer s.mu.Unlock() s.printf("RegisterService(%q)", sd.ServiceName) if s.serve { grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName) } if _, ok := s.m[sd.ServiceName]; ok { grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName) } srv := &service{ server: ss, md: make(map[string]*MethodDesc), sd: make(map[string]*StreamDesc), mdata: sd.Metadata, } for i := range sd.Methods { d := &sd.Methods[i] srv.md[d.MethodName] = d } for i := range sd.Streams { d := &sd.Streams[i] srv.sd[d.StreamName] = d } s.m[sd.ServiceName] = srv}
能够依据之前形容的server构造体能够看出,server有一个属性s,这个汇合能够存储多个pb定义的service。key为服务名,value为具体的服务构造体。意思是一个grpc server启动时,能够有多个pb定义的rpc service被注册到其中。并且同一个rpc service不能被注册两次。
注册服务时,将之前提到的pb生成的_Greeter_serviceDesc
相干信息注册到了service汇合中。把service和method都注册到了server中。
grpcServer.Serve
当初服务能够正式启动起来了,首先看一下官网对Serve办法的正文:
// Serve accepts incoming connections on the listener lis, creating a new// ServerTransport and service goroutine for each. The service goroutines// read gRPC requests and then call the registered handlers to reply to them.// Serve returns when lis.Accept fails with fatal errors. lis will be closed when// this method returns.// Serve will return a non-nil error unless Stop or GracefulStop is called.
大抵翻译一下:
Serve对每一个listener监听到的连贯创立一个新的ServerTransport和协程,这个协程读取grpc申请而后调用被注册的handler来响应这些申请。Serve办法在lis.Accept失败时会返回error,当这个办法返回时,lis会被敞开。当应用失常退出或者优雅退出时,Serve办法不会返回error。
正文说得十分明确,也能看到,的确是调用了被注册的handler来响应申请。
Serve办法的大抵工作逻辑:
for { rawConn, err := lis.Accept() if err != nil { //错误处理 //... } s.serveWG.Add(1) go func() { s.handleRawConn(rawConn) s.serveWG.Done() }() }
整个Serve办法在一个大的for循环中一直获取listener监听到的申请,而后对每一个监听到的申请开了一个协程去解决。
也就是说外围其实是handleRawConn
办法
// handleRawConn forks a goroutine to handle a just-accepted connection that// has not had any I/O performed on it yet.func (s *Server) handleRawConn(rawConn net.Conn) { if s.quit.HasFired() { rawConn.Close() return } //设置连贯超时,若没有的话会有默认的超时工夫,默认两分钟rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout)) //查看证书认证相干信息 conn, authInfo, err := s.useTransportAuthenticator(rawConn) if err != nil { // ErrConnDispatched means that the connection was dispatched away from // gRPC; those connections should be left open. if err != credentials.ErrConnDispatched { s.mu.Lock() s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err) s.mu.Unlock() grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err) rawConn.Close() } rawConn.SetDeadline(time.Time{}) return } //进行http2的传输 // Finish handshaking (HTTP2) st := s.newHTTP2Transport(conn, authInfo)//获取到传输st if st == nil { return } rawConn.SetDeadline(time.Time{}) //把http2传输连贯缓存到server的conns构造中 //为何要缓存呢? if !s.addConn(st) {//如果server的conns构造为空时,会返回false,并敞开掉传输st return } //间接go一个协程,异步解决申请 go func() { s.serveStreams(st) //最初从连贯缓存conns构造中删除掉传输st //并通过cond,发动一个播送 s.removeConn(st) }()
//这里能够看到,起了一个协程去解决这个http2的stream,这是因为http2是长连贯,能够复用这个流一直接收数据,所以须要起一个协程来做这件事。
这样看来,实际上咱们应该重点关注下serveStreams
办法,它做了什么呢?
func (s *Server) serveStreams(st transport.ServerTransport) { defer st.Close() var wg sync.WaitGroup st.HandleStreams(func(stream *transport.Stream) { wg.Add(1) go func() { defer wg.Done() s.handleStream(st, stream, s.traceInfo(st, stream)) }() }, func(ctx context.Context, method string) context.Context { if !EnableTracing { return ctx } tr := trace.New("grpc.Recv."+methodFamily(method), method) return trace.NewContext(ctx, tr) }) wg.Wait()}
能够看到,这里实际上的解决办法是:
func(stream *transport.Stream) { wg.Add(1) go func() { defer wg.Done() s.handleStream(st, stream, s.traceInfo(st, stream)) }() }
即HandleStream
,为何这里要应用waitGroup,并期待这个实现,而不是同步的形式呢?。这是因为,咱们要理解一个前提,grpc是能够反对长连贯的,因而severStreams
办法解决的其实并不仅仅是单个申请,而是一个源源不断的http2流。所以咱们应该查看st的HandleStream
干了什么事件。
st是transport.ServerTransport
类型的interface,而实际上这里的对象是http2Server
。
// HandleStreams receives incoming streams using the given handler. This is// typically run in a separate goroutine.// traceCtx attaches trace to ctx and returns the new context.func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) { defer close(t.readerDone) for { t.controlBuf.throttle() frame, err := t.framer.fr.ReadFrame() atomic.StoreInt64(&t.lastRead, time.Now().UnixNano()) if err != nil { //错误处理 //... } switch frame := frame.(type) { case *http2.MetaHeadersFrame: if t.operateHeaders(frame, handle, traceCtx) { t.Close() break } case *http2.DataFrame: t.handleData(frame) case *http2.RSTStreamFrame: t.handleRSTStream(frame) case *http2.SettingsFrame: t.handleSettings(frame) case *http2.PingFrame: t.handlePing(frame) case *http2.WindowUpdateFrame: t.handleWindowUpdate(frame) case *http2.GoAwayFrame: // TODO: Handle GoAway from the client appropriately. default: errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame) } }}
因而,整个办法是在一个for循环中,一直解决二进制流,所以对每次获取到的数据都应用一个协程去解决,这就是为何handle参数要应用waitgroup。
handleStream
做了什么事呢?这里只截取要害代码:
sm := stream.Method() service := sm[:pos] method := sm[pos+1:] srv, knownService := s.m[service] if knownService { if md, ok := srv.md[method]; ok { s.processUnaryRPC(t, stream, srv, md, trInfo) return } if sd, ok := srv.sd[method]; ok { s.processStreamingRPC(t, stream, srv, sd, trInfo) return } }
总结起来是:
- 从stream里取出method,service信息
- 如果service与之前注册胜利的rpc service名字匹配,则查看method是否与注册胜利的rpc service的method相匹配,若匹配的话,间接调用
processUnaryRPC
或processStreamingRPC
进行解决。
接着看processUnaryRPC
,发现了咱们最开始提到的pb注册的Handler: _Greeter_SayHello_Handler
,它是这样被调用的:
df := func(v interface{}) error { if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil { return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err) } if sh != nil { sh.HandleRPC(stream.Context(), &stats.InPayload{ RecvTime: time.Now(), Payload: v, WireLength: payInfo.wireLength, Data: d, Length: len(d), }) } if binlog != nil { binlog.Log(&binarylog.ClientMessage{ Message: d, }) } if trInfo != nil { trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true) } return nil } ctx := NewContextWithServerTransportStream(stream.Context(), stream) reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
最初解决实现后,会将resp写回:
if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil { ...}
至此,整个grpc就解决完了一条申请,并返回了响应。
优雅退出是如何实现的?
之前咱们有一个疑难,为什么要缓存所有的conns呢?这就是因为在优雅退出时,咱们须要告诉/期待所有以后的连贯都开释再退出。
看一下优雅退出是如何实现的:
- 第一阶段
s.mu.Lock() if s.conns == nil { s.mu.Unlock() return } for lis := range s.lis { lis.Close() } s.lis = nil if !s.drain { for st := range s.conns { st.Drain() } s.drain = true } // Wait for serving threads to be ready to exit. Only then can we be sure no // new conns will be created. s.mu.Unlock()
第一阶段次要是首先将所有tcp listener敞开掉,并且若server状态不处于 进行接管申请的状态,那么就告诉以后所有的连贯进行再接管申请(Drain
),最初将状态drain置为true。
- 第二阶段
s.serveWG.Wait() s.mu.Lock() for len(s.conns) != 0 { s.cv.Wait() } s.conns = nil if s.events != nil { s.events.Finish() s.events = nil } s.mu.Unlock()
首先期待所有的申请都解决完,而后加锁,期待缓存的conns连贯都被敞开掉,若都被敞开掉,会有协程发动Broadcase进行告诉,告诉结束后Wait()不会再阻塞,能够接着往下走,最初发送finish event,解锁即可。
什么时候会发动Broadcast呢?有两个中央:
- 解决完一个流,会将流从conns中删除
go func() { s.serveStreams(st) s.removeConn(st)}()
而删除时,则会将st从conns中删除掉,并且发一个Broadcast
func (s *Server) removeConn(st transport.ServerTransport) { s.mu.Lock() defer s.mu.Unlock() if s.conns != nil { delete(s.conns, st) s.cv.Broadcast() }}
依据优雅退出代码,尽管发动了播送,s.cv.Wait
不再阻塞,然而若还有别的流没有开释,那么s.conns
依然不为0,因而又会进入到s.cv.Wait
的阻塞中。
- server失常退出(
Stop
)
s.mu.Lock() listeners := s.lis s.lis = nil st := s.conns s.conns = nil // interrupt GracefulStop if Stop and GracefulStop are called concurrently. s.cv.Broadcast() s.mu.Unlock() for lis := range listeners { lis.Close() } for c := range st { c.Close() } s.mu.Lock() if s.events != nil { s.events.Finish() s.events = nil } s.mu.Unlock()
能够看到,在失常退出server时,间接将conns置为了nil,而后发送了Broadcasr,因而优雅退出的Wait也不会再期待了。