前言
本文将对Druid
数据库连接池的源码进行剖析和学习,以理解Druid
数据库连接池的工作原理。Druid
数据库连接池的根本逻辑简直全副在DruidDataSource
类中,所以本文次要是围绕DruidDataSource
的各项性能开展阐述。
Druid
版本:1.2.11
注释
一. DruidDataSource初始化
DruidDataSource
初始化有两种形式,如下所示。
- 将
DruidDataSource
实例创立进去后,被动调用其init()
办法实现初始化; - 首次调用
DruidDataSource
的getConnection()
办法时,会调用到init()
办法实现初始化。
因为init()
办法过长,上面将分点介绍init()
办法实现的要害事件。
1. 双重查看inited状态
对inited状态进行Double Check,避免DruidDataSource
初始化两次。源码示意如下。
public void init() throws SQLException {
if (inited) {
return;
}
......
final ReentrantLock lock = this.lock;
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
throw new SQLException("interrupt", e);
}
boolean init = false;
try {
if (inited) {
return;
}
......
} catch (SQLException e) {
......
} catch (InterruptedException e) {
......
} catch (RuntimeException e) {
......
} catch (Error e) {
......
} finally {
inited = true;
lock.unlock();
......
}
}
2. 判断数据库类型
依据jdbcUrl失去数据库类型dbTypeName。源码如下所示。
if (this.dbTypeName == null || this.dbTypeName.length() == 0) {
this.dbTypeName = JdbcUtils.getDbType(jdbcUrl, null);
}
3. 参数校验
对一些要害参数进行校验。源码如下所示。
// 连接池最大连贯数量不能小于等于0
if (maxActive <= 0) {
throw new IllegalArgumentException("illegal maxActive " + maxActive);
}
// 连接池最大连贯数量不能小于最小连贯数量
if (maxActive < minIdle) {
throw new IllegalArgumentException("illegal maxActive " + maxActive);
}
// 连接池初始连贯数量不能大于最大连贯数量
if (getInitialSize() > maxActive) {
throw new IllegalArgumentException("illegal initialSize " + this.initialSize + ", maxActive " + maxActive);
}
// 不容许同时开启基于日志伎俩记录连接池状态和全局状态监控
if (timeBetweenLogStatsMillis > 0 && useGlobalDataSourceStat) {
throw new IllegalArgumentException("timeBetweenLogStatsMillis not support useGlobalDataSourceStat=true");
}
// 连贯最大闲暇工夫不能小于连贯最小闲暇工夫
if (maxEvictableIdleTimeMillis < minEvictableIdleTimeMillis) {
throw new SQLException("maxEvictableIdleTimeMillis must be grater than minEvictableIdleTimeMillis");
}
// 不容许开启了保活机制但保活间隔时间小于等于回收查看工夫距离
if (keepAlive && keepAliveBetweenTimeMillis <= timeBetweenEvictionRunsMillis) {
throw new SQLException("keepAliveBetweenTimeMillis must be grater than timeBetweenEvictionRunsMillis");
}
4. SPI机制加载过滤器
调用到DruidDataSource#initFromSPIServiceLoader
办法,基于SPI机制加载过滤器Filter
。源码如下所示。
private void initFromSPIServiceLoader() {
if (loadSpifilterSkip) {
return;
}
if (autoFilters == null) {
List<Filter> filters = new ArrayList<Filter>();
// 基于ServiceLoader加载Filter
ServiceLoader<Filter> autoFilterLoader = ServiceLoader.load(Filter.class);
// 遍历加载的每一个Filter,依据@AutoLoad注解的属性判断是否加载该Filter
for (Filter filter : autoFilterLoader) {
AutoLoad autoLoad = filter.getClass().getAnnotation(AutoLoad.class);
if (autoLoad != null && autoLoad.value()) {
filters.add(filter);
}
}
autoFilters = filters;
}
// 将每个须要加载的Filter增加到filters字段中,并去重
for (Filter filter : autoFilters) {
if (LOG.isInfoEnabled()) {
LOG.info("load filter from spi :" + filter.getClass().getName());
}
addFilter(filter);
}
}
5. 加载驱动
调用DruidDataSource#resolveDriver
办法,依据配置的驱动名称加载数据库驱动。源码如下所示。
protected void resolveDriver() throws SQLException {
if (this.driver == null) {
// 若没有配置驱动名则尝试从jdbcUrl中获取
if (this.driverClass == null || this.driverClass.isEmpty()) {
this.driverClass = JdbcUtils.getDriverClassName(this.jdbcUrl);
}
// Mock驱动相干
if (MockDriver.class.getName().equals(driverClass)) {
driver = MockDriver.instance;
} else if ("com.alibaba.druid.support.clickhouse.BalancedClickhouseDriver".equals(driverClass)) {
// ClickHouse相干
Properties info = new Properties();
info.put("user", username);
info.put("password", password);
info.putAll(connectProperties);
driver = new BalancedClickhouseDriver(jdbcUrl, info);
} else {
if (jdbcUrl == null && (driverClass == null || driverClass.length() == 0)) {
throw new SQLException("url not set");
}
// 加载驱动
driver = JdbcUtils.createDriver(driverClassLoader, driverClass);
}
} else {
if (this.driverClass == null) {
this.driverClass = driver.getClass().getName();
}
}
}
6. 初始化连贯有效性校验器
调用DruidDataSource#initValidConnectionChecker
办法,初始化ValidConnectionChecker
,用于校验某个连贯是否可用。源码如下所示。
private void initValidConnectionChecker() {
if (this.validConnectionChecker != null) {
return;
}
String realDriverClassName = driver.getClass().getName();
// 不同的数据库初始化不同的ValidConnectionChecker
if (JdbcUtils.isMySqlDriver(realDriverClassName)) {
// MySQL数据库还反对应用ping的形式来校验连贯活性,这比执行一条简略查问语句来判活更高效
// 由usePingMethod参数决定是否开启
this.validConnectionChecker = new MySqlValidConnectionChecker(usePingMethod);
} else if (realDriverClassName.equals(JdbcConstants.ORACLE_DRIVER)
|| realDriverClassName.equals(JdbcConstants.ORACLE_DRIVER2)) {
this.validConnectionChecker = new OracleValidConnectionChecker();
} else if (realDriverClassName.equals(JdbcConstants.SQL_SERVER_DRIVER)
|| realDriverClassName.equals(JdbcConstants.SQL_SERVER_DRIVER_SQLJDBC4)
|| realDriverClassName.equals(JdbcConstants.SQL_SERVER_DRIVER_JTDS)) {
this.validConnectionChecker = new MSSQLValidConnectionChecker();
} else if (realDriverClassName.equals(JdbcConstants.POSTGRESQL_DRIVER)
|| realDriverClassName.equals(JdbcConstants.ENTERPRISEDB_DRIVER)
|| realDriverClassName.equals(JdbcConstants.POLARDB_DRIVER)) {
this.validConnectionChecker = new PGValidConnectionChecker();
} else if (realDriverClassName.equals(JdbcConstants.OCEANBASE_DRIVER)
|| (realDriverClassName.equals(JdbcConstants.OCEANBASE_DRIVER2))) {
DbType dbType = DbType.of(this.dbTypeName);
this.validConnectionChecker = new OceanBaseValidConnectionChecker(dbType);
}
}
7. 初始化全局状态统计器
如果useGlobalDataSourceStat设置为true,则初始化全局状态统计器,用于统计和剖析数据库连接池的性能数据。源码片段如下所示。
if (isUseGlobalDataSourceStat()) {
dataSourceStat = JdbcDataSourceStat.getGlobal();
if (dataSourceStat == null) {
dataSourceStat = new JdbcDataSourceStat("Global", "Global", this.dbTypeName);
JdbcDataSourceStat.setGlobal(dataSourceStat);
}
if (dataSourceStat.getDbType() == null) {
dataSourceStat.setDbType(this.dbTypeName);
}
} else {
dataSourceStat = new JdbcDataSourceStat(this.name, this.jdbcUrl, this.dbTypeName, this.connectProperties);
}
8. 初始化连接池数组并预热
创立三个连接池数组,别离是connections(用于寄存能获取的连贯对象),evictConnections(用于寄存须要抛弃的连贯对象)和keepAliveConnections(用于寄存须要保活的连贯对象)。连接池的预热有两种,如果配置了asyncInit为true,且异步线程池不为空,则执行异步连接池预热,反之执行同步连接池预热。
// 用于寄存能获取的连贯对象,真正意义上的连接池
// 曾经被获取的连贯不在其中
connections = new DruidConnectionHolder[maxActive];
// 用于寄存须要被敞开抛弃的连贯
evictConnections = new DruidConnectionHolder[maxActive];
// 用于寄存须要保活的连贯
keepAliveConnections = new DruidConnectionHolder[maxActive];
SQLException connectError = null;
// 有线程池且异步初始化配置为true,则异步预热
if (createScheduler != null && asyncInit) {
for (int i = 0; i < initialSize; ++i) {
submitCreateTask(true);
}
} else if (!asyncInit) {
// 同步预热,预热连接数由initialSize配置
while (poolingCount < initialSize) {
try {
PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
// 对DruidDataSource和Connection做了一层封装
DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
connections[poolingCount++] = holder;
} catch (SQLException ex) {
LOG.error("init datasource error, url: " + this.getUrl(), ex);
if (initExceptionThrow) {
connectError = ex;
break;
} else {
Thread.sleep(3000);
}
}
}
if (poolingCount > 0) {
poolingPeak = poolingCount;
poolingPeakTime = System.currentTimeMillis();
}
}
9. 创立日志记录线程并启动
调用DruidDataSource#createAndLogThread
办法创立通过打印日志来记录连接池状态的线程。createAndLogThread()
办法如下所示。
private void createAndLogThread() {
// timeBetweenLogStatsMillis小于等于0示意不开启打印日志记录连接池状态的性能
if (this.timeBetweenLogStatsMillis <= 0) {
return;
}
String threadName = "Druid-ConnectionPool-Log-" + System.identityHashCode(this);
// 创立线程
logStatsThread = new LogStatsThread(threadName);
// 启动线程
logStatsThread.start();
this.resetStatEnable = false;
}
createAndLogThread()
办法会创立LogStatsThread
并启动,即会调用到LogStatsThread
的run()
办法。LogStatsThread
线程的run()
办法如下所示。
public void run() {
try {
for (; ; ) {
try {
// 每距离timeBetweenLogStatsMillis就打印一次连接池状态
logStats();
} catch (Exception e) {
LOG.error("logStats error", e);
}
Thread.sleep(timeBetweenLogStatsMillis);
}
} catch (InterruptedException e) {
}
}
上述run()
办法中会每距离timeBetweenLogStatsMillis的工夫就调用一次logStats()
办法来打印连接池状态。logStats()
办法如下所示。
public void logStats() {
final DruidDataSourceStatLogger statLogger = this.statLogger;
if (statLogger == null) {
return;
}
// 拿到各种连接池的状态
DruidDataSourceStatValue statValue = getStatValueAndReset();
// 打印
statLogger.log(statValue);
}
在logStats()
办法中会先调用getStatValueAndReset()
办法来拿到各种连接池的状态,而后调用DruidDataSourceStatLogger
实现打印。最初看一眼getStatValueAndReset()
办法外面拿哪些连接池状态,getStatValueAndReset()
办法代码片段如下所示。
public DruidDataSourceStatValue getStatValueAndReset() {
DruidDataSourceStatValue value = new DruidDataSourceStatValue();
lock.lock();
try {
value.setPoolingCount(this.poolingCount);
value.setPoolingPeak(this.poolingPeak);
value.setPoolingPeakTime(this.poolingPeakTime);
value.setActiveCount(this.activeCount);
value.setActivePeak(this.activePeak);
value.setActivePeakTime(this.activePeakTime);
value.setConnectCount(this.connectCount);
value.setCloseCount(this.closeCount);
value.setWaitThreadCount(lock.getWaitQueueLength(notEmpty));
value.setNotEmptyWaitCount(this.notEmptyWaitCount);
value.setNotEmptyWaitNanos(this.notEmptyWaitNanos);
value.setKeepAliveCheckCount(this.keepAliveCheckCount);
// 重置参数
this.poolingPeak = 0;
this.poolingPeakTime = 0;
this.activePeak = 0;
this.activePeakTime = 0;
this.connectCount = 0;
this.closeCount = 0;
this.keepAliveCheckCount = 0;
this.notEmptyWaitCount = 0;
this.notEmptyWaitNanos = 0;
} finally {
lock.unlock();
}
value.setName(this.getName());
value.setDbType(this.dbTypeName);
value.setDriverClassName(this.getDriverClassName());
......
value.setSqlSkipCount(this.getDataSourceStat().getSkipSqlCountAndReset());
value.setSqlList(this.getDataSourceStat().getSqlStatMapAndReset());
return value;
}
10. 创立创立连贯的线程并启动
调用DruidDataSource#createAndStartCreatorThread
办法来创立创立连贯的线程CreateConnectionThread
并启动。createAndStartCreatorThread()
办法如下所示。
protected void createAndStartCreatorThread() {
// 只有异步创立连贯的线程池为空时,才创立CreateConnectionThread
if (createScheduler == null) {
String threadName = "Druid-ConnectionPool-Create-" + System.identityHashCode(this);
createConnectionThread = new CreateConnectionThread(threadName);
// 启动线程
createConnectionThread.start();
return;
}
initedLatch.countDown();
}
CreateConnectionThread
只有在异步创立连贯的线程池createScheduler为空时,才会被创立进去,并且在CreateConnectionThread
的run()
办法一开始,就会调用initedLatch的countDown()
办法,其中initedLatch是一个初始值为2的CountDownLatch
对象,另外一次countDown()
调用在DestroyConnectionThread
的run()
办法中,目标就是init()
办法执行完以前,创立连贯的线程和销毁连贯的线程肯定要创立进去并启动结束。
createAndLogThread();
// 在外部会调用到initedLatch.countDown()
createAndStartCreatorThread();
// 在外部最终会调用initedLatch.countDown()
createAndStartDestroyThread();
initedLatch.await();
11. 创立销毁连贯的线程并启动
调用DruidDataSource#createAndStartDestroyThread
办法来创立销毁连贯的线程DestroyConnectionThread
并启动。createAndStartDestroyThread()
办法如下所示。
protected void createAndStartDestroyThread() {
// 销毁连贯的工作
destroyTask = new DestroyTask();
// 如果销毁连贯的线程池不会为空,则让其周期执行销毁连贯的工作
if (destroyScheduler != null) {
long period = timeBetweenEvictionRunsMillis;
if (period <= 0) {
period = 1000;
}
destroySchedulerFuture = destroyScheduler.scheduleAtFixedRate(destroyTask, period, period,
TimeUnit.MILLISECONDS);
initedLatch.countDown();
return;
}
// 如果销毁连贯的线程池为空,则创立销毁连贯的线程
String threadName = "Druid-ConnectionPool-Destroy-" + System.identityHashCode(this);
destroyConnectionThread = new DestroyConnectionThread(threadName);
// 启动线程
destroyConnectionThread.start();
}
createAndStartDestroyThread()
办法中会先判断销毁连贯的线程池是否存在,如果存在,则不再创立DestroyConnectionThread
,而是会让销毁连贯的线程池来执行销毁工作,如果不存在,则创立DestroyConnectionThread
并启动,此时initedLatch的countDown()
调用是在DestroyConnectionThread
的run()
办法中。DestroyConnectionThread#run
办法源码如下所示。
public void run() {
// run()办法只有执行了,就调用initedLatch#countDown
initedLatch.countDown();
for (; ; ) {
// 每距离timeBetweenEvictionRunsMillis执行一次DestroyTask的run()办法
try {
if (closed || closing) {
break;
}
if (timeBetweenEvictionRunsMillis > 0) {
Thread.sleep(timeBetweenEvictionRunsMillis);
} else {
Thread.sleep(1000);
}
if (Thread.interrupted()) {
break;
}
// 执行DestroyTask的run()办法来销毁须要销毁的线程
destroyTask.run();
} catch (InterruptedException e) {
break;
}
}
}
DestroyConnectionThread#run
办法只有被调用到,那么就会调用initedLatch的countDown()
办法,此时阻塞在init()
办法中的initedLatch.await()
办法上的线程就会被唤醒并持续往下执行。
二. DruidDataSource连贯创立
DruidDataSource
连贯的创立由CreateConnectionThread
线程实现,其run()
办法如下所示。
public void run() {
initedLatch.countDown();
long lastDiscardCount = 0;
int errorCount = 0;
for (; ; ) {
try {
lock.lockInterruptibly();
} catch (InterruptedException e2) {
break;
}
long discardCount = DruidDataSource.this.discardCount;
boolean discardChanged = discardCount - lastDiscardCount > 0;
lastDiscardCount = discardCount;
try {
// emptyWait为true示意生产连接线程须要期待,无需生产连贯
boolean emptyWait = true;
// 产生了创立谬误,且池中已无连贯,且抛弃连贯的统计没有扭转
// 此时生产连接线程须要生产连贯
if (createError != null
&& poolingCount == 0
&& !discardChanged) {
emptyWait = false;
}
if (emptyWait
&& asyncInit && createCount < initialSize) {
emptyWait = false;
}
if (emptyWait) {
// 池中已有连接数大于等于正在期待连贯的利用线程数
// 且以后是非keepAlive场景
// 且以后是非间断失败
// 此时生产连贯的线程在empty上期待
// keepAlive && activeCount + poolingCount < minIdle时会在shrink()办法中触发emptySingal()来增加连贯
// isFailContinuous()返回true示意间断失败,即屡次(默认2次)创立物理连贯失败
if (poolingCount >= notEmptyWaitThreadCount
&& (!(keepAlive && activeCount + poolingCount < minIdle))
&& !isFailContinuous()
) {
empty.await();
}
// 避免创立超过maxActive数量的连贯
if (activeCount + poolingCount >= maxActive) {
empty.await();
continue;
}
}
} catch (InterruptedException e) {
......
} finally {
lock.unlock();
}
PhysicalConnectionInfo connection = null;
try {
connection = createPhysicalConnection();
} catch (SQLException e) {
LOG.error("create connection SQLException, url: " + jdbcUrl + ", errorCode " + e.getErrorCode()
+ ", state " + e.getSQLState(), e);
errorCount++;
if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
// 屡次创立失败
setFailContinuous(true);
// 如果配置了疾速失败,就唤醒所有在notEmpty上期待的利用线程
if (failFast) {
lock.lock();
try {
notEmpty.signalAll();
} finally {
lock.unlock();
}
}
if (breakAfterAcquireFailure) {
break;
}
try {
Thread.sleep(timeBetweenConnectErrorMillis);
} catch (InterruptedException interruptEx) {
break;
}
}
} catch (RuntimeException e) {
LOG.error("create connection RuntimeException", e);
setFailContinuous(true);
continue;
} catch (Error e) {
LOG.error("create connection Error", e);
setFailContinuous(true);
break;
}
if (connection == null) {
continue;
}
// 把连贯增加到连接池
boolean result = put(connection);
if (!result) {
JdbcUtils.close(connection.getPhysicalConnection());
LOG.info("put physical connection to pool failed.");
}
errorCount = 0;
if (closing || closed) {
break;
}
}
}
CreateConnectionThread
的run()
办法整体就是在一个死循环中一直的期待,被唤醒,而后创立线程。当一个物理连贯被创立进去后,会调用DruidDataSource#put
办法将其放到连接池connections中,put()
办法源码如下所示。
protected boolean put(PhysicalConnectionInfo physicalConnectionInfo) {
DruidConnectionHolder holder = null;
try {
holder = new DruidConnectionHolder(DruidDataSource.this, physicalConnectionInfo);
} catch (SQLException ex) {
......
return false;
}
return put(holder, physicalConnectionInfo.createTaskId, false);
}
private boolean put(DruidConnectionHolder holder, long createTaskId, boolean checkExists) {
// 波及到连接池中连贯数量扭转的操作,都须要加锁
lock.lock();
try {
if (this.closing || this.closed) {
return false;
}
// 池中已有连接数曾经大于等于最大连接数,则不再把连贯加到连接池并间接返回false
if (poolingCount >= maxActive) {
if (createScheduler != null) {
clearCreateTask(createTaskId);
}
return false;
}
// 查看反复增加
if (checkExists) {
for (int i = 0; i < poolingCount; i++) {
if (connections[i] == holder) {
return false;
}
}
}
// 连贯放入连接池
connections[poolingCount] = holder;
// poolingCount++
incrementPoolingCount();
if (poolingCount > poolingPeak) {
poolingPeak = poolingCount;
poolingPeakTime = System.currentTimeMillis();
}
// 唤醒在notEmpty上期待连贯的利用线程
notEmpty.signal();
notEmptySignalCount++;
if (createScheduler != null) {
clearCreateTask(createTaskId);
if (poolingCount + createTaskCount < notEmptyWaitThreadCount
&& activeCount + poolingCount + createTaskCount < maxActive) {
emptySignal();
}
}
} finally {
lock.unlock();
}
return true;
}
put()
办法会先将物理连贯从PhysicalConnectionInfo
中获取进去并封装成一个DruidConnectionHolder
,DruidConnectionHolder
就是Druid
连接池中的连贯。新增加的连贯会寄存在连接池数组connections的poolingCount地位,而后poolingCount会加1,也就是poolingCount代表着连接池中能够获取的连贯的数量。
三. DruidDataSource连贯销毁
DruidDataSource
连贯的创立由DestroyConnectionThread
线程实现,其run()
办法如下所示。
public void run() {
// run()办法只有执行了,就调用initedLatch#countDown
initedLatch.countDown();
for (; ; ) {
// 每距离timeBetweenEvictionRunsMillis执行一次DestroyTask的run()办法
try {
if (closed || closing) {
break;
}
if (timeBetweenEvictionRunsMillis > 0) {
Thread.sleep(timeBetweenEvictionRunsMillis);
} else {
Thread.sleep(1000);
}
if (Thread.interrupted()) {
break;
}
// 执行DestroyTask的run()办法来销毁须要销毁的连贯
destroyTask.run();
} catch (InterruptedException e) {
break;
}
}
}
DestroyConnectionThread
的run()
办法就是在一个死循环中每距离timeBetweenEvictionRunsMillis的工夫就执行一次DestroyTask
的run()
办法。DestroyTask#run
办法实现如下所示。
public void run() {
// 依据一系列条件判断并销毁连贯
shrink(true, keepAlive);
// RemoveAbandoned机制
if (isRemoveAbandoned()) {
removeAbandoned();
}
}
在DestroyTask#run
办法中会调用DruidDataSource#shrink
办法来依据设定的条件来判断出须要销毁和保活的连贯。DruidDataSource#shrink
办法如下所示。
// checkTime参数示意在将一个连贯进行销毁前,是否须要判断一下闲暇工夫
public void shrink(boolean checkTime, boolean keepAlive) {
// 加锁
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
return;
}
// needFill = keepAlive && poolingCount + activeCount < minIdle
// needFill为true时,会调用empty.signal()唤醒生产连贯的线程来生产连贯
boolean needFill = false;
// evictCount记录须要销毁的连接数
// keepAliveCount记录须要保活的连接数
int evictCount = 0;
int keepAliveCount = 0;
int fatalErrorIncrement = fatalErrorCount - fatalErrorCountLastShrink;
fatalErrorCountLastShrink = fatalErrorCount;
try {
if (!inited) {
return;
}
// checkCount = 池中已有连接数 - 最小闲暇连接数
// 失常状况下,最多可能将前checkCount个连贯进行销毁
final int checkCount = poolingCount - minIdle;
final long currentTimeMillis = System.currentTimeMillis();
// 失常状况下,须要遍历池中所有连贯
// 从前往后遍历,i为数组索引
for (int i = 0; i < poolingCount; ++i) {
DruidConnectionHolder connection = connections[i];
// 如果产生了致命谬误(onFatalError == true)且致命谬误产生工夫(lastFatalErrorTimeMillis)在连贯建设工夫之后
// 把连贯退出到保活连贯数组中
if ((onFatalError || fatalErrorIncrement > 0) && (lastFatalErrorTimeMillis > connection.connectTimeMillis)) {
keepAliveConnections[keepAliveCount++] = connection;
continue;
}
if (checkTime) {
// phyTimeoutMillis示意连贯的物理存活超时工夫,默认值是-1
if (phyTimeoutMillis > 0) {
// phyConnectTimeMillis示意连贯的物理存活工夫
long phyConnectTimeMillis = currentTimeMillis - connection.connectTimeMillis;
// 连贯的物理存活工夫大于phyTimeoutMillis,则将这个连贯放入evictConnections数组
if (phyConnectTimeMillis > phyTimeoutMillis) {
evictConnections[evictCount++] = connection;
continue;
}
}
// idleMillis示意连贯的闲暇工夫
long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;
// minEvictableIdleTimeMillis示意连贯容许的最小闲暇工夫,默认是30分钟
// keepAliveBetweenTimeMillis示意保活间隔时间,默认是2分钟
// 如果连贯的闲暇工夫小于minEvictableIdleTimeMillis且还小于keepAliveBetweenTimeMillis
// 则connections数组中以后连贯之后的连贯都会满足闲暇工夫小于minEvictableIdleTimeMillis且还小于keepAliveBetweenTimeMillis
// 此时跳出遍历,不再查看其余的连贯
if (idleMillis < minEvictableIdleTimeMillis
&& idleMillis < keepAliveBetweenTimeMillis
) {
break;
}
// 连贯的闲暇工夫大于等于容许的最小闲暇工夫
if (idleMillis >= minEvictableIdleTimeMillis) {
if (checkTime && i < checkCount) {
// i < checkCount这个条件的了解如下:
// 每次shrink()办法执行时,connections数组中只有索引0到checkCount-1的连贯才容许被销毁
// 这样能力保障销毁完连贯后,connections数组中至多还有minIdle个连贯
evictConnections[evictCount++] = connection;
continue;
} else if (idleMillis > maxEvictableIdleTimeMillis) {
// 如果闲暇工夫过久,曾经大于了容许的最大闲暇工夫(默认7小时)
// 那么无论如何都要销毁这个连贯
evictConnections[evictCount++] = connection;
continue;
}
}
// 如果开启了保活机制,且连贯闲暇工夫大于等于了保活间隔时间
// 此时将连贯退出到保活连贯数组中
if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) {
keepAliveConnections[keepAliveCount++] = connection;
}
} else {
// checkTime为false,那么前checkCount个连贯间接进行销毁,不再判断这些连贯的闲暇工夫是否超过阈值
if (i < checkCount) {
evictConnections[evictCount++] = connection;
} else {
break;
}
}
}
// removeCount = 销毁连接数 + 保活连接数
// removeCount示意本次从connections数组中拿掉的连接数
// 注:肯定是从返回后拿,失常状况下最初minIdle个连贯是平安的
int removeCount = evictCount + keepAliveCount;
if (removeCount > 0) {
// [0, 1, 2, 3, 4, null, null, null] -> [3, 4, 2, 3, 4, null, null, null]
System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
// [3, 4, 2, 3, 4, null, null, null] -> [3, 4, null, null, null, null, null, null, null]
Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
// 更新池中连接数
poolingCount -= removeCount;
}
keepAliveCheckCount += keepAliveCount;
// 如果池中连接数加上沉闷连接数(借出去的连贯)小于最小闲暇连接数
// 则将needFill设为true,后续须要唤醒生产连贯的线程来生产连贯
if (keepAlive && poolingCount + activeCount < minIdle) {
needFill = true;
}
} finally {
lock.unlock();
}
if (evictCount > 0) {
// 遍历evictConnections数组,销毁其中的连贯
for (int i = 0; i < evictCount; ++i) {
DruidConnectionHolder item = evictConnections[i];
Connection connection = item.getConnection();
JdbcUtils.close(connection);
destroyCountUpdater.incrementAndGet(this);
}
Arrays.fill(evictConnections, null);
}
if (keepAliveCount > 0) {
// 遍历keepAliveConnections数组,对其中的连贯做可用性校验
// 校验通过连贯就放入connections数组,没通过连贯就销毁
for (int i = keepAliveCount - 1; i >= 0; --i) {
DruidConnectionHolder holer = keepAliveConnections[i];
Connection connection = holer.getConnection();
holer.incrementKeepAliveCheckCount();
boolean validate = false;
try {
this.validateConnection(connection);
validate = true;
} catch (Throwable error) {
if (LOG.isDebugEnabled()) {
LOG.debug("keepAliveErr", error);
}
}
boolean discard = !validate;
if (validate) {
holer.lastKeepTimeMillis = System.currentTimeMillis();
boolean putOk = put(holer, 0L, true);
if (!putOk) {
discard = true;
}
}
if (discard) {
try {
connection.close();
} catch (Exception e) {
}
lock.lock();
try {
discardCount++;
if (activeCount + poolingCount <= minIdle) {
emptySignal();
}
} finally {
lock.unlock();
}
}
}
this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
Arrays.fill(keepAliveConnections, null);
}
// 如果needFill为true则唤醒生产连贯的线程来生产连贯
if (needFill) {
lock.lock();
try {
// 计算须要生产连贯的个数
int fillCount = minIdle - (activeCount + poolingCount + createTaskCount);
for (int i = 0; i < fillCount; ++i) {
emptySignal();
}
} finally {
lock.unlock();
}
} else if (onFatalError || fatalErrorIncrement > 0) {
lock.lock();
try {
emptySignal();
} finally {
lock.unlock();
}
}
}
在DruidDataSource#shrink
办法中,外围逻辑是遍历connections数组中的连贯,并判断这些连贯是须要销毁还是须要保活。通常状况下,connections数组中的前checkCount(checkCount = poolingCount – minIdle)个连贯是“危险”的,因为这些连贯只有满足了:闲暇工夫 >= minEvictableIdleTimeMillis(容许的最小闲暇工夫),那么就须要被销毁,而connections数组中的最初minIdle个连贯是“绝对平安”的,因为这些连贯只有在满足:闲暇工夫 > maxEvictableIdleTimeMillis(容许的最大闲暇工夫)时,才会被销毁。这么判断的起因,次要就是须要让连接池里可能保障至多有minIdle个闲暇连贯能够让利用线程获取。
当确定好了须要销毁和须要保活的连贯后,此时会先将connections数组清理,只保留平安的连贯,这个过程示意图如下。
最初,会遍历evictConnections数组,销毁数组中的连贯,遍历keepAliveConnections数组,对其中的每个连贯做可用性校验,如果校验可用,那么就从新放回connections数组,否则销毁。
四. DruidDataSource连贯获取
DruidDataSource
获取连贯的入口办法是DruidDataSource#getConnection
办法,实现如下。
public DruidPooledConnection getConnection() throws SQLException {
// maxWait示意获取连贯时最大等待时间,单位毫秒,默认值为-1
return getConnection(maxWait);
}
public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
// 首次获取连贯时触发数据库连接池初始化
init();
if (filters.size() > 0) {
FilterChainImpl filterChain = new FilterChainImpl(this);
return filterChain.dataSource_connect(this, maxWaitMillis);
} else {
// 间接获取连贯
return getConnectionDirect(maxWaitMillis);
}
}
DruidDataSource#getConnection
办法会调用到DruidDataSource#getConnectionDirect
办法来获取连贯,实现如下所示。
public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
int notFullTimeoutRetryCnt = 0;
for (; ; ) {
DruidPooledConnection poolableConnection;
try {
// 从连接池拿到连贯
poolableConnection = getConnectionInternal(maxWaitMillis);
} catch (GetConnectionTimeoutException ex) {
// 拿连贯时有异样,能够重试
// 重试次数由notFullTimeoutRetryCount指定
if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) {
notFullTimeoutRetryCnt++;
if (LOG.isWarnEnabled()) {
LOG.warn("get connection timeout retry : " + notFullTimeoutRetryCnt);
}
continue;
}
throw ex;
}
// 如果配置了testOnBorrow = true,那么每次拿到连贯后,都须要校验这个连贯的有效性
if (testOnBorrow) {
boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
// 如果连贯不可用,则销毁连贯,而后从新从池中获取
if (!validate) {
if (LOG.isDebugEnabled()) {
LOG.debug("skip not validate connection.");
}
discardConnection(poolableConnection.holder);
continue;
}
} else {
if (poolableConnection.conn.isClosed()) {
discardConnection(poolableConnection.holder);
continue;
}
// 如果配置testOnBorrow = fasle但testWhileIdle = true
// 则判断连贯闲暇工夫是否大于等于timeBetweenEvictionRunsMillis
// 如果是,则校验连贯的有效性
if (testWhileIdle) {
final DruidConnectionHolder holder = poolableConnection.holder;
long currentTimeMillis = System.currentTimeMillis();
// lastActiveTimeMillis是连贯最近一次沉闷工夫
// 新建连贯,偿还连贯到连接池,都会更新这个工夫
long lastActiveTimeMillis = holder.lastActiveTimeMillis;
// lastExecTimeMillis是连贯最近一次执行工夫
// 新建连贯,设置连贯的事务是否主动提交,记录SQL到事务信息中,都会更新这个工夫
long lastExecTimeMillis = holder.lastExecTimeMillis;
// lastKeepTimeMillis是连贯最近一次保活工夫
// 在连贯被保活并放回连接池时,会更新这个工夫
long lastKeepTimeMillis = holder.lastKeepTimeMillis;
// 如果配置checkExecuteTime为true,则最近沉闷工夫取值为最近执行工夫
if (checkExecuteTime
&& lastExecTimeMillis != lastActiveTimeMillis) {
lastActiveTimeMillis = lastExecTimeMillis;
}
// 如果连贯最近一次做的操作是保活,那么最近沉闷工夫取值为最近保活工夫
if (lastKeepTimeMillis > lastActiveTimeMillis) {
lastActiveTimeMillis = lastKeepTimeMillis;
}
// 计算闲暇工夫
long idleMillis = currentTimeMillis - lastActiveTimeMillis;
// testWhileIdle为true时的判断工夫距离
long timeBetweenEvictionRunsMillis = this.timeBetweenEvictionRunsMillis;
if (timeBetweenEvictionRunsMillis <= 0) {
// timeBetweenEvictionRunsMillis如果小于等于0,那么重置为60秒
timeBetweenEvictionRunsMillis = DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS;
}
// 如果闲暇工夫大于等于timeBetweenEvictionRunsMillis,则执行连贯的有效性校验
if (idleMillis >= timeBetweenEvictionRunsMillis
|| idleMillis < 0
) {
boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
if (!validate) {
if (LOG.isDebugEnabled()) {
LOG.debug("skip not validate connection.");
}
discardConnection(poolableConnection.holder);
continue;
}
}
}
}
// 如果设置removeAbandoned为true
// 则将连贯放到activeConnections沉闷连贯map中
if (removeAbandoned) {
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
poolableConnection.connectStackTrace = stackTrace;
poolableConnection.setConnectedTimeNano();
poolableConnection.traceEnable = true;
activeConnectionLock.lock();
try {
activeConnections.put(poolableConnection, PRESENT);
} finally {
activeConnectionLock.unlock();
}
}
if (!this.defaultAutoCommit) {
poolableConnection.setAutoCommit(false);
}
return poolableConnection;
}
}
DruidDataSource#getConnectionDirect
办法中会先调用getConnectionInternal()
办法从连接池中拿连贯,而后如果开启了testOnBorrow,则校验一下连贯的有效性,如果有效则从新调用getConnectionInternal()
办法拿连贯,直到拿到的连贯通过校验。如果没有开启testOnBorrow然而开启了testWhileIdle,则会判断连贯的闲暇工夫是否大于等于timeBetweenEvictionRunsMillis参数,如果满足则校验一下连贯的有效性,若没有通过校验,那么须要从新调用getConnectionInternal()
办法拿连贯,直到拿到的连贯通过校验或者连贯的闲暇工夫小于timeBetweenEvictionRunsMillis。
上面看一下理论从连接池拿连贯的getConnectionInternal()
办法的实现,如下所示。
private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException {
......
final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait);
final int maxWaitThreadCount = this.maxWaitThreadCount;
DruidConnectionHolder holder;
// 在死循环中从连接池拿连贯
// 一开始createDirect为false,示意先从池子中拿
for (boolean createDirect = false; ; ) {
if (createDirect) {
// createDirect为true示意间接创立连贯
createStartNanosUpdater.set(this, System.nanoTime());
// creatingCount为0示意以后没有其它连贯正在被创立
if (creatingCountUpdater.compareAndSet(this, 0, 1)) {
// 创立物理连贯
PhysicalConnectionInfo pyConnInfo = DruidDataSource.this.createPhysicalConnection();
holder = new DruidConnectionHolder(this, pyConnInfo);
holder.lastActiveTimeMillis = System.currentTimeMillis();
creatingCountUpdater.decrementAndGet(this);
directCreateCountUpdater.incrementAndGet(this);
......
boolean discard;
lock.lock();
try {
// 如果以后正在应用的连接数未达到最大连接数
// 则以后正在应用的连接数加1
// 否则销毁刚刚创立进去的连贯
if (activeCount < maxActive) {
activeCount++;
holder.active = true;
if (activeCount > activePeak) {
activePeak = activeCount;
activePeakTime = System.currentTimeMillis();
}
break;
} else {
discard = true;
}
} finally {
lock.unlock();
}
if (discard) {
JdbcUtils.close(pyConnInfo.getPhysicalConnection());
}
}
}
// 上锁
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException("interrupt", e);
}
try {
// maxWaitThreadCount示意容许的最大期待连贯的利用线程数
// notEmptyWaitThreadCount示意正在期待连贯的利用线程数
// 期待连贯的利用线程数达到最大值时,抛出异样
if (maxWaitThreadCount > 0
&& notEmptyWaitThreadCount >= maxWaitThreadCount) {
connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException("maxWaitThreadCount " + maxWaitThreadCount + ", current wait Thread count "
+ lock.getQueueLength());
}
// 产生了致命谬误,且设置了致命谬误数最大值大于0,且正在应用的连接数大于等于致命谬误数最大值
if (onFatalError
&& onFatalErrorMaxActive > 0
&& activeCount >= onFatalErrorMaxActive) {
// 拼接异样并抛出
......
throw new SQLException(
errorMsg.toString(), lastFatalError);
}
connectCount++;
// 如果配置的创立连贯的线程池是一个定时线程池
// 且连接池曾经没有可用连贯,
// 且以后借出的连接数未达到容许的最大连接数
// 且以后没有其它线程(利用线程,创立连贯的线程,创立连贯的线程池里的线程)在创立连贯
// 此时将createDirect置为true,让以后利用线程间接创立连贯
if (createScheduler != null
&& poolingCount == 0
&& activeCount < maxActive
&& creatingCountUpdater.get(this) == 0
&& createScheduler instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) createScheduler;
if (executor.getQueue().size() > 0) {
createDirect = true;
continue;
}
}
if (maxWait > 0) {
// 如果设置了期待连贯的最大等待时间,则调用pollLast()办法来拿连贯
// pollLast()办法执行时如果池中没有连贯,则利用线程会在notEmpty上最多期待maxWait的工夫
holder = pollLast(nanos);
} else {
// 调用takeLast()办法拿连贯时,如果池中没有连贯,则会在notEmpty上始终期待,直到池中有连贯
holder = takeLast();
}
if (holder != null) {
if (holder.discard) {
continue;
}
// 正在应用的连接数加1
activeCount++;
holder.active = true;
if (activeCount > activePeak) {
activePeak = activeCount;
activePeakTime = System.currentTimeMillis();
}
}
} catch (InterruptedException e) {
connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException(e.getMessage(), e);
} catch (SQLException e) {
connectErrorCountUpdater.incrementAndGet(this);
throw e;
} finally {
lock.unlock();
}
break;
}
// 如果拿到的连贯为null,阐明拿连贯时期待超时了
// 此时抛出连贯超时异样
if (holder == null) {
......
final Throwable createError;
try {
lock.lock();
......
createError = this.createError;
} finally {
lock.unlock();
}
......
if (createError != null) {
throw new GetConnectionTimeoutException(errorMessage, createError);
} else {
throw new GetConnectionTimeoutException(errorMessage);
}
}
holder.incrementUseCount();
DruidPooledConnection poolalbeConnection = new DruidPooledConnection(holder);
return poolalbeConnection;
}
getConnectionInternal()
办法中拿到连贯的形式有三种,如下所示。
- 间接创立连贯。须要满足配置的创立连贯的线程池是一个定时线程池,且连接池曾经没有可用连贯,且以后借出的连接数未达到容许的最大连接数,且以后没有其它线程在创立连贯;
- 从池中拿连贯,并最多期待maxWait的工夫。须要设置了maxWait;
- 从池中拿连贯,并始终期待直到拿到连贯。
上面最初看一下超时期待拿连贯的DruidDataSource#pollLast
办法的实现。
private DruidConnectionHolder pollLast(long nanos) throws InterruptedException, SQLException {
long estimate = nanos;
for (; ; ) {
if (poolingCount == 0) {
// 如果池中曾经没有连贯,则唤醒在empty上期待的创立连接线程来创立连贯
emptySignal();
if (failFast && isFailContinuous()) {
throw new DataSourceNotAvailableException(createError);
}
// 等待时间耗尽,返回null
if (estimate <= 0) {
waitNanosLocal.set(nanos - estimate);
return null;
}
// 利用线程行将在上面的notEmpty上期待
// 这里先把期待获取连贯的利用线程数加1
notEmptyWaitThreadCount++;
if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) {
notEmptyWaitThreadPeak = notEmptyWaitThreadCount;
}
try {
long startEstimate = estimate;
// 利用线程在notEmpty上期待
// 有连贯被创立或者被偿还时,会唤醒在notEmpty上期待的利用线程
estimate = notEmpty.awaitNanos(estimate);
notEmptyWaitCount++;
notEmptyWaitNanos += (startEstimate - estimate);
if (!enable) {
connectErrorCountUpdater.incrementAndGet(this);
if (disableException != null) {
throw disableException;
}
throw new DataSourceDisableException();
}
} catch (InterruptedException ie) {
notEmpty.signal();
notEmptySignalCount++;
throw ie;
} finally {
notEmptyWaitThreadCount--;
}
if (poolingCount == 0) {
if (estimate > 0) {
// 若唤醒后池中还是没有连贯,且此时等待时间还有残余
// 则从新在notEmpty上期待
continue;
}
waitNanosLocal.set(nanos - estimate);
return null;
}
}
// poolingCount--
decrementPoolingCount();
// 从池中拿到连贯
DruidConnectionHolder last = connections[poolingCount];
connections[poolingCount] = null;
long waitNanos = nanos - estimate;
last.setLastNotEmptyWaitNanos(waitNanos);
return last;
}
}
五. DruidDataSource连贯偿还
Druid
数据库连接池中,每一个物理连贯都会被包装成DruidConnectionHolder
,在提供给利用线程前,还会将DruidConnectionHolder
包装成DruidPooledConnection
,类图如下所示。
利用线程中应用连贯结束后,会调用DruidPooledConnection
的close()
办法来偿还连贯,也就是将连贯放回连接池。DruidPooledConnection#close
办法如下所示。
public void close() throws SQLException {
if (this.disable) {
return;
}
DruidConnectionHolder holder = this.holder;
if (holder == null) {
if (dupCloseLogEnable) {
LOG.error("dup close");
}
return;
}
DruidAbstractDataSource dataSource = holder.getDataSource();
// 判断偿还连贯的线程和获取连贯的线程是否是同一个线程
boolean isSameThread = this.getOwnerThread() == Thread.currentThread();
// 如果不是同一个线程,则设置asyncCloseConnectionEnable为true
if (!isSameThread) {
dataSource.setAsyncCloseConnectionEnable(true);
}
// 如果开启了removeAbandoned机制
// 或者asyncCloseConnectionEnable为true
// 则调用syncClose()办法来偿还连贯
// syncClose()办法中会先加锁,而后调用recycle()办法来回收连贯
if (dataSource.isAsyncCloseConnectionEnable()) {
syncClose();
return;
}
if (!CLOSING_UPDATER.compareAndSet(this, 0, 1)) {
return;
}
try {
for (ConnectionEventListener listener : holder.getConnectionEventListeners()) {
listener.connectionClosed(new ConnectionEvent(this));
}
List<Filter> filters = dataSource.getProxyFilters();
if (filters.size() > 0) {
FilterChainImpl filterChain = new FilterChainImpl(dataSource);
filterChain.dataSource_recycle(this);
} else {
// 回收连贯
recycle();
}
} finally {
CLOSING_UPDATER.set(this, 0);
}
this.disable = true;
}
在DruidPooledConnection#close
办法中,会先判断本次偿还连贯的线程和获取连贯的线程是否是同一个线程,如果不是,则先加锁而后再调用recycle()
办法来回收连贯,如果是则间接调用recycle()
办法来回收连贯。当开启了removeAbandoned机制时,就可能会呈现偿还连贯的线程和获取连贯的线程不是同一个线程的状况,这是因为一旦开启了removeAbandoned机制,那么每一个被借出的连贯都会被放到activeConnections沉闷连贯map中,并且在销毁连贯的线程DestroyConnectionThread
中会每距离timeBetweenEvictionRunsMillis的工夫就遍历一次activeConnections沉闷连贯map,一旦有沉闷连贯被借出的工夫大于了removeAbandonedTimeoutMillis,那么销毁连贯的线程DestroyConnectionThread
就会被动去回收这个连贯,以避免连贯透露。
上面看一下DruidPooledConnection#recycle
办法的实现。
public void recycle() throws SQLException {
if (this.disable) {
return;
}
DruidConnectionHolder holder = this.holder;
if (holder == null) {
if (dupCloseLogEnable) {
LOG.error("dup close");
}
return;
}
if (!this.abandoned) {
DruidAbstractDataSource dataSource = holder.getDataSource();
// 调用DruidAbstractDataSource#recycle回收以后连贯
dataSource.recycle(this);
}
this.holder = null;
conn = null;
transactionInfo = null;
closed = true;
}
在DruidPooledConnection#recycle
办法中会调用到DruidDataSource#recycle
办法来回收连贯。DruidDataSource#recycle
办法实现如下所示。
protected void recycle(DruidPooledConnection pooledConnection) throws SQLException {
final DruidConnectionHolder holder = pooledConnection.holder;
......
final boolean isAutoCommit = holder.underlyingAutoCommit;
final boolean isReadOnly = holder.underlyingReadOnly;
final boolean testOnReturn = this.testOnReturn;
try {
// 如果是非主动提交且存在事务
// 则回滚事务
if ((!isAutoCommit) && (!isReadOnly)) {
pooledConnection.rollback();
}
// 重置连贯信息(配置还原为默认值,敞开Statement,革除连贯的Warnings等)
boolean isSameThread = pooledConnection.ownerThread == Thread.currentThread();
if (!isSameThread) {
final ReentrantLock lock = pooledConnection.lock;
lock.lock();
try {
holder.reset();
} finally {
lock.unlock();
}
} else {
holder.reset();
}
......
// 开启了testOnReturn机制,则校验连贯有效性
if (testOnReturn) {
boolean validate = testConnectionInternal(holder, physicalConnection);
// 校验不通过则敞开物理连贯
if (!validate) {
JdbcUtils.close(physicalConnection);
destroyCountUpdater.incrementAndGet(this);
lock.lock();
try {
if (holder.active) {
activeCount--;
holder.active = false;
}
closeCount++;
} finally {
lock.unlock();
}
return;
}
}
......
lock.lock();
try {
// 连贯行将放回连接池,须要将active设置为false
if (holder.active) {
activeCount--;
holder.active = false;
}
closeCount++;
// 将连贯放到connections数组的poolingCount地位
// 而后poolingCount加1
// 而后唤醒在notEmpty上期待连贯的一个利用线程
result = putLast(holder, currentTimeMillis);
recycleCount++;
} finally {
lock.unlock();
}
if (!result) {
JdbcUtils.close(holder.conn);
LOG.info("connection recyle failed.");
}
} catch (Throwable e) {
......
}
}
DruidDataSource#recycle
办法中会先重置连贯信息,行将连贯的一些配置重置为默认值,而后敞开连贯的Statement
和Warnings,如果开启了testOnReturn机制,则还须要校验一下连贯的有效性,校验不通过则间接敞开物理连贯,最初,将连贯放回到connections数组的poolingCount地位,而后唤醒一个在notEmpty上期待连贯的利用线程。
六. removeAbandoned机制
Druid
数据库连接池提供了removeAbandoned机制来避免连贯透露。要开启removeAbandoned机制,须要设置如下参数。
参数 | 阐明 |
---|---|
removeAbandoned | 产生连贯透露时,是否须要回收透露的连贯。默认为false,示意不回收。 |
removeAbandonedTimeoutMillis | 判断产生连贯透露的超时工夫。默认为300秒。 |
上面将对开启removeAbandoned机制后,如何回收产生了透露的连贯进行阐明。当利用线程从连接池获取到一个连贯后,如果开启了removeAbandoned机制,那么会将这个连贯放到activeConnections沉闷连贯map中,对应的办法为DruidDataSource#getConnectionDirect
,源码片段如下所示。
public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
int notFullTimeoutRetryCnt = 0;
for (; ; ) {
DruidPooledConnection poolableConnection;
......
if (removeAbandoned) {
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
poolableConnection.connectStackTrace = stackTrace;
// 设置connectedTimeNano,用于后续判断连贯借出工夫是否大于removeAbandonedTimeoutMillis
poolableConnection.setConnectedTimeNano();
poolableConnection.traceEnable = true;
activeConnectionLock.lock();
try {
// 将从连接池获取到的连贯放到activeConnections中
activeConnections.put(poolableConnection, PRESENT);
} finally {
activeConnectionLock.unlock();
}
}
if (!this.defaultAutoCommit) {
poolableConnection.setAutoCommit(false);
}
return poolableConnection;
}
}
又已知Druid
数据库连接池有一个销毁连贯的线程会每距离timeBetweenEvictionRunsMillis执行一次DestroyTask#run
办法来销毁连贯,DestroyTask#run
办法如下所示。
public void run() {
shrink(true, keepAlive);
// 如果开启了removeAbandoned机制
// 则执行removeAbandoned()办法来检测产生了透露的连贯并回收
if (isRemoveAbandoned()) {
removeAbandoned();
}
}
DestroyTask#run
办法的最初会判断是否开启了removeAbandoned机制,如果开启了则会执行DruidDataSource#removeAbandoned
办法来检测哪些连贯产生了透露,并被动回收这些连贯。DruidDataSource#removeAbandoned
办法如下所示。
public int removeAbandoned() {
int removeCount = 0;
long currrentNanos = System.nanoTime();
List<DruidPooledConnection> abandonedList = new ArrayList<DruidPooledConnection>();
activeConnectionLock.lock();
try {
Iterator<DruidPooledConnection> iter = activeConnections.keySet().iterator();
for (; iter.hasNext(); ) {
DruidPooledConnection pooledConnection = iter.next();
// 运行中的连贯不会被断定为产生了透露
if (pooledConnection.isRunning()) {
continue;
}
long timeMillis = (currrentNanos - pooledConnection.getConnectedTimeNano()) / (1000 * 1000);
// 判断连贯借出工夫是否达到连贯透露的超时工夫
if (timeMillis >= removeAbandonedTimeoutMillis) {
// 将产生了透露的连贯从activeConnections中移除
iter.remove();
pooledConnection.setTraceEnable(false);
// 将产生了泄露的连贯增加到abandonedList汇合中
abandonedList.add(pooledConnection);
}
}
} finally {
activeConnectionLock.unlock();
}
if (abandonedList.size() > 0) {
// 遍历abandonedList汇合
// 被动调用每个产生了透露的DruidPooledConnection的close()办法来回收连贯
for (DruidPooledConnection pooledConnection : abandonedList) {
final ReentrantLock lock = pooledConnection.lock;
lock.lock();
try {
if (pooledConnection.isDisable()) {
continue;
}
} finally {
lock.unlock();
}
JdbcUtils.close(pooledConnection);
pooledConnection.abandond();
removeAbandonedCount++;
removeCount++;
......
}
}
return removeCount;
}
DruidDataSource#removeAbandoned
办法中次要实现的事件就是将每个产生了透露的连贯从activeConnections中挪动到abandonedList中,而后遍历abandonedList中的每个连贯并调用DruidPooledConnection#close
办法,最终实现透露连贯的回收。
总结
Druid
数据库连接池中,利用线程向连接池获取连贯时,如果池中没有连贯,则利用线程会在notEmpty上期待,同时Druid
数据库连接池中有一个创立连贯的线程,会继续的向连接池创立连贯,如果连接池已满,则创立连贯的线程会在empty上期待。
当有连贯被生产,或者有连贯被偿还,会唤醒在notEmpty上期待的利用线程,同理有连贯被销毁时,会唤醒在empty上期待的生产连贯的线程。
Druid
数据库连接池中还有一个销毁连贯的线程,会每距离timeBetweenEvictionRunsMillis的工夫执行一次DestroyTask
工作来销毁连贯,这些被销毁的连贯能够是存活工夫达到最大值的连贯,也能够是闲暇工夫达到指定值的连贯。如果还开启了保活机制,那么闲暇工夫大于keepAliveBetweenTimeMillis的连贯都会被校验一次有效性,校验不通过的连贯会被销毁。
最初,Druid
数据库连接池提供了removeAbandoned机制来避免连贯透露,当开启了removeAbandoned机制时,每一个被利用线程获取的连贯都会被增加到activeConnections沉闷连贯map中,如果这个连贯在利用线程中应用结束后没有被敞开,那么Druid
数据库连接池会从activeConnections中将其辨认进去并被动回收。
发表回复