共计 12831 个字符,预计需要花费 33 分钟才能阅读完成。
上一篇文章一起学习了 Resolver 的原理和源码剖析,本篇持续和大家一起学习下和 Resolver 关系密切的 Balancer 的相干内容。这里说的负载平衡次要指数据中心内的负载平衡,即 RPC 间的负载平衡。
传送门 服务发现原理剖析与源码解读
基于 go-zero v1.3.5 和 grpc-go v1.47.0
负载平衡
每一个被调用服务都会有多个实例,那么服务的调用方应该将申请,发向被调用服务的哪一个服务实例,这就是负载平衡的业务场景。
负载平衡的第一个关键点是公平性,即负载平衡须要关注被调用服务实例组之间的公平性,不要呈现旱的旱死,涝的涝死的状况。
负载平衡的第二个关键点是正确性,即对于有状态的服务来说,负载平衡须要关怀申请的状态,将申请调度到能解决它的后端实例上,不要呈现不能解决和错误处理的状况。
无状态的负载平衡
无状态的负载平衡是咱们日常工作中接触比拟多的负载平衡模型,它指的是参加负载平衡的后端实例是无状态的,所有的后端实例都是对等的,一个申请不管发向哪一个实例,都会失去雷同的并且正确的处理结果,所以无状态的负载平衡策略不须要关怀申请的状态。上面介绍两种无状态负载平衡算法。
轮询
轮询的负载平衡策略非常简单,只须要将申请按程序调配给多个实例,不必再做其余的解决。例如,轮询策略会将第一个申请调配给第一个实例,而后将下一个申请调配给第二个实例,这样顺次调配上来,调配完一轮之后,再回到结尾调配给第一个实例,再顺次调配。轮询在路由时,不利用申请的状态信息,属于无状态的负载平衡策略,所以它不能用于有状态实例的负载均衡器,否则正确性会呈现问题。在公平性方面,因为轮询策略只是按程序调配申请,所以实用于申请的工作负载和实例的解决能力差异都较小的状况。
权重轮询
权重轮询的负载平衡策略是将每一个后端实例调配一个权重,调配申请的数量和实例的权重成正比轮询。例如有两个实例 A,B,假如咱们设置 A 的权重为 20,B 的权重为 80,那么负载平衡会将 20% 的申请数量调配给 A,80 % 的申请数量调配给 B。权重轮询在路由时,不利用申请的状态信息,属于无状态的负载平衡策略,所以它也不能用于有状态实例的负载均衡器,否则正确性会呈现问题。在公平性方面,因为权重策略会按实例的权重比例来调配申请数,所以,咱们能够利用它解决实例的解决能力差异的问题,认为它的公平性比轮询策略要好。
有状态负载平衡
有状态负载平衡是指,在负载平衡策略中会保留服务端的一些状态,而后依据这些状态依照肯定的算法抉择出对应的实例。
P2C+EWMA
在 go-zero 中默认应用的是 P2C 的负载平衡算法。该算法的原理比较简单,即随机从所有可用节点中抉择两个节点,而后计算这两个节点的负载状况,抉择负载较低的一个节点来服务本次申请。为了防止某些节点始终得不到抉择导致不均衡,会在超过肯定的工夫后强制抉择一次。
在该简单平衡算法中,多出采纳了 EWMA 指数挪动加权均匀的算法,示意是一段时间内的均值。该算法绝对于算数均匀来说对于忽然的网络抖动没有那么敏感,忽然的抖动不会体现在申请的 lag 中,从而能够让算法更加平衡。
go-zero/zrpc/internal/balancer/p2c/p2c.go:133
atomic.StoreUint64(&c.lag, uint64(float64(olag)*w+float64(lag)*(1-w)))
go-zero/zrpc/internal/balancer/p2c/p2c.go:139
atomic.StoreUint64(&c.success, uint64(float64(osucc)*w+float64(success)*(1-w)))
系数 w 是一个工夫衰减值,即两次申请的距离越大,则系数 w 就越小。
go-zero/zrpc/internal/balancer/p2c/p2c.go:124
w := math.Exp(float64(-td) / float64(decayTime))
节点的 load 值是通过该连贯的申请提早 lag 和以后申请数 inflight 的乘积所得,如果申请的提早越大或者以后正在解决的申请数越多表明该节点的负载越高。
go-zero/zrpc/internal/balancer/p2c/p2c.go:199
func (c *subConn) load() int64 {
// plus one to avoid multiply zero
lag := int64(math.Sqrt(float64(atomic.LoadUint64(&c.lag) + 1)))
load := lag * (atomic.LoadInt64(&c.inflight) + 1)
if load == 0 {return penalty}
return load
}
源码剖析
如下源码会波及 go-zero 和 gRPC,请依据给出的代码门路进行辨别
在 gRPC 中,Balancer 和 Resolver 一样也能够自定义,同样也是通过 Register 办法进行注册
grpc-go/balancer/balancer.go:53
func Register(b Builder) {m[strings.ToLower(b.Name())] = b
}
Register 的参数 Builder 为接口,在 Builder 接口中,Build 办法的第一个参数 ClientConn 也为接口,Build 办法的返回值 Balancer 同样也是接口,定义如下:
能够看出,要想实现自定义的 Balancer 的话,就必须要实现 balancer.Builder 接口。
在理解了 gRPC 提供的 Balancer 的注册形式之后,咱们看一下 go-zero 是在什么中央进行 Balancer 注册的
go-zero/zrpc/internal/balancer/p2c/p2c.go:36
func init() {balancer.Register(newBuilder())
}
在 go-zero 中并没有实现 balancer.Builder 接口,而是应用 gRPC 提供的 base.baseBuilder 进行注册,base.baseBuilder 实现了balancer.Builder 接口。创立 baseBuilder 的时候调用了 base.NewBalancerBuilder 办法,须要传入 PickerBuilder 参数,PickerBuilder 为接口,在 go-zero 中 p2c.p2cPickerBuilder 实现了该接口。
PickerBuilder 接口 Build 办法返回值 balancer.Picker 也是一个接口,p2c.p2cPicker 实现了该接口。
grpc-go/balancer/base/base.go:65
func NewBalancerBuilder(name string, pb PickerBuilder, config Config) balancer.Builder {
return &baseBuilder{
name: name,
pickerBuilder: pb,
config: config,
}
}
各构造之间的关系如下图所示,其中各构造模块对应的包为:
- balancer:grpc-go/balancer
- base:grpc-go/balancer/base
- p2c: go-zero/zrpc/internal/balancer/p2c
在哪里获取已注册的 Balancer?
通过下面的流程步骤,曾经晓得了如何自定义 Balancer,以及如何注册自定义的 Blancer。既然注册了必定就会获取,接下来看一下是在哪里获取曾经注册的 Balancer 的。
咱们晓得 Resolver 是通过解析 DialContext 的第二个参数 target,从而失去 Resolver 的 name,而后依据 name 获取到对应的 Resolver 的。获取 Balancer 同样也是依据名称,Balancer 的名称是在创立 gRPC Client 的时候通过配置项传入的,这里的 p2c.Name 为注册 Balancer 时指定的名称 p2c_ewma,如下:
go-zero/zrpc/internal/client.go:50
func NewClient(target string, opts ...ClientOption) (Client, error) {
var cli client
svcCfg := fmt.Sprintf(`{"loadBalancingPolicy":"%s"}`, p2c.Name)
balancerOpt := WithDialOption(grpc.WithDefaultServiceConfig(svcCfg))
opts = append([]ClientOption{balancerOpt}, opts...)
if err := cli.dial(target, opts...); err != nil {return nil, err}
return &cli, nil
}
在上一篇文章中,咱们曾经晓得当创立 gRPC 客户端的时候,会触发调用自定义 Resolver 的 Build 办法,在 Build 办法外部获取到服务地址列表后,通过 cc.UpdateState 办法进行状态更新,前面当监听到服务状态变动的时候同样也会调用 cc.UpdateState 进行状态的更新,而这里的 cc 指的就是 ccResolverWrapper 对象,这一部分如果遗记的话,能够再去回顾一下解说 Resolver 的那篇文章,以便能丝滑接入本篇:
go-zero/zrpc/resolver/internal/kubebuilder.go:51
if err := cc.UpdateState(resolver.State{Addresses: addrs,}); err != nil {logx.Error(err)
}
这里有几个重要的模块对象,如下:
- ClientConn:grpc-go/clientconn.go:464
- ccResolverWrapper:grpc-go/resolver_conn_wrapper.go:36
- ccBalancerWrapper:grpc-go/balancer_conn_wrappers.go:48
- Balancer:grpc-go/internal/balancer/gracefulswitch/gracefulswitch.go:46
- balancerWrapper:grpc-go/internal/balancer/gracefulswitch/gracefulswitch.go:247
当监听到服务状态的变更后(首次启动或者通过 Watch 监听变动)调用 ccResolverWrapper.UpdateState 触发更新状态的流程,各模块间的调用链路如下所示:
获取 Balancer 的动作是在 ccBalancerWrapper.handleSwitchTo 办法中触发的,代码如下所示:
grpc-go/balancer_conn_wrappers.go:266
builder := balancer.Get(name)
if builder == nil {channelz.Warningf(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q, since the specified LB policy %q was not registered", PickFirstBalancerName, name)
builder = newPickfirstBuilder()} else {channelz.Infof(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q", name)
}
if err := ccb.balancer.SwitchTo(builder); err != nil {channelz.Errorf(logger, ccb.cc.channelzID, "Channel failed to build new LB policy %q: %v", name, err)
return
}
ccb.curBalancerName = builder.Name()
而后在 Balancer.SwitchTo 办法中,调用了自定义 Balancer 的 Build 办法:
grpc-go/internal/balancer/gracefulswitch/gracefulswitch.go:121
newBalancer := builder.Build(bw, gsb.bOpts)
上文有提到 Build 办法的第一个参数为接口 balancer.ClientConn,而这里传入的为 balancerWrapper,所以 gracefulswitch.balancerWrapper 实现了该接口:
到这里咱们曾经晓得了获取自定义 Balancer 是在哪里触达的,以及在哪里获取的自定义的 Balancer,和 balancer.Builder 的 Build 办法在哪里被调用。
通过上文可知这里的 balancer.Builder 为 baseBuilder,所以调用的 Build 办法为 baseBuilder 的 Build 办法,Build 办法的定义如下:
grpc-go/balancer/base/balancer.go:39
func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
bal := &baseBalancer{
cc: cc,
pickerBuilder: bb.pickerBuilder,
subConns: resolver.NewAddressMap(),
scStates: make(map[balancer.SubConn]connectivity.State),
csEvltr: &balancer.ConnectivityStateEvaluator{},
config: bb.config,
}
bal.picker = NewErrPicker(balancer.ErrNoSubConnAvailable)
return bal
}
Build 办法返回了 baseBalancer,能够晓得 baseBalancer 实现了 balancer.Balancer 接口:
再来回顾下这个流程,其实次要做了如下几件事:
- 在自定义的 Resolver 中监听服务状态的变更
- 通过 UpdateState 来更新状态
- 获取自定义的 Balancer
- 执行自定义 Balancer 的 Build 办法获取 Balancer
如何创立连贯?
持续回到 ClientConn 的 updateResolverState 办法,在办法的最初调用 balancerWrapper.updateClientConnState 办法更新客户端的连贯状态:
grpc-go/clientconn.go:664
uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
if ret == nil {
ret = uccsErr // prefer ErrBadResolver state since any other error is
// currently meaningless to the caller.
}
前面的调用链路如下图所示:
最终会调用 baseBalancer.UpdateClientConnState 办法:
grpc-go/balancer/base/balancer.go:94
func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
// .............
b.resolverErr = nil
addrsSet := resolver.NewAddressMap()
for _, a := range s.ResolverState.Addresses {addrsSet.Set(a, nil)
if _, ok := b.subConns.Get(a); !ok {sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
if err != nil {logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
continue
}
b.subConns.Set(a, sc)
b.scStates[sc] = connectivity.Idle
b.csEvltr.RecordTransition(connectivity.Shutdown, connectivity.Idle)
sc.Connect()}
}
for _, a := range b.subConns.Keys() {sci, _ := b.subConns.Get(a)
sc := sci.(balancer.SubConn)
if _, ok := addrsSet.Get(a); !ok {b.cc.RemoveSubConn(sc)
b.subConns.Delete(a)
}
}
// ................
}
当第一次触发调用 UpdateClientConnState 的时候,如下代码中 ok 为 false:
_, ok := b.subConns.Get(a);
所以会创立新的连贯:
sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
这里的 b.cc 即为 balancerWrapper,遗记的盆友能够往上翻看温习一下,也就是会调用 balancerWrapper.NewSubConn 创立连贯
grpc-go/internal/balancer/gracefulswitch/gracefulswitch.go:328
func (bw *balancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
// .............
sc, err := bw.gsb.cc.NewSubConn(addrs, opts)
if err != nil {return nil, err}
// .............
bw.subconns[sc] = true
// .............
}
bw.gsb.cc 即为 ccBalancerWrapper,所以这里会调用 ccBalancerWrapper.NewSubConn 创立连贯:
grpc-go/balancer_conn_wrappers.go:299
func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {if len(addrs) <= 0 {return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
}
ac, err := ccb.cc.newAddrConn(addrs, opts)
if err != nil {channelz.Warningf(logger, ccb.cc.channelzID, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err)
return nil, err
}
acbw := &acBalancerWrapper{ac: ac}
acbw.ac.mu.Lock()
ac.acbw = acbw
acbw.ac.mu.Unlock()
return acbw, nil
}
最终返回的是 acBalancerWrapper 对象,acBalancerWrapper 实现了 balancer.SubConn 接口:
调用流程图如下所示:
创立连贯的默认状态为 connectivity.Idle :
grpc-go/clientconn.go:699
func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
ac := &addrConn{
state: connectivity.Idle,
cc: cc,
addrs: addrs,
scopts: opts,
dopts: cc.dopts,
czData: new(channelzData),
resetBackoff: make(chan struct{}),
}
// ...........
}
在 gRPC 中为连贯定义了五种状态,别离如下:
const (
// Idle indicates the ClientConn is idle.
Idle State = iota
// Connecting indicates the ClientConn is connecting.
Connecting
// Ready indicates the ClientConn is ready for work.
Ready
// TransientFailure indicates the ClientConn has seen a failure but expects to recover.
TransientFailure
// Shutdown indicates the ClientConn has started shutting down.
Shutdown
)
在 baseBalancer 中通过 b.scStates 保留创立的连贯,初始状态也为 connectivity.Idle,之后通过 sc.Connect()进行连贯:
grpc-go/balancer/base/balancer.go:112
b.subConns.Set(a, sc)
b.scStates[sc] = connectivity.Idle
b.csEvltr.RecordTransition(connectivity.Shutdown, connectivity.Idle)
sc.Connect()
这里 sc.Connetc 调用的是 acBalancerWrapper 的 Connect 办法,能够看到这里创立连贯是异步进行的:
grpc-go/balancer_conn_wrappers.go:406
func (acbw *acBalancerWrapper) Connect() {acbw.mu.Lock()
defer acbw.mu.Unlock()
go acbw.ac.connect()}
最初会调用 addrConn.connect 办法:
grpc-go/clientconn.go:786
func (ac *addrConn) connect() error {ac.mu.Lock()
if ac.state == connectivity.Shutdown {ac.mu.Unlock()
return errConnClosing
}
if ac.state != connectivity.Idle {ac.mu.Unlock()
return nil
}
ac.updateConnectivityState(connectivity.Connecting, nil)
ac.mu.Unlock()
ac.resetTransport()
return nil
}
从 connect 开始的调用链路如下所示:
在 baseBalancer 的 UpdateSubConnState 办法的最初,更新了 Picker 为自定义的 Picker:
grpc-go/balancer/base/balancer.go:221
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
在 addrConn 办法的最初会调用 ac.resetTransport()真正的进行连贯的创立:
当连贯曾经创立好,处于 Ready 状态,最初调用 baseBalancer.UpdateSubConnState 办法,此时 s ==connectivity.Ready 为 true,而 oldS == connectivity.Ready 为 false,所以会调用 b.regeneratePicker()办法:
if (s == connectivity.Ready) != (oldS == connectivity.Ready) ||
b.state == connectivity.TransientFailure {b.regeneratePicker()
}
func (b *baseBalancer) regeneratePicker() {
if b.state == connectivity.TransientFailure {b.picker = NewErrPicker(b.mergeErrors())
return
}
readySCs := make(map[balancer.SubConn]SubConnInfo)
// Filter out all ready SCs from full subConn map.
for _, addr := range b.subConns.Keys() {sci, _ := b.subConns.Get(addr)
sc := sci.(balancer.SubConn)
if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {readySCs[sc] = SubConnInfo{Address: addr}
}
}
b.picker = b.pickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs})
}
在 regeneratePicker 中获取了处于 connectivity.Ready 状态可用的连贯,同时更新了 picker。还记得 b.pickerBuilder 吗?b.b.pickerBuilder 为在 go-zero 中自定义实现的 base.PickerBuilder 接口。
go-zero/zrpc/internal/balancer/p2c/p2c.go:42
func (b *p2cPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
readySCs := info.ReadySCs
if len(readySCs) == 0 {return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
}
var conns []*subConn
for conn, connInfo := range readySCs {
conns = append(conns, &subConn{
addr: connInfo.Address,
conn: conn,
success: initSuccess,
})
}
return &p2cPicker{
conns: conns,
r: rand.New(rand.NewSource(time.Now().UnixNano())),
stamp: syncx.NewAtomicDuration(),}
}
最初把自定义的 Picker 赋值为 ClientConn.blockingpicker.picker 属性。
grpc-go/balancer_conn_wrappers.go:347
func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {ccb.cc.blockingpicker.updatePicker(s.Picker)
ccb.cc.csMgr.updateState(s.ConnectivityState)
}
如何抉择已创立的连贯?
当初曾经晓得了如何创立连贯,以及连贯其实是在 baseBalancer.scStates 中治理,当连贯的状态发生变化,则会更新 baseBalancer.scStates 。那么接下来咱们来看一下 gRPC 是如何抉择一个连贯进行申请的发送的。
当 gRPC 客户端发动调用的时候,会调用 ClientConn 的 Invoke 办法,个别不会被动应用该办法进行调用,该办法的调用个别是主动生成:
grpc-go/examples/helloworld/helloworld/helloworld_grpc.pb.go:39
func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {out := new(HelloReply)
err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)
if err != nil {return nil, err}
return out, nil
}
如下为发动申请的调用链路,最终会调用 p2cPicker.Pick 办法获取连贯,咱们自定义的负载平衡算法个别都在 Pick 办法中实现,获取到连贯之后,通过 sendMsg 发送申请。
grpc-go/stream.go:945
func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
cs := a.cs
if a.trInfo != nil {a.mu.Lock()
if a.trInfo.tr != nil {a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
}
a.mu.Unlock()}
if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil {
if !cs.desc.ClientStreams {return nil}
return io.EOF
}
if a.statsHandler != nil {a.statsHandler.HandleRPC(a.ctx, outPayload(true, m, data, payld, time.Now()))
}
if channelz.IsOn() {a.t.IncrMsgSent()
}
return nil
}
源码剖析到此就完结了,因为篇幅无限没法做到八面玲珑,所以本文只列出了源码中的次要门路。
结束语
Balancer 相干的源码还是有点简单的,笔者也是读了好几遍才理清脉络,所以如果读了一两遍感觉没有脉络也不必焦急,对照文章的脉络多读几遍就肯定能搞懂。
如果有疑难能够随时找我探讨,在社区群中能够搜寻 dawn_zhou 找到我。
心愿本篇文章对你有所帮忙,你的点赞是作者继续输入的最大能源。
我的项目地址
https://github.com/zeromicro/go-zero
欢送应用 go-zero
并 star 反对咱们!
微信交换群
关注『微服务实际 』公众号并点击 交换群 获取社区群二维码。