来自公众号:新世界杂货铺

前言

继Go中的HTTP申请之——HTTP1.1申请流程剖析之后,两头断断续续,历时近一月,终于才敢开始码字写下本文。

浏览倡议

HTTP2.0在建设TCP连贯和平安的TLS传输通道与HTTP1.1的流程基本一致。所以笔者倡议没有看过Go中的HTTP申请之——HTTP1.1申请流程剖析这篇文章的先去补一下课,本文会基于前一篇文章仅介绍和HTTP2.0相干的逻辑。

(*Transport).roundTrip

(*Transport).roundTrip办法会调用t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)初始化TLSClientConfig以及h2transport,而这两者都和HTTP2.0有着严密的分割。

TLSClientConfig: 初始化client反对的http协定, 并在tls握手时告知server。

h2transport: 如果本次申请是http2,那么h2transport会接管连贯,申请和响应的解决逻辑。

上面看看源码:

func (t *Transport) onceSetNextProtoDefaults() {    // ...此处省略代码...    t2, err := http2configureTransport(t)    if err != nil {        log.Printf("Error enabling Transport HTTP/2 support: %v", err)        return    }    t.h2transport = t2    // ...此处省略代码...}func http2configureTransport(t1 *Transport) (*http2Transport, error) {    connPool := new(http2clientConnPool)    t2 := &http2Transport{        ConnPool: http2noDialClientConnPool{connPool},        t1:       t1,    }    connPool.t = t2    if err := http2registerHTTPSProtocol(t1, http2noDialH2RoundTripper{t2}); err != nil {        return nil, err    }    if t1.TLSClientConfig == nil {        t1.TLSClientConfig = new(tls.Config)    }    if !http2strSliceContains(t1.TLSClientConfig.NextProtos, "h2") {        t1.TLSClientConfig.NextProtos = append([]string{"h2"}, t1.TLSClientConfig.NextProtos...)    }    if !http2strSliceContains(t1.TLSClientConfig.NextProtos, "http/1.1") {        t1.TLSClientConfig.NextProtos = append(t1.TLSClientConfig.NextProtos, "http/1.1")    }    upgradeFn := func(authority string, c *tls.Conn) RoundTripper {        addr := http2authorityAddr("https", authority)        if used, err := connPool.addConnIfNeeded(addr, t2, c); err != nil {            go c.Close()            return http2erringRoundTripper{err}        } else if !used {            // Turns out we don't need this c.            // For example, two goroutines made requests to the same host            // at the same time, both kicking off TCP dials. (since protocol            // was unknown)            go c.Close()        }        return t2    }    if m := t1.TLSNextProto; len(m) == 0 {        t1.TLSNextProto = map[string]func(string, *tls.Conn) RoundTripper{            "h2": upgradeFn,        }    } else {        m["h2"] = upgradeFn    }    return t2, nil}

笔者将上述的源码简略拆解为以下几个步骤:

  1. 新建一个http2clientConnPool并复制给t2,当前http2的申请会优先从该连接池中获取连贯。
  2. 初始化TLSClientConfig,并将反对的h2http1.1协定增加到TLSClientConfig.NextProtos中。
  3. 定义一个h2upgradeFn存储到t1.TLSNextProto里。

鉴于前一篇文章对新建连贯前的步骤有了较为具体的介绍,所以这里间接看和server建设连贯的局部源码,即(*Transport).dialConn办法:

func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {    // ...此处省略代码...    if cm.scheme() == "https" && t.hasCustomTLSDialer() {        // ...此处省略代码...    } else {        conn, err := t.dial(ctx, "tcp", cm.addr())        if err != nil {            return nil, wrapErr(err)        }        pconn.conn = conn        if cm.scheme() == "https" {            var firstTLSHost string            if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {                return nil, wrapErr(err)            }            if err = pconn.addTLS(firstTLSHost, trace); err != nil {                return nil, wrapErr(err)            }        }    }    // Proxy setup.    // ...此处省略代码...    if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {        if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {            return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: next(cm.targetAddr, pconn.conn.(*tls.Conn))}, nil        }    }    // ...此处省略代码...}

笔者对上述的源码形容如下:

  1. 调用t.dial(ctx, "tcp", cm.addr())创立TCP连贯。
  2. 如果是https的申请, 则对申请建设平安的tls传输通道。
  3. 查看tls的握手状态,如果和server协商的NegotiatedProtocol协定不为空,且client的t.TLSNextProto有该协定,则返回alt不为空的长久连贯(HTTP1.1不会进入if条件里)。

笔者对上述的第三点进行开展。经笔者在本地debug验证,当client和server都反对http2时,s.NegotiatedProtocol的值为h2s.NegotiatedProtocolIsMutual的值为true

在下面剖析http2configureTransport函数时,咱们晓得TLSNextProto注册了一个key为h2的函数,所以调用next理论就是调用后面的upgradeFn函数。

upgradeFn会调用connPool.addConnIfNeeded向http2的连接池增加一个tls传输通道,并最终返回后面曾经创立好的t2http2Transport

func (p *http2clientConnPool) addConnIfNeeded(key string, t *http2Transport, c *tls.Conn) (used bool, err error) {    p.mu.Lock()    // ...此处省略代码...    // 次要用于判断是否有必要像连接池增加新的连贯    // 判断连接池中是否已有同host连贯,如果有且该链接可能解决新的申请则间接返回    call, dup := p.addConnCalls[key]    if !dup {        // ...此处省略代码...        call = &http2addConnCall{            p:    p,            done: make(chan struct{}),        }        p.addConnCalls[key] = call        go call.run(t, key, c)    }    p.mu.Unlock()    <-call.done    if call.err != nil {        return false, call.err    }    return !dup, nil}func (c *http2addConnCall) run(t *http2Transport, key string, tc *tls.Conn) {    cc, err := t.NewClientConn(tc)    p := c.p    p.mu.Lock()    if err != nil {        c.err = err    } else {        p.addConnLocked(key, cc)    }    delete(p.addConnCalls, key)    p.mu.Unlock()    close(c.done)}

剖析上述的源码咱们可能失去两点论断:

  1. 执行完upgradeFn之后,(*Transport).dialConn返回的长久化连贯中alt字段曾经不是nil了。
  2. t.NewClientConn(tc)新建进去的连贯会保留在http2的连接池即http2clientConnPool中,下一小结将对NewClientConn开展剖析。

最初咱们回到(*Transport).roundTrip办法并剖析其中的要害源码:

func (t *Transport) roundTrip(req *Request) (*Response, error) {    t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)    // ...此处省略代码...    for {        select {        case <-ctx.Done():            req.closeBody()            return nil, ctx.Err()        default:        }        // ...此处省略代码...        pconn, err := t.getConn(treq, cm)        if err != nil {            t.setReqCanceler(req, nil)            req.closeBody()            return nil, err        }        var resp *Response        if pconn.alt != nil {            // HTTP/2 path.            t.setReqCanceler(req, nil) // not cancelable with CancelRequest            resp, err = pconn.alt.RoundTrip(req)        } else {            resp, err = pconn.roundTrip(treq)        }        if err == nil {            return resp, nil        }        // ...此处省略代码...    }}

联合后面的剖析,pconn.alt在server和client都反对http2协定的状况下是不为nil的。所以,http2的申请会走pconn.alt.RoundTrip(req)分支,也就是说http2的申请流程就被http2Transport接管啦。

(*http2Transport).NewClientConn

(*http2Transport).NewClientConn外部会调用t.newClientConn(c, t.disableKeepAlives())

因为本节内容较多,所以笔者不再一次性贴出源码,而是按关键步骤剖析并分块儿贴出源码。

1、初始化一个http2ClientConn

cc := &http2ClientConn{    t:                     t,    tconn:                 c,    readerDone:            make(chan struct{}),    nextStreamID:          1,    maxFrameSize:          16 << 10,           // spec default    initialWindowSize:     65535,              // spec default    maxConcurrentStreams:  1000,               // "infinite", per spec. 1000 seems good enough.    peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.    streams:               make(map[uint32]*http2clientStream),    singleUse:             singleUse,    wantSettingsAck:       true,    pings:                 make(map[[8]byte]chan struct{}),}

下面的源码新建了一个默认的http2ClientConn。

initialWindowSize:初始化窗口大小为65535,这个值之后会初始化每一个数据流可发送的数据窗口大小。

maxConcurrentStreams:示意每个连贯上容许最多有多少个数据流同时传输数据。

streams:以后连贯上的数据流。

singleUse: 管制http2的连贯是否容许多个数据流共享,其值由t.disableKeepAlives()管制。

2、创立一个条件锁并且新建Writer&Reader。

cc.cond = sync.NewCond(&cc.mu)cc.flow.add(int32(http2initialWindowSize))cc.bw = bufio.NewWriter(http2stickyErrWriter{c, &cc.werr})cc.br = bufio.NewReader(c)

新建Writer&Reader没什么好说的,须要留神的是cc.flow.add(int32(http2initialWindowSize))

cc.flow.add将以后连贯的可写流控制窗口大小设置为http2initialWindowSize,即65535。

3、新建一个读写数据帧的Framer。

cc.fr = http2NewFramer(cc.bw, cc.br)cc.fr.ReadMetaHeaders = hpack.NewDecoder(http2initialHeaderTableSize, nil)cc.fr.MaxHeaderListSize = t.maxHeaderListSize()

4、向server发送开场白,并发送一些初始化数据帧。

initialSettings := []http2Setting{    {ID: http2SettingEnablePush, Val: 0},    {ID: http2SettingInitialWindowSize, Val: http2transportDefaultStreamFlow},}if max := t.maxHeaderListSize(); max != 0 {    initialSettings = append(initialSettings, http2Setting{ID: http2SettingMaxHeaderListSize, Val: max})}cc.bw.Write(http2clientPreface)cc.fr.WriteSettings(initialSettings...)cc.fr.WriteWindowUpdate(0, http2transportDefaultConnFlow)cc.inflow.add(http2transportDefaultConnFlow + http2initialWindowSize)cc.bw.Flush()

client向server发送的开场白内容如下:

const (    // client首先想server发送以PRI结尾的一串字符串。    http2ClientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n")var (    http2clientPreface = []byte(http2ClientPreface))

发送完开场白后,client向server发送SETTINGS数据帧。

http2SettingEnablePush: 告知server客户端是否开启push性能。

http2SettingInitialWindowSize:告知server客户端可承受的最大数据窗口是http2transportDefaultStreamFlow(4M)。

发送完SETTINGS数据帧后,发送WINDOW_UPDATE数据帧, 因为第一个参数为0即streamID为0,则是告知server此连贯可承受的最大数据窗口为http2transportDefaultConnFlow(1G)。

发送完WINDOW_UPDATE数据帧后,将client的可读流控制窗口大小设置为http2transportDefaultConnFlow + http2initialWindowSize

5、开启读循环并返回

go cc.readLoop()

(*http2Transport).RoundTrip

(*http2Transport).RoundTrip只是一个入口函数,它会调用(*http2Transport). RoundTripOpt办法。

(*http2Transport). RoundTripOpt有两个步骤比拟要害:

t.connPool().GetClientConn(req, addr): 在http2的连接池外面获取一个可用连贯,其中连接池的类型为http2noDialClientConnPool,参考http2configureTransport函数。

cc.roundTrip(req): 通过获取到的可用连贯发送申请并返回响应。

(http2noDialClientConnPool).GetClientConn

依据理论的debug后果(http2noDialClientConnPool).GetClientConn最终会调用(*http2clientConnPool).getClientConn(req *Request, addr string, dialOnMiss bool)

通过(http2noDialClientConnPool).GetClientConn获取连贯时传递给(*http2clientConnPool).getClientConn办法的第三个参数始终为false,该参数为false时代表着即便无奈失常获取可用连贯,也不在这个环节从新发动拨号流程。

在(*http2clientConnPool).getClientConn中会遍历同地址的连贯,并判断连贯的状态从而获取一个能够解决申请的连贯。

for _, cc := range p.conns[addr] {    if st := cc.idleState(); st.canTakeNewRequest {        if p.shouldTraceGetConn(st) {            http2traceGetConn(req, addr)        }        p.mu.Unlock()        return cc, nil    }}

cc.idleState()判断以后连接池中的连贯是否解决新的申请:

1、以后连贯是否能被多个申请共享,如果仅单个申请应用且曾经有一个数据流,则以后连贯不能解决新的申请。

if cc.singleUse && cc.nextStreamID > 1 {    return}

2、以下几点均为true时,才代表以后连贯可能解决新的申请:

  • 连贯状态失常,即未敞开并且不处于正在敞开的状态。
  • 以后连贯正在解决的数据流小于maxConcurrentStreams
  • 下一个要解决的数据流 + 以后连贯处于期待状态的申请*2 < math.MaxInt32。
  • 以后连贯没有长时间处于闲暇状态(次要通过cc.tooIdleLocked()判断)。
st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing && maxConcurrentOkay &&        int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 &&        !cc.tooIdleLocked()

当从链接池胜利获取到一个能够解决申请的连贯,就能够和server进行数据交互,即(*http2ClientConn).roundTrip流程。

(*http2ClientConn).roundTrip

1、在真正开始解决申请前,还要进行header查看,http2对http1.1的某些header是不反对的,笔者就不对这个逻辑进行剖析了,间接上源码:

func http2checkConnHeaders(req *Request) error {    if v := req.Header.Get("Upgrade"); v != "" {        return fmt.Errorf("http2: invalid Upgrade request header: %q", req.Header["Upgrade"])    }    if vv := req.Header["Transfer-Encoding"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && vv[0] != "chunked") {        return fmt.Errorf("http2: invalid Transfer-Encoding request header: %q", vv)    }    if vv := req.Header["Connection"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && !strings.EqualFold(vv[0], "close") && !strings.EqualFold(vv[0], "keep-alive")) {        return fmt.Errorf("http2: invalid Connection request header: %q", vv)    }    return nil}func http2commaSeparatedTrailers(req *Request) (string, error) {    keys := make([]string, 0, len(req.Trailer))    for k := range req.Trailer {        k = CanonicalHeaderKey(k)        switch k {        case "Transfer-Encoding", "Trailer", "Content-Length":            return "", &http2badStringError{"invalid Trailer key", k}        }        keys = append(keys, k)    }    if len(keys) > 0 {        sort.Strings(keys)        return strings.Join(keys, ","), nil    }    return "", nil}

2、调用(*http2ClientConn).awaitOpenSlotForRequest,始终等到以后连贯解决的数据流小于maxConcurrentStreams, 如果此函数返回谬误,则本次申请失败。

2.1、double check以后连贯可用。

if cc.closed || !cc.canTakeNewRequestLocked() {    if waitingForConn != nil {        close(waitingForConn)    }    return http2errClientConnUnusable}

2.2、如果以后连贯解决的数据流小于maxConcurrentStreams则间接返回nil。笔者置信大部分逻辑走到这儿就返回了。

if int64(len(cc.streams))+1 <= int64(cc.maxConcurrentStreams) {    if waitingForConn != nil {        close(waitingForConn)    }    return nil}

2.3、如果以后连贯解决的数据流的确曾经达到下限,则开始进入期待流程。

if waitingForConn == nil {    waitingForConn = make(chan struct{})    go func() {        if err := http2awaitRequestCancel(req, waitingForConn); err != nil {            cc.mu.Lock()            waitingForConnErr = err            cc.cond.Broadcast()            cc.mu.Unlock()        }    }()}cc.pendingRequests++cc.cond.Wait()cc.pendingRequests--

通过下面的逻辑晓得,以后连贯解决的数据流达到下限后有两种状况,一是期待申请被勾销,二是期待其余申请完结。如果有其余数据流完结并唤醒以后期待的申请,则反复2.1、2.2和2.3的步骤。

3、调用cc.newStream()在连贯上创立一个数据流(创立数据流是线程平安的,因为源码中在调用awaitOpenSlotForRequest之前先加锁,直到写入申请的header之后才开释锁)。

func (cc *http2ClientConn) newStream() *http2clientStream {    cs := &http2clientStream{        cc:        cc,        ID:        cc.nextStreamID,        resc:      make(chan http2resAndError, 1),        peerReset: make(chan struct{}),        done:      make(chan struct{}),    }    cs.flow.add(int32(cc.initialWindowSize))    cs.flow.setConnFlow(&cc.flow)    cs.inflow.add(http2transportDefaultStreamFlow)    cs.inflow.setConnFlow(&cc.inflow)    cc.nextStreamID += 2    cc.streams[cs.ID] = cs    return cs}

笔者对上述代码简略形容如下:

  • 新建一个http2clientStream,数据流ID为cc.nextStreamID,新建数据流后,cc.nextStreamID +=2
  • 数据流通过http2resAndError管道接管申请的响应。
  • 初始化以后数据流的可写流控制窗口大小为cc.initialWindowSize,并保留连贯的可写流控制指针。
  • 初始化以后数据流的可读流控制窗口大小为http2transportDefaultStreamFlow,并保留连贯的可读流控制指针。
  • 最初将新建的数据流注册到以后连贯中。

4、调用cc.t.getBodyWriterState(cs, body)会返回一个http2bodyWriterState构造体。通过该构造体能够晓得申请body是否发送胜利。

func (t *http2Transport) getBodyWriterState(cs *http2clientStream, body io.Reader) (s http2bodyWriterState) {    s.cs = cs    if body == nil {        return    }    resc := make(chan error, 1)    s.resc = resc    s.fn = func() {        cs.cc.mu.Lock()        cs.startedWrite = true        cs.cc.mu.Unlock()        resc <- cs.writeRequestBody(body, cs.req.Body)    }    s.delay = t.expectContinueTimeout()    if s.delay == 0 ||        !httpguts.HeaderValuesContainsToken(            cs.req.Header["Expect"],            "100-continue") {        return    }    // 此处省略代码,因为绝大部分申请都不会设置100-continue的标头    return}

s.fn: 标记以后数据流开始写入数据,并且将申请body的发送后果写入s.resc管道(本文暂不对writeRequestBody开展剖析,下篇文章会对其进行剖析)。

5、因为是多个申请共享一个连贯,那么向连贯写入数据帧时须要加锁,比方加锁写入申请头。

cc.wmu.Lock()endStream := !hasBody && !hasTrailerswerr := cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs)cc.wmu.Unlock()

6、如果有申请body,则开始写入申请body,没有申请body则设置响应header的超时工夫(有申请body时,响应header的超时工夫须要在申请body写完之后设置)。

if hasBody {    bodyWriter.scheduleBodyWrite()} else {    http2traceWroteRequest(cs.trace, nil)    if d := cc.responseHeaderTimeout(); d != 0 {        timer := time.NewTimer(d)        defer timer.Stop()        respHeaderTimer = timer.C    }}

scheduleBodyWrite的内容如下:

func (s http2bodyWriterState) scheduleBodyWrite() {    if s.timer == nil {        // We're not doing a delayed write (see        // getBodyWriterState), so just start the writing        // goroutine immediately.        go s.fn()        return    }    http2traceWait100Continue(s.cs.trace)    if s.timer.Stop() {        s.timer.Reset(s.delay)    }}

因为笔者的申请header中没有携带100-continue标头,所以在后面的getBodyWriterState函数中初始化的s.timer为nil即调用scheduleBodyWrite会立刻开始发送申请body。

7、轮询管道获取响应后果。

在看轮询源码之前,先看一个简略的函数:

handleReadLoopResponse := func(re http2resAndError) (*Response, bool, error) {    res := re.res    if re.err != nil || res.StatusCode > 299 {        bodyWriter.cancel()        cs.abortRequestBodyWrite(http2errStopReqBodyWrite)    }    if re.err != nil {        cc.forgetStreamID(cs.ID)        return nil, cs.getStartedWrite(), re.err    }    res.Request = req    res.TLS = cc.tlsState    return res, false, nil}

该函数次要就是判断读到的响应是否失常,并依据响应的后果结构(*http2ClientConn).roundTrip的返回值。

理解了handleReadLoopResponse之后,上面就看看轮询的逻辑:

for {    select {    case re := <-readLoopResCh:        return handleReadLoopResponse(re)    // 此处省略代码(蕴含申请勾销,申请超时等管道的轮询)    case err := <-bodyWriter.resc:        // Prefer the read loop's response, if available. Issue 16102.        select {        case re := <-readLoopResCh:            return handleReadLoopResponse(re)        default:        }        if err != nil {            cc.forgetStreamID(cs.ID)            return nil, cs.getStartedWrite(), err        }        bodyWritten = true        if d := cc.responseHeaderTimeout(); d != 0 {            timer := time.NewTimer(d)            defer timer.Stop()            respHeaderTimer = timer.C        }    }}

笔者仅对下面的第二种状况即申请body发送实现进行形容:

  • 是否读到响应,如果可能读取响应则间接返回。
  • 判断申请body是否发送胜利,如果发送失败,间接返回。
  • 如果申请body发送胜利,则设置响应header的超时工夫。

总结

本文次要形容了两个方面的内容:

  1. 确认client和server都反对http2协定,并构建一个http2的连贯,同时开启该连贯的读循环。
  2. 通过http2连接池获取一个http2连贯,并发送申请和读取响应。

预报

鉴于HTTTP2.0的内容较多,且文章篇幅过长时不易浏览,笔者将后续要剖析的内容拆为两个局部:

  1. 形容数据帧和流控制以及读循环读到响应并发送给readLoopResCh管道。
  2. http2.0标头压缩逻辑。

最初,衷心希望本文可能对各位读者有肯定的帮忙。

:

  1. 写本文时, 笔者所用go版本为: go1.14.2。
  2. 本文对h2c的状况不予以考虑。
  3. 因为笔者剖析的是申请流程,所以没有在本地搭建server,而是应用了一个反对http2连贯的图片一步步的debug。eg: https://dss0.bdstatic.com/5aV...

参考

https://developers.google.com...