关于golang:grpc-源码阅读之-balancer

4次阅读

共计 11856 个字符,预计需要花费 30 分钟才能阅读完成。

Balancer

gRPC balancer

背景

接着上篇《gRPC 插件式编程之 Resolver》,gRPC 将 target 解析为 resolver.Target 后,通过 resolver.Builder.Build 办法调用
resolver.ClientConn.UpdateState(State) error 办法,该办法做了哪些事件呢,咱们本篇接着看源码往下走。

UpdateState

UpdateState 的调用会调用 grpc.ClientConn.updateResolverState 办法,该办法次要做了如下工作:

  • ServiceConfig 解决
  • BalancerWrapper 创立
  • 调用 balancer.updateClientConnState 办法 执行负载平衡逻辑更新
func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
    ...
    cc.maybeApplyDefaultServiceConfig(s.Addresses)
    ...
    cc.applyServiceConfigAndBalancer(sc, configSelector, s.Addresses)
    ...
    // reference: balancer_conn_wrappers.go:164
    // bw.updateClientConnState -> ccBalancerWrapper.updateClientConnState
    bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
    ...
}

舒适提醒

这里先以搞懂 gRPC 主流程思路为主,不扣太细节的货色,比方一些 GRPCLB 解决、error 解决,ServiceConfigSelector 解决等能够查看源码。

bw.updateClientConnState 调用实质是 ccBalancerWrapper.updateClientConnState
ccBalancerWrapper.updateClientConnState 就做了一件事件,调用 balancer.Balancer.UpdateClientConnState 办法

func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {ccb.balancerMu.Lock()
    defer ccb.balancerMu.Unlock()
    return ccb.balancer.UpdateClientConnState(*ccs)
}

到这里,咱们想看 balancer 源码逻辑有两种路径

  • 本人实现的 balancer.Balancer
  • gRPC 提供的 balancer

为了浏览源码,咱们先去浏览 gRPC 提供的几个 balancer 中的一个进行流程了解,后续再介绍如何自定义一个 balancer

gRPC Balancer

gRPC 提供了几个负载平衡解决,如下:

  • grpclb
  • rls
  • roundrobin
  • weightroundrobin
  • weighttarget

为了好了解,咱们挑一个简略的负载均衡器 roundrobin 持续浏览。

负载平衡从哪里获取?通过后面 cc.maybeApplyDefaultServiceConfig(s.Addresses) 办法中的源码可知,balancer.Balancerbalancer.Builder
提供,咱们看一下 balancer.Builder 接口

// Builder creates a balancer.
type Builder interface {
    // Build creates a new balancer with the ClientConn.
    Build(cc ClientConn, opts BuildOptions) Balancer
    // Name returns the name of balancers built by this builder.
    // It will be used to pick balancers (for example in service config).
    Name() string}

roundrobin

roundrobin 是 gRPC 内置的负载均衡器,其和 resolver 一样都是通过插件式编程提供扩大,在源码中,咱们可知,
roundrobin 在 init 函数中对 balancer.Builder 进行了注册,其中 baseBuilderbalancer.Builder 的实现,
上文咱们得悉,balancer.Balancerbalancer.Builder.Build 提供,通过 baseBuilder.Build 办法咱们晓得 gRPC 的
balancer 底层是由 baseBalancer 实现,局部源码如下:

roundrobin.go

// newBuilder creates a new roundrobin balancer builder.
func newBuilder() balancer.Builder {return base.NewBalancerBuilder(Name, &rrPickerBuilder{}, base.Config{HealthCheck: true})
}

func init() {balancer.Register(newBuilder())
}

balancer.go

func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
    bal := &baseBalancer{
        cc:            cc,
        pickerBuilder: bb.pickerBuilder,
    
        subConns: resolver.NewAddressMap(),
        scStates: make(map[balancer.SubConn]connectivity.State),
        csEvltr:  &balancer.ConnectivityStateEvaluator{},
        config:   bb.config,
    }
    bal.picker = NewErrPicker(balancer.ErrNoSubConnAvailable)
    return bal
}

沿着 UpdateState 环节最初一个办法 ccb.balancer.UpdateClientConnState(*ccs) 调用浏览,其实最终来到了
baseBalancer.UpdateClientConnState 办法,咱们查看一下源码:

func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
    ...
    addrsSet := resolver.NewAddressMap()
    for _, a := range s.ResolverState.Addresses {addrsSet.Set(a, nil)
        if _, ok := b.subConns.Get(a); !ok {sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
            if err != nil {logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
                continue
            }
            b.subConns.Set(a, sc)
            b.scStates[sc] = connectivity.Idle
            b.csEvltr.RecordTransition(connectivity.Shutdown, connectivity.Idle)
            sc.Connect()}
    }
    for _, a := range b.subConns.Keys() {sci, _ := b.subConns.Get(a)
        sc := sci.(balancer.SubConn)
        if _, ok := addrsSet.Get(a); !ok {b.cc.RemoveSubConn(sc)
            b.subConns.Delete(a)
        }
    }
    if len(s.ResolverState.Addresses) == 0 {b.ResolverError(errors.New("produced zero addresses"))
        return balancer.ErrBadResolverState
    }
    return nil
}

