共计 15497 个字符,预计需要花费 39 分钟才能阅读完成。
来自公众号:新世界杂货铺
浏览倡议
这是 HTTP2.0 系列的第二篇,所以笔者举荐浏览程序如下:
- Go 中的 HTTP 申请之——HTTP1.1 申请流程剖析
- Go 发动 HTTP2.0 申请流程剖析(前篇)
本篇次要分为三个局部:数据帧,流控制器以及通过剖析源码逐渐理解流控制。
本无意将这三个局部拆成三篇文章,但它们之间又有分割,所以最初仍旧决定放在一篇文章外面。因为内容较多,笔者认为分三次别离浏览三个局部较佳。
数据帧
HTTP2 通信的最小单位是数据帧,每一个帧都蕴含两局部:帧头 和Payload。不同数据流的帧能够交织发送(同一个数据流的帧必须程序发送),而后再依据每个帧头的数据流标识符从新组装。
因为 Payload 中为无效数据,故仅对帧头进行剖析形容。
帧头
帧头总长度为 9 个字节,并蕴含四个局部,别离是:
- Payload 的长度,占用三个字节。
- 数据帧类型,占用一个字节。
- 数据帧标识符,占用一个字节。
- 数据流 ID,占用四个字节。
用图示意如下:
数据帧的格局和各局部的含意曾经分明了,那么咱们看看代码中怎么读取一个帧头:
func http2readFrameHeader(buf []byte, r io.Reader) (http2FrameHeader, error) {_, err := io.ReadFull(r, buf[:http2frameHeaderLen])
if err != nil {return http2FrameHeader{}, err
}
return http2FrameHeader{Length: (uint32(buf[0])<<16 | uint32(buf[1])<<8 | uint32(buf[2])),
Type: http2FrameType(buf[3]),
Flags: http2Flags(buf[4]),
StreamID: binary.BigEndian.Uint32(buf[5:]) & (1<<31 - 1),
valid: true,
}, nil
}
在下面的代码中 http2frameHeaderLen
是一个常量,其值为 9。
从 io.Reader 中读取 9 个字节后,将前三个字节和后四个字节均转为 uint32
的类型,从而失去 Payload 长度和数据流 ID。另外须要了解的是帧头的前三个字节和后四个字节存储格局为大端(大小端笔者就不在这里解释了,请尚不理解的读者自行百度)。
数据帧类型
依据 http://http2.github.io/http2-…,数据帧类型总共有 10 个。在 go 源码中均有体现:
const (
http2FrameData http2FrameType = 0x0
http2FrameHeaders http2FrameType = 0x1
http2FramePriority http2FrameType = 0x2
http2FrameRSTStream http2FrameType = 0x3
http2FrameSettings http2FrameType = 0x4
http2FramePushPromise http2FrameType = 0x5
http2FramePing http2FrameType = 0x6
http2FrameGoAway http2FrameType = 0x7
http2FrameWindowUpdate http2FrameType = 0x8
http2FrameContinuation http2FrameType = 0x9
)
http2FrameData
:次要用于发送申请 body 和接管响应的数据帧。
http2FrameHeaders
:次要用于发送申请 header 和接管响应 header 的数据帧。
http2FrameSettings
:次要用于 client 和 server 交换设置相干的数据帧。
http2FrameWindowUpdate
:次要用于流控制的数据帧。
其余数据帧类型因为本文不波及,故不做形容。
数据帧标识符
因为数据帧标识符品种较多,笔者在这里仅介绍其中局部标识符,先看源码:
const (
// Data Frame
http2FlagDataEndStream http2Flags = 0x1
// Headers Frame
http2FlagHeadersEndStream http2Flags = 0x1
// Settings Frame
http2FlagSettingsAck http2Flags = 0x1
// 此处省略定义其余数据帧标识符的代码
)
http2FlagDataEndStream
:在前篇中提到,调用 (*http2ClientConn).newStream
办法会创立一个数据流,那这个数据流什么时候完结呢,这就是 http2FlagDataEndStream
的作用。
当 client 收到有响应 body 的响应时(HEAD 申请无响应 body,301,302 等响应也无响应 body),始终读到 http2FrameData
数据帧的标识符为 http2FlagDataEndStream
则意味着本次申请完结能够敞开以后数据流。
http2FlagHeadersEndStream
:如果读到的 http2FrameHeaders
数据帧有此标识符也意味着本次申请完结。
http2FlagSettingsAck
:该标示符意味着对方确认收到 http2FrameSettings
数据帧。
流控制器
流控制是一种阻止发送方向接管方发送大量数据的机制,免得超出后者的需要或解决能力。Go 中 HTTP2 通过 http2flow
构造体进行流控制:
type http2flow struct {
// n is the number of DATA bytes we're allowed to send.
// A flow is kept both on a conn and a per-stream.
n int32
// conn points to the shared connection-level flow that is
// shared by all streams on that conn. It is nil for the flow
// that's on the conn directly.
conn *http2flow
}
字段含意英文正文曾经形容的很分明了,所以笔者不再翻译。上面看一下和流控制无关的办法。
(*http2flow).available
此办法返回以后流控制可发送的最大字节数:
func (f *http2flow) available() int32 {
n := f.n
if f.conn != nil && f.conn.n < n {n = f.conn.n}
return n
}
- 如果
f.conn
为 nil 则意味着此控制器的管制级别为连贯,那么可发送的最大字节数就是f.n
。 - 如果
f.conn
不为 nil 则意味着此控制器的管制级别为数据流,且以后数据流可发送的最大字节数不能超过以后连贯可发送的最大字节数。
(*http2flow).take
此办法用于耗费以后流控制器的可发送字节数:
func (f *http2flow) take(n int32) {if n > f.available() {panic("internal error: took too much")
}
f.n -= n
if f.conn != nil {f.conn.n -= n}
}
通过理论须要传递一个参数,告知以后流控制器想要发送的数据大小。如果发送的大小超过流控制器容许的大小,则panic
,如果未超过流控制器容许的大小,则将以后数据流和以后连贯的可发送字节数-n
。
(*http2flow).add
有耗费就有新增,此办法用于减少流控制器可发送的最大字节数:
func (f *http2flow) add(n int32) bool {
sum := f.n + n
if (sum > n) == (f.n > 0) {
f.n = sum
return true
}
return false
}
下面的代码惟一须要留神的中央是,当 sum 超过 int32 负数最大值 (2^31-1) 时会返回 false。
回顾 :在前篇中提到的(*http2Transport).NewClientConn
办法和 (*http2ClientConn).newStream
办法均通过 (*http2flow).add
初始化可发送数据窗口大小。
有了帧和流控制器的基本概念,上面咱们联合源码来剖析总结流控制的具体实现。
(*http2ClientConn).readLoop
前篇剖析 (*http2Transport).newClientConn
时止步于读循环,那么明天咱们就从 (*http2ClientConn).readLoop
开始。
func (cc *http2ClientConn) readLoop() {rl := &http2clientConnReadLoop{cc: cc}
defer rl.cleanup()
cc.readerErr = rl.run()
if ce, ok := cc.readerErr.(http2ConnectionError); ok {cc.wmu.Lock()
cc.fr.WriteGoAway(0, http2ErrCode(ce), nil)
cc.wmu.Unlock()}
}
由上可知,readLoop 的逻辑比较简单,其外围逻辑在 (*http2clientConnReadLoop).run
办法里。
func (rl *http2clientConnReadLoop) run() error {
cc := rl.cc
rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse
gotReply := false // ever saw a HEADERS reply
gotSettings := false
for {f, err := cc.fr.ReadFrame()
// 此处省略代码
maybeIdle := false // whether frame might transition us to idle
switch f := f.(type) {
case *http2MetaHeadersFrame:
err = rl.processHeaders(f)
maybeIdle = true
gotReply = true
case *http2DataFrame:
err = rl.processData(f)
maybeIdle = true
case *http2GoAwayFrame:
err = rl.processGoAway(f)
maybeIdle = true
case *http2RSTStreamFrame:
err = rl.processResetStream(f)
maybeIdle = true
case *http2SettingsFrame:
err = rl.processSettings(f)
case *http2PushPromiseFrame:
err = rl.processPushPromise(f)
case *http2WindowUpdateFrame:
err = rl.processWindowUpdate(f)
case *http2PingFrame:
err = rl.processPing(f)
default:
cc.logf("Transport: unhandled response frame type %T", f)
}
if err != nil {
if http2VerboseLogs {cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, http2summarizeFrame(f), err)
}
return err
}
if rl.closeWhenIdle && gotReply && maybeIdle {cc.closeIfIdle()
}
}
}
由上可知,(*http2clientConnReadLoop).run
的外围逻辑是读取数据帧而后对不同的数据帧进行不同的解决。
cc.fr.ReadFrame()
会依据后面介绍的数据帧格局读出数据帧。
前篇中提到应用了一个反对 h2 协定的图片进行剖析,本篇持续复用该图片对 (*http2clientConnReadLoop).run
办法进行 debug。
收到 http2FrameSettings 数据帧
读循环会最先读到 http2FrameSettings
数据帧。读到该数据帧后会调用 (*http2clientConnReadLoop).processSettings
办法。(*http2clientConnReadLoop).processSettings
次要蕴含 3 个逻辑。
1、判断是否是 http2FrameSettings
的 ack 信息,如果是间接返回,否则持续前面的步骤。
if f.IsAck() {
if cc.wantSettingsAck {
cc.wantSettingsAck = false
return nil
}
return http2ConnectionError(http2ErrCodeProtocol)
}
2、解决不同 http2FrameSettings
的数据帧,并依据 server 传递的信息,批改 maxConcurrentStreams
等的值。
err := f.ForeachSetting(func(s http2Setting) error {
switch s.ID {
case http2SettingMaxFrameSize:
cc.maxFrameSize = s.Val
case http2SettingMaxConcurrentStreams:
cc.maxConcurrentStreams = s.Val
case http2SettingMaxHeaderListSize:
cc.peerMaxHeaderListSize = uint64(s.Val)
case http2SettingInitialWindowSize:
if s.Val > math.MaxInt32 {return http2ConnectionError(http2ErrCodeFlowControl)
}
delta := int32(s.Val) - int32(cc.initialWindowSize)
for _, cs := range cc.streams {cs.flow.add(delta)
}
cc.cond.Broadcast()
cc.initialWindowSize = s.Val
default:
// TODO(bradfitz): handle more settings? SETTINGS_HEADER_TABLE_SIZE probably.
cc.vlogf("Unhandled Setting: %v", s)
}
return nil
})
当收到 ID 为 http2SettingInitialWindowSize
的帧时,会调整以后连贯中所有数据流的可发送数据窗口大小,并批改以后连贯的initialWindowSize
(每个新创建的数据流均会应用该值初始化可发送数据窗口大小)为s.Val
。
3、发送 http2FrameSettings
的 ack 信息给 server。
cc.wmu.Lock()
defer cc.wmu.Unlock()
cc.fr.WriteSettingsAck()
cc.bw.Flush()
return cc.werr
收到 http2WindowUpdateFrame 数据帧
在笔者 debug 的过程中,解决完 http2FrameSettings
数据帧后,紧接着就收到了 http2WindowUpdateFrame
数据帧。收到该数据帧后会调用 (*http2clientConnReadLoop).processWindowUpdate
办法:
func (rl *http2clientConnReadLoop) processWindowUpdate(f *http2WindowUpdateFrame) error {
cc := rl.cc
cs := cc.streamByID(f.StreamID, false)
if f.StreamID != 0 && cs == nil {return nil}
cc.mu.Lock()
defer cc.mu.Unlock()
fl := &cc.flow
if cs != nil {fl = &cs.flow}
if !fl.add(int32(f.Increment)) {return http2ConnectionError(http2ErrCodeFlowControl)
}
cc.cond.Broadcast()
return nil
}
下面的逻辑次要用于更新以后连贯和数据流的可发送数据窗口大小。如果 http2WindowUpdateFrame 帧中的 StreamID 为 0,则更新以后连贯的可发送数据窗口大小,否则更新对应数据流可发送数据窗口大小。
留神 :在 debug 的过程,收到http2WindowUpdateFrame
数据帧后,又收到一次http2FrameSettings
,且该数据帧标识符为http2FlagSettingsAck
。
笔者在这里特意揭示,这是因为前篇中提到的(*http2Transport).NewClientConn 办法,也向 server 发送了 http2FrameSettings 数据帧和 http2WindowUpdateFrame 数据帧。
另外,在解决 http2FrameSettings
和http2WindowUpdateFrame
过程中,均呈现了 cc.cond.Broadcast()
调用,该调用次要用于唤醒因为以下两种状况而 Wait
的申请:
- 因以后连贯解决的数据流曾经达到
maxConcurrentStreams
的下限(详见前篇中(*http2ClientConn).awaitOpenSlotForRequest
办法剖析)。 - 因发送数据流已达可发送数据窗口下限而期待可发送数据窗口更新的申请(后续会介绍)。
收到 http2MetaHeadersFrame 数据帧
收到此数据帧意味着某一个申请曾经开始接管响应数据。此数据帧对应的处理函数为(*http2clientConnReadLoop).processHeaders
:
func (rl *http2clientConnReadLoop) processHeaders(f *http2MetaHeadersFrame) error {
cc := rl.cc
cs := cc.streamByID(f.StreamID, false)
// 此处省略代码
res, err := rl.handleResponse(cs, f)
if err != nil {
// 此处省略代码
cs.resc <- http2resAndError{err: err}
return nil // return nil from process* funcs to keep conn alive
}
if res == nil {// (nil, nil) special case. See handleResponse docs.
return nil
}
cs.resTrailer = &res.Trailer
cs.resc <- http2resAndError{res: res}
return nil
}
首先咱们先看 cs.resc <- http2resAndError{res: res}
这一行代码,向数据流写入 http2resAndError
即本次申请的响应。在 (*http2ClientConn).roundTrip
办法中有这样一行代码readLoopResCh := cs.resc
。
回顾 :前篇(*http2ClientConn).roundTrip
办法的第 7 点和本局部关联起来就能够造成一个残缺的申请链。
接下来咱们对 rl.handleResponse
办法开展剖析。
(*http2clientConnReadLoop).handleResponse
(*http2clientConnReadLoop).handleResponse
的次要作用是构建一个 Response
变量,上面对该函数的关键步骤进行形容。
1、构建一个 Response
变量。
header := make(Header)
res := &Response{
Proto: "HTTP/2.0",
ProtoMajor: 2,
Header: header,
StatusCode: statusCode,
Status: status + " " + StatusText(statusCode),
}
2、构建 header(本篇不对 header 进行开展剖析)。
for _, hf := range f.RegularFields() {key := CanonicalHeaderKey(hf.Name)
if key == "Trailer" {
t := res.Trailer
if t == nil {t = make(Header)
res.Trailer = t
}
http2foreachHeaderElement(hf.Value, func(v string) {t[CanonicalHeaderKey(v)] = nil
})
} else {header[key] = append(header[key], hf.Value)
}
}
3、解决响应 body 的 ContentLength。
streamEnded := f.StreamEnded()
isHead := cs.req.Method == "HEAD"
if !streamEnded || isHead {
res.ContentLength = -1
if clens := res.Header["Content-Length"]; len(clens) == 1 {if clen64, err := strconv.ParseInt(clens[0], 10, 64); err == nil {res.ContentLength = clen64} else {
// TODO: care? unlike http/1, it won't mess up our framing, so it's
// more safe smuggling-wise to ignore.
}
} else if len(clens) > 1 {
// TODO: care? unlike http/1, it won't mess up our framing, so it's
// more safe smuggling-wise to ignore.
}
}
由上可知,以后数据流没有完结或者是 HEAD 申请才读取 ContentLength。如果 header 中的 ContentLength 不非法则 res.ContentLength 的值为 -1。
4、构建res.Body
。
cs.bufPipe = http2pipe{b: &http2dataBuffer{expected: res.ContentLength}}
cs.bytesRemain = res.ContentLength
res.Body = http2transportResponseBody{cs}
go cs.awaitRequestCancel(cs.req)
if cs.requestedGzip && res.Header.Get("Content-Encoding") == "gzip" {res.Header.Del("Content-Encoding")
res.Header.Del("Content-Length")
res.ContentLength = -1
res.Body = &http2gzipReader{body: res.Body}
res.Uncompressed = true
}
依据 Content-Encoding
的编码方式,会构建两种不同的 Body:
- 非 gzip 编码时,结构的 res.Body 类型为
http2transportResponseBody
。 - gzip 编码时,结构的 res.Body 类型为
http2gzipReader
。
收到 http2DataFrame 数据帧
收到此数据帧意味着咱们开始接管实在的响应,即平时开发中须要解决的业务数据。此数据帧对应的处理函数为(*http2clientConnReadLoop).processData
。
因为 server 无奈及时晓得数据流在 client 端的状态,所以 server 可能会向 client 中一个曾经不存在的数据流发送数据:
cc := rl.cc
cs := cc.streamByID(f.StreamID, f.StreamEnded())
data := f.Data()
if cs == nil {cc.mu.Lock()
neverSent := cc.nextStreamID
cc.mu.Unlock()
// 此处省略代码
if f.Length > 0 {cc.mu.Lock()
cc.inflow.add(int32(f.Length))
cc.mu.Unlock()
cc.wmu.Lock()
cc.fr.WriteWindowUpdate(0, uint32(f.Length))
cc.bw.Flush()
cc.wmu.Unlock()}
return nil
}
接管到的数据帧在 client 没有对应的数据流解决时,通过流控制器为以后连贯可读窗口大小减少 f.Length
,并且通过http2FrameWindowUpdate
数据帧告知 server 将以后连贯的可写窗口大小减少f.Length
。
如果 client 有对应的数据流且 f.Length
大于 0:
1、如果是 head 申请完结以后数据流并返回。
if cs.req.Method == "HEAD" && len(data) > 0 {cc.logf("protocol error: received DATA on a HEAD request")
rl.endStreamError(cs, http2StreamError{
StreamID: f.StreamID,
Code: http2ErrCodeProtocol,
})
return nil
}
2、查看以后数据流是否解决 f.Length
长度的数据。
cc.mu.Lock()
if cs.inflow.available() >= int32(f.Length) {cs.inflow.take(int32(f.Length))
} else {cc.mu.Unlock()
return http2ConnectionError(http2ErrCodeFlowControl)
}
由上可知以后数据流如果可能解决该数据,通过流控制器调用 cs.inflow.take
减小以后数据流可承受窗口大小。
3、以后数据流被重置或者被敞开即 cs.didReset
为 true 时又或者数据帧有填充数据时须要调整流控制窗口。
var refund int
if pad := int(f.Length) - len(data); pad > 0 {refund += pad}
// Return len(data) now if the stream is already closed,
// since data will never be read.
didReset := cs.didReset
if didReset {refund += len(data)
}
if refund > 0 {cc.inflow.add(int32(refund))
cc.wmu.Lock()
cc.fr.WriteWindowUpdate(0, uint32(refund))
if !didReset {cs.inflow.add(int32(refund))
cc.fr.WriteWindowUpdate(cs.ID, uint32(refund))
}
cc.bw.Flush()
cc.wmu.Unlock()}
cc.mu.Unlock()
- 如果数据帧有填充数据则计算须要返还的填充数据长度。
- 如果数据流有效该数据帧的长度须要全副返还。
最初,依据计算的 refund 减少以后连贯或者以后数据流的可承受窗口大小,并且同时告知 server 减少以后连贯或者以后数据流的可写窗口大小。
4、数据长度大于 0 且数据流失常则将数据写入数据流缓冲区。
if len(data) > 0 && !didReset {if _, err := cs.bufPipe.Write(data); err != nil {rl.endStreamError(cs, err)
return err
}
}
回顾 :后面的(*http2clientConnReadLoop).handleResponse
办法中有这样一行代码res.Body = http2transportResponseBody{cs}
,所以在业务开发时可能通过 Response 读取到数据流中的缓冲数据。
(http2transportResponseBody).Read
在后面的内容里,如果数据流状态失常且数据帧没有填充数据则数据流和连贯的可接管窗口会始终变小,而这部分内容就是减少数据流的可承受窗口大小。
因为篇幅和宗旨的问题笔者仅剖析形容该办法内和流控制无关的局部。
1、读取响应数据后计算以后连贯须要减少的可承受窗口大小。
cc.mu.Lock()
defer cc.mu.Unlock()
var connAdd, streamAdd int32
// Check the conn-level first, before the stream-level.
if v := cc.inflow.available(); v < http2transportDefaultConnFlow/2 {
connAdd = http2transportDefaultConnFlow - v
cc.inflow.add(connAdd)
}
如果以后连贯可承受窗口的大小曾经小于http2transportDefaultConnFlow
(1G)的一半,则以后连贯可接管窗口大小须要减少http2transportDefaultConnFlow - cc.inflow.available()
。
回顾 :http2transportDefaultConnFlow
在前篇 (*http2Transport).NewClientConn
办法局部有提到,且连贯刚建设时会通过 http2WindowUpdateFrame
数据帧告知 server 以后连贯可发送窗口大小减少http2transportDefaultConnFlow
。
2、读取响应数据后计算以后数据流须要减少的可承受窗口大小。
if err == nil { // No need to refresh if the stream is over or failed.
// Consider any buffered body data (read from the conn but not
// consumed by the client) when computing flow control for this
// stream.
v := int(cs.inflow.available()) + cs.bufPipe.Len()
if v < http2transportDefaultStreamFlow-http2transportDefaultStreamMinRefresh {streamAdd = int32(http2transportDefaultStreamFlow - v)
cs.inflow.add(streamAdd)
}
}
如果以后数据流可承受窗口大小加上以后数据流缓冲区残余未读数据的长度小于http2transportDefaultStreamFlow-http2transportDefaultStreamMinRefresh
(4M-4KB),则以后数据流可承受窗口大小须要减少http2transportDefaultStreamFlow - v
。
回顾 :http2transportDefaultStreamFlow
在前篇 (*http2Transport).NewClientConn
办法和 (*http2ClientConn).newStream
办法中均有提到。
连贯刚建设时,发送 http2FrameSettings
数据帧,告知 server 每个数据流的可发送窗口大小为http2transportDefaultStreamFlow
。
在 newStream
时,数据流默认的可接管窗口大小为http2transportDefaultStreamFlow
。
3、将连贯和数据流别离须要减少的窗口大小通过 http2WindowUpdateFrame
数据帧告知 server。
if connAdd != 0 || streamAdd != 0 {cc.wmu.Lock()
defer cc.wmu.Unlock()
if connAdd != 0 {cc.fr.WriteWindowUpdate(0, http2mustUint31(connAdd))
}
if streamAdd != 0 {cc.fr.WriteWindowUpdate(cs.ID, http2mustUint31(streamAdd))
}
cc.bw.Flush()}
以上就是 server 向 client 发送数据的流控制逻辑。
(*http2clientStream).writeRequestBody
前篇中 (*http2ClientConn).roundTrip
未对 (*http2clientStream).writeRequestBody
进行剖析,上面咱们看看该办法的源码:
func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (err error) {
cc := cs.cc
sentEnd := false // whether we sent the final DATA frame w/ END_STREAM
// 此处省略代码
req := cs.req
hasTrailers := req.Trailer != nil
remainLen := http2actualContentLength(req)
hasContentLen := remainLen != -1
var sawEOF bool
for !sawEOF {n, err := body.Read(buf[:len(buf)-1])
// 此处省略代码
remain := buf[:n]
for len(remain) > 0 && err == nil {
var allowed int32
allowed, err = cs.awaitFlowControl(len(remain))
switch {
case err == http2errStopReqBodyWrite:
return err
case err == http2errStopReqBodyWriteAndCancel:
cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
return err
case err != nil:
return err
}
cc.wmu.Lock()
data := remain[:allowed]
remain = remain[allowed:]
sentEnd = sawEOF && len(remain) == 0 && !hasTrailers
err = cc.fr.WriteData(cs.ID, sentEnd, data)
if err == nil {err = cc.bw.Flush()
}
cc.wmu.Unlock()}
if err != nil {return err}
}
// 此处省略代码
return err
}
下面的逻辑可简略总结为:不停的读取申请 body 而后将读取的内容通过 cc.fr.WriteData
转为 http2FrameData
数据帧发送给 server,直到申请 body 读完为止。其中和流控制无关的办法是awaitFlowControl
,上面咱们对该办法进行剖析。
(*http2clientStream).awaitFlowControl
此办法的次要作用是期待以后数据流可写窗口有容量可能写入数据。
func (cs *http2clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
cc := cs.cc
cc.mu.Lock()
defer cc.mu.Unlock()
for {
if cc.closed {return 0, http2errClientConnClosed}
if cs.stopReqBody != nil {return 0, cs.stopReqBody}
if err := cs.checkResetOrDone(); err != nil {return 0, err}
if a := cs.flow.available(); a > 0 {
take := a
if int(take) > maxBytes {take = int32(maxBytes) // can't truncate int; take is int32
}
if take > int32(cc.maxFrameSize) {take = int32(cc.maxFrameSize)
}
cs.flow.take(take)
return take, nil
}
cc.cond.Wait()}
}
依据源码能够晓得,数据流被敞开或者进行发送申请 body,则以后数据流无奈写入数据。当数据流状态失常时,又分为两种状况:
- 以后数据流可写窗口残余可写数据大于 0,则计算可写字节数,并将以后数据流可写窗口大小耗费
take
。 - 以后数据流可写窗口残余可写数据小于等于 0,则会始终期待直到被唤醒并进入下一次查看。
下面的第二种状况在 收到 http2WindowUpdateFrame 数据帧 这一节中提到过。
server 读取以后数据流的数据后会向 client 对应数据流发送 http2WindowUpdateFrame
数据帧,client 收到该数据帧后会增大对应数据流可写窗口,并执行 cc.cond.Broadcast()
唤醒因发送数据已达流控制下限而期待的数据流持续发送数据。
以上就是 client 向 server 发送数据的流控制逻辑。
总结
- 帧头长度为 9 个字节,并蕴含四个局部:Payload 的长度、帧类型、帧标识符和数据流 ID。
-
流控制可分为两个步骤:
- 初始时,通过
http2FrameSettings
数据帧和http2WindowUpdateFrame
数据帧告知对方以后连贯读写窗口大小以及连贯中数据流读写窗口大小。 - 在读写数据过程中,通过发送
http2WindowUpdateFrame
数据帧管制另一端的写窗口大小。
- 初始时,通过
预报
前篇和中篇曾经实现,下一期将对 http2.0 标头压缩 进行剖析。
最初,衷心希望本文可能对各位读者有肯定的帮忙。
注:写本文时,笔者所用 go 版本为: go1.14.2