乐趣区

聊聊jdk httpclient的ConnectionPool


本文主要研究一下 jdk httpclient 的 ConnectionPool
HttpConnection
HttpConnection.getConnection
java.net.http/jdk/internal/net/http/HttpConnection.java
/**
* Factory for retrieving HttpConnections. A connection can be retrieved
* from the connection pool, or a new one created if none available.
*
* The given {@code addr} is the ultimate destination. Any proxies,
* etc, are determined from the request. Returns a concrete instance which
* is one of the following:
* {@link PlainHttpConnection}
* {@link PlainTunnelingConnection}
*
* The returned connection, if not from the connection pool, must have its,
* connect() or connectAsync() method invoked, which (when it completes
* successfully ) renders the connection usable for requests.
*/
public static HttpConnection getConnection(InetSocketAddress addr,
HttpClientImpl client,
HttpRequestImpl request,
Version version) {
// The default proxy selector may select a proxy whose address is
// unresolved. We must resolve the address before connecting to it.
InetSocketAddress proxy = Utils.resolveAddress(request.proxy());
HttpConnection c = null;
boolean secure = request.secure();
ConnectionPool pool = client.connectionPool();

if (!secure) {
c = pool.getConnection(false, addr, proxy);
if (c != null && c.isOpen() /* may have been eof/closed when in the pool */) {
final HttpConnection conn = c;
if (DEBUG_LOGGER.on())
DEBUG_LOGGER.log(conn.getConnectionFlow()
+ “: plain connection retrieved from HTTP/1.1 pool”);
return c;
} else {
return getPlainConnection(addr, proxy, request, client);
}
} else {// secure
if (version != HTTP_2) {// only HTTP/1.1 connections are in the pool
c = pool.getConnection(true, addr, proxy);
}
if (c != null && c.isOpen()) {
final HttpConnection conn = c;
if (DEBUG_LOGGER.on())
DEBUG_LOGGER.log(conn.getConnectionFlow()
+ “: SSL connection retrieved from HTTP/1.1 pool”);
return c;
} else {
String[] alpn = null;
if (version == HTTP_2 && hasRequiredHTTP2TLSVersion(client)) {
alpn = new String[] { “h2”, “http/1.1”};
}
return getSSLConnection(addr, proxy, alpn, request, client);
}
}
}
这里非 https、https1.1 的,走 pool.getConnection(true, addr, proxy)
HttpConnection.closeOrReturnToCache
java.net.http/jdk/internal/net/http/HttpConnection.java
void closeOrReturnToCache(HttpHeaders hdrs) {
if (hdrs == null) {
// the connection was closed by server, eof
close();
return;
}
if (!isOpen()) {
return;
}
HttpClientImpl client = client();
if (client == null) {
close();
return;
}
ConnectionPool pool = client.connectionPool();
boolean keepAlive = hdrs.firstValue(“Connection”)
.map((s) -> !s.equalsIgnoreCase(“close”))
.orElse(true);

if (keepAlive) {
Log.logTrace(“Returning connection to the pool: {0}”, this);
pool.returnToPool(this);
} else {
close();
}
}
调用 pool.returnToPool(this)归还连接
ConnectionPool
java.net.http/jdk/internal/net/http/ConnectionPool.java
/**
* Http 1.1 connection pool.
*/
final class ConnectionPool {

static final long KEEP_ALIVE = Utils.getIntegerNetProperty(
“jdk.httpclient.keepalive.timeout”, 1200); // seconds
static final long MAX_POOL_SIZE = Utils.getIntegerNetProperty(
“jdk.httpclient.connectionPoolSize”, 0); // unbounded
final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);

// Pools of idle connections

private final HashMap<CacheKey,LinkedList<HttpConnection>> plainPool;
private final HashMap<CacheKey,LinkedList<HttpConnection>> sslPool;
private final ExpiryList expiryList;
private final String dbgTag; // used for debug
boolean stopped;