从源码得悉,该办法做了以下几件事:

  • 对新的 endpoint NewSubConn 并且 Connect
  • 移出旧的曾经不存在的 endpoint 及其 Conn 信息

总的来说就是更新负载均衡器内可用的链接信息。

balancer.ClientConn.NewSubConn

balancer.ClientConn 是一个接口,其代表 gRPC 的一个链接,而 ccBalancerWrapper 就为其实现类,先看看该接口的申明:

type ClientConn interface {
    // NewSubConn 平衡器调用 NewSubConn 来创立一个新的 SubConn,它不会阻塞并期待建设连贯,// SubConn 的行为能够通过 NewSubConnOptions 来管制。NewSubConn([]resolver.Address, NewSubConnOptions) (SubConn, error)

    // RemoveSubConn 从 ClientConn 中删除 SubConn。SubConn 将敞开。RemoveSubConn(SubConn)
    // UpdateAddresses 更新传入的 SubConn 中应用的地址,gRPC 查看以后连贯的地址是否仍在新列表中。如果存在,将放弃连贯,// 否则,连贯将失常敞开,并创立一个新连贯。// 这将触发 SubConn 的状态转换。UpdateAddresses(SubConn, []resolver.Address)
    
    // UpdateState 告诉 gRPC 平衡器的外部状态已更改。// gRPC 将更新 ClientConn 的连贯状态,并在新的 Picker 上调用 Pick 来抉择新的 SubConn。UpdateState(State)
    
    // 平衡器调用 ResolveNow 以告诉 gRPC 进行名称解析。ResolveNow(resolver.ResolveNowOptions)
    
    // Target 返回此 ClientConn 的拨号指标。// 已弃用:改用 BuildOptions 中的 Target 字段
    Target() string}

再看一下 ccBalancerWrapper 的创立:

func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
    ccb := &ccBalancerWrapper{
        cc:       cc,
        updateCh: buffer.NewUnbounded(),
        closed:   grpcsync.NewEvent(),
        done:     grpcsync.NewEvent(),
        subConns: make(map[*acBalancerWrapper]struct{}),
    }
    go ccb.watcher()
    ccb.balancer = b.Build(ccb, bopts)
    _, ccb.hasExitIdle = ccb.balancer.(balancer.ExitIdler)
    return ccb
}

留神

记住 go ccb.watcher() 这一行代码,后续还会回到这个办法来。

baseBalancer.UpdateClientConnState 中对新退出的 endpoint 进行 NewSubConnConnect 解决,咱们先来看看 NewSubConn 办法做了哪些事件,
来到 ccBalancerWrapper.NewSubConn 办法中:

func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {if len(addrs) <= 0 {return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
    }
    ccb.mu.Lock()
    defer ccb.mu.Unlock()
    if ccb.subConns == nil {return nil, fmt.Errorf("grpc: ClientConn balancer wrapper was closed")
    }
    ac, err := ccb.cc.newAddrConn(addrs, opts)
    if err != nil {return nil, err}
    acbw := &acBalancerWrapper{ac: ac}
    acbw.ac.mu.Lock()
    ac.acbw = acbw
    acbw.ac.mu.Unlock()
    ccb.subConns[acbw] = struct{}{}
    return acbw, nil
}

从该办法可知,次要是通过 gprc.ClientConn.newAddrConn 创立一个 addrConn 对象,并且创立一个
balancer.SubConn 的实现类对象 acBalancerWrapper,将其退出到 ccBalancerWrapper.subConns 中进行治理。

阐明

由此可知,baseBalancer.UpdateClientConnState 判断地址变更后的 address 是否为新退出的就由
ccBalancerWrapper.subConns 来比照即可得悉。

接着咱们持续看看 Connect 做了什么事件,下面曾经通过 acBalancerWrapper 创立了一个 balancer.SubConn 的实现对象,接着利用该对象进行了
Connect 办法调用,咱们来到 acBalancerWrapper.Connect() 办法中:

func (acbw *acBalancerWrapper) Connect() {acbw.mu.Lock()
    defer acbw.mu.Unlock()
    go acbw.ac.connect()}
func (ac *addrConn) connect() error {ac.mu.Lock()
    if ac.state == connectivity.Shutdown {ac.mu.Unlock()
        return errConnClosing
    }
    if ac.state != connectivity.Idle {ac.mu.Unlock()
        return nil
    }
    ac.updateConnectivityState(connectivity.Connecting, nil)
    ac.mu.Unlock()
    
    ac.resetTransport()
    return nil
}

