关于golang:Go发起HTTP20请求流程分析前篇

38次阅读

共计 13799 个字符,预计需要花费 35 分钟才能阅读完成。

来自公众号:新世界杂货铺

前言

继 Go 中的 HTTP 申请之——HTTP1.1 申请流程剖析之后,两头断断续续,历时近一月,终于才敢开始码字写下本文。

浏览倡议

HTTP2.0 在建设 TCP 连贯和平安的 TLS 传输通道与 HTTP1.1 的流程基本一致。所以笔者倡议没有看过 Go 中的 HTTP 申请之——HTTP1.1 申请流程剖析这篇文章的先去补一下课,本文会基于前一篇文章仅介绍和 HTTP2.0 相干的逻辑。

(*Transport).roundTrip

(*Transport).roundTrip办法会调用 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults) 初始化 TLSClientConfig 以及h2transport,而这两者都和 HTTP2.0 有着严密的分割。

TLSClientConfig: 初始化 client 反对的 http 协定, 并在 tls 握手时告知 server。

h2transport: 如果本次申请是 http2,那么 h2transport 会接管连贯,申请和响应的解决逻辑。

上面看看源码:

func (t *Transport) onceSetNextProtoDefaults() {
    // ... 此处省略代码...
    t2, err := http2configureTransport(t)
    if err != nil {log.Printf("Error enabling Transport HTTP/2 support: %v", err)
        return
    }
    t.h2transport = t2

    // ... 此处省略代码...
}
func http2configureTransport(t1 *Transport) (*http2Transport, error) {connPool := new(http2clientConnPool)
    t2 := &http2Transport{ConnPool: http2noDialClientConnPool{connPool},
        t1:       t1,
    }
    connPool.t = t2
    if err := http2registerHTTPSProtocol(t1, http2noDialH2RoundTripper{t2}); err != nil {return nil, err}
    if t1.TLSClientConfig == nil {t1.TLSClientConfig = new(tls.Config)
    }
    if !http2strSliceContains(t1.TLSClientConfig.NextProtos, "h2") {t1.TLSClientConfig.NextProtos = append([]string{"h2"}, t1.TLSClientConfig.NextProtos...)
    }
    if !http2strSliceContains(t1.TLSClientConfig.NextProtos, "http/1.1") {t1.TLSClientConfig.NextProtos = append(t1.TLSClientConfig.NextProtos, "http/1.1")
    }
    upgradeFn := func(authority string, c *tls.Conn) RoundTripper {addr := http2authorityAddr("https", authority)
        if used, err := connPool.addConnIfNeeded(addr, t2, c); err != nil {go c.Close()
            return http2erringRoundTripper{err}
        } else if !used {
            // Turns out we don't need this c.
            // For example, two goroutines made requests to the same host
            // at the same time, both kicking off TCP dials. (since protocol
            // was unknown)
            go c.Close()}
        return t2
    }
    if m := t1.TLSNextProto; len(m) == 0 {t1.TLSNextProto = map[string]func(string, *tls.Conn) RoundTripper{"h2": upgradeFn,}
    } else {m["h2"] = upgradeFn
    }
    return t2, nil
}

笔者将上述的源码简略拆解为以下几个步骤:

  1. 新建一个 http2clientConnPool 并复制给 t2,当前 http2 的申请会优先从该连接池中获取连贯。
  2. 初始化 TLSClientConfig,并将反对的h2http1.1协定增加到 TLSClientConfig.NextProtos 中。
  3. 定义一个 h2upgradeFn存储到 t1.TLSNextProto 里。

鉴于前一篇文章对新建连贯前的步骤有了较为具体的介绍,所以这里间接看和 server 建设连贯的局部源码,即 (*Transport).dialConn 办法:

