关于go:通过dial-operation-was-canceled错误来看go-http-client获取连接conn过程

2次阅读

共计 7013 个字符,预计需要花费 18 分钟才能阅读完成。

上面是演示代码 (不肯定能复现):
client 端演示代码:

package main

import (
    "context"
    "fmt"
    "io"
    "math"
    "net"
    "net/http"
    "sync"
    "time"
)

func main() {

    dialer := &net.Dialer{
        Timeout:   10 * time.Second,
        KeepAlive: 15 * time.Second,
    }

    tr := &http.Transport{
        MaxIdleConnsPerHost: math.MaxInt,
        DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {before := time.Now()
            conn, err := dialer.DialContext(ctx, network, addr)
            if err != nil {
              // 这里报错 operation was canceled
                fmt.Printf("dial err=%v,cost=%v[ms]\n", err, time.Now().Sub(before).Milliseconds())
            }
            return conn, err
        },
    }

    httpClient := &http.Client{
        Transport: tr,
        Timeout:   10 * time.Second,
    }

    var wg sync.WaitGroup

    maxGoroutines := 100
    eachRunTimes := 100000
    for i := 0; i < maxGoroutines; i++ {wg.Add(1)
        go func() {
            for j := 0; j < eachRunTimes; j++ {head(httpClient)
            }
            wg.Done()}()}
    wg.Wait()}

func head(client *http.Client) {resp, err := client.Head("http://localhost:9080/nop")
    if err != nil {
        // 这里并没有报错
        fmt.Printf("httpClient.Head err=%v\n", err)
        return
    }
    io.Copy(io.Discard, resp.Body)
    defer resp.Body.Close()}

server 端演示代码:

package main

import ("net/http")

func main() {http.HandleFunc("/nop", func(writer http.ResponseWriter, request *http.Request) {writer.WriteHeader(200)
    })

    http.ListenAndServe(":9080", nil)
}

而后咱们发现 Dial 报错 operation was canceled,然而 client.Head(其实就是 client.Do) 确没有返回谬误, 没有把谬误抛出给内部的 http client, 这是为什么呢,咱们通过源码来剖析,顺便引出 client 每次取得连贯的过程:

咱们先来看下 DialContext 提到了 RoundTrip 会同时从 idle 和 dial 获取,如果 idle 比 DialContext 优先返回取得就会应用 idleConn

    // DialContext specifies the dial function for creating unencrypted TCP connections.
    // If DialContext is nil (and the deprecated Dial below is also nil),
    // then the transport dials using package net.
    //
    // DialContext runs concurrently with calls to RoundTrip.
    // A RoundTrip call that initiates a dial may end up using
    // a connection dialed previously when the earlier connection
    // becomes idle before the later DialContext completes.
    DialContext func(ctx context.Context, network, addr string) (net.Conn, error)

getConn 是来获取连贯:

