乐趣区

关于程序员:第三期gRPC客户端与服务端连接失败后是否会有重试机制

grpc 版本 1.50

client.go 代码:

func main() {flag.Parse()
    // Set up a connection to the server.
    conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
    if err != nil {log.Fatalf("did not connect: %v", err)
    }
    defer conn.Close()
    c := pb.NewGreeterClient(conn)

    // Contact the server and print out its response.
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()
    r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
    if err != nil {log.Fatalf("could not greet: %v", err)
    }
    log.Printf("Greeting: %s", r.GetMessage())
}

Dial 源码:

func Dial(target string, opts ...DialOption) (*ClientConn, error) {return DialContext(context.Background(), target, opts...)
}

DialContext 源码:

省略次局部代码

// 首先会创立 ClientConn,初始化相干字段
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
    cc := &ClientConn{
        target:            target,
        csMgr:             &connectivityStateManager{},
        conns:             make(map[*addrConn]struct{}),
        dopts:             defaultDialOptions(),
        blockingpicker:    newPickerWrapper(),
        czData:            new(channelzData),
        firstResolveEvent: grpcsync.NewEvent(),}

    // 将用户设置的连贯参数更新到客户端连接器 ClientConn
    for _, opt := range opts {opt.apply(&cc.dopts)
    }

    return cc, nil
}

connect() 办法:

func (ac *addrConn) connect() error {ac.mu.Lock()
    // if 校验状态
    if ac.state == connectivity.Shutdown {ac.mu.Unlock()
        return errConnClosing
    }
    if ac.state != connectivity.Idle {ac.mu.Unlock()
        return nil
    }
    ac.updateConnectivityState(connectivity.Connecting, nil)
    ac.mu.Unlock()
    // 次要看这个办法,重试连贯
    ac.resetTransport()
    return nil
}

进入 resetTransport() 源码

func (ac *addrConn) resetTransport() {ac.mu.Lock()
    // 判断状态若为 shutdown,则不再连贯间接推出
    if ac.state == connectivity.Shutdown {ac.mu.Unlock()
        return
    }

    addrs := ac.addrs
    // 连贯失败,须要进行重试的
    // Backoff 是须要期待的工夫,ac.backoffIdx 示意第几次重试
    backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
    // 计算出本次向 gRPC 服务尝试建设 TCP 连贯的最长工夫
    // 若超过这个工夫还是连贯不上,则被动断开,期待尝试下次连贯
    // This will be the duration that dial gets to finish.
    dialDuration := minConnectTimeout
    if ac.dopts.minConnectTimeout != nil {dialDuration = ac.dopts.minConnectTimeout()
    }

    if dialDuration < backoffFor {
        // Give dial more time as we keep failing to connect.
        dialDuration = backoffFor
    }
    connectDeadline := time.Now().Add(dialDuration)

    // 更新构造 addrConn 状态为 connecting
    ac.updateConnectivityState(connectivity.Connecting, nil)
    ac.mu.Unlock()

    // 向服务器连贯失败后须要做的逻辑
    if err := ac.tryAllAddrs(addrs, connectDeadline); err != nil {ac.cc.resolveNow(resolver.ResolveNowOptions{})
        // After exhausting all addresses, the addrConn enters
        // TRANSIENT_FAILURE.
        ac.mu.Lock()
        if ac.state == connectivity.Shutdown {ac.mu.Unlock()
            return
        }
        ac.updateConnectivityState(connectivity.TransientFailure, err)

        // Backoff.
        b := ac.resetBackoff
        ac.mu.Unlock()

        // 定时的超工夫
        timer := time.NewTimer(backoffFor)
        // 1.timer.C 如果连贯超时,重试的次数 +1,持续下一次重试连贯,但须要期待一段时间
        // 2. b, 间接敞开,将持续进行从新连贯
        // 2. ac.ctx.Done,走这里的话,上下文完结,这里不会再次重试了
        select {
        case <-timer.C:
            ac.mu.Lock()
            ac.backoffIdx++
            ac.mu.Unlock()
        case <-b:
            timer.Stop()
        case <-ac.ctx.Done():
            timer.Stop()
            return
        }

        ac.mu.Lock()
        // 状态 != shutdown 就更新为闲暇状态
        if ac.state != connectivity.Shutdown {ac.updateConnectivityState(connectivity.Idle, err)
        }
        ac.mu.Unlock()
        return
    }
    // 连贯胜利,从新设置 backoff 为原始值 0
    ac.mu.Lock()
    ac.backoffIdx = 0
    ac.mu.Unlock()}

如何计算重试连贯等待时间?

进入 Backoff 办法


func (bc Exponential) Backoff(retries int) time.Duration {
    if retries == 0 {return bc.Config.BaseDelay}
    backoff, max := float64(bc.Config.BaseDelay), float64(bc.Config.MaxDelay)
    for backoff < max && retries > 0 {
        // 幂次方
        backoff *= bc.Config.Multiplier
        retries--
    }
    // 不能超过最大延时工夫
    if backoff > max {backoff = max}
    // Randomize backoff delays so that if a cluster of requests start at
    // the same time, they won't operate in lockstep.
    backoff *= 1 + bc.Config.Jitter*(grpcrand.Float64()*2-1)
    if backoff < 0 {return 0}
    return time.Duration(backoff)
}

总结:

  • 连贯失败后,客户端会进行重试连贯。
  • 重试次数越多,期待下一次连接时间也会变长,但不能超过 MaxDelay 值。

更多 Go 云原生学习材料,收录于 Github:https://github.com/metashops/GoFamily

本文由 mdnice 多平台公布

退出移动版