func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
    // ... 此处省略代码...
    if cm.scheme() == "https" && t.hasCustomTLSDialer() {// ... 此处省略代码...} else {conn, err := t.dial(ctx, "tcp", cm.addr())
        if err != nil {return nil, wrapErr(err)
        }
        pconn.conn = conn
        if cm.scheme() == "https" {
            var firstTLSHost string
            if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {return nil, wrapErr(err)
            }
            if err = pconn.addTLS(firstTLSHost, trace); err != nil {return nil, wrapErr(err)
            }
        }
    }

    // Proxy setup.
    // ... 此处省略代码...

    if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: next(cm.targetAddr, pconn.conn.(*tls.Conn))}, nil
        }
    }

    // ... 此处省略代码...
}

笔者对上述的源码形容如下:

  1. 调用 t.dial(ctx, "tcp", cm.addr()) 创立 TCP 连贯。
  2. 如果是 https 的申请,则对申请建设平安的 tls 传输通道。
  3. 查看 tls 的握手状态,如果和 server 协商的 NegotiatedProtocol 协定不为空,且 client 的 t.TLSNextProto 有该协定,则返回 alt 不为空的长久连贯(HTTP1.1 不会进入 if 条件里)。

笔者对上述的第三点进行开展。经笔者在本地 debug 验证,当 client 和 server 都反对 http2 时,s.NegotiatedProtocol的值为 h2s.NegotiatedProtocolIsMutual的值为true

在下面剖析 http2configureTransport 函数时,咱们晓得 TLSNextProto 注册了一个 key 为 h2 的函数,所以调用 next 理论就是调用后面的 upgradeFn 函数。

upgradeFn会调用 connPool.addConnIfNeeded 向 http2 的连接池增加一个 tls 传输通道,并最终返回后面曾经创立好的 t2http2Transport

func (p *http2clientConnPool) addConnIfNeeded(key string, t *http2Transport, c *tls.Conn) (used bool, err error) {p.mu.Lock()
    // ... 此处省略代码...
    // 次要用于判断是否有必要像连接池增加新的连贯
    // 判断连接池中是否已有同 host 连贯,如果有且该链接可能解决新的申请则间接返回
    call, dup := p.addConnCalls[key]
    if !dup {
        // ... 此处省略代码...
        call = &http2addConnCall{
            p:    p,
            done: make(chan struct{}),
        }
        p.addConnCalls[key] = call
        go call.run(t, key, c)
    }
    p.mu.Unlock()

    <-call.done
    if call.err != nil {return false, call.err}
    return !dup, nil
}
func (c *http2addConnCall) run(t *http2Transport, key string, tc *tls.Conn) {cc, err := t.NewClientConn(tc)

    p := c.p
    p.mu.Lock()
    if err != nil {c.err = err} else {p.addConnLocked(key, cc)
    }
    delete(p.addConnCalls, key)
    p.mu.Unlock()
    close(c.done)
}

剖析上述的源码咱们可能失去两点论断:

  1. 执行完 upgradeFn 之后,(*Transport).dialConn 返回的长久化连贯中 alt 字段曾经不是 nil 了。
  2. t.NewClientConn(tc)新建进去的连贯会保留在 http2 的连接池即 http2clientConnPool 中,下一小结将对 NewClientConn 开展剖析。

最初咱们回到(*Transport).roundTrip 办法并剖析其中的要害源码:

func (t *Transport) roundTrip(req *Request) (*Response, error) {t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
    // ... 此处省略代码...
    for {
        select {case <-ctx.Done():
            req.closeBody()
            return nil, ctx.Err()
        default:
        }

        // ... 此处省略代码...
        pconn, err := t.getConn(treq, cm)
        if err != nil {t.setReqCanceler(req, nil)
            req.closeBody()
            return nil, err
        }

        var resp *Response
        if pconn.alt != nil {
            // HTTP/2 path.
            t.setReqCanceler(req, nil) // not cancelable with CancelRequest
            resp, err = pconn.alt.RoundTrip(req)
        } else {resp, err = pconn.roundTrip(treq)
        }
        if err == nil {return resp, nil}

        // ... 此处省略代码...
    }
}

