关于golang:Go发起HTTP20请求流程分析中篇数据帧流控制

4次阅读

共计 15497 个字符,预计需要花费 39 分钟才能阅读完成。

来自公众号:新世界杂货铺

浏览倡议

这是 HTTP2.0 系列的第二篇,所以笔者举荐浏览程序如下:

  1. Go 中的 HTTP 申请之——HTTP1.1 申请流程剖析
  2. Go 发动 HTTP2.0 申请流程剖析(前篇)

本篇次要分为三个局部:数据帧,流控制器以及通过剖析源码逐渐理解流控制。

本无意将这三个局部拆成三篇文章,但它们之间又有分割,所以最初仍旧决定放在一篇文章外面。因为内容较多,笔者认为分三次别离浏览三个局部较佳。

数据帧

HTTP2 通信的最小单位是数据帧,每一个帧都蕴含两局部:帧头 Payload。不同数据流的帧能够交织发送(同一个数据流的帧必须程序发送),而后再依据每个帧头的数据流标识符从新组装。

因为 Payload 中为无效数据,故仅对帧头进行剖析形容。

帧头

帧头总长度为 9 个字节,并蕴含四个局部,别离是:

  1. Payload 的长度,占用三个字节。
  2. 数据帧类型,占用一个字节。
  3. 数据帧标识符,占用一个字节。
  4. 数据流 ID,占用四个字节。

用图示意如下:

数据帧的格局和各局部的含意曾经分明了,那么咱们看看代码中怎么读取一个帧头:

func http2readFrameHeader(buf []byte, r io.Reader) (http2FrameHeader, error) {_, err := io.ReadFull(r, buf[:http2frameHeaderLen])
    if err != nil {return http2FrameHeader{}, err
    }
    return http2FrameHeader{Length:   (uint32(buf[0])<<16 | uint32(buf[1])<<8 | uint32(buf[2])),
        Type:     http2FrameType(buf[3]),
        Flags:    http2Flags(buf[4]),
        StreamID: binary.BigEndian.Uint32(buf[5:]) & (1<<31 - 1),
        valid:    true,
    }, nil
}

在下面的代码中 http2frameHeaderLen 是一个常量,其值为 9。

从 io.Reader 中读取 9 个字节后,将前三个字节和后四个字节均转为 uint32 的类型,从而失去 Payload 长度和数据流 ID。另外须要了解的是帧头的前三个字节和后四个字节存储格局为大端(大小端笔者就不在这里解释了,请尚不理解的读者自行百度)。

数据帧类型

依据 http://http2.github.io/http2-…,数据帧类型总共有 10 个。在 go 源码中均有体现:

const (
    http2FrameData         http2FrameType = 0x0
    http2FrameHeaders      http2FrameType = 0x1
    http2FramePriority     http2FrameType = 0x2
    http2FrameRSTStream    http2FrameType = 0x3
    http2FrameSettings     http2FrameType = 0x4
    http2FramePushPromise  http2FrameType = 0x5
    http2FramePing         http2FrameType = 0x6
    http2FrameGoAway       http2FrameType = 0x7
    http2FrameWindowUpdate http2FrameType = 0x8
    http2FrameContinuation http2FrameType = 0x9
)

http2FrameData:次要用于发送申请 body 和接管响应的数据帧。

http2FrameHeaders:次要用于发送申请 header 和接管响应 header 的数据帧。

http2FrameSettings:次要用于 client 和 server 交换设置相干的数据帧。

http2FrameWindowUpdate:次要用于流控制的数据帧。

其余数据帧类型因为本文不波及,故不做形容。

数据帧标识符

因为数据帧标识符品种较多,笔者在这里仅介绍其中局部标识符,先看源码:

const (
    // Data Frame
    http2FlagDataEndStream http2Flags = 0x1
  
  // Headers Frame
    http2FlagHeadersEndStream  http2Flags = 0x1
  
  // Settings Frame
    http2FlagSettingsAck http2Flags = 0x1
    // 此处省略定义其余数据帧标识符的代码
)

