grpc 版本 1.50
client.go 代码:
func main() {flag.Parse()
// Set up a connection to the server.
conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewGreeterClient(conn)
// Contact the server and print out its response.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
if err != nil {log.Fatalf("could not greet: %v", err)
}
log.Printf("Greeting: %s", r.GetMessage())
}
Dial 源码:
func Dial(target string, opts ...DialOption) (*ClientConn, error) {return DialContext(context.Background(), target, opts...)
}
DialContext 源码:
省略次局部代码
// 首先会创立 ClientConn,初始化相干字段
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
csMgr: &connectivityStateManager{},
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
blockingpicker: newPickerWrapper(),
czData: new(channelzData),
firstResolveEvent: grpcsync.NewEvent(),}
// 将用户设置的连贯参数更新到客户端连接器 ClientConn
for _, opt := range opts {opt.apply(&cc.dopts)
}
return cc, nil
}
connect() 办法:
func (ac *addrConn) connect() error {ac.mu.Lock()
// if 校验状态
if ac.state == connectivity.Shutdown {ac.mu.Unlock()
return errConnClosing
}
if ac.state != connectivity.Idle {ac.mu.Unlock()
return nil
}
ac.updateConnectivityState(connectivity.Connecting, nil)
ac.mu.Unlock()
// 次要看这个办法,重试连贯
ac.resetTransport()
return nil
}
进入 resetTransport() 源码
func (ac *addrConn) resetTransport() {ac.mu.Lock()
// 判断状态若为 shutdown,则不再连贯间接推出
if ac.state == connectivity.Shutdown {ac.mu.Unlock()
return
}
addrs := ac.addrs
// 连贯失败,须要进行重试的
// Backoff 是须要期待的工夫,ac.backoffIdx 示意第几次重试
backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
// 计算出本次向 gRPC 服务尝试建设 TCP 连贯的最长工夫
// 若超过这个工夫还是连贯不上,则被动断开,期待尝试下次连贯
// This will be the duration that dial gets to finish.
dialDuration := minConnectTimeout
if ac.dopts.minConnectTimeout != nil {dialDuration = ac.dopts.minConnectTimeout()
}
if dialDuration < backoffFor {
// Give dial more time as we keep failing to connect.
dialDuration = backoffFor
}
connectDeadline := time.Now().Add(dialDuration)
// 更新构造 addrConn 状态为 connecting
ac.updateConnectivityState(connectivity.Connecting, nil)
ac.mu.Unlock()
// 向服务器连贯失败后须要做的逻辑
if err := ac.tryAllAddrs(addrs, connectDeadline); err != nil {ac.cc.resolveNow(resolver.ResolveNowOptions{})
// After exhausting all addresses, the addrConn enters
// TRANSIENT_FAILURE.
ac.mu.Lock()
if ac.state == connectivity.Shutdown {ac.mu.Unlock()
return
}
ac.updateConnectivityState(connectivity.TransientFailure, err)
// Backoff.
b := ac.resetBackoff
ac.mu.Unlock()
// 定时的超工夫
timer := time.NewTimer(backoffFor)
// 1.timer.C 如果连贯超时,重试的次数 +1,持续下一次重试连贯,但须要期待一段时间
// 2. b, 间接敞开,将持续进行从新连贯
// 2. ac.ctx.Done,走这里的话,上下文完结,这里不会再次重试了
select {
case <-timer.C:
ac.mu.Lock()
ac.backoffIdx++
ac.mu.Unlock()
case <-b:
timer.Stop()
case <-ac.ctx.Done():
timer.Stop()
return
}
ac.mu.Lock()
// 状态 != shutdown 就更新为闲暇状态
if ac.state != connectivity.Shutdown {ac.updateConnectivityState(connectivity.Idle, err)
}
ac.mu.Unlock()
return
}
// 连贯胜利,从新设置 backoff 为原始值 0
ac.mu.Lock()
ac.backoffIdx = 0
ac.mu.Unlock()}
如何计算重试连贯等待时间?
进入 Backoff 办法
func (bc Exponential) Backoff(retries int) time.Duration {
if retries == 0 {return bc.Config.BaseDelay}
backoff, max := float64(bc.Config.BaseDelay), float64(bc.Config.MaxDelay)
for backoff < max && retries > 0 {
// 幂次方
backoff *= bc.Config.Multiplier
retries--
}
// 不能超过最大延时工夫
if backoff > max {backoff = max}
// Randomize backoff delays so that if a cluster of requests start at
// the same time, they won't operate in lockstep.
backoff *= 1 + bc.Config.Jitter*(grpcrand.Float64()*2-1)
if backoff < 0 {return 0}
return time.Duration(backoff)
}
总结:
- 连贯失败后,客户端会进行重试连贯。
- 重试次数越多,期待下一次连接时间也会变长,但不能超过 MaxDelay 值。
更多 Go 云原生学习材料,收录于 Github:https://github.com/metashops/GoFamily
本文由 mdnice 多平台公布