上面是演示代码(不肯定能复现):
client端演示代码:
package mainimport ( "context" "fmt" "io" "math" "net" "net/http" "sync" "time")func main() { dialer := &net.Dialer{ Timeout: 10 * time.Second, KeepAlive: 15 * time.Second, } tr := &http.Transport{ MaxIdleConnsPerHost: math.MaxInt, DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { before := time.Now() conn, err := dialer.DialContext(ctx, network, addr) if err != nil { // 这里报错operation was canceled fmt.Printf("dial err=%v,cost=%v[ms]\n", err, time.Now().Sub(before).Milliseconds()) } return conn, err }, } httpClient := &http.Client{ Transport: tr, Timeout: 10 * time.Second, } var wg sync.WaitGroup maxGoroutines := 100 eachRunTimes := 100000 for i := 0; i < maxGoroutines; i++ { wg.Add(1) go func() { for j := 0; j < eachRunTimes; j++ { head(httpClient) } wg.Done() }() } wg.Wait()}func head(client *http.Client) { resp, err := client.Head("http://localhost:9080/nop") if err != nil { // 这里并没有报错 fmt.Printf("httpClient.Head err=%v\n", err) return } io.Copy(io.Discard, resp.Body) defer resp.Body.Close()}
server端演示代码:
package mainimport ( "net/http")func main() { http.HandleFunc("/nop", func(writer http.ResponseWriter, request *http.Request) { writer.WriteHeader(200) }) http.ListenAndServe(":9080", nil)}
而后咱们发现Dial 报错operation was canceled,然而client.Head(其实就是client.Do)确没有返回谬误,没有把谬误抛出给内部的http client,这是为什么呢,咱们通过源码来剖析,顺便引出client每次取得连贯的过程:
咱们先来看下DialContext提到了RoundTrip会同时从idle和dial获取,如果idle比DialContext优先返回取得就会应用idleConn
// DialContext specifies the dial function for creating unencrypted TCP connections. // If DialContext is nil (and the deprecated Dial below is also nil), // then the transport dials using package net. // // DialContext runs concurrently with calls to RoundTrip. // A RoundTrip call that initiates a dial may end up using // a connection dialed previously when the earlier connection // becomes idle before the later DialContext completes. DialContext func(ctx context.Context, network, addr string) (net.Conn, error)
getConn是来获取连贯:
// getConn dials and creates a new persistConn to the target as// specified in the connectMethod. This includes doing a proxy CONNECT// and/or setting up TLS. If this doesn't return an error, the persistConn// is ready to write requests to.func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) { req := treq.Request trace := treq.trace ctx := req.Context() if trace != nil && trace.GetConn != nil { trace.GetConn(cm.addr()) } w := &wantConn{ cm: cm, key: cm.key(), ctx: ctx, ready: make(chan struct{}, 1), beforeDial: testHookPrePendingDial, afterDial: testHookPostPendingDial, } defer func() { if err != nil { w.cancel(t, err) } }() // 这里从idle获取连贯 // Queue for idle connection. if delivered := t.queueForIdleConn(w); delivered { pc := w.pc // Trace only for HTTP/1. // HTTP/2 calls trace.GotConn itself. if pc.alt == nil && trace != nil && trace.GotConn != nil { trace.GotConn(pc.gotIdleConnTrace(pc.idleAt)) } // set request canceler to some non-nil function so we // can detect whether it was cleared between now and when // we enter roundTrip t.setReqCanceler(treq.cancelKey, func(error) {}) return pc, nil } cancelc := make(chan error, 1) t.setReqCanceler(treq.cancelKey, func(err error) { cancelc <- err }) // 这里从idle获取连贯 // Queue for permission to dial. t.queueForDial(w)
重点看下idle怎么获取连贯的
// queueForIdleConn queues w to receive the next idle connection for w.cm.// As an optimization hint to the caller, queueForIdleConn reports whether// it successfully delivered an already-idle connection.func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) { if t.DisableKeepAlives { return false } t.idleMu.Lock() defer t.idleMu.Unlock() // Stop closing connections that become idle - we might want one. // (That is, undo the effect of t.CloseIdleConnections.) t.closeIdle = false if w == nil { // Happens in test hook. return false } // If IdleConnTimeout is set, calculate the oldest // persistConn.idleAt time we're willing to use a cached idle // conn. var oldTime time.Time if t.IdleConnTimeout > 0 { oldTime = time.Now().Add(-t.IdleConnTimeout) } // 从idleConn查问是否有现成可用的conn // Look for most recently-used idle connection. if list, ok := t.idleConn[w.key]; ok { stop := false delivered := false for len(list) > 0 && !stop { pconn := list[len(list)-1] // See whether this connection has been idle too long, considering // only the wall time (the Round(0)), in case this is a laptop or VM // coming out of suspend with previously cached idle connections. tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime) if tooOld { // Async cleanup. Launch in its own goroutine (as if a // time.AfterFunc called it); it acquires idleMu, which we're // holding, and does a synchronous net.Conn.Close. go pconn.closeConnIfStillIdle() } if pconn.isBroken() || tooOld { // If either persistConn.readLoop has marked the connection // broken, but Transport.removeIdleConn has not yet removed it // from the idle list, or if this persistConn is too old (it was // idle too long), then ignore it and look for another. In both // cases it's already in the process of being closed. list = list[:len(list)-1] continue } delivered = w.tryDeliver(pconn, nil) if delivered { if pconn.alt != nil { // HTTP/2: multiple clients can share pconn. // Leave it in the list. } else { // HTTP/1: only one client can use pconn. // Remove it from the list. t.idleLRU.remove(pconn) list = list[:len(list)-1] } } stop = true } if len(list) > 0 { t.idleConn[w.key] = list } else { delete(t.idleConn, w.key) } if stop { return delivered } } // 最重要的是这里放入idleConnWait,这样下次有idle回收时通过tryPutIdleConn会优先激活期待连贯 // Register to receive next connection that becomes idle. if t.idleConnWait == nil { t.idleConnWait = make(map[connectMethodKey]wantConnQueue) } q := t.idleConnWait[w.key] q.cleanFront() q.pushBack(w) t.idleConnWait[w.key] = q return false}
// cancelTimerBody is an io.ReadCloser that wraps rc with two features:// 1) On Read error or close, the stop func is called.// 2) On Read failure, if reqDidTimeout is true, the error is wrapped and// marked as net.Error that hit its timeout.type cancelTimerBody struct { stop func() // stops the time.Timer waiting to cancel the request rc io.ReadCloser reqDidTimeout func() bool}
// dialSerial connects to a list of addresses in sequence, returning// either the first successful connection, or the first error.func (sd *sysDialer) dialSerial(ctx context.Context, ras addrList) (Conn, error) { var firstErr error // The error from the first address is most relevant. for i, ra := range ras { select { case <-ctx.Done(): // operation was canceled 谬误是从这里抛出来的 return nil, &OpError{Op: "dial", Net: sd.network, Source: sd.LocalAddr, Addr: ra, Err: mapErr(ctx.Err())} default: } dialCtx := ctx if deadline, hasDeadline := ctx.Deadline(); hasDeadline { partialDeadline, err := partialDeadline(time.Now(), deadline, len(ras)-i) if err != nil { // Ran out of time. if firstErr == nil { firstErr = &OpError{Op: "dial", Net: sd.network, Source: sd.LocalAddr, Addr: ra, Err: err} } break } if partialDeadline.Before(deadline) { var cancel context.CancelFunc dialCtx, cancel = context.WithDeadline(ctx, partialDeadline) defer cancel() } ...
// dialSerial外面的<-ctx.Done()是setRequestCancel()返回的stopTimer所激发的// setRequestCancel sets req.Cancel and adds a deadline context to req// if deadline is non-zero. The RoundTripper's type is used to// determine whether the legacy CancelRequest behavior should be used.//// As background, there are three ways to cancel a request:// First was Transport.CancelRequest. (deprecated)// Second was Request.Cancel.// Third was Request.Context.// This function populates the second and third, and uses the first if it really needs to.func setRequestCancel(req *Request, rt RoundTripper, deadline time.Time) (stopTimer func(), didTimeout func() bool) { if deadline.IsZero() { return nop, alwaysFalse } knownTransport := knownRoundTripperImpl(rt, req) oldCtx := req.Context() ...}
大抵的流程如下:
总结
dial报错却没有抛出给http client的起因是,这里的dial报错并不是连贯的时候报错,而是在连贯过程中发现有异步的其余conn曾经解决胜利, 触发了ctx.Done而抛出的报错。所以并不会抛出给最外层的http client。