http2FlagDataEndStream:在前篇中提到,调用 (*http2ClientConn).newStream 办法会创立一个数据流,那这个数据流什么时候完结呢,这就是 http2FlagDataEndStream 的作用。

当 client 收到有响应 body 的响应时(HEAD 申请无响应 body,301,302 等响应也无响应 body),始终读到 http2FrameData 数据帧的标识符为 http2FlagDataEndStream 则意味着本次申请完结能够敞开以后数据流。

http2FlagHeadersEndStream:如果读到的 http2FrameHeaders 数据帧有此标识符也意味着本次申请完结。

http2FlagSettingsAck:该标示符意味着对方确认收到 http2FrameSettings 数据帧。

流控制器

流控制是一种阻止发送方向接管方发送大量数据的机制,免得超出后者的需要或解决能力。Go 中 HTTP2 通过 http2flow 构造体进行流控制:

type http2flow struct {
    // n is the number of DATA bytes we're allowed to send.
    // A flow is kept both on a conn and a per-stream.
    n int32

    // conn points to the shared connection-level flow that is
    // shared by all streams on that conn. It is nil for the flow
    // that's on the conn directly.
    conn *http2flow
}

字段含意英文正文曾经形容的很分明了,所以笔者不再翻译。上面看一下和流控制无关的办法。

(*http2flow).available

此办法返回以后流控制可发送的最大字节数:

func (f *http2flow) available() int32 {
    n := f.n
    if f.conn != nil && f.conn.n < n {n = f.conn.n}
    return n
}
  • 如果 f.conn 为 nil 则意味着此控制器的管制级别为连贯,那么可发送的最大字节数就是f.n
  • 如果 f.conn 不为 nil 则意味着此控制器的管制级别为数据流,且以后数据流可发送的最大字节数不能超过以后连贯可发送的最大字节数。

(*http2flow).take

此办法用于耗费以后流控制器的可发送字节数:

func (f *http2flow) take(n int32) {if n > f.available() {panic("internal error: took too much")
    }
    f.n -= n
    if f.conn != nil {f.conn.n -= n}
}

通过理论须要传递一个参数,告知以后流控制器想要发送的数据大小。如果发送的大小超过流控制器容许的大小,则panic,如果未超过流控制器容许的大小,则将以后数据流和以后连贯的可发送字节数-n

(*http2flow).add

有耗费就有新增,此办法用于减少流控制器可发送的最大字节数:

func (f *http2flow) add(n int32) bool {
    sum := f.n + n
    if (sum > n) == (f.n > 0) {
        f.n = sum
        return true
    }
    return false
}

下面的代码惟一须要留神的中央是,当 sum 超过 int32 负数最大值 (2^31-1) 时会返回 false。

回顾 :在前篇中提到的(*http2Transport).NewClientConn 办法和 (*http2ClientConn).newStream 办法均通过 (*http2flow).add 初始化可发送数据窗口大小。

有了帧和流控制器的基本概念,上面咱们联合源码来剖析总结流控制的具体实现。

(*http2ClientConn).readLoop

前篇剖析 (*http2Transport).newClientConn 时止步于读循环,那么明天咱们就从 (*http2ClientConn).readLoop 开始。

func (cc *http2ClientConn) readLoop() {rl := &http2clientConnReadLoop{cc: cc}
    defer rl.cleanup()
    cc.readerErr = rl.run()
    if ce, ok := cc.readerErr.(http2ConnectionError); ok {cc.wmu.Lock()
        cc.fr.WriteGoAway(0, http2ErrCode(ce), nil)
        cc.wmu.Unlock()}
}

由上可知,readLoop 的逻辑比较简单,其外围逻辑在 (*http2clientConnReadLoop).run 办法里。