联合后面的剖析,pconn.alt在 server 和 client 都反对 http2 协定的状况下是不为 nil 的。所以,http2 的申请会走 pconn.alt.RoundTrip(req) 分支,也就是说 http2 的申请流程就被 http2Transport 接管啦。

(*http2Transport).NewClientConn

(*http2Transport).NewClientConn 外部会调用t.newClientConn(c, t.disableKeepAlives())

因为本节内容较多,所以笔者不再一次性贴出源码,而是按关键步骤剖析并分块儿贴出源码。

1、初始化一个http2ClientConn

cc := &http2ClientConn{
    t:                     t,
    tconn:                 c,
    readerDone:            make(chan struct{}),
    nextStreamID:          1,
    maxFrameSize:          16 << 10,           // spec default
    initialWindowSize:     65535,              // spec default
    maxConcurrentStreams:  1000,               // "infinite", per spec. 1000 seems good enough.
    peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
    streams:               make(map[uint32]*http2clientStream),
    singleUse:             singleUse,
    wantSettingsAck:       true,
    pings:                 make(map[[8]byte]chan struct{}),
}

下面的源码新建了一个默认的 http2ClientConn。

initialWindowSize:初始化窗口大小为 65535,这个值之后会初始化每一个数据流可发送的数据窗口大小。

maxConcurrentStreams:示意每个连贯上容许最多有多少个数据流同时传输数据。

streams:以后连贯上的数据流。

singleUse: 管制 http2 的连贯是否容许多个数据流共享,其值由 t.disableKeepAlives() 管制。

2、创立一个条件锁并且新建 Writer&Reader。

cc.cond = sync.NewCond(&cc.mu)
cc.flow.add(int32(http2initialWindowSize))
cc.bw = bufio.NewWriter(http2stickyErrWriter{c, &cc.werr})
cc.br = bufio.NewReader(c)

新建 Writer&Reader 没什么好说的,须要留神的是cc.flow.add(int32(http2initialWindowSize))

cc.flow.add将以后连贯的可写流控制窗口大小设置为http2initialWindowSize, 即 65535。

3、新建一个读写数据帧的 Framer。

cc.fr = http2NewFramer(cc.bw, cc.br)
cc.fr.ReadMetaHeaders = hpack.NewDecoder(http2initialHeaderTableSize, nil)
cc.fr.MaxHeaderListSize = t.maxHeaderListSize()

4、向 server 发送开场白,并发送一些初始化数据帧。

initialSettings := []http2Setting{{ID: http2SettingEnablePush, Val: 0},
    {ID: http2SettingInitialWindowSize, Val: http2transportDefaultStreamFlow},
}
if max := t.maxHeaderListSize(); max != 0 {initialSettings = append(initialSettings, http2Setting{ID: http2SettingMaxHeaderListSize, Val: max})
}

cc.bw.Write(http2clientPreface)
cc.fr.WriteSettings(initialSettings...)
cc.fr.WriteWindowUpdate(0, http2transportDefaultConnFlow)
cc.inflow.add(http2transportDefaultConnFlow + http2initialWindowSize)
cc.bw.Flush()

client 向 server 发送的开场白内容如下:

const (
    // client 首先想 server 发送以 PRI 结尾的一串字符串。http2ClientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
)
var (http2clientPreface = []byte(http2ClientPreface)
)

发送完开场白后,client 向 server 发送 SETTINGS 数据帧。

http2SettingEnablePush: 告知 server 客户端是否开启 push 性能。

http2SettingInitialWindowSize:告知 server 客户端可承受的最大数据窗口是http2transportDefaultStreamFlow(4M)。

发送完 SETTINGS 数据帧后,发送 WINDOW_UPDATE 数据帧, 因为第一个参数为 0 即 streamID 为 0,则是告知 server 此连贯可承受的最大数据窗口为http2transportDefaultConnFlow(1G)。

发送完 WINDOW_UPDATE 数据帧后,将 client 的可读流控制窗口大小设置为http2transportDefaultConnFlow + http2initialWindowSize

5、开启读循环并返回

go cc.readLoop()