ac.updateConnectivityState 更新链接状态,ac.resetTransport 次要工作内容就是从 resolver.Address 列表中依照去创立链接并同样调用 ac.updateConnectivityState 更新状态,具体源码可自行浏览,
咱们接着 ac.updateConnectivityState 办法往下走,其实该办法调用了 grpc.ClientConn.handleSubConnStateChange 办法,最终又回到了 ccBalancerWrapper.handleSubConnStateChange 办法中,其办法调用链如下:

ac.updateConnectivityState -> grpc.ClientConn.handleSubConnStateChange -> ccBalancerWrapper.handleSubConnStateChange

来看一下最初一个办法 ccBalancerWrapper.handleSubConnStateChange 的源码:

func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
    if sc == nil {return}
    ccb.updateCh.Put(&scStateUpdate{
        sc:    sc,
        state: s,
        err:   err,
    })
}

该办法把一个 balancer.SubConnconnectivity.State 丢进了一个切片,而后通过一个 channel 管制另一个 goroutine 取数据

func (b *Unbounded) Put(t interface{}) {b.mu.Lock()
    if len(b.backlog) == 0 {
        select {
        case b.c <- t:
            b.mu.Unlock()
            return
        default:
        }
    }
    b.backlog = append(b.backlog, t)
    b.mu.Unlock()}

这里的数据写入后,在哪里读取,这就回到上文须要大家重点记住的一个 goroutine 调用了,还记得吗,试着回顾一下,没错就是 go ccb.watcher()

咱们来看看 watcher 办法,由上文可知,咱们写如的数据是 scStateUpdate 对象,因而如下源码就仅看获取该对象的 case 即可,省略了临时不须要关注的代码:

func (ccb *ccBalancerWrapper) watcher() {
    for {
        select {case t := <-ccb.updateCh.Get():
            ccb.updateCh.Load()
            if ccb.closed.HasFired() {break}
            switch u := t.(type) {
            case *scStateUpdate:
                ccb.balancerMu.Lock()
                ccb.balancer.UpdateSubConnState(u.sc, balancer.SubConnState{ConnectivityState: u.state, ConnectionError: u.err})
                ccb.balancerMu.Unlock()
            case ...:
                ...
            default:
                logger.Errorf("ccBalancerWrapper.watcher: unknown update %+v, type %T", t, t)
            }
        case <-ccb.closed.Done():}
        ...
    }
}

由源码得悉,其最终调用了 balancer.Balancer.UpdateSubConnState 办法,咱们以 roundrobin 负载均衡器来查看,由上文知,gRPC 的 balancer 最终实现类是
baseBalancer,因而 balancer.Balancer.UpdateSubConnState 最终落到了 baseBalancer.UpdateSubConnState 办法上,

func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
    s := state.ConnectivityState
    ...
    oldS, ok := b.scStates[sc]
    if !ok {
        ...
        return
    }
    if oldS == connectivity.TransientFailure &&
        (s == connectivity.Connecting || s == connectivity.Idle) {
        if s == connectivity.Idle {sc.Connect()
        }
        return
    }
    b.scStates[sc] = s
    switch s {
    case connectivity.Idle:
        sc.Connect()
    case connectivity.Shutdown:
        // When an address was removed by resolver, b called RemoveSubConn but
        // kept the sc's state in scStates. Remove state for this sc here.
        delete(b.scStates, sc)
    case connectivity.TransientFailure:
        // Save error to be reported via picker.
        b.connErr = state.ConnectionError
    }
    
    b.state = b.csEvltr.RecordTransition(oldS, s)
    ...
    if (s == connectivity.Ready) != (oldS == connectivity.Ready) ||
        b.state == connectivity.TransientFailure {b.regeneratePicker()
    }
    b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
}

该办法中最终只会有状态 connectivity.ReadySubConn 往下走,其余的状态要么被从新发动 Connect,要么被移出
最初一行代码发动 balancer.ClientConn.UpdateState 调用,因为 ccBalancerWrapperbalancer.ClientConn 的实现,因而来到
balancer.ClientConn.UpdateState 下,该办法做了两件事件:

  • 更新 balancer.Picker
  • 调用 grpc.connectivityStateManager.updateState 办法,该办法开释一个 channel 信号,告诉 goroutine 进行信息处理,该 goroutine 咱们后续再讲。

上文讲了这么多,那么负载算法在哪里,又何时调用呢?
由上文可知,baseBalancer.UpdateSubConnState 更新了一个 picker,这个 picker 来自哪里?追溯一下源码联合 roundrobin 负载均衡器可知,该 picker是在
balancer.Builder 的实现类调用 base.NewBalancerBuilder 创立实例时传入的 base.PickBuilder 实现类 rrPickerBuilder 结构进去的,看一下 rrPickerBuilder
的源码可知 Pick 办法中就是对 SubConn 进行负载算法的具体逻辑了。

