grpc 版本1.50
client.go 代码:
func main() { flag.Parse() // Set up a connection to the server. conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatalf("did not connect: %v", err) } defer conn.Close() c := pb.NewGreeterClient(conn) // Contact the server and print out its response. ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name}) if err != nil { log.Fatalf("could not greet: %v", err) } log.Printf("Greeting: %s", r.GetMessage())}
Dial 源码:
func Dial(target string, opts ...DialOption) (*ClientConn, error) { return DialContext(context.Background(), target, opts...)}
DialContext 源码:
省略次局部代码
// 首先会创立 ClientConn,初始化相干字段func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { cc := &ClientConn{ target: target, csMgr: &connectivityStateManager{}, conns: make(map[*addrConn]struct{}), dopts: defaultDialOptions(), blockingpicker: newPickerWrapper(), czData: new(channelzData), firstResolveEvent: grpcsync.NewEvent(), } // 将用户设置的连贯参数更新到客户端连接器 ClientConn for _, opt := range opts { opt.apply(&cc.dopts) } return cc, nil}
connect() 办法:
func (ac *addrConn) connect() error { ac.mu.Lock() // if 校验状态 if ac.state == connectivity.Shutdown { ac.mu.Unlock() return errConnClosing } if ac.state != connectivity.Idle { ac.mu.Unlock() return nil } ac.updateConnectivityState(connectivity.Connecting, nil) ac.mu.Unlock() // 次要看这个办法,重试连贯 ac.resetTransport() return nil}
进入 resetTransport() 源码
func (ac *addrConn) resetTransport() { ac.mu.Lock() // 判断状态若为 shutdown,则不再连贯间接推出 if ac.state == connectivity.Shutdown { ac.mu.Unlock() return } addrs := ac.addrs // 连贯失败,须要进行重试的 // Backoff 是须要期待的工夫,ac.backoffIdx示意第几次重试 backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx) // 计算出本次向gRPC服务尝试建设TCP连贯的最长工夫 // 若超过这个工夫还是连贯不上,则被动断开,期待尝试下次连贯 // This will be the duration that dial gets to finish. dialDuration := minConnectTimeout if ac.dopts.minConnectTimeout != nil { dialDuration = ac.dopts.minConnectTimeout() } if dialDuration < backoffFor { // Give dial more time as we keep failing to connect. dialDuration = backoffFor } connectDeadline := time.Now().Add(dialDuration) // 更新构造addrConn状态为connecting ac.updateConnectivityState(connectivity.Connecting, nil) ac.mu.Unlock() // 向服务器连贯失败后须要做的逻辑 if err := ac.tryAllAddrs(addrs, connectDeadline); err != nil { ac.cc.resolveNow(resolver.ResolveNowOptions{}) // After exhausting all addresses, the addrConn enters // TRANSIENT_FAILURE. ac.mu.Lock() if ac.state == connectivity.Shutdown { ac.mu.Unlock() return } ac.updateConnectivityState(connectivity.TransientFailure, err) // Backoff. b := ac.resetBackoff ac.mu.Unlock() // 定时的超工夫 timer := time.NewTimer(backoffFor) // 1.timer.C如果连贯超时,重试的次数+1,持续下一次重试连贯,但须要期待一段时间 // 2. b,间接敞开,将持续进行从新连贯 // 2. ac.ctx.Done,走这里的话,上下文完结,这里不会再次重试了 select { case <-timer.C: ac.mu.Lock() ac.backoffIdx++ ac.mu.Unlock() case <-b: timer.Stop() case <-ac.ctx.Done(): timer.Stop() return } ac.mu.Lock() // 状态 != shutdown就更新为闲暇状态 if ac.state != connectivity.Shutdown { ac.updateConnectivityState(connectivity.Idle, err) } ac.mu.Unlock() return } // 连贯胜利,从新设置backoff为原始值0 ac.mu.Lock() ac.backoffIdx = 0 ac.mu.Unlock()}
如何计算重试连贯等待时间?
进入Backoff办法
func (bc Exponential) Backoff(retries int) time.Duration { if retries == 0 { return bc.Config.BaseDelay } backoff, max := float64(bc.Config.BaseDelay), float64(bc.Config.MaxDelay) for backoff < max && retries > 0 { // 幂次方 backoff *= bc.Config.Multiplier retries-- } // 不能超过最大延时工夫 if backoff > max { backoff = max } // Randomize backoff delays so that if a cluster of requests start at // the same time, they won't operate in lockstep. backoff *= 1 + bc.Config.Jitter*(grpcrand.Float64()*2-1) if backoff < 0 { return 0 } return time.Duration(backoff)}
总结:
- 连贯失败后,客户端会进行重试连贯。
- 重试次数越多,期待下一次连接时间也会变长,但不能超过MaxDelay值。
更多Go云原生学习材料,收录于Github:https://github.com/metashops/GoFamily
本文由mdnice多平台公布