共计 11856 个字符,预计需要花费 30 分钟才能阅读完成。
Balancer
gRPC balancer
背景
接着上篇《gRPC 插件式编程之 Resolver》,gRPC 将 target
解析为 resolver.Target
后,通过 resolver.Builder.Build
办法调用resolver.ClientConn.UpdateState(State) error
办法,该办法做了哪些事件呢,咱们本篇接着看源码往下走。
UpdateState
UpdateState 的调用会调用 grpc.ClientConn.updateResolverState
办法,该办法次要做了如下工作:
- ServiceConfig 解决
- BalancerWrapper 创立
- 调用
balancer.updateClientConnState
办法 执行负载平衡逻辑更新
func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
...
cc.maybeApplyDefaultServiceConfig(s.Addresses)
...
cc.applyServiceConfigAndBalancer(sc, configSelector, s.Addresses)
...
// reference: balancer_conn_wrappers.go:164
// bw.updateClientConnState -> ccBalancerWrapper.updateClientConnState
bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
...
}
舒适提醒
这里先以搞懂 gRPC 主流程思路为主,不扣太细节的货色,比方一些
GRPCLB
解决、error 解决,ServiceConfigSelector 解决等能够查看源码。
bw.updateClientConnState
调用实质是 ccBalancerWrapper.updateClientConnState
而 ccBalancerWrapper.updateClientConnState
就做了一件事件,调用 balancer.Balancer.UpdateClientConnState
办法
func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {ccb.balancerMu.Lock()
defer ccb.balancerMu.Unlock()
return ccb.balancer.UpdateClientConnState(*ccs)
}
到这里,咱们想看 balancer
源码逻辑有两种路径
- 本人实现的
balancer.Balancer
- gRPC 提供的
balancer
为了浏览源码,咱们先去浏览 gRPC 提供的几个 balancer
中的一个进行流程了解,后续再介绍如何自定义一个 balancer
gRPC Balancer
gRPC 提供了几个负载平衡解决,如下:
- grpclb
- rls
- roundrobin
- weightroundrobin
- weighttarget
为了好了解,咱们挑一个简略的负载均衡器 roundrobin
持续浏览。
负载平衡从哪里获取?通过后面 cc.maybeApplyDefaultServiceConfig(s.Addresses)
办法中的源码可知,balancer.Balancer
由 balancer.Builder
提供,咱们看一下 balancer.Builder
接口
// Builder creates a balancer.
type Builder interface {
// Build creates a new balancer with the ClientConn.
Build(cc ClientConn, opts BuildOptions) Balancer
// Name returns the name of balancers built by this builder.
// It will be used to pick balancers (for example in service config).
Name() string}
roundrobin
roundrobin 是 gRPC 内置的负载均衡器,其和 resolver
一样都是通过插件式编程提供扩大,在源码中,咱们可知,
roundrobin 在 init
函数中对 balancer.Builder
进行了注册,其中 baseBuilder
是 balancer.Builder
的实现,
上文咱们得悉,balancer.Balancer
由 balancer.Builder.Build
提供,通过 baseBuilder.Build
办法咱们晓得 gRPC 的balancer
底层是由 baseBalancer
实现,局部源码如下:
roundrobin.go
// newBuilder creates a new roundrobin balancer builder.
func newBuilder() balancer.Builder {return base.NewBalancerBuilder(Name, &rrPickerBuilder{}, base.Config{HealthCheck: true})
}
func init() {balancer.Register(newBuilder())
}
balancer.go
func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
bal := &baseBalancer{
cc: cc,
pickerBuilder: bb.pickerBuilder,
subConns: resolver.NewAddressMap(),
scStates: make(map[balancer.SubConn]connectivity.State),
csEvltr: &balancer.ConnectivityStateEvaluator{},
config: bb.config,
}
bal.picker = NewErrPicker(balancer.ErrNoSubConnAvailable)
return bal
}
沿着 UpdateState
环节最初一个办法 ccb.balancer.UpdateClientConnState(*ccs)
调用浏览,其实最终来到了baseBalancer.UpdateClientConnState
办法,咱们查看一下源码:
func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
...
addrsSet := resolver.NewAddressMap()
for _, a := range s.ResolverState.Addresses {addrsSet.Set(a, nil)
if _, ok := b.subConns.Get(a); !ok {sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
if err != nil {logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
continue
}
b.subConns.Set(a, sc)
b.scStates[sc] = connectivity.Idle
b.csEvltr.RecordTransition(connectivity.Shutdown, connectivity.Idle)
sc.Connect()}
}
for _, a := range b.subConns.Keys() {sci, _ := b.subConns.Get(a)
sc := sci.(balancer.SubConn)
if _, ok := addrsSet.Get(a); !ok {b.cc.RemoveSubConn(sc)
b.subConns.Delete(a)
}
}
if len(s.ResolverState.Addresses) == 0 {b.ResolverError(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState
}
return nil
}
从源码得悉,该办法做了以下几件事:
- 对新的 endpoint
NewSubConn
并且Connect
- 移出旧的曾经不存在的
endpoint
及其Conn
信息
总的来说就是更新负载均衡器内可用的链接信息。
balancer.ClientConn.NewSubConn
balancer.ClientConn
是一个接口,其代表 gRPC 的一个链接,而 ccBalancerWrapper
就为其实现类,先看看该接口的申明:
type ClientConn interface {
// NewSubConn 平衡器调用 NewSubConn 来创立一个新的 SubConn,它不会阻塞并期待建设连贯,// SubConn 的行为能够通过 NewSubConnOptions 来管制。NewSubConn([]resolver.Address, NewSubConnOptions) (SubConn, error)
// RemoveSubConn 从 ClientConn 中删除 SubConn。SubConn 将敞开。RemoveSubConn(SubConn)
// UpdateAddresses 更新传入的 SubConn 中应用的地址,gRPC 查看以后连贯的地址是否仍在新列表中。如果存在,将放弃连贯,// 否则,连贯将失常敞开,并创立一个新连贯。// 这将触发 SubConn 的状态转换。UpdateAddresses(SubConn, []resolver.Address)
// UpdateState 告诉 gRPC 平衡器的外部状态已更改。// gRPC 将更新 ClientConn 的连贯状态,并在新的 Picker 上调用 Pick 来抉择新的 SubConn。UpdateState(State)
// 平衡器调用 ResolveNow 以告诉 gRPC 进行名称解析。ResolveNow(resolver.ResolveNowOptions)
// Target 返回此 ClientConn 的拨号指标。// 已弃用:改用 BuildOptions 中的 Target 字段
Target() string}
再看一下 ccBalancerWrapper
的创立:
func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
ccb := &ccBalancerWrapper{
cc: cc,
updateCh: buffer.NewUnbounded(),
closed: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
subConns: make(map[*acBalancerWrapper]struct{}),
}
go ccb.watcher()
ccb.balancer = b.Build(ccb, bopts)
_, ccb.hasExitIdle = ccb.balancer.(balancer.ExitIdler)
return ccb
}
留神
记住
go ccb.watcher()
这一行代码,后续还会回到这个办法来。
baseBalancer.UpdateClientConnState
中对新退出的 endpoint
进行 NewSubConn
和 Connect
解决,咱们先来看看 NewSubConn
办法做了哪些事件,
来到 ccBalancerWrapper.NewSubConn
办法中:
func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {if len(addrs) <= 0 {return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
}
ccb.mu.Lock()
defer ccb.mu.Unlock()
if ccb.subConns == nil {return nil, fmt.Errorf("grpc: ClientConn balancer wrapper was closed")
}
ac, err := ccb.cc.newAddrConn(addrs, opts)
if err != nil {return nil, err}
acbw := &acBalancerWrapper{ac: ac}
acbw.ac.mu.Lock()
ac.acbw = acbw
acbw.ac.mu.Unlock()
ccb.subConns[acbw] = struct{}{}
return acbw, nil
}
从该办法可知,次要是通过 gprc.ClientConn.newAddrConn
创立一个 addrConn
对象,并且创立一个balancer.SubConn
的实现类对象 acBalancerWrapper
,将其退出到 ccBalancerWrapper.subConns
中进行治理。
阐明
由此可知,
baseBalancer.UpdateClientConnState
判断地址变更后的 address 是否为新退出的就由ccBalancerWrapper.subConns
来比照即可得悉。
接着咱们持续看看 Connect
做了什么事件,下面曾经通过 acBalancerWrapper
创立了一个 balancer.SubConn
的实现对象,接着利用该对象进行了Connect
办法调用,咱们来到 acBalancerWrapper.Connect()
办法中:
func (acbw *acBalancerWrapper) Connect() {acbw.mu.Lock()
defer acbw.mu.Unlock()
go acbw.ac.connect()}
func (ac *addrConn) connect() error {ac.mu.Lock()
if ac.state == connectivity.Shutdown {ac.mu.Unlock()
return errConnClosing
}
if ac.state != connectivity.Idle {ac.mu.Unlock()
return nil
}
ac.updateConnectivityState(connectivity.Connecting, nil)
ac.mu.Unlock()
ac.resetTransport()
return nil
}
ac.updateConnectivityState
更新链接状态,ac.resetTransport
次要工作内容就是从 resolver.Address
列表中依照去创立链接并同样调用 ac.updateConnectivityState
更新状态,具体源码可自行浏览,
咱们接着 ac.updateConnectivityState
办法往下走,其实该办法调用了 grpc.ClientConn.handleSubConnStateChange
办法,最终又回到了 ccBalancerWrapper.handleSubConnStateChange
办法中,其办法调用链如下:
ac.updateConnectivityState
-> grpc.ClientConn.handleSubConnStateChange
-> ccBalancerWrapper.handleSubConnStateChange
来看一下最初一个办法 ccBalancerWrapper.handleSubConnStateChange
的源码:
func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
if sc == nil {return}
ccb.updateCh.Put(&scStateUpdate{
sc: sc,
state: s,
err: err,
})
}
该办法把一个 balancer.SubConn
和 connectivity.State
丢进了一个切片,而后通过一个 channel 管制另一个 goroutine 取数据
func (b *Unbounded) Put(t interface{}) {b.mu.Lock()
if len(b.backlog) == 0 {
select {
case b.c <- t:
b.mu.Unlock()
return
default:
}
}
b.backlog = append(b.backlog, t)
b.mu.Unlock()}
这里的数据写入后,在哪里读取,这就回到上文须要大家重点记住的一个 goroutine 调用了,还记得吗,试着回顾一下,没错就是 go ccb.watcher()
咱们来看看 watcher
办法,由上文可知,咱们写如的数据是 scStateUpdate
对象,因而如下源码就仅看获取该对象的 case 即可,省略了临时不须要关注的代码:
func (ccb *ccBalancerWrapper) watcher() {
for {
select {case t := <-ccb.updateCh.Get():
ccb.updateCh.Load()
if ccb.closed.HasFired() {break}
switch u := t.(type) {
case *scStateUpdate:
ccb.balancerMu.Lock()
ccb.balancer.UpdateSubConnState(u.sc, balancer.SubConnState{ConnectivityState: u.state, ConnectionError: u.err})
ccb.balancerMu.Unlock()
case ...:
...
default:
logger.Errorf("ccBalancerWrapper.watcher: unknown update %+v, type %T", t, t)
}
case <-ccb.closed.Done():}
...
}
}
由源码得悉,其最终调用了 balancer.Balancer.UpdateSubConnState
办法,咱们以 roundrobin
负载均衡器来查看,由上文知,gRPC 的 balancer
最终实现类是baseBalancer
,因而 balancer.Balancer.UpdateSubConnState
最终落到了 baseBalancer.UpdateSubConnState
办法上,
func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
s := state.ConnectivityState
...
oldS, ok := b.scStates[sc]
if !ok {
...
return
}
if oldS == connectivity.TransientFailure &&
(s == connectivity.Connecting || s == connectivity.Idle) {
if s == connectivity.Idle {sc.Connect()
}
return
}
b.scStates[sc] = s
switch s {
case connectivity.Idle:
sc.Connect()
case connectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scStates. Remove state for this sc here.
delete(b.scStates, sc)
case connectivity.TransientFailure:
// Save error to be reported via picker.
b.connErr = state.ConnectionError
}
b.state = b.csEvltr.RecordTransition(oldS, s)
...
if (s == connectivity.Ready) != (oldS == connectivity.Ready) ||
b.state == connectivity.TransientFailure {b.regeneratePicker()
}
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
}
该办法中最终只会有状态 connectivity.Ready
的 SubConn
往下走,其余的状态要么被从新发动 Connect
,要么被移出
最初一行代码发动 balancer.ClientConn.UpdateState
调用,因为 ccBalancerWrapper
为 balancer.ClientConn
的实现,因而来到balancer.ClientConn.UpdateState
下,该办法做了两件事件:
- 更新
balancer.Picker
- 调用
grpc.connectivityStateManager.updateState
办法,该办法开释一个 channel 信号,告诉 goroutine 进行信息处理,该 goroutine 咱们后续再讲。
上文讲了这么多,那么负载算法在哪里,又何时调用呢?
由上文可知,baseBalancer.UpdateSubConnState
更新了一个 picker
,这个 picker
来自哪里?追溯一下源码联合 roundrobin
负载均衡器可知,该 picker
是在 balancer.Builder
的实现类调用 base.NewBalancerBuilder
创立实例时传入的 base.PickBuilder
实现类 rrPickerBuilder
结构进去的,看一下 rrPickerBuilder
的源码可知 Pick
办法中就是对 SubConn
进行负载算法的具体逻辑了。
func (p *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {p.mu.Lock()
sc := p.subConns[p.next]
p.next = (p.next + 1) % len(p.subConns)
p.mu.Unlock()
return balancer.PickResult{SubConn: sc}, nil
}
那么该办法什么时候调用呢?这里间接给出答案,在 grpc.ClientConn
发动 Invoke
办法调用时会通过调用链调用到,咱们后续源码浏览到那里在来剖析。
自定义负载均衡器
自定义负载均衡器首先须要理解 gRPC 的插件式编程,这部分内容能够自行 google。
环境
etcd
go
负载平衡指标
随机抉择
-
实现
balancer.Builder
咱们就不一一实现其办法了,因为负载均衡器的重点在负载平衡算法,即实现base.PickerBuilder
,咱们间接用 gRPC 提供的base.NewBalancerBuilder
来创立balancer.Builder
const Name = "random" func init() {balancer.Register(newBuilder()) } func newBuilder() balancer.Builder {return base.NewBalancerBuilder(Name, &randomPickerBuilder{}, base.Config{HealthCheck: true}) }
-
实现
base.PickerBuilder
func (r *randomPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {if len(info.ReadySCs) == 0 {return base.NewErrPicker(balancer.ErrNoSubConnAvailable) } readyScs := make([]Conn, 0, len(info.ReadySCs)) for sc, info := range info.ReadySCs { readyScs = append(readyScs, Conn{ SubConn: sc, SubConnInfo: info, }) } return &randomPicker{ subConns: readyScs, r: rand.New(rand.NewSource(time.Now().UnixNano())), } }
- 实现
balancer.Picker
balancer.Picker
才是咱们须要扩大的逻辑,即依照本人想要的负载平衡算法从SunConn
列表中抉择一个可用的SubConn
创立链接。
func (r *randomPicker) Pick(_ balancer.PickInfo) (balancer.PickResult, error) {next := r.r.Int() % len(r.subConns)
sc := r.subConns[next]
fmt.Printf("picked: %+v\n", sc.SubConnInfo.Address.Addr)
return balancer.PickResult{SubConn: sc.SubConn,}, nil
}
-
应用自定义负载均衡器
r := resolverBuilder.NewCustomBuilder(resolverBuilder.Scheme) options := []grpc.DialOption{grpc.WithInsecure(), grpc.WithResolvers(r), grpc.WithBalancerName(builder.Name)} conn, err := grpc.Dial(resolverBuilder.Format("grpc-server"), options...)
演示成果
-
启动多个 server 实例,我这里启动了三个
$ go run server.go -addr localhost:8888
$ go run server.go -addr localhost:8889
$ go run server.go -addr localhost:8890
-
屡次启动 client,察看 Pick 的日志输入
go run client.go endpoints: [localhost:8888 localhost:8889 localhost:8888 localhost:8889 localhost:8890] picked: localhost:8888 output: hi
go run client.go
endpoints: [localhost:8888 localhost:8889 localhost:8888 localhost:8889 localhost:8890]
picked: localhost:8890
output: hi
go run client.go
endpoints: [localhost:8888 localhost:8889 localhost:8888 localhost:8889 localhost:8890]
picked: localhost:8889
output: hi
…
总结
grpc 通过服务发现或者直连模式获取到 gRPC server 的实例的 endpoints,而后告诉负载均衡器进行 SubConn
更新,对于新退出的 endpoint 进行实例创立,移出废除的 endpoint,
最初通过状态更新将状态为 Ready
的 SubConn
进行治理,gRPC 在调用 Invoke
(即 client 发动申请)时,则会通过负载均衡器中的 Picker
去依照某一个负载平衡算法抉择一个 SubConn
创立链接,如果创立胜利则不再进行其余 SubConn
的尝试,否则会依照肯定的退却算法进行重试,直到退却失败或者创立链接胜利为止。
自定义负载均衡器的外围逻辑在于对 Picker
的实现,从 SubConn
列表中依照负载平衡算法抉择一个 SubConn
创立链接,自定义负载均衡器和 Resolver
一样都是用到了插件式编程提供了扩大能力。
本次源码浏览只是为了了解 gRPC 的调用流程,其中有很多细节在源码正文中有阐明,其能够加深咱们对 gRPC 的了解,因而在了解本文介绍后,能够再次浏览源码加深一下了解。
源码
https://github.com/anqiansong…