func (rl *http2clientConnReadLoop) run() error {
    cc := rl.cc
    rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse
    gotReply := false // ever saw a HEADERS reply
    gotSettings := false
    for {f, err := cc.fr.ReadFrame()
    // 此处省略代码
        maybeIdle := false // whether frame might transition us to idle

        switch f := f.(type) {
        case *http2MetaHeadersFrame:
            err = rl.processHeaders(f)
            maybeIdle = true
            gotReply = true
        case *http2DataFrame:
            err = rl.processData(f)
            maybeIdle = true
        case *http2GoAwayFrame:
            err = rl.processGoAway(f)
            maybeIdle = true
        case *http2RSTStreamFrame:
            err = rl.processResetStream(f)
            maybeIdle = true
        case *http2SettingsFrame:
            err = rl.processSettings(f)
        case *http2PushPromiseFrame:
            err = rl.processPushPromise(f)
        case *http2WindowUpdateFrame:
            err = rl.processWindowUpdate(f)
        case *http2PingFrame:
            err = rl.processPing(f)
        default:
            cc.logf("Transport: unhandled response frame type %T", f)
        }
        if err != nil {
            if http2VerboseLogs {cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, http2summarizeFrame(f), err)
            }
            return err
        }
        if rl.closeWhenIdle && gotReply && maybeIdle {cc.closeIfIdle()
        }
    }
}

由上可知,(*http2clientConnReadLoop).run的外围逻辑是读取数据帧而后对不同的数据帧进行不同的解决。

cc.fr.ReadFrame()会依据后面介绍的数据帧格局读出数据帧。

前篇中提到应用了一个反对 h2 协定的图片进行剖析,本篇持续复用该图片对 (*http2clientConnReadLoop).run 办法进行 debug。

收到 http2FrameSettings 数据帧

读循环会最先读到 http2FrameSettings 数据帧。读到该数据帧后会调用 (*http2clientConnReadLoop).processSettings 办法。(*http2clientConnReadLoop).processSettings次要蕴含 3 个逻辑。

1、判断是否是 http2FrameSettings 的 ack 信息,如果是间接返回,否则持续前面的步骤。

if f.IsAck() {
  if cc.wantSettingsAck {
    cc.wantSettingsAck = false
    return nil
  }
  return http2ConnectionError(http2ErrCodeProtocol)
}

2、解决不同 http2FrameSettings 的数据帧,并依据 server 传递的信息,批改 maxConcurrentStreams 等的值。

err := f.ForeachSetting(func(s http2Setting) error {
  switch s.ID {
    case http2SettingMaxFrameSize:
    cc.maxFrameSize = s.Val
    case http2SettingMaxConcurrentStreams:
    cc.maxConcurrentStreams = s.Val
    case http2SettingMaxHeaderListSize:
    cc.peerMaxHeaderListSize = uint64(s.Val)
    case http2SettingInitialWindowSize:
    if s.Val > math.MaxInt32 {return http2ConnectionError(http2ErrCodeFlowControl)
    }
    delta := int32(s.Val) - int32(cc.initialWindowSize)
    for _, cs := range cc.streams {cs.flow.add(delta)
    }
    cc.cond.Broadcast()
    cc.initialWindowSize = s.Val
    default:
    // TODO(bradfitz): handle more settings? SETTINGS_HEADER_TABLE_SIZE probably.
    cc.vlogf("Unhandled Setting: %v", s)
  }
  return nil
})

当收到 ID 为 http2SettingInitialWindowSize 的帧时,会调整以后连贯中所有数据流的可发送数据窗口大小,并批改以后连贯的initialWindowSize(每个新创建的数据流均会应用该值初始化可发送数据窗口大小)为s.Val

3、发送 http2FrameSettings 的 ack 信息给 server。

    cc.wmu.Lock()
    defer cc.wmu.Unlock()

    cc.fr.WriteSettingsAck()
    cc.bw.Flush()
    return cc.werr

收到 http2WindowUpdateFrame 数据帧

在笔者 debug 的过程中,解决完 http2FrameSettings 数据帧后,紧接着就收到了 http2WindowUpdateFrame 数据帧。收到该数据帧后会调用 (*http2clientConnReadLoop).processWindowUpdate 办法:

func (rl *http2clientConnReadLoop) processWindowUpdate(f *http2WindowUpdateFrame) error {
    cc := rl.cc
    cs := cc.streamByID(f.StreamID, false)
    if f.StreamID != 0 && cs == nil {return nil}

    cc.mu.Lock()
    defer cc.mu.Unlock()

    fl := &cc.flow
    if cs != nil {fl = &cs.flow}
    if !fl.add(int32(f.Increment)) {return http2ConnectionError(http2ErrCodeFlowControl)
    }
    cc.cond.Broadcast()
    return nil
}

下面的逻辑次要用于更新以后连贯和数据流的可发送数据窗口大小。如果 http2WindowUpdateFrame 帧中的 StreamID 为 0,则更新以后连贯的可发送数据窗口大小,否则更新对应数据流可发送数据窗口大小。

留神 :在 debug 的过程,收到http2WindowUpdateFrame 数据帧后,又收到一次http2FrameSettings,且该数据帧标识符为http2FlagSettingsAck

笔者在这里特意揭示,这是因为前篇中提到的(*http2Transport).NewClientConn 办法,也向 server 发送了 http2FrameSettings 数据帧和 http2WindowUpdateFrame 数据帧。

另外,在解决 http2FrameSettingshttp2WindowUpdateFrame过程中,均呈现了 cc.cond.Broadcast() 调用,该调用次要用于唤醒因为以下两种状况而 Wait 的申请:

  1. 因以后连贯解决的数据流曾经达到 maxConcurrentStreams 的下限(详见前篇中 (*http2ClientConn).awaitOpenSlotForRequest 办法剖析)。
  2. 因发送数据流已达可发送数据窗口下限而期待可发送数据窗口更新的申请(后续会介绍)。

收到 http2MetaHeadersFrame 数据帧

收到此数据帧意味着某一个申请曾经开始接管响应数据。此数据帧对应的处理函数为(*http2clientConnReadLoop).processHeaders

func (rl *http2clientConnReadLoop) processHeaders(f *http2MetaHeadersFrame) error {
    cc := rl.cc
    cs := cc.streamByID(f.StreamID, false)
    // 此处省略代码
    res, err := rl.handleResponse(cs, f)
    if err != nil {
        // 此处省略代码
        cs.resc <- http2resAndError{err: err}
        return nil // return nil from process* funcs to keep conn alive
    }
    if res == nil {// (nil, nil) special case. See handleResponse docs.
        return nil
    }
    cs.resTrailer = &res.Trailer
    cs.resc <- http2resAndError{res: res}
    return nil
}

首先咱们先看 cs.resc <- http2resAndError{res: res} 这一行代码,向数据流写入 http2resAndError 即本次申请的响应。在 (*http2ClientConn).roundTrip 办法中有这样一行代码readLoopResCh := cs.resc

回顾 :前篇(*http2ClientConn).roundTrip 办法的第 7 点和本局部关联起来就能够造成一个残缺的申请链。

接下来咱们对 rl.handleResponse 办法开展剖析。

(*http2clientConnReadLoop).handleResponse

(*http2clientConnReadLoop).handleResponse的次要作用是构建一个 Response 变量,上面对该函数的关键步骤进行形容。

1、构建一个 Response 变量。

header := make(Header)
res := &Response{
  Proto:      "HTTP/2.0",
  ProtoMajor: 2,
  Header:     header,
  StatusCode: statusCode,
  Status:     status + " " + StatusText(statusCode),
}

2、构建 header(本篇不对 header 进行开展剖析)。

for _, hf := range f.RegularFields() {key := CanonicalHeaderKey(hf.Name)
  if key == "Trailer" {
    t := res.Trailer
    if t == nil {t = make(Header)
      res.Trailer = t
    }
    http2foreachHeaderElement(hf.Value, func(v string) {t[CanonicalHeaderKey(v)] = nil
    })
  } else {header[key] = append(header[key], hf.Value)
  }
}

3、解决响应 body 的 ContentLength。