(*http2Transport).RoundTrip

(*http2Transport).RoundTrip 只是一个入口函数,它会调用(*http2Transport). RoundTripOpt 办法。

(*http2Transport). RoundTripOpt 有两个步骤比拟要害:

t.connPool().GetClientConn(req, addr): 在 http2 的连接池外面获取一个可用连贯,其中连接池的类型为 http2noDialClientConnPool,参考http2configureTransport 函数。

cc.roundTrip(req): 通过获取到的可用连贯发送申请并返回响应。

(http2noDialClientConnPool).GetClientConn

依据理论的 debug 后果(http2noDialClientConnPool).GetClientConn 最终会调用(*http2clientConnPool).getClientConn(req *Request, addr string, dialOnMiss bool)

通过(http2noDialClientConnPool).GetClientConn 获取连贯时传递给(*http2clientConnPool).getClientConn 办法的第三个参数始终为false,该参数为 false 时代表着即便无奈失常获取可用连贯,也不在这个环节从新发动拨号流程。

在(*http2clientConnPool).getClientConn 中会遍历同地址的连贯,并判断连贯的状态从而获取一个能够解决申请的连贯。

for _, cc := range p.conns[addr] {if st := cc.idleState(); st.canTakeNewRequest {if p.shouldTraceGetConn(st) {http2traceGetConn(req, addr)
        }
        p.mu.Unlock()
        return cc, nil
    }
}

cc.idleState()判断以后连接池中的连贯是否解决新的申请:

1、以后连贯是否能被多个申请共享,如果仅单个申请应用且曾经有一个数据流,则以后连贯不能解决新的申请。

if cc.singleUse && cc.nextStreamID > 1 {return}

2、以下几点均为 true 时,才代表以后连贯可能解决新的申请:

  • 连贯状态失常,即未敞开并且不处于正在敞开的状态。
  • 以后连贯正在解决的数据流小于maxConcurrentStreams
  • 下一个要解决的数据流 + 以后连贯处于期待状态的申请 *2 < math.MaxInt32。
  • 以后连贯没有长时间处于闲暇状态(次要通过 cc.tooIdleLocked() 判断)。
st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing && maxConcurrentOkay &&
        int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 &&
        !cc.tooIdleLocked()

当从链接池胜利获取到一个能够解决申请的连贯,就能够和 server 进行数据交互,即 (*http2ClientConn).roundTrip 流程。

(*http2ClientConn).roundTrip

1、在真正开始解决申请前,还要进行 header 查看,http2 对 http1.1 的某些 header 是不反对的,笔者就不对这个逻辑进行剖析了,间接上源码:

func http2checkConnHeaders(req *Request) error {if v := req.Header.Get("Upgrade"); v != "" {return fmt.Errorf("http2: invalid Upgrade request header: %q", req.Header["Upgrade"])
    }
    if vv := req.Header["Transfer-Encoding"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != ""&& vv[0] !="chunked") {return fmt.Errorf("http2: invalid Transfer-Encoding request header: %q", vv)
    }
    if vv := req.Header["Connection"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != ""&& !strings.EqualFold(vv[0],"close") && !strings.EqualFold(vv[0],"keep-alive")) {return fmt.Errorf("http2: invalid Connection request header: %q", vv)
    }
    return nil
}
func http2commaSeparatedTrailers(req *Request) (string, error) {keys := make([]string, 0, len(req.Trailer))
    for k := range req.Trailer {k = CanonicalHeaderKey(k)
        switch k {
        case "Transfer-Encoding", "Trailer", "Content-Length":
            return "", &http2badStringError{"invalid Trailer key", k}
        }
        keys = append(keys, k)
    }
    if len(keys) > 0 {sort.Strings(keys)
        return strings.Join(keys, ","), nil
    }
    return "", nil
}

2、调用(*http2ClientConn).awaitOpenSlotForRequest,始终等到以后连贯解决的数据流小于maxConcurrentStreams, 如果此函数返回谬误,则本次申请失败。

