乐趣区

关于后端:详解-gRPC-客户端长连接机制实现

本文作者:

熊喵君,原文链接:https://pandaychen.github.io/…

转载

Go 语言中文网 公众号
Golang 爱好者社区,这里有精选的网站上数千篇优良文章供你学习,内容涵盖 Golang 根底系列教程、实战教程等优良开源我的项目实际,同时会分享职场教训。每周获取 Golang 一周资讯等值得关注的内容

0x00 前言

HTTP2 是一个全双工的流式协定, 服务端也能够被动 ping 客户端, 且服务端还会有一些检测连贯可用性和管制客户端 ping 包频率的配置。gRPC 就是采纳 HTTP2 来作为其根底通信模式的,所以默认的 gRPC 客户端都是长连贯。
有这么一种场景,须要客户端和服务端放弃长久的长连贯,即无论服务端、客户端异样断开或重启,长连贯都要具备重试保活(当然前提是两方重启都胜利)的需要。在 gRPC 中,对于曾经建设的长连贯,服务端异样重启之后,客户端个别会收到如下谬误:

rpc error: code = Unavailable desc = transport is closing

大部分的 gRPC 客户端封装都没有很好的解决这类 case,参见 Warden 对于 Server 端服务重启后 Client 连贯断开之后的重试问题[1],对于这种谬误,举荐有两种解决办法:

  1. 重试:在客户端调用失败时,抉择以指数退却(Exponential Backoff)来优雅进行重试
  2. 减少 keepalive 的保活策略
  3. 减少重连(auto reconnect)策略

这篇文章就来剖析下如何实现这样的客户端保活(keepalive)逻辑。提到保活机制,咱们先看下 gRPC 的 keepalive 机制[2]。
0x01 HTTP2 的 GOAWAY 帧
HTTP2 应用 GOAWAY 帧信号来管制连贯敞开,GOAWAY 用于启动连贯敞开或收回严重错误状态信号。GOAWAY 语义为容许端点失常进行承受新的流,同时依然实现对先前建设的流的解决,当 client 收到这个包之后就会被动敞开连贯。下次须要发送数据时,就会从新建设连贯。GOAWAY 是实现 grpc.gracefulStop 机制的重要保障。

gRPC 客户端 keepalive

gRPC 客户端提供 keepalive 配置如下:

var kacp = keepalive.ClientParameters{
 Time:                10 * time.Second, // send pings every 10 seconds if there is no activity
 Timeout:             time.Second,      // wait 1 second for ping ack before considering the connection dead
 PermitWithoutStream: true,             // send pings even without active streams
}
//Dial 中传入 keepalive 配置
conn, err := grpc.Dial(*addr, grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp))

keepalive.ClientParameters 参数的含意如下:

  • Time:如果没有 activity,则每隔 10s 发送一个 ping 包
  • Timeout:如果 ping ack 1s 之内未返回则认为连贯已断开
  • PermitWithoutStream:如果没有 active 的 stream,是否容许发送 ping
    联想到,在我的项目中 ssh 客户端[3] 和 mysql 客户端中都有着相似的实现,即独自开启协程来实现 keepalive:如上面的代码(以 ssh 为例):

    go func() {t := time.NewTicker(2 * time.Second)
      defer t.Stop()
      for range t.C {_, _, err := client.Conn.SendRequest("keepalive@golang.org", true, nil)
          if err != nil {return}
      }
    }()

    gPRC 的实现

    在 grpc-go 的 newHTTP2Client[4] 办法中,有上面的逻辑:即在新建一个 HTTP2Client 的时候会启动一个 goroutine 来解决 keepalive

    // newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
    // and starts to receive messages on it. Non-nil error returns if construction
    // fails.
    func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
      ...
     if t.keepaliveEnabled {t.kpDormancyCond = sync.NewCond(&t.mu)
    go t.keepalive()}
      ...
    }

    接下来,看下 keepalive 办法[5] 的实现:

    func (t *http2Client) keepalive() {p := &ping{data: [8]byte{}} //ping 的内容
     timer := time.NewTimer(t.kp.Time) // 启动一个定时器, 触发工夫为配置的 Time 值
     //for loop
     for {
    select {
    // 定时器触发
    case <-timer.C:
     if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {timer.Reset(t.kp.Time)
      continue
     }
     // Check if keepalive should go dormant.
     t.mu.Lock()
     if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
      // Make awakenKeepalive writable.
      <-t.awakenKeepalive
      t.mu.Unlock()
      select {
      case <-t.awakenKeepalive:
       // If the control gets here a ping has been sent
       // need to reset the timer with keepalive.Timeout.
      case <-t.ctx.Done():
       return
      }
     } else {t.mu.Unlock()
      if channelz.IsOn() {atomic.AddInt64(&t.czData.kpCount, 1)
      }
      // Send ping.
      t.controlBuf.put(p)
     }
    
     // By the time control gets here a ping has been sent one way or the other.
     timer.Reset(t.kp.Timeout)
     select {
     case <-timer.C:
      if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {timer.Reset(t.kp.Time)
       continue
      }
      t.Close()
      return
     case <-t.ctx.Done():
      if !timer.Stop() {<-timer.C}
      return
     }
    // 下层告诉 context 完结
    case <-t.ctx.Done():
     if !timer.Stop() {
      // 返回 false,示意 timer 未被销毁
      <-timer.C
     }
     return
    }
     }
    }

    从客户端的 keepalive 实现中梳理下执行逻辑:

  • 填充 ping 包内容, 为 [8]byte{},创立定时器, 触发工夫为用户配置中的 Time
  • 循环解决,select 的两大分支,一为定时器触发后执行的逻辑,另一分支为 t.ctx.Done(),即 keepalive 的下层利用调用了 cancel 完结 context 子树
  • 外围逻辑在定时器触发的过程中