streamEnded := f.StreamEnded()
isHead := cs.req.Method == "HEAD"
if !streamEnded || isHead {
  res.ContentLength = -1
  if clens := res.Header["Content-Length"]; len(clens) == 1 {if clen64, err := strconv.ParseInt(clens[0], 10, 64); err == nil {res.ContentLength = clen64} else {
      // TODO: care? unlike http/1, it won't mess up our framing, so it's
      // more safe smuggling-wise to ignore.
    }
  } else if len(clens) > 1 {
    // TODO: care? unlike http/1, it won't mess up our framing, so it's
    // more safe smuggling-wise to ignore.
  }
}

由上可知,以后数据流没有完结或者是 HEAD 申请才读取 ContentLength。如果 header 中的 ContentLength 不非法则 res.ContentLength 的值为 -1

4、构建res.Body

cs.bufPipe = http2pipe{b: &http2dataBuffer{expected: res.ContentLength}}
cs.bytesRemain = res.ContentLength
res.Body = http2transportResponseBody{cs}
go cs.awaitRequestCancel(cs.req)

if cs.requestedGzip && res.Header.Get("Content-Encoding") == "gzip" {res.Header.Del("Content-Encoding")
  res.Header.Del("Content-Length")
  res.ContentLength = -1
  res.Body = &http2gzipReader{body: res.Body}
  res.Uncompressed = true
}

依据 Content-Encoding 的编码方式,会构建两种不同的 Body:

  1. 非 gzip 编码时,结构的 res.Body 类型为http2transportResponseBody
  2. gzip 编码时,结构的 res.Body 类型为http2gzipReader

收到 http2DataFrame 数据帧

收到此数据帧意味着咱们开始接管实在的响应,即平时开发中须要解决的业务数据。此数据帧对应的处理函数为(*http2clientConnReadLoop).processData

因为 server 无奈及时晓得数据流在 client 端的状态,所以 server 可能会向 client 中一个曾经不存在的数据流发送数据:

cc := rl.cc
cs := cc.streamByID(f.StreamID, f.StreamEnded())
data := f.Data()
if cs == nil {cc.mu.Lock()
  neverSent := cc.nextStreamID
  cc.mu.Unlock()
 // 此处省略代码
  if f.Length > 0 {cc.mu.Lock()
    cc.inflow.add(int32(f.Length))
    cc.mu.Unlock()

    cc.wmu.Lock()
    cc.fr.WriteWindowUpdate(0, uint32(f.Length))
    cc.bw.Flush()
    cc.wmu.Unlock()}
  return nil
}

接管到的数据帧在 client 没有对应的数据流解决时,通过流控制器为以后连贯可读窗口大小减少 f.Length,并且通过http2FrameWindowUpdate 数据帧告知 server 将以后连贯的可写窗口大小减少f.Length

如果 client 有对应的数据流且 f.Length 大于 0:

1、如果是 head 申请完结以后数据流并返回。

if cs.req.Method == "HEAD" && len(data) > 0 {cc.logf("protocol error: received DATA on a HEAD request")
  rl.endStreamError(cs, http2StreamError{
    StreamID: f.StreamID,
    Code:     http2ErrCodeProtocol,
  })
  return nil
}

2、查看以后数据流是否解决 f.Length 长度的数据。

cc.mu.Lock()
if cs.inflow.available() >= int32(f.Length) {cs.inflow.take(int32(f.Length))
} else {cc.mu.Unlock()
  return http2ConnectionError(http2ErrCodeFlowControl)
}

由上可知以后数据流如果可能解决该数据,通过流控制器调用 cs.inflow.take 减小以后数据流可承受窗口大小。

3、以后数据流被重置或者被敞开即 cs.didReset 为 true 时又或者数据帧有填充数据时须要调整流控制窗口。

var refund int
if pad := int(f.Length) - len(data); pad > 0 {refund += pad}
// Return len(data) now if the stream is already closed,
// since data will never be read.
didReset := cs.didReset
if didReset {refund += len(data)
}
if refund > 0 {cc.inflow.add(int32(refund))
  cc.wmu.Lock()
  cc.fr.WriteWindowUpdate(0, uint32(refund))
  if !didReset {cs.inflow.add(int32(refund))
    cc.fr.WriteWindowUpdate(cs.ID, uint32(refund))
  }
  cc.bw.Flush()
  cc.wmu.Unlock()}