//……
/**
* Entries in connection pool are keyed by destination address and/or
* proxy address:
* case 1: plain TCP not via proxy (destination only)
* case 2: plain TCP via proxy (proxy only)
* case 3: SSL not via proxy (destination only)
* case 4: SSL over tunnel (destination and proxy)
*/
static class CacheKey {
final InetSocketAddress proxy;
final InetSocketAddress destination;

CacheKey(InetSocketAddress destination, InetSocketAddress proxy) {
this.proxy = proxy;
this.destination = destination;
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final CacheKey other = (CacheKey) obj;
if (!Objects.equals(this.proxy, other.proxy)) {
return false;
}
if (!Objects.equals(this.destination, other.destination)) {
return false;
}
return true;
}

@Override
public int hashCode() {
return Objects.hash(proxy, destination);
}
}

synchronized HttpConnection getConnection(boolean secure,
InetSocketAddress addr,
InetSocketAddress proxy) {
if (stopped) return null;
CacheKey key = new CacheKey(addr, proxy);
HttpConnection c = secure ? findConnection(key, sslPool)
: findConnection(key, plainPool);
//System.out.println (“getConnection returning: ” + c);
return c;
}

private HttpConnection
findConnection(CacheKey key,
HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
LinkedList<HttpConnection> l = pool.get(key);
if (l == null || l.isEmpty()) {
return null;
} else {
HttpConnection c = l.removeFirst();
expiryList.remove(c);
return c;
}
}
/**
* Returns the connection to the pool.
*/
void returnToPool(HttpConnection conn) {
returnToPool(conn, Instant.now(), KEEP_ALIVE);
}

// Called also by whitebox tests
void returnToPool(HttpConnection conn, Instant now, long keepAlive) {

// Don’t call registerCleanupTrigger while holding a lock,
// but register it before the connection is added to the pool,
// since we don’t want to trigger the cleanup if the connection
// is not in the pool.
CleanupTrigger cleanup = registerCleanupTrigger(conn);

// it’s possible that cleanup may have been called.
HttpConnection toClose = null;
synchronized(this) {
if (cleanup.isDone()) {
return;
} else if (stopped) {
conn.close();
return;
}
if (MAX_POOL_SIZE > 0 && expiryList.size() >= MAX_POOL_SIZE) {
toClose = expiryList.removeOldest();
if (toClose != null) removeFromPool(toClose);
}
if (conn instanceof PlainHttpConnection) {
putConnection(conn, plainPool);
} else {
assert conn.isSecure();
putConnection(conn, sslPool);
}
expiryList.add(conn, now, keepAlive);
}
if (toClose != null) {
if (debug.on()) {
debug.log(“Maximum pool size reached: removing oldest connection %s”,
toClose.dbgString());
}
close(toClose);
}
//System.out.println(“Return to pool: ” + conn);
}

private void removeFromPool(HttpConnection c) {
assert Thread.holdsLock(this);
if (c instanceof PlainHttpConnection) {
removeFromPool(c, plainPool);
} else {
assert c.isSecure();
removeFromPool(c, sslPool);
}
}

private boolean
removeFromPool(HttpConnection c,
HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
//System.out.println(“cacheCleaner removing: ” + c);
assert Thread.holdsLock(this);
CacheKey k = c.cacheKey();
List<HttpConnection> l = pool.get(k);
if (l == null || l.isEmpty()) {
pool.remove(k);
return false;
}
return l.remove(c);
}

private void
putConnection(HttpConnection c,
HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
CacheKey key = c.cacheKey();
LinkedList<HttpConnection> l = pool.get(key);
if (l == null) {
l = new LinkedList<>();
pool.put(key, l);
}
l.add(c);
}

void stop() {
List<HttpConnection> closelist = Collections.emptyList();
try {
synchronized (this) {
stopped = true;
closelist = expiryList.stream()
.map(e -> e.connection)
.collect(Collectors.toList());
expiryList.clear();
plainPool.clear();
sslPool.clear();
}
} finally {
closelist.forEach(this::close);
}
}
}

