共计 8698 个字符,预计需要花费 22 分钟才能阅读完成。
grpc 源码学习笔记(浅显版)
本次源码浏览的目标是心愿理解到 grpc 从 client 调用到 server 端响应的流程,
而非系统性的对每个逻辑都进行剖析和详解。如果能通过本次学习到浏览 grpc 源码的
线索或者大抵理解到其工作流程,这次浏览就有价值。
本次浏览以一元 rpc 应用为导向。
原文地址
- grpc 源码学习笔记
我的主页
- https://github.com/anqiansong
概念
rpc
在分布式计算,近程过程调用(英语:Remote Procedure Call,缩写为 RPC)是一个计算机通信协议。该协定容许运行于一台计算机的程序调用另一个地址空间(通常为一个凋谢网络的一台计算机)的子程序,
而程序员就像调用本地程序一样,无需额定地为这个交互作用编程(无需关注细节)。RPC 是一种服务器 - 客户端(Client/Server)模式,经典实现是一个通过发送申请 - 承受回应进行信息交互的零碎。—— 维基百科
grpc
grpc 是 google 的开源 rpc 零碎,该零碎基于 HTTP/2 协定传输,应用 Protocol Buffers 作为接口描述语言。
在 gRPC 中,客户机应用程序能够间接调用不同机器上的服务器应用程序上的办法,就如同它是一个本地对象一样,这使得您更容易创立分布式应用程序和服务。与许多 RPC 零碎一样,gRPC 基于定义服务的思维,指定能够通过参数和返回类型近程调用的办法。在服务器端,服务器实现这个接口并运行一个 gRPC 服务器来解决客户机调用。在客户端,客户端有一个存根(在某些语言中称为客户端),它提供与服务器雷同的办法。
grpc 应用示例
1、编写 proto
syntax = "proto3";
//x:generate protoc --go_out=%DIR% --go-grpc_out=%DIR% %BASE%
package greet;
service Greeter {// Sends a greeting rpc SayHello (HelloRequest) returns (HelloReply) {}}
// The request message containing the user's name.
message HelloRequest {string name = 1;}
// The response message containing the greetings
message HelloReply {string message = 1;}
舒适提醒
这段正文在装置 intellij 插件 (CommentShell) 后能够间接运行并生成 pb.go,前提是曾经装置了
2、点击做侧边栏运行按钮生成 pb.go
3、server 端代码
type serverImplemented struct {greet.UnimplementedGreeterServer}
func (s *serverImplemented) SayHello(context.Context, *greet.Req) (*greet.Reply, error) {return &greet.Reply{}, nil}
func main() {s := grpc.NewServer() greet.RegisterGreeterServer(s, &serverImplemented{}) lis, err := net.Listen("tcp", "127.0.0.1:8888") if err != nil {log.Fatal(err) } err = s.Serve(lis) if err != nil {log.Fatal(err) }}
4、client 端代码
conn, err := grpc.Dial("", grpc.WithInsecure(), grpc.WithBlock())
if err != nil {log.Fatal(err)}
client := greet.NewGreeterClient(conn)
// 疏忽 resp
client.SayHello(context.Background(), &greet.Req{})
源码剖析
grpc 版本
v.1.36.0
纲要
client
咱们从下面的 demo 的 client 端作为入口来进入源码,从 client 应用咱们能够得悉,
该程序有两个步骤:
- Dial
- SayHello(Invoke)
grpc.Dial(clientconn.go:135)
Dial 最终会调用 grpc.DialContext
办法,这外面我集体在浏览的时候次要关注了一下几点:
-
中间件链式调用
一次执行中间件中的逻辑,造成链式调用拼装
-
insecure 解决
装置传输参数校验,其能够由
grpc.WithInsecure()
来管制是否开启平安传输,如果开启,
则会对 https 证书,密钥在这一步做参数校验。if !cc.dopts.insecure {if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil { return nil, errNoTransportSecurity} if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {return nil, errTransportCredsAndBundle} } else {if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil { return nil, errCredentialsConflict} for _, cd := range cc.dopts.copts.PerRPCCredentials {if cd.RequireTransportSecurity() {return nil, errTransportCredentialsMissing} } } ```* defaultServiceConfigRawJSON 解析 通过 json 文本模式设置 ServiceConfig,相干参数见
type jsonSC struct {// 负载平衡策略 LoadBalancingPolicy string // 负载平衡配置 LoadBalancingConfig internalserviceconfig.BalancerConfig // 每个 rpc 办法的配置 MethodConfig []jsonMC // 重试机制相干配置 RetryThrottling retryThrottlingPolicy // 衰弱检测配置 HealthCheckConfig healthCheckConfig }
`
超时管制
客户端超时管制是通过Context.WithTimeout
进行管制的,其超时时长能够通过grpc.WithTimeout(time.Duration)
管制。if cc.dopts.timeout > 0 {var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout) defer cancel()} ```* getResolver `getResolver` 优先从 DialOption 中读取,其次中全局读取,DialOption 能够通过 `grpc.WithResolvers()` 设置,全局的能够通过 `resolver.Register(Builder)` 注册, 用法可参考 `resolver/dns/dns_resolver.go`。
func (cc ClientConn) getResolver(scheme string) resolver.Builder {for _, rb := range cc.dopts.resolvers { if scheme == rb.Scheme() {return rb} } return resolver.Get(scheme) }
`
block 管制
在DialContext
文档正文中曾经阐明,block 会阻塞 dial 过程,晓得连贯状态变为Ready
或者遇到谬误,默认是非阻塞的(即在后盾建设连贯操作)。
开发者能够通过grpc.WithBlock()
来管制阻塞。
SayHello
通过调用 SayHello
办法追溯,在 pb.go 文件中能够看到其调用了 Invoke
办法:go func (c *greeterClient) SayHello(ctx context.Context, in *Req, opts ...grpc.CallOption) (*Reply, error) {out := new(Reply) err := c.cc.Invoke(ctx, "/greet.Greeter/SayHello", in, out, opts...) if err != nil {return nil, err} return out, nil }
深刻到 Invoke
办法中,找到 ClientConn.Invoke()
其实现了 ClientConnInterface
,找到 call.go
第 29 行,在该办法中做了两件事:
opts = combine(cc.dopts.callOptions, opts)
if cc.dopts.unaryInt != nil {return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...) } return invoke(ctx, method, args, reply, cc, opts...)```
* combine 组装 CallOption
* invoke 真正执行 client 调用
##### invoke(call.go:65)
在该办法中,咱们终于看到了外围的逻辑:
* newClientStream(stream.go:160)
在该办法中次要是进行初始化,其中能够重点关注一下 `Compressor` 和 `Encode` 初始化。**Compressor(stream.go:233):**
if ct := c.compressorType; ct != “” {callHdr.SendCompress = ct if ct != encoding.Identity { comp = encoding.GetCompressor(ct) if comp == nil {return nil, status.Errorf(codes.Internal, “grpc: Compressor is not installed for requested grpc-encoding %q”, ct) } } } else if cc.dopts.cp != nil {callHdr.SendCompress = cc.dopts.cp.Type() cp = cc.dopts.cp } `
Compressor
会优先从 CallOption
中去读取 Compressor
的名称,而后从全局中去查找(前提已注册),
其次从 DialOption
中去读取,开发者能够通过 grpc.WithCompressor()
执行。
自定义
Compressor
可参考gzip.go
, 这也是默认的压缩形式
Encoding(rpc_util.go:854):
if c.codec != nil {// codec was already set by a CallOption; use it. return nil} if c.contentSubtype == "" {// No codec specified in CallOptions; use proto by default. c.codec = encoding.GetCodec(proto.Name) return nil } // c.contentSubtype is already lowercased in CallContentSubtype
c.codec = encoding.GetCodec(c.contentSubtype) if c.codec == nil {return status.Errorf(codes.Internal, "no codec registered for content-subtype %s", c.contentSubtype) } return nil ``` `Encode 会优先从 `CallOption` 中读取,而后从全局中查找,如果没有则应用默认的 encoding 形式—— `proto`, 这个也是默认的 encoding 形式。> 自定义 `Encoding` 可参考 `proto.go`
* ClientStream.SendMsg(stream.go:1193)
`SendMsg` 外面有一个比拟重要的办法 `prepareMsg`,其将申请体进行 encoding 和 compress 而后失去
`hdr`、`payload`,`hdr` 中有 5 个 byte 的内容,其中第一个 byte 存储的是是否 compress 的标记位,1:压缩,0:未压缩。残余 4 个 byte 存储的是 payload 的长度。在 `prepareMsg` 实现后就进入数据传输了 `transport.ClientTransport.Write`,这里就不持续深究上来了。* ClientStream.RecvMsg(stream.go:1238)
在 `RecvMsg` 办法中其首先通过 `transport.Stream.RecvCompress` 读取 `hdr`, 判断是否须要 `UnCompress` 和 `Decoding`,而后
通过 `recv` 读取响应的内容。
if !as.decompSet {// Block until we receive headers containing received message encoding. if ct := as.s.RecvCompress(); ct != “” && ct != encoding.Identity {if as.dc == nil || as.dc.Type() != ct {// No configured decompressor, or it does not match the incoming // message encoding; attempt to find a registered compressor that does. as.dc = nil as.decomp = encoding.GetCompressor(ct) } } else {// No compression is used; disable our decompressor. as.dc = nil} // Only initialize this state once per stream. as.decompSet = true } err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp) if err != nil {if err == io.EOF { if statusErr := as.s.Status().Err(); statusErr != nil { return statusErr} return io.EOF // indicates successful end of stream. } return toRPCErr(err) } `
server
从上述 demo 中可为线索,server
端咱们关注三个阶段
- grpc.NewServer
- RegisterService
- Serve
grpc.NewServer(server.go:514)
该办法中做的事件,我认为比 client
简略很多,次要是一些初始化工作,中间件的链式调用拼装也在这里实现了:
chainUnaryServerInterceptors(s)
chainStreamServerInterceptors(s)
RegisterService(server.go:578)
其会在做简略的 interface
实现测验后调用 register(server.go:589)
办法。
s.mu.Lock()
defer s.mu.Unlock()
s.printf("RegisterService(%q)", sd.ServiceName)
if s.serve {logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)}
if _, ok := s.services[sd.ServiceName]; ok {logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)}
info := &serviceInfo{serviceImpl: ss, methods: make(map[string]*MethodDesc), streams: make(map[string]*StreamDesc), mdata: sd.Metadata,}
for i := range sd.Methods {d := &sd.Methods[i] info.methods[d.MethodName] = d}
for i := range sd.Streams {d := &sd.Streams[i] info.streams[d.StreamName] = d}
s.services[sd.ServiceName] = info
本阶段将每个 rpc 办法进行初始化,以便后续应用。
Serve(server.go:699)
接管来自 client
端的申请,为每个申请创立一个新的 goroutine
读取申请数据并解决后响应。
for {rawConn, err := lis.Accept() if err != nil {if ne, ok := err.(interface { Temporary() bool }); ok && ne.Temporary() { if tempDelay == 0 { tempDelay = 5 * time.Millisecond} else {tempDelay *= 2} if max := 1 * time.Second; tempDelay > max {tempDelay = max} s.mu.Lock() s.printf("Accept error: %v; retrying in %v", err, tempDelay) s.mu.Unlock() timer := time.NewTimer(tempDelay) select {case <-timer.C: case <-s.quit.Done(): timer.Stop() return nil} continue } s.mu.Lock() s.printf("done serving; Accept = %v", err) s.mu.Unlock()
if s.quit.HasFired() { return nil} return err } tempDelay = 0 // Start a new goroutine to deal with rawConn so we don't stall this Accept // loop goroutine. // // Make sure we account for the goroutine so GracefulStop doesn't nil out // s.conns before this conn can be added. s.serveWG.Add(1) go func() { s.handleRawConn(rawConn) s.serveWG.Done()}()}
外围解决逻辑,咱们进入 s.handleRawConn(rawConn)
办法查看一下,在该办法中实现了建设连贯和解决 stream
。
conn, authInfo, err := s.useTransportAuthenticator(rawConn)
...
go func() {s.serveStreams(st) s.removeConn(st)}()
serveStreams(server.go:857)
...
s.handleStream(st, stream, s.traceInfo(st, stream))
...
handleStream(server.go:1510)
这里便应用到了 RegisterService
步骤中的局部信息,校验 rpc 是否存在,如果存在则持续解决,这里有一元
rpc 数据处理和流式数据处理,咱们进入一元 rpc 的数据处理 processUnaryRPC
`
go
srv, knownService := s.services[service]
if knownService {
if md, ok := srv.methods[method]; ok {s.processUnaryRPC(t, stream, srv, md, trInfo) return } if sd, ok := srv.streams[method]; ok {s.processStreamingRPC(t, stream, srv, sd, trInfo) return }}
processUnaryRPC(server.go:1062)
这里才是真正读取申请和响应申请的中央,其解决逻辑大抵为
* 获取 Compressor,解压读取到的 payload 数据
* 1、优先从 hdr 中获取,如果没有则执行第 2 步
* 2、从 ServerOption 获取是否设置 Compressor
* 读取数据、UnCompress、Decode
* sendResponse 响应数据
## 学习心得
* 插件式编程
* encoding 的注册与获取
* compressor 的注册与获取
* resolver 的注册与获取
* 初步理解一元 rpc 的调用流程
# 总结