共计 7013 个字符,预计需要花费 18 分钟才能阅读完成。
上面是演示代码 (不肯定能复现):
client 端演示代码:
package main
import (
"context"
"fmt"
"io"
"math"
"net"
"net/http"
"sync"
"time"
)
func main() {
dialer := &net.Dialer{
Timeout: 10 * time.Second,
KeepAlive: 15 * time.Second,
}
tr := &http.Transport{
MaxIdleConnsPerHost: math.MaxInt,
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {before := time.Now()
conn, err := dialer.DialContext(ctx, network, addr)
if err != nil {
// 这里报错 operation was canceled
fmt.Printf("dial err=%v,cost=%v[ms]\n", err, time.Now().Sub(before).Milliseconds())
}
return conn, err
},
}
httpClient := &http.Client{
Transport: tr,
Timeout: 10 * time.Second,
}
var wg sync.WaitGroup
maxGoroutines := 100
eachRunTimes := 100000
for i := 0; i < maxGoroutines; i++ {wg.Add(1)
go func() {
for j := 0; j < eachRunTimes; j++ {head(httpClient)
}
wg.Done()}()}
wg.Wait()}
func head(client *http.Client) {resp, err := client.Head("http://localhost:9080/nop")
if err != nil {
// 这里并没有报错
fmt.Printf("httpClient.Head err=%v\n", err)
return
}
io.Copy(io.Discard, resp.Body)
defer resp.Body.Close()}
server 端演示代码:
package main
import ("net/http")
func main() {http.HandleFunc("/nop", func(writer http.ResponseWriter, request *http.Request) {writer.WriteHeader(200)
})
http.ListenAndServe(":9080", nil)
}
而后咱们发现 Dial 报错 operation was canceled,然而 client.Head(其实就是 client.Do) 确没有返回谬误, 没有把谬误抛出给内部的 http client, 这是为什么呢,咱们通过源码来剖析,顺便引出 client 每次取得连贯的过程:
咱们先来看下 DialContext 提到了 RoundTrip 会同时从 idle 和 dial 获取,如果 idle 比 DialContext 优先返回取得就会应用 idleConn
// DialContext specifies the dial function for creating unencrypted TCP connections.
// If DialContext is nil (and the deprecated Dial below is also nil),
// then the transport dials using package net.
//
// DialContext runs concurrently with calls to RoundTrip.
// A RoundTrip call that initiates a dial may end up using
// a connection dialed previously when the earlier connection
// becomes idle before the later DialContext completes.
DialContext func(ctx context.Context, network, addr string) (net.Conn, error)
getConn 是来获取连贯:
// getConn dials and creates a new persistConn to the target as
// specified in the connectMethod. This includes doing a proxy CONNECT
// and/or setting up TLS. If this doesn't return an error, the persistConn
// is ready to write requests to.
func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {
req := treq.Request
trace := treq.trace
ctx := req.Context()
if trace != nil && trace.GetConn != nil {trace.GetConn(cm.addr())
}
w := &wantConn{
cm: cm,
key: cm.key(),
ctx: ctx,
ready: make(chan struct{}, 1),
beforeDial: testHookPrePendingDial,
afterDial: testHookPostPendingDial,
}
defer func() {
if err != nil {w.cancel(t, err)
}
}()
// 这里从 idle 获取连贯
// Queue for idle connection.
if delivered := t.queueForIdleConn(w); delivered {
pc := w.pc
// Trace only for HTTP/1.
// HTTP/2 calls trace.GotConn itself.
if pc.alt == nil && trace != nil && trace.GotConn != nil {trace.GotConn(pc.gotIdleConnTrace(pc.idleAt))
}
// set request canceler to some non-nil function so we
// can detect whether it was cleared between now and when
// we enter roundTrip
t.setReqCanceler(treq.cancelKey, func(error) {})
return pc, nil
}
cancelc := make(chan error, 1)
t.setReqCanceler(treq.cancelKey, func(err error) {cancelc <- err})
// 这里从 idle 获取连贯
// Queue for permission to dial.
t.queueForDial(w)
重点看下 idle 怎么获取连贯的
// queueForIdleConn queues w to receive the next idle connection for w.cm.
// As an optimization hint to the caller, queueForIdleConn reports whether
// it successfully delivered an already-idle connection.
func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
if t.DisableKeepAlives {return false}
t.idleMu.Lock()
defer t.idleMu.Unlock()
// Stop closing connections that become idle - we might want one.
// (That is, undo the effect of t.CloseIdleConnections.)
t.closeIdle = false
if w == nil {
// Happens in test hook.
return false
}
// If IdleConnTimeout is set, calculate the oldest
// persistConn.idleAt time we're willing to use a cached idle
// conn.
var oldTime time.Time
if t.IdleConnTimeout > 0 {oldTime = time.Now().Add(-t.IdleConnTimeout)
}
// 从 idleConn 查问是否有现成可用的 conn
// Look for most recently-used idle connection.
if list, ok := t.idleConn[w.key]; ok {
stop := false
delivered := false
for len(list) > 0 && !stop {pconn := list[len(list)-1]
// See whether this connection has been idle too long, considering
// only the wall time (the Round(0)), in case this is a laptop or VM
// coming out of suspend with previously cached idle connections.
tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime)
if tooOld {
// Async cleanup. Launch in its own goroutine (as if a
// time.AfterFunc called it); it acquires idleMu, which we're
// holding, and does a synchronous net.Conn.Close.
go pconn.closeConnIfStillIdle()}
if pconn.isBroken() || tooOld {
// If either persistConn.readLoop has marked the connection
// broken, but Transport.removeIdleConn has not yet removed it
// from the idle list, or if this persistConn is too old (it was
// idle too long), then ignore it and look for another. In both
// cases it's already in the process of being closed.
list = list[:len(list)-1]
continue
}
delivered = w.tryDeliver(pconn, nil)
if delivered {
if pconn.alt != nil {
// HTTP/2: multiple clients can share pconn.
// Leave it in the list.
} else {
// HTTP/1: only one client can use pconn.
// Remove it from the list.
t.idleLRU.remove(pconn)
list = list[:len(list)-1]
}
}
stop = true
}
if len(list) > 0 {t.idleConn[w.key] = list
} else {delete(t.idleConn, w.key)
}
if stop {return delivered}
}
// 最重要的是这里放入 idleConnWait, 这样下次有 idle 回收时通过 tryPutIdleConn 会优先激活期待连贯
// Register to receive next connection that becomes idle.
if t.idleConnWait == nil {t.idleConnWait = make(map[connectMethodKey]wantConnQueue)
}
q := t.idleConnWait[w.key]
q.cleanFront()
q.pushBack(w)
t.idleConnWait[w.key] = q
return false
}
// cancelTimerBody is an io.ReadCloser that wraps rc with two features:
// 1) On Read error or close, the stop func is called.
// 2) On Read failure, if reqDidTimeout is true, the error is wrapped and
// marked as net.Error that hit its timeout.
type cancelTimerBody struct {stop func() // stops the time.Timer waiting to cancel the request
rc io.ReadCloser
reqDidTimeout func() bool}
// dialSerial connects to a list of addresses in sequence, returning
// either the first successful connection, or the first error.
func (sd *sysDialer) dialSerial(ctx context.Context, ras addrList) (Conn, error) {
var firstErr error // The error from the first address is most relevant.
for i, ra := range ras {
select {case <-ctx.Done():
// operation was canceled 谬误是从这里抛出来的
return nil, &OpError{Op: "dial", Net: sd.network, Source: sd.LocalAddr, Addr: ra, Err: mapErr(ctx.Err())}
default:
}
dialCtx := ctx
if deadline, hasDeadline := ctx.Deadline(); hasDeadline {partialDeadline, err := partialDeadline(time.Now(), deadline, len(ras)-i)
if err != nil {
// Ran out of time.
if firstErr == nil {firstErr = &OpError{Op: "dial", Net: sd.network, Source: sd.LocalAddr, Addr: ra, Err: err}
}
break
}
if partialDeadline.Before(deadline) {
var cancel context.CancelFunc
dialCtx, cancel = context.WithDeadline(ctx, partialDeadline)
defer cancel()}
...
// dialSerial 外面的 <-ctx.Done() 是 setRequestCancel() 返回的 stopTimer 所激发的
// setRequestCancel sets req.Cancel and adds a deadline context to req
// if deadline is non-zero. The RoundTripper's type is used to
// determine whether the legacy CancelRequest behavior should be used.
//
// As background, there are three ways to cancel a request:
// First was Transport.CancelRequest. (deprecated)
// Second was Request.Cancel.
// Third was Request.Context.
// This function populates the second and third, and uses the first if it really needs to.
func setRequestCancel(req *Request, rt RoundTripper, deadline time.Time) (stopTimer func(), didTimeout func() bool) {if deadline.IsZero() {return nop, alwaysFalse}
knownTransport := knownRoundTripperImpl(rt, req)
oldCtx := req.Context()
...
}
大抵的流程如下:
总结
dial 报错却没有抛出给 http client 的起因是,这里的 dial 报错并不是连贯的时候报错,而是在连贯过程中发现有异步的其余 conn 曾经解决胜利, 触发了 ctx.Done 而抛出的报错。所以并不会抛出给最外层的 http client。
正文完