借用连接调用 getConnection 方法,最后是调用 findConnection 方法,从 LinkedList<HttpConnection>> 移除掉第一个,再从 expiryList 移除掉该连接
归还连接调用 returnToPool 方法,如果当前 expiryList 超出 MAX_POOL_SIZE,则移除掉最老的一个,再将其从 ExpiryList、HashMap<CacheKey,LinkedList<HttpConnection>> 移除并且 close 掉;之后调用 putConnection 往 HashMap<CacheKey,LinkedList<HttpConnection>> 添加该连接,最后再往 expiryList 添加该连接
可以看见 ConnectionPool 维护了 HashMap<CacheKey,LinkedList<HttpConnection>> 以及 ExpiryList 两个重要的属性,借用时从这两个地方移除,归还时往这两个地方添加;不一样的是归还时如果 MAX_POOL_SIZE 大于 0,则会对 expiryList 大小进行判断,超过最大值则移除最老的连接,并将其从这两个地方移除掉
MAX_POOL_SIZE 读取的是 jdk.httpclient.connectionPoolSize,读取不到默认为 0,表示无限
ConnectionPool 有个 stop 方法,在 HttpClient 的 stop 时候调用(SelectorManager 线程退出时触发),stop 方法会清除连接池并关闭连接

ExpiryList
java.net.http/jdk/internal/net/http/ConnectionPool.java
/**
* Manages a LinkedList of sorted ExpiryEntry. The entry with the closer
* deadline is at the tail of the list, and the entry with the farther
* deadline is at the head. In the most common situation, new elements
* will need to be added at the head (or close to it), and expired elements
* will need to be purged from the tail.
*/
private static final class ExpiryList {
private final LinkedList<ExpiryEntry> list = new LinkedList<>();
private volatile boolean mayContainEntries;

int size() { return list.size(); }

// A loosely accurate boolean whose value is computed
// at the end of each operation performed on ExpiryList;
// Does not require synchronizing on the ConnectionPool.
boolean purgeMaybeRequired() {
return mayContainEntries;
}

// Returns the next expiry deadline
// should only be called while holding a synchronization
// lock on the ConnectionPool
Optional<Instant> nextExpiryDeadline() {
if (list.isEmpty()) return Optional.empty();
else return Optional.of(list.getLast().expiry);
}

// should only be called while holding a synchronization
// lock on the ConnectionPool
HttpConnection removeOldest() {
ExpiryEntry entry = list.pollLast();
return entry == null ? null : entry.connection;
}

// should only be called while holding a synchronization
// lock on the ConnectionPool
void add(HttpConnection conn) {
add(conn, Instant.now(), KEEP_ALIVE);
}

// Used by whitebox test.
void add(HttpConnection conn, Instant now, long keepAlive) {
Instant then = now.truncatedTo(ChronoUnit.SECONDS)
.plus(keepAlive, ChronoUnit.SECONDS);

// Elements with the farther deadline are at the head of
// the list. It’s more likely that the new element will
// have the farthest deadline, and will need to be inserted
// at the head of the list, so we’re using an ascending
// list iterator to find the right insertion point.
ListIterator<ExpiryEntry> li = list.listIterator();
while (li.hasNext()) {
ExpiryEntry entry = li.next();

if (then.isAfter(entry.expiry)) {
li.previous();
// insert here
li.add(new ExpiryEntry(conn, then));
mayContainEntries = true;
return;
}
}
// last (or first) element of list (the last element is
// the first when the list is empty)
list.add(new ExpiryEntry(conn, then));
mayContainEntries = true;
}

// should only be called while holding a synchronization
// lock on the ConnectionPool
void remove(HttpConnection c) {
if (c == null || list.isEmpty()) return;
ListIterator<ExpiryEntry> li = list.listIterator();
while (li.hasNext()) {
ExpiryEntry e = li.next();
if (e.connection.equals(c)) {
li.remove();
mayContainEntries = !list.isEmpty();
return;
}
}
}

// should only be called while holding a synchronization
// lock on the ConnectionPool.
// Purge all elements whose deadline is before now (now included).
List<HttpConnection> purgeUntil(Instant now) {
if (list.isEmpty()) return Collections.emptyList();

List<HttpConnection> closelist = new ArrayList<>();

// elements with the closest deadlines are at the tail
// of the queue, so we’re going to use a descending iterator
// to remove them, and stop when we find the first element
// that has not expired yet.
Iterator<ExpiryEntry> li = list.descendingIterator();
while (li.hasNext()) {
ExpiryEntry entry = li.next();
// use !isAfter instead of isBefore in order to
// remove the entry if its expiry == now
if (!entry.expiry.isAfter(now)) {
li.remove();
HttpConnection c = entry.connection;
closelist.add(c);
} else break; // the list is sorted
}
mayContainEntries = !list.isEmpty();
return closelist;
}

// should only be called while holding a synchronization
// lock on the ConnectionPool
java.util.stream.Stream<ExpiryEntry> stream() {
return list.stream();
}

// should only be called while holding a synchronization
// lock on the ConnectionPool
void clear() {
list.clear();
mayContainEntries = false;
}
}

