grpc server 源码学习
如何实现一个最简略的 grpc server
// 创立 listener
lis, err := net.Listen("tcp", port)
if err != nil {log.Fatalf("failed to listen: %v", err)
}
// 创立 server 示例
s := grpc.NewServer()
// 注册服务
pb.RegisterGreeterServer(s, &server{})
reflection.Register(s)
// 启动服务端监听
if err := s.Serve(lis); err != nil {log.Fatalf("failed to serve: %v", err)
}
grpc.NewServer()
grpc.NewServer()会返回一个 grpc.Server,它的构造如下:
// Server is a gRPC server to serve RPC requests.
type Server struct {
opts serverOptions
mu sync.Mutex // guards following
lis map[net.Listener]bool
conns map[transport.ServerTransport]bool
serve bool
drain bool
cv *sync.Cond // signaled when connections close for GracefulStop
m map[string]*service // service name -> service info
events trace.EventLog
quit *grpcsync.Event
done *grpcsync.Event
channelzRemoveOnce sync.Once
serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
channelzID int64 // channelz unique identification number
czData *channelzData
}
由这个构造,咱们能够略知一二,它应用了一个容器 conns 用来保留以后的所有连贯;也有和优雅退出的 waitgroup,猜想应该是须要期待所有申请解决完后退出;cond 猜想是用来告诉所有以后的连贯,服务将被进行了;其余字段的用处临时无奈很显著的猜想进去,咱们将在前面持续剖析。
pb.RegisterGreeterServer
func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) {s.RegisterService(&_Greeter_serviceDesc, srv)
}
最终是调用了 grpc.Server 的 RegisterService
进行注册,第一个参数是 pb 生成代码生成的_Greeter_serviceDesc
,它形容了 rpc service 的一些属性信息,内容如下:
var _Greeter_serviceDesc = grpc.ServiceDesc{
ServiceName: "helloworld.Greeter",
HandlerType: (*GreeterServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "SayHello",
Handler: _Greeter_SayHello_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "demo.proto",
}
这外面形容了
- 服务名
- HandlerType(这个是做什么的?存疑)
- 办法列表,办法列表包含办法名和一个 Handler(Handler 做什么的?存疑)
- Streams,这个应该是只有 stream 类型的 rpc service 才会领有具体的值
- Metadata,这个代表生成这个 go 文件的原始 pb 文件的文件名
办法列表里,每一项是一个 MethodDesc 构造体,它的定义如下:
type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
// MethodDesc represents an RPC service's method specification.
type MethodDesc struct {
MethodName string
Handler methodHandler
}
能够发现 Handler 其实是一个函数;
那么实在的 _Greeter_SayHello_Handler
蕴含了哪些信息呢?蕴含的信息如下:
func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error)
in := new(HelloRequest)
if err := dec(in); err != nil {return nil, err}
if interceptor == nil {return srv.(GreeterServer).SayHello(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/helloworld.Greeter/SayHello",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {return srv.(GreeterServer).SayHello(ctx, req.(*HelloRequest))
}
return interceptor(ctx, in, info, handler)
}
通过这段代码能够看出,它其实是真正调用解决 rpc 定义的接口办法的入口。
- 有一个参数 dec,这个首先会将其解码到 request 构造体上
- 如果没有设置拦截器,那么间接调用 srv 实现的 SayHello 办法进行解决并返回。
- 如果设置了拦截器,会通过传入的拦截器做一些非凡的解决。
能够猜想,在 grpc server 实在调用时,会将具体实现了 rpc service 的构造体作为第一个参数传入,将通过解码的 request 数据作为第三个参数传入,由实现了 rpc service 接口的对象来进行解决。
注册服务
func (s *Server) register(sd *ServiceDesc, ss interface{}) {s.mu.Lock()
defer s.mu.Unlock()
s.printf("RegisterService(%q)", sd.ServiceName)
if s.serve {grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
}
if _, ok := s.m[sd.ServiceName]; ok {grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
}
srv := &service{
server: ss,
md: make(map[string]*MethodDesc),
sd: make(map[string]*StreamDesc),
mdata: sd.Metadata,
}
for i := range sd.Methods {d := &sd.Methods[i]
srv.md[d.MethodName] = d
}
for i := range sd.Streams {d := &sd.Streams[i]
srv.sd[d.StreamName] = d
}
s.m[sd.ServiceName] = srv
}
能够依据之前形容的 server 构造体能够看出,server 有一个属性 s,这个汇合能够存储多个 pb 定义的 service。key 为服务名,value 为具体的服务构造体。意思是一个 grpc server 启动时,能够有多个 pb 定义的 rpc service 被注册到其中。并且同一个 rpc service 不能被注册两次。
注册服务时,将之前提到的 pb 生成的 _Greeter_serviceDesc
相干信息注册到了 service 汇合中。把 service 和 method 都注册到了 server 中。
grpcServer.Serve
当初服务能够正式启动起来了,首先看一下官网对 Serve 办法的正文:
// Serve accepts incoming connections on the listener lis, creating a new
// ServerTransport and service goroutine for each. The service goroutines
// read gRPC requests and then call the registered handlers to reply to them.
// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
// this method returns.
// Serve will return a non-nil error unless Stop or GracefulStop is called.
大抵翻译一下:
Serve 对每一个 listener 监听到的连贯创立一个新的 ServerTransport 和协程,这个协程读取 grpc 申请而后调用被注册的 handler 来响应这些申请。Serve 办法在 lis.Accept 失败时会返回 error,当这个办法返回时,lis 会被敞开。当应用失常退出或者优雅退出时,Serve 办法不会返回 error。
正文说得十分明确,也能看到,的确是调用了被注册的 handler 来响应申请。
Serve 办法的大抵工作逻辑:
for {rawConn, err := lis.Accept()
if err != nil {
// 错误处理
//...
}
s.serveWG.Add(1)
go func() {s.handleRawConn(rawConn)
s.serveWG.Done()}()}
整个 Serve 办法在一个大的 for 循环中一直获取 listener 监听到的申请,而后对每一个监听到的申请开了一个协程去解决。
也就是说外围其实是 handleRawConn
办法
// handleRawConn forks a goroutine to handle a just-accepted connection that
// has not had any I/O performed on it yet.
func (s *Server) handleRawConn(rawConn net.Conn) {if s.quit.HasFired() {rawConn.Close()
return
}
// 设置连贯超时,若没有的话会有默认的超时工夫,默认两分钟
rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
// 查看证书认证相干信息
conn, authInfo, err := s.useTransportAuthenticator(rawConn)
if err != nil {
// ErrConnDispatched means that the connection was dispatched away from
// gRPC; those connections should be left open.
if err != credentials.ErrConnDispatched {s.mu.Lock()
s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
s.mu.Unlock()
grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
rawConn.Close()}
rawConn.SetDeadline(time.Time{})
return
}
// 进行 http2 的传输
// Finish handshaking (HTTP2)
st := s.newHTTP2Transport(conn, authInfo)// 获取到传输 st
if st == nil {return}
rawConn.SetDeadline(time.Time{})
// 把 http2 传输连贯缓存到 server 的 conns 构造中
// 为何要缓存呢?if !s.addConn(st) {// 如果 server 的 conns 构造为空时,会返回 false,并敞开掉传输 st
return
}
// 间接 go 一个协程,异步解决申请
go func() {s.serveStreams(st)
// 最初从连贯缓存 conns 构造中删除掉传输 st
// 并通过 cond,发动一个播送
s.removeConn(st)
}()
// 这里能够看到,起了一个协程去解决这个 http2 的 stream,这是因为 http2 是长连贯,能够复用这个流一直接收数据,所以须要起一个协程来做这件事。
这样看来,实际上咱们应该重点关注下 serveStreams
办法,它做了什么呢?
func (s *Server) serveStreams(st transport.ServerTransport) {defer st.Close()
var wg sync.WaitGroup
st.HandleStreams(func(stream *transport.Stream) {wg.Add(1)
go func() {defer wg.Done()
s.handleStream(st, stream, s.traceInfo(st, stream))
}()}, func(ctx context.Context, method string) context.Context {
if !EnableTracing {return ctx}
tr := trace.New("grpc.Recv."+methodFamily(method), method)
return trace.NewContext(ctx, tr)
})
wg.Wait()}
能够看到,这里实际上的解决办法是:
func(stream *transport.Stream) {wg.Add(1)
go func() {defer wg.Done()
s.handleStream(st, stream, s.traceInfo(st, stream))
}()}
即 HandleStream
,为何这里要应用 waitGroup,并期待这个实现,而不是同步的形式呢?。这是因为,咱们要理解一个前提,grpc 是能够反对长连贯的,因而severStreams
办法解决的其实并不仅仅是单个申请,而是一个源源不断的 http2 流。所以咱们应该查看 st 的 HandleStream
干了什么事件。
st 是 transport.ServerTransport
类型的 interface,而实际上这里的对象是http2Server
。
// HandleStreams receives incoming streams using the given handler. This is
// typically run in a separate goroutine.
// traceCtx attaches trace to ctx and returns the new context.
func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {defer close(t.readerDone)
for {t.controlBuf.throttle()
frame, err := t.framer.fr.ReadFrame()
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
if err != nil {
// 错误处理
//...
}
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
if t.operateHeaders(frame, handle, traceCtx) {t.Close()
break
}
case *http2.DataFrame:
t.handleData(frame)
case *http2.RSTStreamFrame:
t.handleRSTStream(frame)
case *http2.SettingsFrame:
t.handleSettings(frame)
case *http2.PingFrame:
t.handlePing(frame)
case *http2.WindowUpdateFrame:
t.handleWindowUpdate(frame)
case *http2.GoAwayFrame:
// TODO: Handle GoAway from the client appropriately.
default:
errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
}
}
}
因而,整个办法是在一个 for 循环中,一直解决二进制流,所以对每次获取到的数据都应用一个协程去解决,这就是为何 handle 参数要应用 waitgroup。
handleStream
做了什么事呢?这里只截取要害代码:
sm := stream.Method()
service := sm[:pos]
method := sm[pos+1:]
srv, knownService := s.m[service]
if knownService {if md, ok := srv.md[method]; ok {s.processUnaryRPC(t, stream, srv, md, trInfo)
return
}
if sd, ok := srv.sd[method]; ok {s.processStreamingRPC(t, stream, srv, sd, trInfo)
return
}
}
总结起来是:
- 从 stream 里取出 method,service 信息
- 如果 service 与之前注册胜利的 rpc service 名字匹配,则查看 method 是否与注册胜利的 rpc service 的 method 相匹配,若匹配的话,间接调用
processUnaryRPC
或processStreamingRPC
进行解决。
接着看processUnaryRPC
,发现了咱们最开始提到的 pb 注册的Handler: _Greeter_SayHello_Handler
,它是这样被调用的:
df := func(v interface{}) error {if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
}
if sh != nil {sh.HandleRPC(stream.Context(), &stats.InPayload{RecvTime: time.Now(),
Payload: v,
WireLength: payInfo.wireLength,
Data: d,
Length: len(d),
})
}
if binlog != nil {
binlog.Log(&binarylog.ClientMessage{Message: d,})
}
if trInfo != nil {trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
}
return nil
}
ctx := NewContextWithServerTransportStream(stream.Context(), stream)
reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
最初解决实现后,会将 resp 写回:
if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {...}
至此,整个 grpc 就解决完了一条申请,并返回了响应。
优雅退出是如何实现的?
之前咱们有一个疑难,为什么要缓存所有的 conns 呢?这就是因为在优雅退出时,咱们须要告诉 / 期待所有以后的连贯都开释再退出。
看一下优雅退出是如何实现的:
- 第一阶段
s.mu.Lock()
if s.conns == nil {s.mu.Unlock()
return
}
for lis := range s.lis {lis.Close()
}
s.lis = nil
if !s.drain {
for st := range s.conns {st.Drain()
}
s.drain = true
}
// Wait for serving threads to be ready to exit. Only then can we be sure no
// new conns will be created.
s.mu.Unlock()
第一阶段次要是首先将所有 tcp listener 敞开掉,并且若 server 状态不处于 进行接管申请的状态,那么就告诉以后所有的连贯进行再接管申请(Drain
),最初将状态 drain 置为 true。
- 第二阶段
s.serveWG.Wait()
s.mu.Lock()
for len(s.conns) != 0 {s.cv.Wait()
}
s.conns = nil
if s.events != nil {s.events.Finish()
s.events = nil
}
s.mu.Unlock()
首先期待所有的申请都解决完,而后加锁,期待缓存的 conns 连贯都被敞开掉,若都被敞开掉,会有协程发动 Broadcase 进行告诉,告诉结束后 Wait()不会再阻塞,能够接着往下走,最初发送 finish event,解锁即可。
什么时候会发动 Broadcast 呢?有两个中央:
- 解决完一个流,会将流从 conns 中删除
go func() {s.serveStreams(st)
s.removeConn(st)
}()
而删除时,则会将 st 从 conns 中删除掉,并且发一个 Broadcast
func (s *Server) removeConn(st transport.ServerTransport) {s.mu.Lock()
defer s.mu.Unlock()
if s.conns != nil {delete(s.conns, st)
s.cv.Broadcast()}
}
依据优雅退出代码,尽管发动了播送,s.cv.Wait
不再阻塞,然而若还有别的流没有开释,那么 s.conns
依然不为 0,因而又会进入到 s.cv.Wait
的阻塞中。
- server 失常退出(
Stop
)
s.mu.Lock()
listeners := s.lis
s.lis = nil
st := s.conns
s.conns = nil
// interrupt GracefulStop if Stop and GracefulStop are called concurrently.
s.cv.Broadcast()
s.mu.Unlock()
for lis := range listeners {lis.Close()
}
for c := range st {c.Close()
}
s.mu.Lock()
if s.events != nil {s.events.Finish()
s.events = nil
}
s.mu.Unlock()
能够看到,在失常退出 server 时,间接将 conns 置为了 nil,而后发送了 Broadcasr,因而优雅退出的 Wait 也不会再期待了。