获取数据库连贯是通过DataSource发动的,如果利用应用HikariPool作为连接池的话,须要配置DataSource为HikariDataSource,利用通过调用HikariDataSource的getConnection办法获取数据库连贯。
敞开数据库连贯是间接调用获取到的数据库连贯对象(Connection对象)的close办法实现的。
明天要钻研的课题:获取及敞开数据库连贯,获取数据库连贯其实就是从HikariDataSource的getConnection办法动手,直到getConnection办法获取到数据库连贯对象。而后再钻研数据库连贯对象的close办法。
获取数据库连贯
间接看HikariDataSource的getConnection办法:
public Connection getConnection() throws SQLException { if (isClosed()) { throw new SQLException("HikariDataSource " + this + " has been closed."); } if (fastPathPool != null) { return fastPathPool.getConnection(); } // See http://en.wikipedia.org/wiki/Double-checked_locking#Usage_in_Java HikariPool result = pool; if (result == null) { synchronized (this) { result = pool; if (result == null) { validate(); LOGGER.info("{} - Starting...", getPoolName()); try { pool = result = new HikariPool(this); this.seal(); } catch (PoolInitializationException pie) { if (pie.getCause() instanceof SQLException) { throw (SQLException) pie.getCause(); } else { throw pie; } } LOGGER.info("{} - Start completed.", getPoolName()); } } }
fastPathPool在HikariDataSource实例化的时候被初始化为HikariPool对象。如果fastPathPool在getConnection办法调用的时候尚未初始化,就在getConnection办法内实现初始化。之后再调用HikariPool对象的getConnection办法。
HikariPool#getConnection办法
间接看代码:
public Connection getConnection() throws SQLException { return getConnection(connectionTimeout); }
间接调用了getConnection(connectionTimeout)办法:
public Connection getConnection(final long hardTimeout) throws SQLException { suspendResumeLock.acquire(); final long startTime = currentTime(); try { long timeout = hardTimeout; do { PoolEntry poolEntry = connectionBag.borrow(timeout, MILLISECONDS); if (poolEntry == null) { break; // We timed out... break and throw exception } final long now = currentTime(); if (poolEntry.isMarkedEvicted() || (elapsedMillis(poolEntry.lastAccessed, now) > aliveBypassWindowMs && !isConnectionAlive(poolEntry.connection))) { closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE); timeout = hardTimeout - elapsedMillis(startTime); } else { metricsTracker.recordBorrowStats(poolEntry, startTime); return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now); } } while (timeout > 0L); metricsTracker.recordBorrowTimeoutStats(startTime); throw createTimeoutException(startTime); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new SQLException(poolName + " - Interrupted during connection acquisition", e); } finally { suspendResumeLock.release(); } }
首先调用suspendResumeLock.acquire();办法,suspendResumeLock的次要作用是限流,初始化的时候设置最大并发线程数为MAX_PERMITS = 10000,如果同时获取连贯的线程未超过限度数量的话,容许获取连贯,否则,挂起、排队(外部通过Semaphore类实现,Semaphore是AQS的扩大实现)期待其余线程获取到连贯、开释suspendResumeLock之后,能力持续获取到数据库连贯。
而后调用connectionBag的borrow办法(稍后剖析源码),从数据库连接池中获取连贯。
如果没能从数据库连接池获取到连贯的话,抛异样。
查看从连接池获取到的连贯,如果连贯被标记为Evicted(该连贯因为超时或其余起因应该被回收),或者最初一次拜访工夫到以后工夫之间的时间差超过了aliveBypassWindowMs的设置并且尝试后发现该连贯曾经生效(连不上了),则敞开该连贯,timeout尚未超时的话从新调用connectionBag的borrow办法获取连贯,直到超时或获取到可用的连贯。
如果连贯可用,则调用metricsTracker.recordBorrowStats,之后返回:
return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now);
留神最终返回的并不是java.sql.Connection对象,而是通过poolEnry.createProxyConnection办法创立的代理对象!
最终通过JavassistProxyFactory创立代理对象HikariProxyConnection。
返回代理数据库连贯对象的同时,创立leakTask。leakTask对象的作用是:如果leakDetectionThreshold不为0的话,创立一个leakTask工作,在leakDetectionThreshold时长之后执行该工作抛出数据库连贯泄露异样。该工作不被执行(不产生数据库泄露异样)的惟一条件就是在leakDetectionThreshold时长范畴内,数据库连贯胜利执行完SQL语句后通过调用代理Connection对象的close办法勾销该工作。
OK,getConnection办法源码剖析实现了,目前为止,咱们晓得:
- 通过connectionBag.borrow办法从连接池获取连贯数据库连贯的封装对象poolEntry
- 最终获取到的数据库连贯是代理对象HikariProxyConnection
上面咱们首先剖析一下connectionBag.borrow办法,borrow办法才是从连接池获取连贯的要害。
connectionBag#borrow办法
上代码,分段剖析,先看第一局部:
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException { // Try the thread-local list first final List<Object> list = threadList.get(); for (int i = list.size() - 1; i >= 0; i--) { final Object entry = list.remove(i); @SuppressWarnings("unchecked") final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry; if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { return bagEntry; } }
首先从threadList中获取,如果能获取到闲暇的数据库连贯、并且能胜利通过CAS操作批改连贯状态(从STATE_NOT_IN_USE批改为STATE_IN_USE)的话,间接返回。
threadList是ThreadLocal对象,以后线程如果曾经获取过连贯,在应用完归还给连接池的过程中,该连贯会写入threadList。如果以后线程再次获取连贯的话,就能够从threadList中获取,必定是性能最好的获取形式。
接下来,如果threadList中没有获取到:
final int waiting = waiters.incrementAndGet(); try { for (T bagEntry : sharedList) { if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { // If we may have stolen another waiter's connection, request another bag add. if (waiting > 1) { listener.addBagItem(waiting - 1); } return bagEntry; } }
获取到期待获取连贯的线程数waiters,加1。
遍历sharedList,如果sharedList中有闲暇连贯,并且可能胜利批改状态的话,阐明获取胜利,间接返回。
从sharedList获取连贯胜利的话,还要检查一下以后是否还有其余期待获取连贯的线程(waiter>1),如果有的话阐明连接池中的连贯不太够用,则调用listener.addBagItem(waiting - 1)减少连贯。listerner就是HikariPool对象,调用的其实就是HikariPool的addBagItem办法。
addBagItem办法的代码非常简单,就不贴出了,逻辑也很简略:期待获取连贯的线程数如果大于等于期待退出连接池的连接数的话,就通过addConnectionExecutor减少连贯到连接池中,否则,不须要减少,只须要稍等一下,addConnectionExecutor就会把期待队列中的连贯退出池中。
持续看源码,如果从sharedList中也没有获取到连贯的话:
listener.addBagItem(waiting); timeout = timeUnit.toNanos(timeout); do { final long start = currentTime(); final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS); if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { return bagEntry; } timeout -= elapsedNanos(start); } while (timeout > 10_000); return null; } finally { waiters.decrementAndGet(); } }
调用listener.addBagItem(waiting),办法逻辑曾经剖析过了。
从handoffQueue获取连贯,如果获取到的连贯状不是闲暇状态(其余线程姗姗来迟了),在timeout工夫范畴内持续循环从handoffQueue获取。
如果handoffQueue中没有获取到(此时必定期待超时了),则世界返回null。
否则,如果获取到了闲暇连贯,并且可能胜利批改连贯状态为STATE_IN_USE的话,返回该连贯。
最终,不论获取胜利与否,waiters减1。
borrow办法剖析实现!咱们晓得从连接池获取连贯的根本逻辑是:
- 首先从ThreadLocal变量threadList中获取
- 如果没有获取到的话,从sharedList中获取
- 还是没有获取到的话,最初才从handoffQueue中获取
源码曾经剖析的很分明了,逻辑也解释的清晰明了。然而为什么要这么做?
首先从threadList中获取连贯的逻辑比拟容易了解,如果以后线程曾经获取过连贯,用完之后存入threadList中,再次获取的话间接返回threadList中的连贯,效率必定是最高的,性能最优。
然而从sharedList中获取不到的话,为什么还要再从handoffQueue中获取?什么状况下会产生从sharedList中获取不到、从handoffQueue中能获取到?
仔细分析一下sharedList以及handoffQueue的构造、以及连贯退出连接池的逻辑,就能明确这么做的起因了。
连贯退出连接池的过程为:首先退出sharedList,之后再退出handoffQueue。
假如获取连贯的线程读取sharedList的时候,sharedList没有闲暇的连贯,此时假如有一个新创建的连贯正好刚刚退出到sharedList中。这种状况下获取连贯的线程就从sharedList中完满错过了新退出的连贯。
而后新退出的连贯会退出handoffQueue,handoffQueue是手递手队列,退出连贯的线程会期待获取连贯的线程从队列中拿走该连贯,两个人一拍即合,相互满足需要,实现“手递手”工作。
这个场景下,HikariPool的handoffQueue的设计肯定是性能最好的,必定回比重复遍历sharedList的性能好很多。
从连接池中获取连贯的过程,源码级剖析实现!
敞开连贯 HikariProxyConnection#close办法
开篇曾经说过了,敞开连贯其实就是调用代理连贯对象HikariProxyConnection的close办法。close办法是在他的父类、抽象类ProxyConnection中定义的:
public final void close() throws SQLException { // Closing statements can cause connection eviction, so this must run before the conditional below closeStatements(); if (delegate != ClosedConnection.CLOSED_CONNECTION) { leakTask.cancel(); try { if (isCommitStateDirty && !isAutoCommit) { delegate.rollback(); lastAccess = currentTime(); LOGGER.debug("{} - Executed rollback on connection {} due to dirty commit state on close().", poolEntry.getPoolName(), delegate); } if (dirtyBits != 0) { poolEntry.resetConnectionState(this, dirtyBits); lastAccess = currentTime(); } delegate.clearWarnings(); } catch (SQLException e) { // when connections are aborted, exceptions are often thrown that should not reach the application if (!poolEntry.isMarkedEvicted()) { throw checkException(e); } } finally { delegate = ClosedConnection.CLOSED_CONNECTION; poolEntry.recycle(lastAccess); } } }
首先调用closeStatements办法,敞开该连贯关上的Statement。
而后,调用leakTask.cancel();勾销连贯泄露查看工作。
之后,调用poolEntry.resetConnectionState重置连贯属性,筹备偿还到连接池。
最初,调用poolEntry.recycle(lastAccess);办法名阐明所有:回收连贯。
poolEntry#recycle(lastAccess)
代码很简略:
void recycle(final long lastAccessed) { if (connection != null) { this.lastAccessed = lastAccessed; hikariPool.recycle(this); } }
记录lastAccessed后,调用hikariPool.recycle(this)。
hikariPool#recycle
重点来了,调用了connectionBag的requite办法。
@Override void recycle(final PoolEntry poolEntry) { metricsTracker.recordConnectionUsage(poolEntry); connectionBag.requite(poolEntry); }
connectionBag#requite
requite负责偿还PoolEntry对象到连接池:
public void requite(final T bagEntry) { bagEntry.setState(STATE_NOT_IN_USE); for (int i = 0; waiters.get() > 0; i++) { if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) { return; } else if ((i & 0xff) == 0xff) { parkNanos(MICROSECONDS.toNanos(10)); } else { Thread.yield(); } }
首先将poolEntry对象设置为闲暇状态(STATE_NOT_IN_USE)。
之后查看如果有期待获取连贯的线程存在的话,帮忙其疾速拿到连贯:
如果以后对象曾经不再是闲暇状态(曾经被getConnection线程拿走了),则间接返回。
否则如果没被拿走的话,以后连贯对象放入handoffQueue,阻塞期待,如果被getConnection线程从handoffQueue中拿走的话,返回。
否则,循环肯定次数(0xff=256次)后挂起期待10秒,直到连贯被期待获取连贯的线程拿走。(这个中央不是很了解,为什么要这么做?因为敞开连贯的调用方应该是利用线程,做完本人该做的事件、偿还连贯的时候,还要负责把连贯交给下一个期待获取连贯的线程,难道不会影响以后业务的疾速返回吗?)
最初,如果threadList的容量小于50的话,将以后poolEntry对象放入threadList。
小结
HikariPool的初始化、连贯创立及退出连接池、连贯获取、连贯偿还等所有连接池相干的次要逻辑剖析结束。
Thanks a lot!
上一篇 连接池 HikariPool (一) - 根底框架及初始化过程