GRPC源码分析-1代码结构以及通用组件解析

27次阅读

共计 4422 个字符,预计需要花费 12 分钟才能阅读完成。

base in https://github.com/grpc/grpc-…


  • 目录概览
  • 可选项(Opts
  • 包装(Wrapper
  • 组件式编程(Builder & Regist
  • 一次性事件(grpcsync.Event
  • 无界 channel(UnboundedBuffer

目录概览

摘要了一部分文件目录,用来描述在 grpc 中不同目录层级的主要作用。

grpc
├── 顶层目录(package grpc, 主要包含一些 grpc 提供的接口文件和涉及到具体实现的一些包装器文件
├── clientconn.go // grpc 接口文件,主要提供 Dial 接口。├── balancer_conn_wrappers.go // 各种包装器 *_wrappers
├── resolver_conn_wrapper.go
├── balancer
│   ├── balancer.go
├── resolver    // 次级目录(主要用于描述接口
│   └── resolver.go     //resolver 的接口文件
├── internal    // 内部目录(主要提供各种具体实现
│   ├── backoff
│   │   └── backoff.go // 退避策略的具体实现
│   ├── buffer
│   │   ├── unbounded.go    // 内部提供的一些组件
│   ├── resolver
│   │   ├── dns
│   │   │   ├── dns_resolver.go //dns_resolver 的实现 *_resolver.go
文件层级
顶层目录 主要提供 grpc 接口以及各种包装器文件 grpc.Dial() *_wrapper.go
次级目录 这里主要是提供 grpc 的一些功能组件定义,通常是接口文件 type Resolver interface {}
内部目录 这里主要提供功能组件的具体实现 dns_resolver.go

可选项(Opts

在 grpc 中我们会看到很多类似如下这种代码, 一般后面会需要接收参数 opts …Option, 这种接口方式被称为选项模式(options-pattern,主要是为了构建接口提供灵活的可选项

下面我们用自己的伪代码模拟一次这种逻辑(摘自 https://github.com/pojol/brai…

// 配置项
type config struct {Tracing       bool}

// 配置 Option 的包装函数
type Option func(*Server)

// 添加开启 tracing 的可选项
func WithTracing() Option {return func(r *Server) {r.cfg.Tracing = true}
}

// 使用可选项进行构建
func New(name string, opts ...Option) IServer {
    const (defaultTracing       = false)

    server = &Server{
        cfg: config{Tracing:       defaultTracing,  // 进行默认的初始化赋值},
    }

    // 查看是否有可选项,如果有则使用可选项将默认值覆盖。for _, opt := range opts {opt(server)
    }
}

总结 通过这种 options 模式,可以不必每次定义所有的选项,只需选择自己想要的改动即可。

包装(Wrapper

grpc 中使用 Wrapper 把接口的实现和其依赖的对象聚合到一起,通过水平组合的方式完成一些接口的实现。

type ccResolverWrapper struct {
    cc         *ClientConn      // 包含了 ClientConn
    resolverMu sync.Mutex
    resolver   resolver.Resolver    // 包含了 Resolver interface 
    done       *grpcsync.Event  // 完成事件(这个下面有详细解释
    curState   resolver.State   // 状态

    pollingMu sync.Mutex       // 轮询锁
    polling   chan struct{}    // 一个 channel 主要用于判断是否处于轮询中}

上面是一个 Wrapper 的结构,它主要包含了 ClientConn 的指针,以及 Resolver 接口,另外还包含了一些自身逻辑需要的状态和锁

它主要实现了 resolver.ClientConn interface, 使用这个包装器主要是为了 聚合 前面的那些组件,完成一些需要相互依赖调度的逻辑。不过这未必是值得借鉴的,这里先简单路过一下。

插件式编程模式

如上图所示,我们使用了 Resolver 来展示 grpc 是如何使用插件式编程方式组织代码的。

  • 接口定义文件 resolver.go
// Resolver 构建器的定义
type Builder interface {Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
    Scheme() string}

// 名字解析 Resolver 提供的接口定义
type Resolver interface {}

// 注册不同的 resolver 实现
func Register(b Builder) {}

// 通过 scheme 获取相关的 resolver 实现
func Get(scheme string) Builder {}
  • 实现文件 internal/resolver/dns/dns_resolver.go
// 通过 init 函数,将实现注册到 resolver
func init() { resolver.Register(NewBuilder()) }

// 实现 resolver.Builder 接口的 Build 函数(在这里进行真正的构建操作
func Build() {}
// 返回当前 resolver 解决的解析样式
func Scheme() string { return "dns"}
  • 应用 resolver clientconn.go
// 通过解析用户传入的 target 获得 scheme
cc.parsedTarget = grpcutil.ParseTarget(cc.target)

// 通过 target 的 scheme 获取对应的 resolver.Builder
func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
    for _, rb := range cc.dopts.resolvers {if scheme == rb.Scheme() {return rb}
    }
    return resolver.Get(scheme)
}

总结 通过以上的关键代码,我们知道了组件是如何完成 接口定义 以及 实现 使用
在 grpc 中有不少的代码是使用这种插件式的方式进行编程,这种编码方式可以方便的 隔离实现 ,使用户专注在自己的实现上。另外也支持用户 编写自己的实现 注册到 grpc 中。

可以阅读 策略模式 & 开闭原则 加深对这种编码形式的理解。

一次性事件(grpcsync.Event

主要用于在异步逻辑中判断一次性事件(开关)线程安全,在 grpc 中很多模块的退出逻辑都依赖于这个 Event

实现来自 /internal/grpcsync/event.go

type Event struct {
    fired int32         // 用于标记是否被触发
    c     chan struct{} // 用于发送触发信号
    o     sync.Once     // 保证只被执行一次
}

func (e *Event) Fire() bool {} // 触发事件
func (e *Event) Done() <-chan struct{} {}   // 被触发信号
func (e *Event) HasFired() bool {} // 是否被触发
// 构建 Event
func NewEvent() *Event {return &Event{c: make(chan struct{})}
}

// 模拟使用, 创建一个服务,然后这个服务会开启一个 goroutine 从管道中接收消息来处理业务
// 如下的话可以是一些新节点信息,然后通过 done 来处理退出的逻辑,当外部关闭这个 balancer,会立即通知到这个 goroutine 然后退出。func newBalancer() {
    b := Balancer{done : NewEvent(),  // 构建
    }

    // watcher
    go func() {
        for {
            select {
                case <- otherCh:
                    //todo
                case <- b.done.Done(): // 监听到终止信号, 退出 goroutine。return
            }
        }
    }()}

func (b *Balancer)close() {b.done.Fire() // 触发信号
}

无界 channel(UnboundedBuffer

前面有说到 grpcsync.Event 是用来控制退出逻辑,这里的 unbounded 则用于多个 goroutine 之间的消息传递。
这是一个非常不错的 channel 实践,它不用考虑 channel 的各种阻塞情况(这里主要是 channel 溢出的情况。方便了 channel 的应用。

实现来自
/internal/buffer/unbounded.go Unbounded
/internal/transport/transport.go recvBuffer
这两者的实现逻辑是一样的,只是 Unbounded 包装的 interface{},而 recvBuffer 会被高频调用所以使用了具体的类型 recvMsg

type Unbounded struct {c       chan interface{}
    backlog []interface{}
    sync.Mutex
}

func NewUnbounded() *Unbounded {return &Unbounded{c: make(chan interface{}, 1)}
}

// 往管道中写入消息(生产端
func (b *Unbounded) Put(t interface{}) {b.Lock()
    // 判断是否有积压消息,如果没有则直接写入管道后退出
    // 如果有,则写入到积压队列中(先进先出队列
    if len(b.backlog) == 0 {    
        select {
        case b.c <- t:
            b.Unlock()
            return
        default:
        }
    }
    b.backlog = append(b.backlog, t)
    b.Unlock()}

func (b *Unbounded) Load() {b.Lock()
    // 这里主要是判断积压队列是否有消息,如果有则左移一位
    // 并将移出的消息,写入 channel 中。if len(b.backlog) > 0 { 
        select {case b.c <- b.backlog[0]:
            b.backlog[0] = nil
            b.backlog = b.backlog[1:]
        default:
        }
    }
    b.Unlock()}

// 管道的读信号(消费端
func (b *Unbounded) Get() <-chan interface{} {return b.c}

最后宣传一下我的开源框架 https://github.com/pojol/braid 一个轻量的微服务框架

目标是帮助用户可以更容易的使用和理解微服务架构。

正文完
 0