// getConn dials and creates a new persistConn to the target as
// specified in the connectMethod. This includes doing a proxy CONNECT
// and/or setting up TLS.  If this doesn't return an error, the persistConn
// is ready to write requests to.
func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {
    req := treq.Request
    trace := treq.trace
    ctx := req.Context()
    if trace != nil && trace.GetConn != nil {trace.GetConn(cm.addr())
    }

    w := &wantConn{
        cm:         cm,
        key:        cm.key(),
        ctx:        ctx,
        ready:      make(chan struct{}, 1),
        beforeDial: testHookPrePendingDial,
        afterDial:  testHookPostPendingDial,
    }
    defer func() {
        if err != nil {w.cancel(t, err)
        }
    }()

  // 这里从 idle 获取连贯
    // Queue for idle connection.
    if delivered := t.queueForIdleConn(w); delivered {
        pc := w.pc
        // Trace only for HTTP/1.
        // HTTP/2 calls trace.GotConn itself.
        if pc.alt == nil && trace != nil && trace.GotConn != nil {trace.GotConn(pc.gotIdleConnTrace(pc.idleAt))
        }
        // set request canceler to some non-nil function so we
        // can detect whether it was cleared between now and when
        // we enter roundTrip
        t.setReqCanceler(treq.cancelKey, func(error) {})
        return pc, nil
    }

    cancelc := make(chan error, 1)
    t.setReqCanceler(treq.cancelKey, func(err error) {cancelc <- err})

  // 这里从 idle 获取连贯
    // Queue for permission to dial.
    t.queueForDial(w)

重点看下 idle 怎么获取连贯的

// queueForIdleConn queues w to receive the next idle connection for w.cm.
// As an optimization hint to the caller, queueForIdleConn reports whether
// it successfully delivered an already-idle connection.
func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
    if t.DisableKeepAlives {return false}

    t.idleMu.Lock()
    defer t.idleMu.Unlock()

    // Stop closing connections that become idle - we might want one.
    // (That is, undo the effect of t.CloseIdleConnections.)
    t.closeIdle = false

    if w == nil {
        // Happens in test hook.
        return false
    }

    // If IdleConnTimeout is set, calculate the oldest
    // persistConn.idleAt time we're willing to use a cached idle
    // conn.
    var oldTime time.Time
    if t.IdleConnTimeout > 0 {oldTime = time.Now().Add(-t.IdleConnTimeout)
    }

  // 从 idleConn 查问是否有现成可用的 conn
    // Look for most recently-used idle connection.
    if list, ok := t.idleConn[w.key]; ok {
        stop := false
        delivered := false
        for len(list) > 0 && !stop {pconn := list[len(list)-1]

            // See whether this connection has been idle too long, considering
            // only the wall time (the Round(0)), in case this is a laptop or VM
            // coming out of suspend with previously cached idle connections.
            tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime)
            if tooOld {
                // Async cleanup. Launch in its own goroutine (as if a
                // time.AfterFunc called it); it acquires idleMu, which we're
                // holding, and does a synchronous net.Conn.Close.
                go pconn.closeConnIfStillIdle()}
            if pconn.isBroken() || tooOld {
                // If either persistConn.readLoop has marked the connection
                // broken, but Transport.removeIdleConn has not yet removed it
                // from the idle list, or if this persistConn is too old (it was
                // idle too long), then ignore it and look for another. In both
                // cases it's already in the process of being closed.
                list = list[:len(list)-1]
                continue
            }
            delivered = w.tryDeliver(pconn, nil)
            if delivered {
                if pconn.alt != nil {
                    // HTTP/2: multiple clients can share pconn.
                    // Leave it in the list.
                } else {
                    // HTTP/1: only one client can use pconn.
                    // Remove it from the list.
                    t.idleLRU.remove(pconn)
                    list = list[:len(list)-1]
                }
            }
            stop = true
        }
        if len(list) > 0 {t.idleConn[w.key] = list
        } else {delete(t.idleConn, w.key)
        }
        if stop {return delivered}
    }

  // 最重要的是这里放入 idleConnWait, 这样下次有 idle 回收时通过 tryPutIdleConn 会优先激活期待连贯
    // Register to receive next connection that becomes idle.
    if t.idleConnWait == nil {t.idleConnWait = make(map[connectMethodKey]wantConnQueue)
    }
    q := t.idleConnWait[w.key]
    q.cleanFront()
    q.pushBack(w)
    t.idleConnWait[w.key] = q
    return false
}
// cancelTimerBody is an io.ReadCloser that wraps rc with two features:
// 1) On Read error or close, the stop func is called.
// 2) On Read failure, if reqDidTimeout is true, the error is wrapped and
//    marked as net.Error that hit its timeout.
type cancelTimerBody struct {stop          func() // stops the time.Timer waiting to cancel the request
    rc            io.ReadCloser
    reqDidTimeout func() bool}
// dialSerial connects to a list of addresses in sequence, returning
// either the first successful connection, or the first error.
func (sd *sysDialer) dialSerial(ctx context.Context, ras addrList) (Conn, error) {
    var firstErr error // The error from the first address is most relevant.

    for i, ra := range ras {
        select {case <-ctx.Done():
      // operation was canceled 谬误是从这里抛出来的
            return nil, &OpError{Op: "dial", Net: sd.network, Source: sd.LocalAddr, Addr: ra, Err: mapErr(ctx.Err())}
        default:
        }

        dialCtx := ctx
        if deadline, hasDeadline := ctx.Deadline(); hasDeadline {partialDeadline, err := partialDeadline(time.Now(), deadline, len(ras)-i)
            if err != nil {
                // Ran out of time.
                if firstErr == nil {firstErr = &OpError{Op: "dial", Net: sd.network, Source: sd.LocalAddr, Addr: ra, Err: err}
                }
                break
            }
            if partialDeadline.Before(deadline) {
                var cancel context.CancelFunc
                dialCtx, cancel = context.WithDeadline(ctx, partialDeadline)
                defer cancel()}
      ...
// dialSerial 外面的 <-ctx.Done() 是 setRequestCancel() 返回的 stopTimer 所激发的
// setRequestCancel sets req.Cancel and adds a deadline context to req
// if deadline is non-zero. The RoundTripper's type is used to
// determine whether the legacy CancelRequest behavior should be used.
//
// As background, there are three ways to cancel a request:
// First was Transport.CancelRequest. (deprecated)
// Second was Request.Cancel.
// Third was Request.Context.
// This function populates the second and third, and uses the first if it really needs to.
func setRequestCancel(req *Request, rt RoundTripper, deadline time.Time) (stopTimer func(), didTimeout func() bool) {if deadline.IsZero() {return nop, alwaysFalse}
    knownTransport := knownRoundTripperImpl(rt, req)
    oldCtx := req.Context()
  ...
}

大抵的流程如下:

总结

dial 报错却没有抛出给 http client 的起因是,这里的 dial 报错并不是连贯的时候报错,而是在连贯过程中发现有异步的其余 conn 曾经解决胜利, 触发了 ctx.Done 而抛出的报错。所以并不会抛出给最外层的 http client。

正文完
 0