共计 8345 个字符,预计需要花费 21 分钟才能阅读完成。
获取数据库连贯是通过 DataSource 发动的,如果利用应用 HikariPool 作为连接池的话,须要配置 DataSource 为 HikariDataSource,利用通过调用 HikariDataSource 的 getConnection 办法获取数据库连贯。
敞开数据库连贯是间接调用获取到的数据库连贯对象(Connection 对象)的 close 办法实现的。
明天要钻研的课题:获取及敞开数据库连贯,获取数据库连贯其实就是从 HikariDataSource 的 getConnection 办法动手,直到 getConnection 办法获取到数据库连贯对象。而后再钻研数据库连贯对象的 close 办法。
获取数据库连贯
间接看 HikariDataSource 的 getConnection 办法:
public Connection getConnection() throws SQLException
{if (isClosed()) {throw new SQLException("HikariDataSource" + this + "has been closed.");
}
if (fastPathPool != null) {return fastPathPool.getConnection();
}
// See http://en.wikipedia.org/wiki/Double-checked_locking#Usage_in_Java
HikariPool result = pool;
if (result == null) {synchronized (this) {
result = pool;
if (result == null) {validate();
LOGGER.info("{} - Starting...", getPoolName());
try {pool = result = new HikariPool(this);
this.seal();}
catch (PoolInitializationException pie) {if (pie.getCause() instanceof SQLException) {throw (SQLException) pie.getCause();}
else {throw pie;}
}
LOGGER.info("{} - Start completed.", getPoolName());
}
}
}
fastPathPool 在 HikariDataSource 实例化的时候被初始化为 HikariPool 对象。如果 fastPathPool 在 getConnection 办法调用的时候尚未初始化,就在 getConnection 办法内实现初始化。之后再调用 HikariPool 对象的 getConnection 办法。
HikariPool#getConnection 办法
间接看代码:
public Connection getConnection() throws SQLException
{return getConnection(connectionTimeout);
}
间接调用了 getConnection(connectionTimeout) 办法:
public Connection getConnection(final long hardTimeout) throws SQLException
{suspendResumeLock.acquire();
final long startTime = currentTime();
try {
long timeout = hardTimeout;
do {PoolEntry poolEntry = connectionBag.borrow(timeout, MILLISECONDS);
if (poolEntry == null) {break; // We timed out... break and throw exception}
final long now = currentTime();
if (poolEntry.isMarkedEvicted() || (elapsedMillis(poolEntry.lastAccessed, now) > aliveBypassWindowMs && !isConnectionAlive(poolEntry.connection))) {closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE);
timeout = hardTimeout - elapsedMillis(startTime);
}
else {metricsTracker.recordBorrowStats(poolEntry, startTime);
return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now);
}
} while (timeout > 0L);
metricsTracker.recordBorrowTimeoutStats(startTime);
throw createTimeoutException(startTime);
}
catch (InterruptedException e) {Thread.currentThread().interrupt();
throw new SQLException(poolName + "- Interrupted during connection acquisition", e);
}
finally {suspendResumeLock.release();
}
}
首先调用 suspendResumeLock.acquire(); 办法,suspendResumeLock 的次要作用是限流,初始化的时候设置最大并发线程数为 MAX_PERMITS = 10000,如果同时获取连贯的线程未超过限度数量的话,容许获取连贯,否则,挂起、排队(外部通过 Semaphore 类实现,Semaphore 是 AQS 的扩大实现)期待其余线程获取到连贯、开释 suspendResumeLock 之后,能力持续获取到数据库连贯。
而后调用 connectionBag 的 borrow 办法(稍后剖析源码),从数据库连接池中获取连贯。
如果没能从数据库连接池获取到连贯的话,抛异样。
查看从连接池获取到的连贯,如果连贯被标记为 Evicted(该连贯因为超时或其余起因应该被回收),或者最初一次拜访工夫到以后工夫之间的时间差超过了 aliveBypassWindowMs 的设置并且尝试后发现该连贯曾经生效(连不上了),则敞开该连贯,timeout 尚未超时的话从新调用 connectionBag 的 borrow 办法获取连贯,直到超时或获取到可用的连贯。
如果连贯可用,则调用 metricsTracker.recordBorrowStats,之后返回:
return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now);
留神最终返回的并不是 java.sql.Connection 对象,而是通过 poolEnry.createProxyConnection 办法创立的代理对象!
最终通过 JavassistProxyFactory 创立代理对象 HikariProxyConnection。
返回代理数据库连贯对象的同时,创立 leakTask。leakTask 对象的作用是:如果 leakDetectionThreshold 不为 0 的话,创立一个 leakTask 工作,在 leakDetectionThreshold 时长之后执行该工作抛出数据库连贯泄露异样。该工作不被执行(不产生数据库泄露异样)的惟一条件就是在 leakDetectionThreshold 时长范畴内,数据库连贯胜利执行完 SQL 语句后通过调用代理 Connection 对象的 close 办法勾销该工作。
OK,getConnection 办法源码剖析实现了,目前为止,咱们晓得:
- 通过 connectionBag.borrow 办法从连接池获取连贯数据库连贯的封装对象 poolEntry
- 最终获取到的数据库连贯是代理对象 HikariProxyConnection
上面咱们首先剖析一下 connectionBag.borrow 办法,borrow 办法才是从连接池获取连贯的要害。
connectionBag#borrow 办法
上代码,分段剖析,先看第一局部:
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
{
// Try the thread-local list first
final List<Object> list = threadList.get();
for (int i = list.size() - 1; i >= 0; i--) {final Object entry = list.remove(i);
@SuppressWarnings("unchecked")
final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {return bagEntry;}
}
首先从 threadList 中获取,如果能获取到闲暇的数据库连贯、并且能胜利通过 CAS 操作批改连贯状态(从 STATE_NOT_IN_USE 批改为 STATE_IN_USE)的话,间接返回。
threadList 是 ThreadLocal 对象,以后线程如果曾经获取过连贯,在应用完归还给连接池的过程中,该连贯会写入 threadList。如果以后线程再次获取连贯的话,就能够从 threadList 中获取,必定是性能最好的获取形式。
接下来,如果 threadList 中没有获取到:
final int waiting = waiters.incrementAndGet();
try {for (T bagEntry : sharedList) {if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
// If we may have stolen another waiter's connection, request another bag add.
if (waiting > 1) {listener.addBagItem(waiting - 1);
}
return bagEntry;
}
}
获取到期待获取连贯的线程数 waiters,加 1。
遍历 sharedList,如果 sharedList 中有闲暇连贯,并且可能胜利批改状态的话,阐明获取胜利,间接返回。
从 sharedList 获取连贯胜利的话,还要检查一下以后是否还有其余期待获取连贯的线程(waiter>1), 如果有的话阐明连接池中的连贯不太够用,则调用 listener.addBagItem(waiting – 1) 减少连贯。listerner 就是 HikariPool 对象,调用的其实就是 HikariPool 的 addBagItem 办法。
addBagItem 办法的代码非常简单,就不贴出了,逻辑也很简略:期待获取连贯的线程数如果大于等于期待退出连接池的连接数的话,就通过 addConnectionExecutor 减少连贯到连接池中,否则,不须要减少,只须要稍等一下,addConnectionExecutor 就会把期待队列中的连贯退出池中。
持续看源码,如果从 sharedList 中也没有获取到连贯的话:
listener.addBagItem(waiting);
timeout = timeUnit.toNanos(timeout);
do {final long start = currentTime();
final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {return bagEntry;}
timeout -= elapsedNanos(start);
} while (timeout > 10_000);
return null;
}
finally {waiters.decrementAndGet();
}
}
调用 listener.addBagItem(waiting),办法逻辑曾经剖析过了。
从 handoffQueue 获取连贯,如果获取到的连贯状不是闲暇状态(其余线程姗姗来迟了),在 timeout 工夫范畴内持续循环从 handoffQueue 获取。
如果 handoffQueue 中没有获取到(此时必定期待超时了),则世界返回 null。
否则,如果获取到了闲暇连贯,并且可能胜利批改连贯状态为 STATE_IN_USE 的话,返回该连贯。
最终,不论获取胜利与否,waiters 减 1。
borrow 办法剖析实现!咱们晓得从连接池获取连贯的根本逻辑是:
- 首先从 ThreadLocal 变量 threadList 中获取
- 如果没有获取到的话,从 sharedList 中获取
- 还是没有获取到的话,最初才从 handoffQueue 中获取
源码曾经剖析的很分明了,逻辑也解释的清晰明了。然而为什么要这么做?
首先从 threadList 中获取连贯的逻辑比拟容易了解,如果以后线程曾经获取过连贯,用完之后存入 threadList 中,再次获取的话间接返回 threadList 中的连贯,效率必定是最高的,性能最优。
然而从 sharedList 中获取不到的话,为什么还要再从 handoffQueue 中获取?什么状况下会产生从 sharedList 中获取不到、从 handoffQueue 中能获取到?
仔细分析一下 sharedList 以及 handoffQueue 的构造、以及连贯退出连接池的逻辑,就能明确这么做的起因了。
连贯退出连接池的过程为:首先退出 sharedList,之后再退出 handoffQueue。
假如获取连贯的线程读取 sharedList 的时候,sharedList 没有闲暇的连贯,此时假如有一个新创建的连贯正好刚刚退出到 sharedList 中。这种状况下获取连贯的线程就从 sharedList 中完满错过了新退出的连贯。
而后新退出的连贯会退出 handoffQueue,handoffQueue 是手递手队列,退出连贯的线程会期待获取连贯的线程从队列中拿走该连贯,两个人一拍即合,相互满足需要,实现“手递手”工作。
这个场景下,HikariPool 的 handoffQueue 的设计肯定是性能最好的,必定回比重复遍历 sharedList 的性能好很多。
从连接池中获取连贯的过程,源码级剖析实现!
敞开连贯 HikariProxyConnection#close 办法
开篇曾经说过了,敞开连贯其实就是调用代理连贯对象 HikariProxyConnection 的 close 办法。close 办法是在他的父类、抽象类 ProxyConnection 中定义的:
public final void close() throws SQLException
{
// Closing statements can cause connection eviction, so this must run before the conditional below
closeStatements();
if (delegate != ClosedConnection.CLOSED_CONNECTION) {leakTask.cancel();
try {if (isCommitStateDirty && !isAutoCommit) {delegate.rollback();
lastAccess = currentTime();
LOGGER.debug("{} - Executed rollback on connection {} due to dirty commit state on close().", poolEntry.getPoolName(), delegate);
}
if (dirtyBits != 0) {poolEntry.resetConnectionState(this, dirtyBits);
lastAccess = currentTime();}
delegate.clearWarnings();}
catch (SQLException e) {
// when connections are aborted, exceptions are often thrown that should not reach the application
if (!poolEntry.isMarkedEvicted()) {throw checkException(e);
}
}
finally {
delegate = ClosedConnection.CLOSED_CONNECTION;
poolEntry.recycle(lastAccess);
}
}
}
首先调用 closeStatements 办法,敞开该连贯关上的 Statement。
而后,调用 leakTask.cancel(); 勾销连贯泄露查看工作。
之后,调用 poolEntry.resetConnectionState 重置连贯属性,筹备偿还到连接池。
最初,调用 poolEntry.recycle(lastAccess); 办法名阐明所有:回收连贯。
poolEntry#recycle(lastAccess)
代码很简略:
void recycle(final long lastAccessed)
{if (connection != null) {
this.lastAccessed = lastAccessed;
hikariPool.recycle(this);
}
}
记录 lastAccessed 后,调用 hikariPool.recycle(this)。
hikariPool#recycle
重点来了,调用了 connectionBag 的 requite 办法。
@Override
void recycle(final PoolEntry poolEntry)
{metricsTracker.recordConnectionUsage(poolEntry);
connectionBag.requite(poolEntry);
}
connectionBag#requite
requite 负责偿还 PoolEntry 对象到连接池:
public void requite(final T bagEntry)
{bagEntry.setState(STATE_NOT_IN_USE);
for (int i = 0; waiters.get() > 0; i++) {if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {return;}
else if ((i & 0xff) == 0xff) {parkNanos(MICROSECONDS.toNanos(10));
}
else {Thread.yield();
}
}
首先将 poolEntry 对象设置为闲暇状态(STATE_NOT_IN_USE)。
之后查看如果有期待获取连贯的线程存在的话,帮忙其疾速拿到连贯:
如果以后对象曾经不再是闲暇状态(曾经被 getConnection 线程拿走了),则间接返回。
否则如果没被拿走的话,以后连贯对象放入 handoffQueue,阻塞期待,如果被 getConnection 线程从 handoffQueue 中拿走的话,返回。
否则,循环肯定次数(0xff=256 次)后挂起期待 10 秒,直到连贯被期待获取连贯的线程拿走。(这个中央不是很了解,为什么要这么做?因为敞开连贯的调用方应该是利用线程,做完本人该做的事件、偿还连贯的时候,还要负责把连贯交给下一个期待获取连贯的线程,难道不会影响以后业务的疾速返回吗?)
最初,如果 threadList 的容量小于 50 的话,将以后 poolEntry 对象放入 threadList。
小结
HikariPool 的初始化、连贯创立及退出连接池、连贯获取、连贯偿还等所有连接池相干的次要逻辑剖析结束。
Thanks a lot!
上一篇 连接池 HikariPool(一)– 根底框架及初始化过程