2.1、double check 以后连贯可用。

if cc.closed || !cc.canTakeNewRequestLocked() {
    if waitingForConn != nil {close(waitingForConn)
    }
    return http2errClientConnUnusable
}

2.2、如果以后连贯解决的数据流小于 maxConcurrentStreams 则间接返回 nil。笔者置信大部分逻辑走到这儿就返回了。

if int64(len(cc.streams))+1 <= int64(cc.maxConcurrentStreams) {
    if waitingForConn != nil {close(waitingForConn)
    }
    return nil
}

2.3、如果以后连贯解决的数据流的确曾经达到下限,则开始进入期待流程。

if waitingForConn == nil {waitingForConn = make(chan struct{})
    go func() {if err := http2awaitRequestCancel(req, waitingForConn); err != nil {cc.mu.Lock()
            waitingForConnErr = err
            cc.cond.Broadcast()
            cc.mu.Unlock()}
    }()}
cc.pendingRequests++
cc.cond.Wait()
cc.pendingRequests--

通过下面的逻辑晓得,以后连贯解决的数据流达到下限后有两种状况,一是期待申请被勾销,二是期待其余申请完结。如果有其余数据流完结并唤醒以后期待的申请,则反复 2.1、2.2 和 2.3 的步骤。

3、调用 cc.newStream() 在连贯上创立一个数据流(创立数据流是线程平安的,因为源码中在调用 awaitOpenSlotForRequest 之前先加锁,直到写入申请的 header 之后才开释锁)。

func (cc *http2ClientConn) newStream() *http2clientStream {
    cs := &http2clientStream{
        cc:        cc,
        ID:        cc.nextStreamID,
        resc:      make(chan http2resAndError, 1),
        peerReset: make(chan struct{}),
        done:      make(chan struct{}),
    }
    cs.flow.add(int32(cc.initialWindowSize))
    cs.flow.setConnFlow(&cc.flow)
    cs.inflow.add(http2transportDefaultStreamFlow)
    cs.inflow.setConnFlow(&cc.inflow)
    cc.nextStreamID += 2
    cc.streams[cs.ID] = cs
    return cs
}

笔者对上述代码简略形容如下:

  • 新建一个http2clientStream,数据流 ID 为cc.nextStreamID,新建数据流后,cc.nextStreamID +=2
  • 数据流通过 http2resAndError 管道接管申请的响应。
  • 初始化以后数据流的可写流控制窗口大小为cc.initialWindowSize,并保留连贯的可写流控制指针。
  • 初始化以后数据流的可读流控制窗口大小为http2transportDefaultStreamFlow,并保留连贯的可读流控制指针。
  • 最初将新建的数据流注册到以后连贯中。

4、调用 cc.t.getBodyWriterState(cs, body) 会返回一个 http2bodyWriterState 构造体。通过该构造体能够晓得申请 body 是否发送胜利。

func (t *http2Transport) getBodyWriterState(cs *http2clientStream, body io.Reader) (s http2bodyWriterState) {
    s.cs = cs
    if body == nil {return}
    resc := make(chan error, 1)
    s.resc = resc
    s.fn = func() {cs.cc.mu.Lock()
        cs.startedWrite = true
        cs.cc.mu.Unlock()
        resc <- cs.writeRequestBody(body, cs.req.Body)
    }
    s.delay = t.expectContinueTimeout()
    if s.delay == 0 ||
        !httpguts.HeaderValuesContainsToken(cs.req.Header["Expect"],
            "100-continue") {return}
    // 此处省略代码,因为绝大部分申请都不会设置 100-continue 的标头
    return
}

s.fn: 标记以后数据流开始写入数据,并且将申请 body 的发送后果写入 s.resc 管道(本文暂不对 writeRequestBody 开展剖析,下篇文章会对其进行剖析)。

5、因为是多个申请共享一个连贯,那么向连贯写入数据帧时须要加锁,比方加锁写入申请头。

cc.wmu.Lock()
endStream := !hasBody && !hasTrailers
werr := cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs)
cc.wmu.Unlock()