static final class ExpiryEntry {
final HttpConnection connection;
final Instant expiry; // absolute time in seconds of expiry time
ExpiryEntry(HttpConnection connection, Instant expiry) {
this.connection = connection;
this.expiry = expiry;
}
}

ExpiryList 内部使用了 LinkedList<ExpiryEntry>,而且使用 ExpiryEntry 对 connection 进行包装
ExpiryEntry 里头除了 HttpConnection,还维护了 expiry 时间,表示该连接的失效时间
对 ExpiryList 的添加操作是根据当前时间的秒数 +KEEP_ALIVE 参数计算出 expiry 时间,KEEP_ALIVE 读取的是 jdk.httpclient.keepalive.timeout,读取不到默认是 1200 秒;之后根据失效时间插入到 LinkedList<ExpiryEntry>,失效时间长的在 list 头部,快失效的在 list 尾部
对 ExpiryList 的移除操作有两类,一类是移除最老的,通过 pollLast 操作完成,一类是移除指定连接,即使用 ListIterator 遍历 LinkedList<ExpiryEntry> 进行匹配再移除
这里维护了 mayContainEntries 变量,在 LinkedList<ExpiryEntry> 进行操作时更新,用于返回 ExpiryList 是否有连接,避免需要时同步调用 ConnectionPool 来计算

ConnectionPool.purgeExpiredConnectionsAndReturnNextDeadline
java.net.http/jdk/internal/net/http/ConnectionPool.java
/**
* Purge expired connection and return the number of milliseconds
* in which the next connection is scheduled to expire.
* If no connections are scheduled to be purged return 0.
* @return the delay in milliseconds in which the next connection will
* expire.
*/
long purgeExpiredConnectionsAndReturnNextDeadline() {
if (!expiryList.purgeMaybeRequired()) return 0;
return purgeExpiredConnectionsAndReturnNextDeadline(Instant.now());
}

// Used for whitebox testing
long purgeExpiredConnectionsAndReturnNextDeadline(Instant now) {
long nextPurge = 0;

// We may be in the process of adding new elements
// to the expiry list – but those elements will not
// have outlast their keep alive timer yet since we’re
// just adding them.
if (!expiryList.purgeMaybeRequired()) return nextPurge;

List<HttpConnection> closelist;
synchronized (this) {
closelist = expiryList.purgeUntil(now);
for (HttpConnection c : closelist) {
if (c instanceof PlainHttpConnection) {
boolean wasPresent = removeFromPool(c, plainPool);
assert wasPresent;
} else {
boolean wasPresent = removeFromPool(c, sslPool);
assert wasPresent;
}
}
nextPurge = now.until(
expiryList.nextExpiryDeadline().orElse(now),
ChronoUnit.MILLIS);
}
closelist.forEach(this::close);
return nextPurge;
}