cc.mu.Unlock()
  • 如果数据帧有填充数据则计算须要返还的填充数据长度。
  • 如果数据流有效该数据帧的长度须要全副返还。

最初,依据计算的 refund 减少以后连贯或者以后数据流的可承受窗口大小,并且同时告知 server 减少以后连贯或者以后数据流的可写窗口大小。

4、数据长度大于 0 且数据流失常则将数据写入数据流缓冲区。

if len(data) > 0 && !didReset {if _, err := cs.bufPipe.Write(data); err != nil {rl.endStreamError(cs, err)
    return err
  }
}

回顾 :后面的(*http2clientConnReadLoop).handleResponse 办法中有这样一行代码res.Body = http2transportResponseBody{cs},所以在业务开发时可能通过 Response 读取到数据流中的缓冲数据。

(http2transportResponseBody).Read

在后面的内容里,如果数据流状态失常且数据帧没有填充数据则数据流和连贯的可接管窗口会始终变小,而这部分内容就是减少数据流的可承受窗口大小。

因为篇幅和宗旨的问题笔者仅剖析形容该办法内和流控制无关的局部。

1、读取响应数据后计算以后连贯须要减少的可承受窗口大小。

cc.mu.Lock()
defer cc.mu.Unlock()
var connAdd, streamAdd int32
// Check the conn-level first, before the stream-level.
if v := cc.inflow.available(); v < http2transportDefaultConnFlow/2 {
  connAdd = http2transportDefaultConnFlow - v
  cc.inflow.add(connAdd)
}

如果以后连贯可承受窗口的大小曾经小于http2transportDefaultConnFlow(1G)的一半,则以后连贯可接管窗口大小须要减少http2transportDefaultConnFlow - cc.inflow.available()

回顾 http2transportDefaultConnFlow 在前篇 (*http2Transport).NewClientConn 办法局部有提到,且连贯刚建设时会通过 http2WindowUpdateFrame 数据帧告知 server 以后连贯可发送窗口大小减少http2transportDefaultConnFlow

2、读取响应数据后计算以后数据流须要减少的可承受窗口大小。

if err == nil { // No need to refresh if the stream is over or failed.
  // Consider any buffered body data (read from the conn but not
  // consumed by the client) when computing flow control for this
  // stream.
  v := int(cs.inflow.available()) + cs.bufPipe.Len()
  if v < http2transportDefaultStreamFlow-http2transportDefaultStreamMinRefresh {streamAdd = int32(http2transportDefaultStreamFlow - v)
    cs.inflow.add(streamAdd)
  }
}

如果以后数据流可承受窗口大小加上以后数据流缓冲区残余未读数据的长度小于http2transportDefaultStreamFlow-http2transportDefaultStreamMinRefresh(4M-4KB),则以后数据流可承受窗口大小须要减少http2transportDefaultStreamFlow - v

回顾 http2transportDefaultStreamFlow 在前篇 (*http2Transport).NewClientConn 办法和 (*http2ClientConn).newStream 办法中均有提到。

连贯刚建设时,发送 http2FrameSettings 数据帧,告知 server 每个数据流的可发送窗口大小为http2transportDefaultStreamFlow

newStream 时,数据流默认的可接管窗口大小为http2transportDefaultStreamFlow

3、将连贯和数据流别离须要减少的窗口大小通过 http2WindowUpdateFrame 数据帧告知 server。

if connAdd != 0 || streamAdd != 0 {cc.wmu.Lock()
  defer cc.wmu.Unlock()
  if connAdd != 0 {cc.fr.WriteWindowUpdate(0, http2mustUint31(connAdd))
  }
  if streamAdd != 0 {cc.fr.WriteWindowUpdate(cs.ID, http2mustUint31(streamAdd))
  }
  cc.bw.Flush()}

以上就是 server 向 client 发送数据的流控制逻辑。

(*http2clientStream).writeRequestBody