func (p *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {p.mu.Lock()
    sc := p.subConns[p.next]
    p.next = (p.next + 1) % len(p.subConns)
    p.mu.Unlock()
    return balancer.PickResult{SubConn: sc}, nil
}

那么该办法什么时候调用呢?这里间接给出答案,在 grpc.ClientConn 发动 Invoke 办法调用时会通过调用链调用到,咱们后续源码浏览到那里在来剖析。

自定义负载均衡器

自定义负载均衡器首先须要理解 gRPC 的插件式编程,这部分内容能够自行 google。

环境

etcd
go

负载平衡指标

随机抉择

  1. 实现 balancer.Builder
    咱们就不一一实现其办法了,因为负载均衡器的重点在负载平衡算法,即实现 base.PickerBuilder,咱们间接用 gRPC 提供的 base.NewBalancerBuilder 来创立 balancer.Builder

    const Name = "random"
    
    func init() {balancer.Register(newBuilder())
    }
    
    func newBuilder() balancer.Builder {return base.NewBalancerBuilder(Name, &randomPickerBuilder{}, base.Config{HealthCheck: true})
    }
  2. 实现 base.PickerBuilder

    func (r *randomPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {if len(info.ReadySCs) == 0 {return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
     }
     readyScs := make([]Conn, 0, len(info.ReadySCs))
     for sc, info := range info.ReadySCs {
         readyScs = append(readyScs, Conn{
             SubConn:     sc,
             SubConnInfo: info,
         })
     }
     return &randomPicker{
         subConns: readyScs,
         r:        rand.New(rand.NewSource(time.Now().UnixNano())),
     }
    }
  3. 实现 balancer.Picker
    balancer.Picker 才是咱们须要扩大的逻辑,即依照本人想要的负载平衡算法从 SunConn 列表中抉择一个可用的 SubConn 创立链接。
func (r *randomPicker) Pick(_ balancer.PickInfo) (balancer.PickResult, error) {next := r.r.Int() % len(r.subConns)
    sc := r.subConns[next]
    fmt.Printf("picked: %+v\n", sc.SubConnInfo.Address.Addr)
    return balancer.PickResult{SubConn: sc.SubConn,}, nil
}
  1. 应用自定义负载均衡器

    r := resolverBuilder.NewCustomBuilder(resolverBuilder.Scheme)
    options := []grpc.DialOption{grpc.WithInsecure(), grpc.WithResolvers(r), grpc.WithBalancerName(builder.Name)}
    conn, err := grpc.Dial(resolverBuilder.Format("grpc-server"), options...)

演示成果

  1. 启动多个 server 实例,我这里启动了三个

    $ go run server.go -addr localhost:8888
    $ go run server.go -addr localhost:8889
    $ go run server.go -addr localhost:8890
  2. 屡次启动 client,察看 Pick 的日志输入

    go run client.go
    endpoints:  [localhost:8888 localhost:8889 localhost:8888 localhost:8889 localhost:8890]
    picked: localhost:8888
    output:  hi
go run client.go
endpoints:  [localhost:8888 localhost:8889 localhost:8888 localhost:8889 localhost:8890]
picked: localhost:8890
output:  hi
go run client.go
endpoints:  [localhost:8888 localhost:8889 localhost:8888 localhost:8889 localhost:8890]
picked: localhost:8889
output:  hi

总结

grpc 通过服务发现或者直连模式获取到 gRPC server 的实例的 endpoints,而后告诉负载均衡器进行 SubConn 更新,对于新退出的 endpoint 进行实例创立,移出废除的 endpoint,
最初通过状态更新将状态为 ReadySubConn 进行治理,gRPC 在调用 Invoke(即 client 发动申请)时,则会通过负载均衡器中的 Picker 去依照某一个负载平衡算法抉择一个 SubConn
创立链接,如果创立胜利则不再进行其余 SubConn 的尝试,否则会依照肯定的退却算法进行重试,直到退却失败或者创立链接胜利为止。

自定义负载均衡器的外围逻辑在于对 Picker 的实现,从 SubConn 列表中依照负载平衡算法抉择一个 SubConn 创立链接,自定义负载均衡器和 Resolver 一样都是用到了插件式编程提供了扩大能力。

本次源码浏览只是为了了解 gRPC 的调用流程,其中有很多细节在源码正文中有阐明,其能够加深咱们对 gRPC 的了解,因而在了解本文介绍后,能够再次浏览源码加深一下了解。

源码

https://github.com/anqiansong…

正文完
 0