由于 ExpiryList 的 connection 具有失效时间,因而存在清理失效连接的步骤,这个步骤是通过 purgeExpiredConnectionsAndReturnNextDeadline 来完成
purgeExpiredConnectionsAndReturnNextDeadline 方法被 SelectorManager 调用,用于计算 selector.select 的 timeout 时间
该方法首先调用 expiryList.purgeMaybeRequired()访问 mayContainEntries,看 expiryList 有无连接,没有连接直接返回 0;之后调用 expiryList.purgeUntil(now)移除并获取目前过期的连接,然后挨个从 HashMap<CacheKey,LinkedList<HttpConnection>> 移除并计算 nextPurge,最后挨个 close 掉移除的连接

CleanupTrigger
java.net.http/jdk/internal/net/http/ConnectionPool.java

private CleanupTrigger registerCleanupTrigger(HttpConnection conn) {
// Connect the connection flow to a pub/sub pair that will take the
// connection out of the pool and close it if anything happens
// while the connection is sitting in the pool.
CleanupTrigger cleanup = new CleanupTrigger(conn);
FlowTube flow = conn.getConnectionFlow();
if (debug.on()) debug.log(“registering %s”, cleanup);
flow.connectFlows(cleanup, cleanup);
return cleanup;
}

void cleanup(HttpConnection c, Throwable error) {
if (debug.on())
debug.log(“%s : ConnectionPool.cleanup(%s)”,
String.valueOf(c.getConnectionFlow()), error);
synchronized(this) {
removeFromPool(c);
expiryList.remove(c);
}
c.close();
}

/**
* An object that subscribes to the flow while the connection is in
* the pool. Anything that comes in will cause the connection to be closed
* and removed from the pool.
*/
private final class CleanupTrigger implements
FlowTube.TubeSubscriber, FlowTube.TubePublisher,
Flow.Subscription {

private final HttpConnection connection;
private volatile boolean done;

public CleanupTrigger(HttpConnection connection) {
this.connection = connection;
}

public boolean isDone() { return done;}

private void triggerCleanup(Throwable error) {
done = true;
cleanup(connection, error);
}

@Override public void request(long n) {}
@Override public void cancel() {}

@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(1);
}
@Override
public void onError(Throwable error) {triggerCleanup(error); }
@Override
public void onComplete() { triggerCleanup(null); }
@Override
public void onNext(List<ByteBuffer> item) {
triggerCleanup(new IOException(“Data received while in pool”));
}

@Override
public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
subscriber.onSubscribe(this);
}

@Override
public String toString() {
return “CleanupTrigger(” + connection.getConnectionFlow() + “)”;
}
}

在调用 returnToPool 的时候,会调用 registerCleanupTrigger,创建一个 CleanupTrigger,然后调用 conn.getConnectionFlow()获取 flow,再调用 flow.connectFlows(cleanup, cleanup)
CleanupTrigger 既是 FlowTube.TubeSubscriber 也是 FlowTube.TubePublisher,在 onComplete 及 onError 方法里头调用了 cleanup 方法,将连接从 HashMap<CacheKey,LinkedList<HttpConnection>> 及 expiryList 移除
这个 CleanupTrigger 的功能可能类似于主动式的连接健康检查,在底层连接发生异常关闭的时候,通知到连接池这边,触发清理这些脏的连接

小结
jdk httpclient 的 ConnectionPool 相对于 apache common pools 而言比较简单,有几个参数(实际作用于 ExpiryList):
MAX_POOL_SIZE(jdk.httpclient.connectionPoolSize),默认为 0,表示无限 KEEP_ALIVE(jdk.httpclient.keepalive.timeout),默认是 1200 秒

ConnectionPool 同时维护了两个属性:HashMap<CacheKey,LinkedList<HttpConnection>> 及 expiryList,前者使用目标 ip 地址及代理地址作为 CacheKey,每个地址维护一个连接池;后者不分 cacheKey,对每个在连接池中的 connection 进行包装,根据 KEEP_ALIVE 记录了失效时间。
SelectorManager 调用 purgeExpiredConnectionsAndReturnNextDeadline 计算 select 的 timeout 时间,这个方法会清理 (移除并 close) 过期的连接
除了 SelectorManager 清理过期的连接外,connection 还通过 FlowTube 间接触发 CleanupTrigger,去清理关闭或异常的连接

doc
java.net.http javadoc

退出移动版