前篇中 (*http2ClientConn).roundTrip 未对 (*http2clientStream).writeRequestBody 进行剖析,上面咱们看看该办法的源码:

func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (err error) {
    cc := cs.cc
    sentEnd := false // whether we sent the final DATA frame w/ END_STREAM
  // 此处省略代码
    req := cs.req
    hasTrailers := req.Trailer != nil
    remainLen := http2actualContentLength(req)
    hasContentLen := remainLen != -1

    var sawEOF bool
    for !sawEOF {n, err := body.Read(buf[:len(buf)-1])
    // 此处省略代码
        remain := buf[:n]
        for len(remain) > 0 && err == nil {
            var allowed int32
            allowed, err = cs.awaitFlowControl(len(remain))
            switch {
            case err == http2errStopReqBodyWrite:
                return err
            case err == http2errStopReqBodyWriteAndCancel:
                cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
                return err
            case err != nil:
                return err
            }
            cc.wmu.Lock()
            data := remain[:allowed]
            remain = remain[allowed:]
            sentEnd = sawEOF && len(remain) == 0 && !hasTrailers
            err = cc.fr.WriteData(cs.ID, sentEnd, data)
            if err == nil {err = cc.bw.Flush()
            }
            cc.wmu.Unlock()}
        if err != nil {return err}
    }
  // 此处省略代码
    return err
}

下面的逻辑可简略总结为:不停的读取申请 body 而后将读取的内容通过 cc.fr.WriteData 转为 http2FrameData 数据帧发送给 server,直到申请 body 读完为止。其中和流控制无关的办法是awaitFlowControl,上面咱们对该办法进行剖析。

(*http2clientStream).awaitFlowControl

此办法的次要作用是期待以后数据流可写窗口有容量可能写入数据。

func (cs *http2clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
    cc := cs.cc
    cc.mu.Lock()
    defer cc.mu.Unlock()
    for {
        if cc.closed {return 0, http2errClientConnClosed}
        if cs.stopReqBody != nil {return 0, cs.stopReqBody}
        if err := cs.checkResetOrDone(); err != nil {return 0, err}
        if a := cs.flow.available(); a > 0 {
            take := a
            if int(take) > maxBytes {take = int32(maxBytes) // can't truncate int; take is int32
            }
            if take > int32(cc.maxFrameSize) {take = int32(cc.maxFrameSize)
            }
            cs.flow.take(take)
            return take, nil
        }
        cc.cond.Wait()}
}

依据源码能够晓得,数据流被敞开或者进行发送申请 body,则以后数据流无奈写入数据。当数据流状态失常时,又分为两种状况:

  1. 以后数据流可写窗口残余可写数据大于 0,则计算可写字节数,并将以后数据流可写窗口大小耗费take
  2. 以后数据流可写窗口残余可写数据小于等于 0,则会始终期待直到被唤醒并进入下一次查看。

下面的第二种状况在 收到 http2WindowUpdateFrame 数据帧 这一节中提到过。

server 读取以后数据流的数据后会向 client 对应数据流发送 http2WindowUpdateFrame 数据帧,client 收到该数据帧后会增大对应数据流可写窗口,并执行 cc.cond.Broadcast() 唤醒因发送数据已达流控制下限而期待的数据流持续发送数据。

以上就是 client 向 server 发送数据的流控制逻辑。

总结

  1. 帧头长度为 9 个字节,并蕴含四个局部:Payload 的长度、帧类型、帧标识符和数据流 ID。
  2. 流控制可分为两个步骤:

    • 初始时,通过 http2FrameSettings 数据帧和 http2WindowUpdateFrame 数据帧告知对方以后连贯读写窗口大小以及连贯中数据流读写窗口大小。
    • 在读写数据过程中,通过发送 http2WindowUpdateFrame 数据帧管制另一端的写窗口大小。

预报

前篇和中篇曾经实现,下一期将对 http2.0 标头压缩 进行剖析。

最初,衷心希望本文可能对各位读者有肯定的帮忙。

:写本文时,笔者所用 go 版本为: go1.14.2

正文完
 0