共计 16127 个字符,预计需要花费 41 分钟才能阅读完成。
序
本文主要研究一下 httpclient 的 connect timeout 异常
实例代码
@Test
public void testConnectTimeout() throws IOException, InterruptedException {
HttpClient client = HttpClient.newBuilder()
.build();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(“https://twitter.com”))
.build();
long start = System.currentTimeMillis();
try{
HttpResponse<String> result = client.send(request, HttpResponse.BodyHandlers.ofString());
System.out.println(result.body());
}finally {
long cost = System.currentTimeMillis() – start;
System.out.println(“cost:”+cost);
}
}
异常日志如下:
cost:75814
java.net.ConnectException: Operation timed out
at java.net.http/jdk.internal.net.http.HttpClientImpl.send(HttpClientImpl.java:561)
at java.net.http/jdk.internal.net.http.HttpClientFacade.send(HttpClientFacade.java:119)
at com.example.HttpClientTest.testConnectTimeout(HttpClientTest.java:464)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at com.intellij.rt.execution.application.AppMainV2.main(AppMainV2.java:131)
Caused by: java.net.ConnectException: Operation timed out
at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:779)
at java.net.http/jdk.internal.net.http.PlainHttpConnection$ConnectEvent.handle(PlainHttpConnection.java:128)
at java.net.http/jdk.internal.net.http.HttpClientImpl$SelectorManager.handleEvent(HttpClientImpl.java:957)
at java.net.http/jdk.internal.net.http.HttpClientImpl$SelectorManager.lambda$run$3(HttpClientImpl.java:912)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
at java.net.http/jdk.internal.net.http.HttpClientImpl$SelectorManager.run(HttpClientImpl.java:912)
Exchange.responseAsync
java.net.http/jdk/internal/net/http/Exchange.java
public CompletableFuture<Response> responseAsync() {
return responseAsyncImpl(null);
}
CompletableFuture<Response> responseAsyncImpl(HttpConnection connection) {
SecurityException e = checkPermissions();
if (e != null) {
return MinimalFuture.failedFuture(e);
} else {
return responseAsyncImpl0(connection);
}
}
CompletableFuture<Response> responseAsyncImpl0(HttpConnection connection) {
Function<ExchangeImpl<T>, CompletableFuture<Response>> after407Check;
bodyIgnored = null;
if (request.expectContinue()) {
request.addSystemHeader(“Expect”, “100-Continue”);
Log.logTrace(“Sending Expect: 100-Continue”);
// wait for 100-Continue before sending body
after407Check = this::expectContinue;
} else {
// send request body and proceed.
after407Check = this::sendRequestBody;
}
// The ProxyAuthorizationRequired can be triggered either by
// establishExchange (case of HTTP/2 SSL tunneling through HTTP/1.1 proxy
// or by sendHeaderAsync (case of HTTP/1.1 SSL tunneling through HTTP/1.1 proxy
// Therefore we handle it with a call to this checkFor407(…) after these
// two places.
Function<ExchangeImpl<T>, CompletableFuture<Response>> afterExch407Check =
(ex) -> ex.sendHeadersAsync()
.handle((r,t) -> this.checkFor407(r, t, after407Check))
.thenCompose(Function.identity());
return establishExchange(connection)
.handle((r,t) -> this.checkFor407(r,t, afterExch407Check))
.thenCompose(Function.identity());
}
// get/set the exchange impl, solving race condition issues with
// potential concurrent calls to cancel() or cancel(IOException)
private CompletableFuture<? extends ExchangeImpl<T>>
establishExchange(HttpConnection connection) {
if (debug.on()) {
debug.log(“establishing exchange for %s,%n\t proxy=%s”,
request, request.proxy());
}
// check if we have been cancelled first.
Throwable t = getCancelCause();
checkCancelled();
if (t != null) {
return MinimalFuture.failedFuture(t);
}
CompletableFuture<? extends ExchangeImpl<T>> cf, res;
cf = ExchangeImpl.get(this, connection);
// We should probably use a VarHandle to get/set exchangeCF
// instead – as we need CAS semantics.
synchronized (this) {exchangeCF = cf;};
res = cf.whenComplete((r,x) -> {
synchronized(Exchange.this) {
if (exchangeCF == cf) exchangeCF = null;
}
});
checkCancelled();
return res.thenCompose((eimpl) -> {
// recheck for cancelled, in case of race conditions
exchImpl = eimpl;
IOException tt = getCancelCause();
checkCancelled();
if (tt != null) {
return MinimalFuture.failedFuture(tt);
} else {
// Now we’re good to go. Because exchImpl is no longer
// null cancel() will be able to propagate directly to
// the impl after this point (if needed).
return MinimalFuture.completedFuture(eimpl);
} });
}
responseAsync 最后调用 ExchangeImpl.get(this, connection)
ExchangeImpl.get
java.net.http/jdk/internal/net/http/ExchangeImpl.java
/**
* Initiates a new exchange and assigns it to a connection if one exists
* already. connection usually null.
*/
static <U> CompletableFuture<? extends ExchangeImpl<U>>
get(Exchange<U> exchange, HttpConnection connection)
{
if (exchange.version() == HTTP_1_1) {
if (debug.on())
debug.log(“get: HTTP/1.1: new Http1Exchange”);
return createHttp1Exchange(exchange, connection);
} else {
Http2ClientImpl c2 = exchange.client().client2(); // #### improve
HttpRequestImpl request = exchange.request();
CompletableFuture<Http2Connection> c2f = c2.getConnectionFor(request, exchange);
if (debug.on())
debug.log(“get: Trying to get HTTP/2 connection”);
return c2f.handle((h2c, t) -> createExchangeImpl(h2c, t, exchange, connection))
.thenCompose(Function.identity());
}
}
这里调用 Http2ClientImpl.getConnectionFor 获取连接
Http2ClientImpl.getConnectionFor
java.net.http/jdk/internal/net/http/Http2ClientImpl.java
/**
* When HTTP/2 requested only. The following describes the aggregate behavior including the
* calling code. In all cases, the HTTP2 connection cache
* is checked first for a suitable connection and that is returned if available.
* If not, a new connection is opened, except in https case when a previous negotiate failed.
* In that case, we want to continue using http/1.1. When a connection is to be opened and
* if multiple requests are sent in parallel then each will open a new connection.
*
* If negotiation/upgrade succeeds then
* one connection will be put in the cache and the others will be closed
* after the initial request completes (not strictly necessary for h2, only for h2c)
*
* If negotiate/upgrade fails, then any opened connections remain open (as http/1.1)
* and will be used and cached in the http/1 cache. Note, this method handles the
* https failure case only (by completing the CF with an ALPN exception, handled externally)
* The h2c upgrade is handled externally also.
*
* Specific CF behavior of this method.
* 1. completes with ALPN exception: h2 negotiate failed for first time. failure recorded.
* 2. completes with other exception: failure not recorded. Caller must handle
* 3. completes normally with null: no connection in cache for h2c or h2 failed previously
* 4. completes normally with connection: h2 or h2c connection in cache. Use it.
*/
CompletableFuture<Http2Connection> getConnectionFor(HttpRequestImpl req,
Exchange<?> exchange) {
URI uri = req.uri();
InetSocketAddress proxy = req.proxy();
String key = Http2Connection.keyFor(uri, proxy);
synchronized (this) {
Http2Connection connection = connections.get(key);
if (connection != null) {
try {
if (connection.closed || !connection.reserveStream(true)) {
if (debug.on())
debug.log(“removing found closed or closing connection: %s”, connection);
deleteConnection(connection);
} else {
// fast path if connection already exists
if (debug.on())
debug.log(“found connection in the pool: %s”, connection);
return MinimalFuture.completedFuture(connection);
}
} catch (IOException e) {
// thrown by connection.reserveStream()
return MinimalFuture.failedFuture(e);
}
}
if (!req.secure() || failures.contains(key)) {
// secure: negotiate failed before. Use http/1.1
// !secure: no connection available in cache. Attempt upgrade
if (debug.on()) debug.log(“not found in connection pool”);
return MinimalFuture.completedFuture(null);
}
}
return Http2Connection
.createAsync(req, this, exchange)
.whenComplete((conn, t) -> {
synchronized (Http2ClientImpl.this) {
if (conn != null) {
try {
conn.reserveStream(true);
} catch (IOException e) {
throw new UncheckedIOException(e); // shouldn’t happen
}
offerConnection(conn);
} else {
Throwable cause = Utils.getCompletionCause(t);
if (cause instanceof Http2Connection.ALPNException)
failures.add(key);
}
}
});
}
如果没有连接会新创建一个,走的是 Http2Connection.createAsync
在 whenComplete 的时候归还连接
Http2Connection.createAsync
java.net.http/jdk/internal/net/http/Http2Connection.java
// Requires TLS handshake. So, is really async
static CompletableFuture<Http2Connection> createAsync(HttpRequestImpl request,
Http2ClientImpl h2client,
Exchange<?> exchange) {
assert request.secure();
AbstractAsyncSSLConnection connection = (AbstractAsyncSSLConnection)
HttpConnection.getConnection(request.getAddress(),
h2client.client(),
request,
HttpClient.Version.HTTP_2);
// Expose the underlying connection to the exchange’s aborter so it can
// be closed if a timeout occurs.
exchange.connectionAborter.connection(connection);
return connection.connectAsync(exchange)
.thenCompose(unused -> connection.finishConnect())
.thenCompose(unused -> checkSSLConfig(connection))
.thenCompose(notused-> {
CompletableFuture<Http2Connection> cf = new MinimalFuture<>();
try {
Http2Connection hc = new Http2Connection(request, h2client, connection);
cf.complete(hc);
} catch (IOException e) {
cf.completeExceptionally(e);
}
return cf; } );
}
这里先是调用了 HttpConnection.getConnection,从连接池获取连接,然后调用 connectAsync 进行连接
AsyncSSLConnection
java.net.http/jdk/internal/net/http/AsyncSSLConnection.java
@Override
public CompletableFuture<Void> connectAsync(Exchange<?> exchange) {
return plainConnection
.connectAsync(exchange)
.thenApply(unused -> {
// create the SSLTube wrapping the SocketTube, with the given engine
flow = new SSLTube(engine,
client().theExecutor(),
client().getSSLBufferSupplier()::recycle,
plainConnection.getConnectionFlow());
return null; } );
}
这里委托给 plainConnection.connectAsync
PlainHttpConnection.connectAsync
java.net.http/jdk/internal/net/http/PlainHttpConnection.java
@Override
public CompletableFuture<Void> connectAsync(Exchange<?> exchange) {
CompletableFuture<Void> cf = new MinimalFuture<>();
try {
assert !connected : “Already connected”;
assert !chan.isBlocking() : “Unexpected blocking channel”;
boolean finished;
connectTimerEvent = newConnectTimer(exchange, cf);
if (connectTimerEvent != null) {
if (debug.on())
debug.log(“registering connect timer: ” + connectTimerEvent);
client().registerTimer(connectTimerEvent);
}
PrivilegedExceptionAction<Boolean> pa =
() -> chan.connect(Utils.resolveAddress(address));
try {
finished = AccessController.doPrivileged(pa);
} catch (PrivilegedActionException e) {
throw e.getCause();
}
if (finished) {
if (debug.on()) debug.log(“connect finished without blocking”);
cf.complete(null);
} else {
if (debug.on()) debug.log(“registering connect event”);
client().registerEvent(new ConnectEvent(cf));
}
} catch (Throwable throwable) {
cf.completeExceptionally(Utils.toConnectException(throwable));
try {
close();
} catch (Exception x) {
if (debug.on())
debug.log(“Failed to close channel after unsuccessful connect”);
}
}
return cf;
}
这里如果 client 有设置 connectTimeout 的话,则会创建一个 connectTimerEvent
调用 chan.connect 进行连接,如果连接未完成,则注册 ConnectEvent
SocketChannelImpl.connect
java.base/sun/nio/ch/SocketChannelImpl.java
@Override
public boolean connect(SocketAddress sa) throws IOException {
InetSocketAddress isa = Net.checkAddress(sa);
SecurityManager sm = System.getSecurityManager();
if (sm != null)
sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
InetAddress ia = isa.getAddress();
if (ia.isAnyLocalAddress())
ia = InetAddress.getLocalHost();
try {
readLock.lock();
try {
writeLock.lock();
try {
int n = 0;
boolean blocking = isBlocking();
try {
beginConnect(blocking, isa);
do {
n = Net.connect(fd, ia, isa.getPort());
} while (n == IOStatus.INTERRUPTED && isOpen());
} finally {
endConnect(blocking, (n > 0));
}
assert IOStatus.check(n);
return n > 0;
} finally {
writeLock.unlock();
}
} finally {
readLock.unlock();
}
} catch (IOException ioe) {
// connect failed, close the channel
close();
throw SocketExceptions.of(ioe, isa);
}
}
通过 Net.connect 调用本地方法进行连接
ConnectEvent
java.net.http/jdk/internal/net/http/PlainHttpConnection.java
final class ConnectEvent extends AsyncEvent {
private final CompletableFuture<Void> cf;
ConnectEvent(CompletableFuture<Void> cf) {
this.cf = cf;
}
@Override
public SelectableChannel channel() {
return chan;
}
@Override
public int interestOps() {
return SelectionKey.OP_CONNECT;
}
@Override
public void handle() {
try {
assert !connected : “Already connected”;
assert !chan.isBlocking() : “Unexpected blocking channel”;
if (debug.on())
debug.log(“ConnectEvent: finishing connect”);
boolean finished = chan.finishConnect();
assert finished : “Expected channel to be connected”;
if (debug.on())
debug.log(“ConnectEvent: connect finished: %s Local addr: %s”,
finished, chan.getLocalAddress());
// complete async since the event runs on the SelectorManager thread
cf.completeAsync(() -> null, client().theExecutor());
} catch (Throwable e) {
Throwable t = Utils.toConnectException(e);
client().theExecutor().execute(() -> cf.completeExceptionally(t));
close();
}
}
@Override
public void abort(IOException ioe) {
client().theExecutor().execute(() -> cf.completeExceptionally(ioe));
close();
}
}
SelectorManager 对准备好的事件触发 handle 操作,对于 ConnectEvent,就是调用 ConnectEvent.handle
ConnectEvent 的 handle 方法执行 chan.finishConnect(),如果捕获到异常,则调用 cf.completeExceptionally(t)
SocketChannelImpl.finishConnect
java.base/sun/nio/ch/SocketChannelImpl.java
@Override
public boolean finishConnect() throws IOException {
try {
readLock.lock();
try {
writeLock.lock();
try {
// no-op if already connected
if (isConnected())
return true;
boolean blocking = isBlocking();
boolean connected = false;
try {
beginFinishConnect(blocking);
int n = 0;
if (blocking) {
do {
n = checkConnect(fd, true);
} while ((n == 0 || n == IOStatus.INTERRUPTED) && isOpen());
} else {
n = checkConnect(fd, false);
}
connected = (n > 0);
} finally {
endFinishConnect(blocking, connected);
}
assert (blocking && connected) ^ !blocking;
return connected;
} finally {
writeLock.unlock();
}
} finally {
readLock.unlock();
}
} catch (IOException ioe) {
// connect failed, close the channel
close();
throw SocketExceptions.of(ioe, remoteAddress);
}
}
checkConnect 是一个本地方法,如果是连接超时,则抛出 java.net.ConnectException: Operation timed out
tcp 连接 syn 超时 (net.ipv4.tcp_syn_retries)
当 client 端与 server 端建立连接,client 发出 syn 包,如果等待一定时间没有收到 server 端发来的 SYN+ACK,则会进行重试,重试次数由具体由 net.ipv4.tcp_syn_retries 决定
/ # sysctl -a | grep tcp_syn_retries
sysctl: error reading key ‘net.ipv6.conf.all.stable_secret’: I/O error
net.ipv4.tcp_syn_retries = 6
sysctl: error reading key ‘net.ipv6.conf.default.stable_secret’: I/O error
sysctl: error reading key ‘net.ipv6.conf.eth0.stable_secret’: I/O error
sysctl: error reading key ‘net.ipv6.conf.lo.stable_secret’: I/O error
linux 默认是 6 次,第一次发送等待 2^0 秒没收到回包则重试第一次,之后等待 2^1,以此类推,第六次重试等待 2^6 秒,因此一共是 1s+2s+4s+8s+16s+32s+64s=127s,因而在 linux 平台下,如果 httpclient 没有设置 connect timeout,则依赖系统 tcp 的 syn 超时,即 127s 之后超时,java 的本地调用抛出 java.net.ConnectException: Operation timed out
如果是 mac 系统,根据 Overriding the default Linux kernel 20-second TCP socket connect timeout 的描述,超时是 75s,与本实例代码输出的 75814ms 近似一致。
小结
使用 jdk httpclient 进行连接,如果没有设置 client 的 connectTimeout,则具体的超时时间依赖系统的 tcp 相关设置
如果 client 端 sync 发送超时,则依赖 tcp_syn_retries 的配置来决定本地方法抛出 java.net.ConnectException: Operation timed out 异常的时间
linux 下默认 tcp_syn_retries 默认为 6,即重试 6 次,一共需要 1s+2s+4s+8s+16s+32s+64s=127s,若再没有收到 server 端发来的 SYN+ACK 则抛出 java.net.ConnectException: Operation timed out 异常
doc
TCP 协议的那些超时
Linux 建立 TCP 连接的超时时间分析
Overriding the default Linux kernel 20-second TCP socket connect timeout
设置 linux 中 tcp 默认的 20 秒 connect 超时时间
SYN 丢包的几个例子