6、如果有申请 body,则开始写入申请 body,没有申请 body 则设置响应 header 的超时工夫(有申请 body 时,响应 header 的超时工夫须要在申请 body 写完之后设置)。

if hasBody {bodyWriter.scheduleBodyWrite()
} else {http2traceWroteRequest(cs.trace, nil)
    if d := cc.responseHeaderTimeout(); d != 0 {timer := time.NewTimer(d)
        defer timer.Stop()
        respHeaderTimer = timer.C
    }
}

scheduleBodyWrite的内容如下:

func (s http2bodyWriterState) scheduleBodyWrite() {
    if s.timer == nil {
        // We're not doing a delayed write (see
        // getBodyWriterState), so just start the writing
        // goroutine immediately.
        go s.fn()
        return
    }
    http2traceWait100Continue(s.cs.trace)
    if s.timer.Stop() {s.timer.Reset(s.delay)
    }
}

因为笔者的申请 header 中没有携带 100-continue 标头,所以在后面的 getBodyWriterState 函数中初始化的 s.timer 为 nil 即调用 scheduleBodyWrite 会立刻开始发送申请 body。

7、轮询管道获取响应后果。

在看轮询源码之前,先看一个简略的函数:

handleReadLoopResponse := func(re http2resAndError) (*Response, bool, error) {
    res := re.res
    if re.err != nil || res.StatusCode > 299 {bodyWriter.cancel()
        cs.abortRequestBodyWrite(http2errStopReqBodyWrite)
    }
    if re.err != nil {cc.forgetStreamID(cs.ID)
        return nil, cs.getStartedWrite(), re.err}
    res.Request = req
    res.TLS = cc.tlsState
    return res, false, nil
}

该函数次要就是判断读到的响应是否失常,并依据响应的后果结构 (*http2ClientConn).roundTrip 的返回值。

理解了 handleReadLoopResponse 之后,上面就看看轮询的逻辑:

for {
    select {
    case re := <-readLoopResCh:
        return handleReadLoopResponse(re)
    // 此处省略代码(蕴含申请勾销,申请超时等管道的轮询)case err := <-bodyWriter.resc:
        // Prefer the read loop's response, if available. Issue 16102.
        select {
        case re := <-readLoopResCh:
            return handleReadLoopResponse(re)
        default:
        }
        if err != nil {cc.forgetStreamID(cs.ID)
            return nil, cs.getStartedWrite(), err}
        bodyWritten = true
        if d := cc.responseHeaderTimeout(); d != 0 {timer := time.NewTimer(d)
            defer timer.Stop()
            respHeaderTimer = timer.C
        }
    }
}

笔者仅对下面的第二种状况即申请 body 发送实现进行形容:

  • 是否读到响应,如果可能读取响应则间接返回。
  • 判断申请 body 是否发送胜利,如果发送失败,间接返回。
  • 如果申请 body 发送胜利,则设置响应 header 的超时工夫。

总结

本文次要形容了两个方面的内容:

  1. 确认 client 和 server 都反对 http2 协定,并构建一个 http2 的连贯,同时开启该连贯的读循环。
  2. 通过 http2 连接池获取一个 http2 连贯,并发送申请和读取响应。

预报

鉴于 HTTTP2.0 的内容较多,且文章篇幅过长时不易浏览,笔者将后续要剖析的内容拆为两个局部:

  1. 形容数据帧和流控制以及读循环读到响应并发送给 readLoopResCh 管道。
  2. http2.0 标头压缩逻辑。

最初,衷心希望本文可能对各位读者有肯定的帮忙。

:

  1. 写本文时,笔者所用 go 版本为: go1.14.2。
  2. 本文对 h2c 的状况不予以考虑。
  3. 因为笔者剖析的是申请流程,所以没有在本地搭建 server,而是应用了一个反对 http2 连贯的图片一步步的 debug。eg: https://dss0.bdstatic.com/5aV…

参考

https://developers.google.com…

正文完
 0