gRPC 服务端的 keepalive

gRPC 的服务端次要有两块逻辑:
接管并相应客户端的 ping 包
独自启动 goroutine 探测客户端是否存活
gRPC 服务端提供 keepalive 配置,分为两局部 keepalive.EnforcementPolicy 和 keepalive.ServerParameters,如下:

var kaep = keepalive.EnforcementPolicy{
 MinTime:             5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection
 PermitWithoutStream: true,            // Allow pings even when there are no active streams
}

var kasp = keepalive.ServerParameters{
 MaxConnectionIdle:     15 * time.Second, // If a client is idle for 15 seconds, send a GOAWAY
 MaxConnectionAge:      30 * time.Second, // If any connection is alive for more than 30 seconds, send a GOAWAY
 MaxConnectionAgeGrace: 5 * time.Second,  // Allow 5 seconds for pending RPCs to complete before forcibly closing connections
 Time:                  5 * time.Second,  // Ping the client if it is idle for 5 seconds to ensure the connection is still active
 Timeout:               1 * time.Second,  // Wait 1 second for the ping ack before assuming the connection is dead
}

func main(){
 ...
 s := grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp))
 ...
}

keepalive.EnforcementPolicy:

  • MinTime:如果客户端两次 ping 的距离小于 5s,则敞开连贯
  • PermitWithoutStream:即便没有 active stream, 也容许 ping
    keepalive.ServerParameters:
  • MaxConnectionIdle:如果一个 client 闲暇超过 15s, 发送一个 GOAWAY, 为了避免同一时间发送大量 GOAWAY, 会在 15s 工夫距离高低浮动 15*10%, 即 15+1.5 或者 15-1.5
  • MaxConnectionAge:如果任意连贯存活工夫超过 30s, 发送一个 GOAWAY
  • MaxConnectionAgeGrace:在强制敞开连贯之间, 容许有 5s 的工夫实现 pending 的 rpc 申请
  • Time:如果一个 client 闲暇超过 5s, 则发送一个 ping 申请
  • Timeout:如果 ping 申请 1s 内未收到回复, 则认为该连贯已断开

    gRPC 的实现

    服务端解决客户端的 ping 包的 response 的逻辑在 handlePing 办法[6] 中。handlePing 办法会判断是否违反两条 policy, 如果违反则将 pingStrikes++, 当违反次数大于 maxPingStrikes(2) 时, 打印一条谬误日志并且发送一个 goAway 包,断开这个连贯,具体实现如下:

    func (t *http2Server) handlePing(f *http2.PingFrame) {if f.IsAck() {
    if f.Data == goAwayPing.data && t.drainChan != nil {close(t.drainChan)
     return
    }
    // Maybe it's a BDP ping.
    if t.bdpEst != nil {t.bdpEst.calculate(f.Data)
    }
    return
     }
     pingAck := &ping{ack: true}
     copy(pingAck.data[:], f.Data[:])
     t.controlBuf.put(pingAck)
    
     now := time.Now()
     defer func() {t.lastPingAt = now}()
     // A reset ping strikes means that we don't need to check for policy
     // violation for this ping and the pingStrikes counter should be set
     // to 0.
     if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
    t.pingStrikes = 0
    return
     }
     t.mu.Lock()
     ns := len(t.activeStreams)
     t.mu.Unlock()
     if ns < 1 && !t.kep.PermitWithoutStream {
    // Keepalive shouldn't be active thus, this new ping should
    // have come after at least defaultPingTimeout.
    if t.lastPingAt.Add(defaultPingTimeout).After(now) {t.pingStrikes++}
     } else {
    // Check if keepalive policy is respected.
    if t.lastPingAt.Add(t.kep.MinTime).After(now) {t.pingStrikes++}
     }
    
     if t.pingStrikes > maxPingStrikes {
    // Send goaway and close the connection.
    if logger.V(logLevel) {logger.Errorf("transport: Got too many pings from the client, closing the connection.")
    }
    t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
     }
    }

    留神,对 pingStrikes 累加的逻辑:

  • t.lastPingAt.Add(defaultPingTimeout).After(now):
  • t.lastPingAt.Add(t.kep.MinTime).After(now):

    func (t *http2Server) handlePing(f *http2.PingFrame) {
     ...
     if ns < 1 && !t.kep.PermitWithoutStream {
    // Keepalive shouldn't be active thus, this new ping should
    // have come after at least defaultPingTimeout.
    if t.lastPingAt.Add(defaultPingTimeout).After(now) {t.pingStrikes++}
     } else {
    // Check if keepalive policy is respected.
    if t.lastPingAt.Add(t.kep.MinTime).After(now) {t.pingStrikes++}
     }
     if t.pingStrikes > maxPingStrikes {
    // Send goaway and close the connection.
    errorf("transport: Got too many pings from the client, closing the connection.")
    t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
     }
    }

    keepalive 相干代码

    gRPC 服务端新建一个 HTTP2 server 的时候会启动一个独自的 goroutine 解决 keepalive 逻辑,newHTTP2Server 办法[7]:

    func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
     ...
     go t.keepalive()
     ...
    }

    简略剖析下 keepalive 的实现,外围逻辑是启动 3 个定时器,别离为 maxIdle、maxAge 和 keepAlive,而后在 for select 中解决相干定时器触发事件:

  • maxIdle 逻辑:判断 client 闲暇工夫是否超出配置的工夫, 如果超时, 则调用 t.drain, 该办法会发送一个 GOAWAY 包
    maxAge 逻辑:触发之后首先调用 t.drain 发送 GOAWAY 包, 接着重置定时器, 工夫设置为 MaxConnectionAgeGrace, 再次触发后调用 t.Close() 间接敞开(有些 graceful 的象征)
  • keepalive 逻辑:首先判断 activity 是否为 1, 如果不是则置 pingSent 为 true, 并且发送 ping 包, 接着重置定时器工夫为 Timeout, 再次触发后如果 activity 不为 1(即未收到 ping 的回复)并且 pingSent 为 true, 则调用 t.Close() 敞开连贯

    func (t *http2Server) keepalive() {p := &ping{}
     var pingSent bool
     maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
     maxAge := time.NewTimer(t.kp.MaxConnectionAge)
     keepalive := time.NewTimer(t.kp.Time)
     // NOTE: All exit paths of this function should reset their
     // respective timers. A failure to do so will cause the
     // following clean-up to deadlock and eventually leak.
     defer func() {
    // 退出前,实现定时器的回收工作
    if !maxIdle.Stop() {<-maxIdle.C}
    if !maxAge.Stop() {<-maxAge.C}
    if !keepalive.Stop() {<-keepalive.C}
     }()
     for {
    select {
    case <-maxIdle.C:
     t.mu.Lock()
     idle := t.idle
     if idle.IsZero() { // The connection is non-idle.
      t.mu.Unlock()
      maxIdle.Reset(t.kp.MaxConnectionIdle)
      continue
     }
     val := t.kp.MaxConnectionIdle - time.Since(idle)
     t.mu.Unlock()
     if val <= 0 {
      // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
      // Gracefully close the connection.
      t.drain(http2.ErrCodeNo, []byte{})
      // Resetting the timer so that the clean-up doesn't deadlock.
      maxIdle.Reset(infinity)
      return
     }
     maxIdle.Reset(val)
    case <-maxAge.C:
     t.drain(http2.ErrCodeNo, []byte{})
     maxAge.Reset(t.kp.MaxConnectionAgeGrace)
     select {
     case <-maxAge.C:
      // Close the connection after grace period.
      t.Close()
      // Resetting the timer so that the clean-up doesn't deadlock.
      maxAge.Reset(infinity)
     case <-t.ctx.Done():}
     return
    case <-keepalive.C:
     if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
      pingSent = false
      keepalive.Reset(t.kp.Time)
      continue
     }
     if pingSent {t.Close()
      // Resetting the timer so that the clean-up doesn't deadlock.
      keepalive.Reset(infinity)
      return
     }
     pingSent = true
     if channelz.IsOn() {atomic.AddInt64(&t.czData.kpCount, 1)
     }
     t.controlBuf.put(p)
     keepalive.Reset(t.kp.Timeout)
    case <-t.ctx.Done():
     return
    }
     }
    }

    实现强壮的长连贯客户端

    参考资料
    [1]
    Warden 对于 Server 端服务重启后 Client 连贯断开之后的重试问题: https://github.com/go-kratos/…

[2]
keepalive 机制: https://github.com/grpc/grpc/…

[3]
ssh 客户端: https://pandaychen.github.io/… 客户端 -keepalive- 机制

[4]
newHTTP2Client: https://github.com/grpc/grpc-…

[5]
keepalive 办法: https://github.com/grpc/grpc-…

[6]
handlePing 办法: https://github.com/grpc/grpc-…

[7]
newHTTP2Server 办法: https://github.com/grpc/grpc-…

[8]
服务端: https://github.com/grpc/grpc-…

[9]
客户端: https://github.com/grpc/grpc-…

[10]
GRPC 开箱手册: https://juejin.im/post/684490…

本文由 mdnice 多平台公布

退出移动版