共计 36514 个字符,预计需要花费 92 分钟才能阅读完成。
前言
本文将对 Druid
数据库连接池的源码进行剖析和学习,以理解 Druid
数据库连接池的工作原理。Druid
数据库连接池的根本逻辑简直全副在 DruidDataSource
类中,所以本文次要是围绕 DruidDataSource
的各项性能开展阐述。
Druid
版本:1.2.11
注释
一. DruidDataSource 初始化
DruidDataSource
初始化有两种形式,如下所示。
- 将
DruidDataSource
实例创立进去后,被动调用其init()
办法实现初始化; - 首次调用
DruidDataSource
的getConnection()
办法时,会调用到init()
办法实现初始化。
因为 init()
办法过长,上面将分点介绍 init()
办法实现的要害事件。
1. 双重查看 inited 状态
对 inited 状态进行 Double Check,避免DruidDataSource
初始化两次。源码示意如下。
public void init() throws SQLException {if (inited) {return;}
......
final ReentrantLock lock = this.lock;
try {lock.lockInterruptibly();
} catch (InterruptedException e) {throw new SQLException("interrupt", e);
}
boolean init = false;
try {if (inited) {return;}
......
} catch (SQLException e) {......} catch (InterruptedException e) {......} catch (RuntimeException e) {......} catch (Error e) {......} finally {
inited = true;
lock.unlock();
......
}
}
2. 判断数据库类型
依据 jdbcUrl 失去数据库类型dbTypeName。源码如下所示。
if (this.dbTypeName == null || this.dbTypeName.length() == 0) {this.dbTypeName = JdbcUtils.getDbType(jdbcUrl, null);
}
3. 参数校验
对一些要害参数进行校验。源码如下所示。
// 连接池最大连贯数量不能小于等于 0
if (maxActive <= 0) {throw new IllegalArgumentException("illegal maxActive" + maxActive);
}
// 连接池最大连贯数量不能小于最小连贯数量
if (maxActive < minIdle) {throw new IllegalArgumentException("illegal maxActive" + maxActive);
}
// 连接池初始连贯数量不能大于最大连贯数量
if (getInitialSize() > maxActive) {throw new IllegalArgumentException("illegal initialSize" + this.initialSize + ", maxActive" + maxActive);
}
// 不容许同时开启基于日志伎俩记录连接池状态和全局状态监控
if (timeBetweenLogStatsMillis > 0 && useGlobalDataSourceStat) {throw new IllegalArgumentException("timeBetweenLogStatsMillis not support useGlobalDataSourceStat=true");
}
// 连贯最大闲暇工夫不能小于连贯最小闲暇工夫
if (maxEvictableIdleTimeMillis < minEvictableIdleTimeMillis) {throw new SQLException("maxEvictableIdleTimeMillis must be grater than minEvictableIdleTimeMillis");
}
// 不容许开启了保活机制但保活间隔时间小于等于回收查看工夫距离
if (keepAlive && keepAliveBetweenTimeMillis <= timeBetweenEvictionRunsMillis) {throw new SQLException("keepAliveBetweenTimeMillis must be grater than timeBetweenEvictionRunsMillis");
}
4. SPI 机制加载过滤器
调用到 DruidDataSource#initFromSPIServiceLoader
办法,基于 SPI 机制加载过滤器Filter
。源码如下所示。
private void initFromSPIServiceLoader() {if (loadSpifilterSkip) {return;}
if (autoFilters == null) {List<Filter> filters = new ArrayList<Filter>();
// 基于 ServiceLoader 加载 Filter
ServiceLoader<Filter> autoFilterLoader = ServiceLoader.load(Filter.class);
// 遍历加载的每一个 Filter,依据 @AutoLoad 注解的属性判断是否加载该 Filter
for (Filter filter : autoFilterLoader) {AutoLoad autoLoad = filter.getClass().getAnnotation(AutoLoad.class);
if (autoLoad != null && autoLoad.value()) {filters.add(filter);
}
}
autoFilters = filters;
}
// 将每个须要加载的 Filter 增加到 filters 字段中,并去重
for (Filter filter : autoFilters) {if (LOG.isInfoEnabled()) {LOG.info("load filter from spi :" + filter.getClass().getName());
}
addFilter(filter);
}
}
5. 加载驱动
调用 DruidDataSource#resolveDriver
办法,依据配置的驱动名称加载数据库驱动。源码如下所示。
protected void resolveDriver() throws SQLException {if (this.driver == null) {
// 若没有配置驱动名则尝试从 jdbcUrl 中获取
if (this.driverClass == null || this.driverClass.isEmpty()) {this.driverClass = JdbcUtils.getDriverClassName(this.jdbcUrl);
}
// Mock 驱动相干
if (MockDriver.class.getName().equals(driverClass)) {driver = MockDriver.instance;} else if ("com.alibaba.druid.support.clickhouse.BalancedClickhouseDriver".equals(driverClass)) {
// ClickHouse 相干
Properties info = new Properties();
info.put("user", username);
info.put("password", password);
info.putAll(connectProperties);
driver = new BalancedClickhouseDriver(jdbcUrl, info);
} else {if (jdbcUrl == null && (driverClass == null || driverClass.length() == 0)) {throw new SQLException("url not set");
}
// 加载驱动
driver = JdbcUtils.createDriver(driverClassLoader, driverClass);
}
} else {if (this.driverClass == null) {this.driverClass = driver.getClass().getName();}
}
}
6. 初始化连贯有效性校验器
调用 DruidDataSource#initValidConnectionChecker
办法,初始化ValidConnectionChecker
,用于校验某个连贯是否可用。源码如下所示。
private void initValidConnectionChecker() {if (this.validConnectionChecker != null) {return;}
String realDriverClassName = driver.getClass().getName();
// 不同的数据库初始化不同的 ValidConnectionChecker
if (JdbcUtils.isMySqlDriver(realDriverClassName)) {
// MySQL 数据库还反对应用 ping 的形式来校验连贯活性,这比执行一条简略查问语句来判活更高效
// 由 usePingMethod 参数决定是否开启
this.validConnectionChecker = new MySqlValidConnectionChecker(usePingMethod);
} else if (realDriverClassName.equals(JdbcConstants.ORACLE_DRIVER)
|| realDriverClassName.equals(JdbcConstants.ORACLE_DRIVER2)) {this.validConnectionChecker = new OracleValidConnectionChecker();
} else if (realDriverClassName.equals(JdbcConstants.SQL_SERVER_DRIVER)
|| realDriverClassName.equals(JdbcConstants.SQL_SERVER_DRIVER_SQLJDBC4)
|| realDriverClassName.equals(JdbcConstants.SQL_SERVER_DRIVER_JTDS)) {this.validConnectionChecker = new MSSQLValidConnectionChecker();
} else if (realDriverClassName.equals(JdbcConstants.POSTGRESQL_DRIVER)
|| realDriverClassName.equals(JdbcConstants.ENTERPRISEDB_DRIVER)
|| realDriverClassName.equals(JdbcConstants.POLARDB_DRIVER)) {this.validConnectionChecker = new PGValidConnectionChecker();
} else if (realDriverClassName.equals(JdbcConstants.OCEANBASE_DRIVER)
|| (realDriverClassName.equals(JdbcConstants.OCEANBASE_DRIVER2))) {DbType dbType = DbType.of(this.dbTypeName);
this.validConnectionChecker = new OceanBaseValidConnectionChecker(dbType);
}
}
7. 初始化全局状态统计器
如果 useGlobalDataSourceStat 设置为true,则初始化全局状态统计器,用于统计和剖析数据库连接池的性能数据。源码片段如下所示。
if (isUseGlobalDataSourceStat()) {dataSourceStat = JdbcDataSourceStat.getGlobal();
if (dataSourceStat == null) {dataSourceStat = new JdbcDataSourceStat("Global", "Global", this.dbTypeName);
JdbcDataSourceStat.setGlobal(dataSourceStat);
}
if (dataSourceStat.getDbType() == null) {dataSourceStat.setDbType(this.dbTypeName);
}
} else {dataSourceStat = new JdbcDataSourceStat(this.name, this.jdbcUrl, this.dbTypeName, this.connectProperties);
}
8. 初始化连接池数组并预热
创立三个连接池数组,别离是 connections(用于寄存能获取的连贯对象),evictConnections(用于寄存须要抛弃的连贯对象)和keepAliveConnections(用于寄存须要保活的连贯对象)。连接池的预热有两种,如果配置了asyncInit 为true,且异步线程池不为空,则执行异步连接池预热,反之执行同步连接池预热。
// 用于寄存能获取的连贯对象,真正意义上的连接池
// 曾经被获取的连贯不在其中
connections = new DruidConnectionHolder[maxActive];
// 用于寄存须要被敞开抛弃的连贯
evictConnections = new DruidConnectionHolder[maxActive];
// 用于寄存须要保活的连贯
keepAliveConnections = new DruidConnectionHolder[maxActive];
SQLException connectError = null;
// 有线程池且异步初始化配置为 true,则异步预热
if (createScheduler != null && asyncInit) {for (int i = 0; i < initialSize; ++i) {submitCreateTask(true);
}
} else if (!asyncInit) {
// 同步预热,预热连接数由 initialSize 配置
while (poolingCount < initialSize) {
try {PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
// 对 DruidDataSource 和 Connection 做了一层封装
DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
connections[poolingCount++] = holder;
} catch (SQLException ex) {LOG.error("init datasource error, url:" + this.getUrl(), ex);
if (initExceptionThrow) {
connectError = ex;
break;
} else {Thread.sleep(3000);
}
}
}
if (poolingCount > 0) {
poolingPeak = poolingCount;
poolingPeakTime = System.currentTimeMillis();}
}
9. 创立日志记录线程并启动
调用 DruidDataSource#createAndLogThread
办法创立通过打印日志来记录连接池状态的线程。createAndLogThread()
办法如下所示。
private void createAndLogThread() {
// timeBetweenLogStatsMillis 小于等于 0 示意不开启打印日志记录连接池状态的性能
if (this.timeBetweenLogStatsMillis <= 0) {return;}
String threadName = "Druid-ConnectionPool-Log-" + System.identityHashCode(this);
// 创立线程
logStatsThread = new LogStatsThread(threadName);
// 启动线程
logStatsThread.start();
this.resetStatEnable = false;
}
createAndLogThread()
办法会创立 LogStatsThread
并启动,即会调用到 LogStatsThread
的run()
办法。LogStatsThread
线程的 run()
办法如下所示。
public void run() {
try {for (; ;) {
try {
// 每距离 timeBetweenLogStatsMillis 就打印一次连接池状态
logStats();} catch (Exception e) {LOG.error("logStats error", e);
}
Thread.sleep(timeBetweenLogStatsMillis);
}
} catch (InterruptedException e) {}}
上述 run()
办法中会每距离 timeBetweenLogStatsMillis 的工夫就调用一次 logStats()
办法来打印连接池状态。logStats()
办法如下所示。
public void logStats() {
final DruidDataSourceStatLogger statLogger = this.statLogger;
if (statLogger == null) {return;}
// 拿到各种连接池的状态
DruidDataSourceStatValue statValue = getStatValueAndReset();
// 打印
statLogger.log(statValue);
}
在 logStats()
办法中会先调用 getStatValueAndReset()
办法来拿到各种连接池的状态,而后调用 DruidDataSourceStatLogger
实现打印。最初看一眼 getStatValueAndReset()
办法外面拿哪些连接池状态,getStatValueAndReset()
办法代码片段如下所示。
public DruidDataSourceStatValue getStatValueAndReset() {DruidDataSourceStatValue value = new DruidDataSourceStatValue();
lock.lock();
try {value.setPoolingCount(this.poolingCount);
value.setPoolingPeak(this.poolingPeak);
value.setPoolingPeakTime(this.poolingPeakTime);
value.setActiveCount(this.activeCount);
value.setActivePeak(this.activePeak);
value.setActivePeakTime(this.activePeakTime);
value.setConnectCount(this.connectCount);
value.setCloseCount(this.closeCount);
value.setWaitThreadCount(lock.getWaitQueueLength(notEmpty));
value.setNotEmptyWaitCount(this.notEmptyWaitCount);
value.setNotEmptyWaitNanos(this.notEmptyWaitNanos);
value.setKeepAliveCheckCount(this.keepAliveCheckCount);
// 重置参数
this.poolingPeak = 0;
this.poolingPeakTime = 0;
this.activePeak = 0;
this.activePeakTime = 0;
this.connectCount = 0;
this.closeCount = 0;
this.keepAliveCheckCount = 0;
this.notEmptyWaitCount = 0;
this.notEmptyWaitNanos = 0;
} finally {lock.unlock();
}
value.setName(this.getName());
value.setDbType(this.dbTypeName);
value.setDriverClassName(this.getDriverClassName());
......
value.setSqlSkipCount(this.getDataSourceStat().getSkipSqlCountAndReset());
value.setSqlList(this.getDataSourceStat().getSqlStatMapAndReset());
return value;
}
10. 创立创立连贯的线程并启动
调用 DruidDataSource#createAndStartCreatorThread
办法来创立创立连贯的线程 CreateConnectionThread
并启动。createAndStartCreatorThread()
办法如下所示。
protected void createAndStartCreatorThread() {
// 只有异步创立连贯的线程池为空时,才创立 CreateConnectionThread
if (createScheduler == null) {String threadName = "Druid-ConnectionPool-Create-" + System.identityHashCode(this);
createConnectionThread = new CreateConnectionThread(threadName);
// 启动线程
createConnectionThread.start();
return;
}
initedLatch.countDown();}
CreateConnectionThread
只有在异步创立连贯的线程池 createScheduler 为空时,才会被创立进去,并且在 CreateConnectionThread
的run()
办法一开始,就会调用 initedLatch 的countDown()
办法,其中 initedLatch 是一个初始值为 2 的 CountDownLatch
对象,另外一次 countDown()
调用在 DestroyConnectionThread
的run()
办法中,目标就是 init()
办法执行完以前,创立连贯的线程和销毁连贯的线程肯定要创立进去并启动结束。
createAndLogThread();
// 在外部会调用到 initedLatch.countDown()
createAndStartCreatorThread();
// 在外部最终会调用 initedLatch.countDown()
createAndStartDestroyThread();
initedLatch.await();
11. 创立销毁连贯的线程并启动
调用 DruidDataSource#createAndStartDestroyThread
办法来创立销毁连贯的线程 DestroyConnectionThread
并启动。createAndStartDestroyThread()
办法如下所示。
protected void createAndStartDestroyThread() {
// 销毁连贯的工作
destroyTask = new DestroyTask();
// 如果销毁连贯的线程池不会为空,则让其周期执行销毁连贯的工作
if (destroyScheduler != null) {
long period = timeBetweenEvictionRunsMillis;
if (period <= 0) {period = 1000;}
destroySchedulerFuture = destroyScheduler.scheduleAtFixedRate(destroyTask, period, period,
TimeUnit.MILLISECONDS);
initedLatch.countDown();
return;
}
// 如果销毁连贯的线程池为空,则创立销毁连贯的线程
String threadName = "Druid-ConnectionPool-Destroy-" + System.identityHashCode(this);
destroyConnectionThread = new DestroyConnectionThread(threadName);
// 启动线程
destroyConnectionThread.start();}
createAndStartDestroyThread()
办法中会先判断销毁连贯的线程池是否存在,如果存在,则不再创立 DestroyConnectionThread
,而是会让销毁连贯的线程池来执行销毁工作,如果不存在,则创立DestroyConnectionThread
并启动,此时 initedLatch 的countDown()
调用是在 DestroyConnectionThread
的run()
办法中。DestroyConnectionThread#run
办法源码如下所示。
public void run() {// run()办法只有执行了,就调用 initedLatch#countDown
initedLatch.countDown();
for (; ;) {// 每距离 timeBetweenEvictionRunsMillis 执行一次 DestroyTask 的 run()办法
try {if (closed || closing) {break;}
if (timeBetweenEvictionRunsMillis > 0) {Thread.sleep(timeBetweenEvictionRunsMillis);
} else {Thread.sleep(1000);
}
if (Thread.interrupted()) {break;}
// 执行 DestroyTask 的 run()办法来销毁须要销毁的线程
destroyTask.run();} catch (InterruptedException e) {break;}
}
}
DestroyConnectionThread#run
办法只有被调用到,那么就会调用 initedLatch 的countDown()
办法,此时阻塞在 init()
办法中的 initedLatch.await()
办法上的线程就会被唤醒并持续往下执行。
二. DruidDataSource 连贯创立
DruidDataSource
连贯的创立由 CreateConnectionThread
线程实现,其 run()
办法如下所示。
public void run() {initedLatch.countDown();
long lastDiscardCount = 0;
int errorCount = 0;
for (; ;) {
try {lock.lockInterruptibly();
} catch (InterruptedException e2) {break;}
long discardCount = DruidDataSource.this.discardCount;
boolean discardChanged = discardCount - lastDiscardCount > 0;
lastDiscardCount = discardCount;
try {
// emptyWait 为 true 示意生产连接线程须要期待,无需生产连贯
boolean emptyWait = true;
// 产生了创立谬误,且池中已无连贯,且抛弃连贯的统计没有扭转
// 此时生产连接线程须要生产连贯
if (createError != null
&& poolingCount == 0
&& !discardChanged) {emptyWait = false;}
if (emptyWait
&& asyncInit && createCount < initialSize) {emptyWait = false;}
if (emptyWait) {
// 池中已有连接数大于等于正在期待连贯的利用线程数
// 且以后是非 keepAlive 场景
// 且以后是非间断失败
// 此时生产连贯的线程在 empty 上期待
// keepAlive && activeCount + poolingCount < minIdle 时会在 shrink()办法中触发 emptySingal()来增加连贯
// isFailContinuous()返回 true 示意间断失败,即屡次(默认 2 次)创立物理连贯失败
if (poolingCount >= notEmptyWaitThreadCount
&& (!(keepAlive && activeCount + poolingCount < minIdle))
&& !isFailContinuous()) {empty.await();
}
// 避免创立超过 maxActive 数量的连贯
if (activeCount + poolingCount >= maxActive) {empty.await();
continue;
}
}
} catch (InterruptedException e) {......} finally {lock.unlock();
}
PhysicalConnectionInfo connection = null;
try {connection = createPhysicalConnection();
} catch (SQLException e) {LOG.error("create connection SQLException, url:" + jdbcUrl + ", errorCode" + e.getErrorCode()
+ ", state" + e.getSQLState(), e);
errorCount++;
if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
// 屡次创立失败
setFailContinuous(true);
// 如果配置了疾速失败,就唤醒所有在 notEmpty 上期待的利用线程
if (failFast) {lock.lock();
try {notEmpty.signalAll();
} finally {lock.unlock();
}
}
if (breakAfterAcquireFailure) {break;}
try {Thread.sleep(timeBetweenConnectErrorMillis);
} catch (InterruptedException interruptEx) {break;}
}
} catch (RuntimeException e) {LOG.error("create connection RuntimeException", e);
setFailContinuous(true);
continue;
} catch (Error e) {LOG.error("create connection Error", e);
setFailContinuous(true);
break;
}
if (connection == null) {continue;}
// 把连贯增加到连接池
boolean result = put(connection);
if (!result) {JdbcUtils.close(connection.getPhysicalConnection());
LOG.info("put physical connection to pool failed.");
}
errorCount = 0;
if (closing || closed) {break;}
}
}
CreateConnectionThread
的 run()
办法整体就是在一个死循环中一直的期待,被唤醒,而后创立线程。当一个物理连贯被创立进去后,会调用 DruidDataSource#put
办法将其放到连接池 connections 中,put()
办法源码如下所示。
protected boolean put(PhysicalConnectionInfo physicalConnectionInfo) {
DruidConnectionHolder holder = null;
try {holder = new DruidConnectionHolder(DruidDataSource.this, physicalConnectionInfo);
} catch (SQLException ex) {
......
return false;
}
return put(holder, physicalConnectionInfo.createTaskId, false);
}
private boolean put(DruidConnectionHolder holder, long createTaskId, boolean checkExists) {
// 波及到连接池中连贯数量扭转的操作,都须要加锁
lock.lock();
try {if (this.closing || this.closed) {return false;}
// 池中已有连接数曾经大于等于最大连接数,则不再把连贯加到连接池并间接返回 false
if (poolingCount >= maxActive) {if (createScheduler != null) {clearCreateTask(createTaskId);
}
return false;
}
// 查看反复增加
if (checkExists) {for (int i = 0; i < poolingCount; i++) {if (connections[i] == holder) {return false;}
}
}
// 连贯放入连接池
connections[poolingCount] = holder;
// poolingCount++
incrementPoolingCount();
if (poolingCount > poolingPeak) {
poolingPeak = poolingCount;
poolingPeakTime = System.currentTimeMillis();}
// 唤醒在 notEmpty 上期待连贯的利用线程
notEmpty.signal();
notEmptySignalCount++;
if (createScheduler != null) {clearCreateTask(createTaskId);
if (poolingCount + createTaskCount < notEmptyWaitThreadCount
&& activeCount + poolingCount + createTaskCount < maxActive) {emptySignal();
}
}
} finally {lock.unlock();
}
return true;
}
put()
办法会先将物理连贯从 PhysicalConnectionInfo
中获取进去并封装成一个 DruidConnectionHolder
,DruidConnectionHolder
就是 Druid
连接池中的连贯。新增加的连贯会寄存在连接池数组 connections 的poolingCount地位,而后 poolingCount 会加 1,也就是 poolingCount 代表着连接池中能够获取的连贯的数量。
三. DruidDataSource 连贯销毁
DruidDataSource
连贯的创立由 DestroyConnectionThread
线程实现,其 run()
办法如下所示。
public void run() {// run()办法只有执行了,就调用 initedLatch#countDown
initedLatch.countDown();
for (; ;) {// 每距离 timeBetweenEvictionRunsMillis 执行一次 DestroyTask 的 run()办法
try {if (closed || closing) {break;}
if (timeBetweenEvictionRunsMillis > 0) {Thread.sleep(timeBetweenEvictionRunsMillis);
} else {Thread.sleep(1000);
}
if (Thread.interrupted()) {break;}
// 执行 DestroyTask 的 run()办法来销毁须要销毁的连贯
destroyTask.run();} catch (InterruptedException e) {break;}
}
}
DestroyConnectionThread
的 run()
办法就是在一个死循环中每距离 timeBetweenEvictionRunsMillis 的工夫就执行一次 DestroyTask
的run()
办法。DestroyTask#run
办法实现如下所示。
public void run() {
// 依据一系列条件判断并销毁连贯
shrink(true, keepAlive);
// RemoveAbandoned 机制
if (isRemoveAbandoned()) {removeAbandoned();
}
}
在 DestroyTask#run
办法中会调用 DruidDataSource#shrink
办法来依据设定的条件来判断出须要销毁和保活的连贯。DruidDataSource#shrink
办法如下所示。
// checkTime 参数示意在将一个连贯进行销毁前,是否须要判断一下闲暇工夫
public void shrink(boolean checkTime, boolean keepAlive) {
// 加锁
try {lock.lockInterruptibly();
} catch (InterruptedException e) {return;}
// needFill = keepAlive && poolingCount + activeCount < minIdle
// needFill 为 true 时,会调用 empty.signal()唤醒生产连贯的线程来生产连贯
boolean needFill = false;
// evictCount 记录须要销毁的连接数
// keepAliveCount 记录须要保活的连接数
int evictCount = 0;
int keepAliveCount = 0;
int fatalErrorIncrement = fatalErrorCount - fatalErrorCountLastShrink;
fatalErrorCountLastShrink = fatalErrorCount;
try {if (!inited) {return;}
// checkCount = 池中已有连接数 - 最小闲暇连接数
// 失常状况下,最多可能将前 checkCount 个连贯进行销毁
final int checkCount = poolingCount - minIdle;
final long currentTimeMillis = System.currentTimeMillis();
// 失常状况下,须要遍历池中所有连贯
// 从前往后遍历,i 为数组索引
for (int i = 0; i < poolingCount; ++i) {DruidConnectionHolder connection = connections[i];
// 如果产生了致命谬误(onFatalError == true)且致命谬误产生工夫(lastFatalErrorTimeMillis)在连贯建设工夫之后
// 把连贯退出到保活连贯数组中
if ((onFatalError || fatalErrorIncrement > 0) && (lastFatalErrorTimeMillis > connection.connectTimeMillis)) {keepAliveConnections[keepAliveCount++] = connection;
continue;
}
if (checkTime) {
// phyTimeoutMillis 示意连贯的物理存活超时工夫,默认值是 -1
if (phyTimeoutMillis > 0) {
// phyConnectTimeMillis 示意连贯的物理存活工夫
long phyConnectTimeMillis = currentTimeMillis - connection.connectTimeMillis;
// 连贯的物理存活工夫大于 phyTimeoutMillis,则将这个连贯放入 evictConnections 数组
if (phyConnectTimeMillis > phyTimeoutMillis) {evictConnections[evictCount++] = connection;
continue;
}
}
// idleMillis 示意连贯的闲暇工夫
long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;
// minEvictableIdleTimeMillis 示意连贯容许的最小闲暇工夫,默认是 30 分钟
// keepAliveBetweenTimeMillis 示意保活间隔时间,默认是 2 分钟
// 如果连贯的闲暇工夫小于 minEvictableIdleTimeMillis 且还小于 keepAliveBetweenTimeMillis
// 则 connections 数组中以后连贯之后的连贯都会满足闲暇工夫小于 minEvictableIdleTimeMillis 且还小于 keepAliveBetweenTimeMillis
// 此时跳出遍历,不再查看其余的连贯
if (idleMillis < minEvictableIdleTimeMillis
&& idleMillis < keepAliveBetweenTimeMillis
) {break;}
// 连贯的闲暇工夫大于等于容许的最小闲暇工夫
if (idleMillis >= minEvictableIdleTimeMillis) {if (checkTime && i < checkCount) {
// i < checkCount 这个条件的了解如下:// 每次 shrink()办法执行时,connections 数组中只有索引 0 到 checkCount- 1 的连贯才容许被销毁
// 这样能力保障销毁完连贯后,connections 数组中至多还有 minIdle 个连贯
evictConnections[evictCount++] = connection;
continue;
} else if (idleMillis > maxEvictableIdleTimeMillis) {
// 如果闲暇工夫过久,曾经大于了容许的最大闲暇工夫(默认 7 小时)// 那么无论如何都要销毁这个连贯
evictConnections[evictCount++] = connection;
continue;
}
}
// 如果开启了保活机制,且连贯闲暇工夫大于等于了保活间隔时间
// 此时将连贯退出到保活连贯数组中
if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) {keepAliveConnections[keepAliveCount++] = connection;
}
} else {
// checkTime 为 false,那么前 checkCount 个连贯间接进行销毁,不再判断这些连贯的闲暇工夫是否超过阈值
if (i < checkCount) {evictConnections[evictCount++] = connection;
} else {break;}
}
}
// removeCount = 销毁连接数 + 保活连接数
// removeCount 示意本次从 connections 数组中拿掉的连接数
// 注:肯定是从返回后拿,失常状况下最初 minIdle 个连贯是平安的
int removeCount = evictCount + keepAliveCount;
if (removeCount > 0) {// [0, 1, 2, 3, 4, null, null, null] -> [3, 4, 2, 3, 4, null, null, null]
System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
// [3, 4, 2, 3, 4, null, null, null] -> [3, 4, null, null, null, null, null, null, null]
Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
// 更新池中连接数
poolingCount -= removeCount;
}
keepAliveCheckCount += keepAliveCount;
// 如果池中连接数加上沉闷连接数(借出去的连贯)小于最小闲暇连接数
// 则将 needFill 设为 true,后续须要唤醒生产连贯的线程来生产连贯
if (keepAlive && poolingCount + activeCount < minIdle) {needFill = true;}
} finally {lock.unlock();
}
if (evictCount > 0) {
// 遍历 evictConnections 数组,销毁其中的连贯
for (int i = 0; i < evictCount; ++i) {DruidConnectionHolder item = evictConnections[i];
Connection connection = item.getConnection();
JdbcUtils.close(connection);
destroyCountUpdater.incrementAndGet(this);
}
Arrays.fill(evictConnections, null);
}
if (keepAliveCount > 0) {
// 遍历 keepAliveConnections 数组,对其中的连贯做可用性校验
// 校验通过连贯就放入 connections 数组,没通过连贯就销毁
for (int i = keepAliveCount - 1; i >= 0; --i) {DruidConnectionHolder holer = keepAliveConnections[i];
Connection connection = holer.getConnection();
holer.incrementKeepAliveCheckCount();
boolean validate = false;
try {this.validateConnection(connection);
validate = true;
} catch (Throwable error) {if (LOG.isDebugEnabled()) {LOG.debug("keepAliveErr", error);
}
}
boolean discard = !validate;
if (validate) {holer.lastKeepTimeMillis = System.currentTimeMillis();
boolean putOk = put(holer, 0L, true);
if (!putOk) {discard = true;}
}
if (discard) {
try {connection.close();
} catch (Exception e) { }
lock.lock();
try {
discardCount++;
if (activeCount + poolingCount <= minIdle) {emptySignal();
}
} finally {lock.unlock();
}
}
}
this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
Arrays.fill(keepAliveConnections, null);
}
// 如果 needFill 为 true 则唤醒生产连贯的线程来生产连贯
if (needFill) {lock.lock();
try {
// 计算须要生产连贯的个数
int fillCount = minIdle - (activeCount + poolingCount + createTaskCount);
for (int i = 0; i < fillCount; ++i) {emptySignal();
}
} finally {lock.unlock();
}
} else if (onFatalError || fatalErrorIncrement > 0) {lock.lock();
try {emptySignal();
} finally {lock.unlock();
}
}
}
在 DruidDataSource#shrink
办法中,外围逻辑是遍历 connections 数组中的连贯,并判断这些连贯是须要销毁还是须要保活。通常状况下,connections数组中的前 checkCount(checkCount = poolingCount – minIdle) 个连贯是“危险”的,因为这些连贯只有满足了:闲暇工夫 >= minEvictableIdleTimeMillis(容许的最小闲暇工夫),那么就须要被销毁,而 connections 数组中的最初 minIdle 个连贯是“绝对平安”的,因为这些连贯只有在满足:闲暇工夫 > maxEvictableIdleTimeMillis(容许的最大闲暇工夫)时,才会被销毁。这么判断的起因,次要就是须要让连接池里可能保障至多有 minIdle 个闲暇连贯能够让利用线程获取。
当确定好了须要销毁和须要保活的连贯后,此时会先将 connections 数组清理,只保留平安的连贯,这个过程示意图如下。
最初,会遍历 evictConnections 数组,销毁数组中的连贯,遍历 keepAliveConnections 数组,对其中的每个连贯做可用性校验,如果校验可用,那么就从新放回 connections 数组,否则销毁。
四. DruidDataSource 连贯获取
DruidDataSource
获取连贯的入口办法是 DruidDataSource#getConnection
办法,实现如下。
public DruidPooledConnection getConnection() throws SQLException {
// maxWait 示意获取连贯时最大等待时间,单位毫秒,默认值为 -1
return getConnection(maxWait);
}
public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
// 首次获取连贯时触发数据库连接池初始化
init();
if (filters.size() > 0) {FilterChainImpl filterChain = new FilterChainImpl(this);
return filterChain.dataSource_connect(this, maxWaitMillis);
} else {
// 间接获取连贯
return getConnectionDirect(maxWaitMillis);
}
}
DruidDataSource#getConnection
办法会调用到 DruidDataSource#getConnectionDirect
办法来获取连贯,实现如下所示。
public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
int notFullTimeoutRetryCnt = 0;
for (; ;) {
DruidPooledConnection poolableConnection;
try {
// 从连接池拿到连贯
poolableConnection = getConnectionInternal(maxWaitMillis);
} catch (GetConnectionTimeoutException ex) {
// 拿连贯时有异样,能够重试
// 重试次数由 notFullTimeoutRetryCount 指定
if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) {
notFullTimeoutRetryCnt++;
if (LOG.isWarnEnabled()) {LOG.warn("get connection timeout retry :" + notFullTimeoutRetryCnt);
}
continue;
}
throw ex;
}
// 如果配置了 testOnBorrow = true,那么每次拿到连贯后,都须要校验这个连贯的有效性
if (testOnBorrow) {boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
// 如果连贯不可用,则销毁连贯,而后从新从池中获取
if (!validate) {if (LOG.isDebugEnabled()) {LOG.debug("skip not validate connection.");
}
discardConnection(poolableConnection.holder);
continue;
}
} else {if (poolableConnection.conn.isClosed()) {discardConnection(poolableConnection.holder);
continue;
}
// 如果配置 testOnBorrow = fasle 但 testWhileIdle = true
// 则判断连贯闲暇工夫是否大于等于 timeBetweenEvictionRunsMillis
// 如果是,则校验连贯的有效性
if (testWhileIdle) {
final DruidConnectionHolder holder = poolableConnection.holder;
long currentTimeMillis = System.currentTimeMillis();
// lastActiveTimeMillis 是连贯最近一次沉闷工夫
// 新建连贯,偿还连贯到连接池,都会更新这个工夫
long lastActiveTimeMillis = holder.lastActiveTimeMillis;
// lastExecTimeMillis 是连贯最近一次执行工夫
// 新建连贯,设置连贯的事务是否主动提交,记录 SQL 到事务信息中,都会更新这个工夫
long lastExecTimeMillis = holder.lastExecTimeMillis;
// lastKeepTimeMillis 是连贯最近一次保活工夫
// 在连贯被保活并放回连接池时,会更新这个工夫
long lastKeepTimeMillis = holder.lastKeepTimeMillis;
// 如果配置 checkExecuteTime 为 true,则最近沉闷工夫取值为最近执行工夫
if (checkExecuteTime
&& lastExecTimeMillis != lastActiveTimeMillis) {lastActiveTimeMillis = lastExecTimeMillis;}
// 如果连贯最近一次做的操作是保活,那么最近沉闷工夫取值为最近保活工夫
if (lastKeepTimeMillis > lastActiveTimeMillis) {lastActiveTimeMillis = lastKeepTimeMillis;}
// 计算闲暇工夫
long idleMillis = currentTimeMillis - lastActiveTimeMillis;
// testWhileIdle 为 true 时的判断工夫距离
long timeBetweenEvictionRunsMillis = this.timeBetweenEvictionRunsMillis;
if (timeBetweenEvictionRunsMillis <= 0) {
// timeBetweenEvictionRunsMillis 如果小于等于 0,那么重置为 60 秒
timeBetweenEvictionRunsMillis = DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS;
}
// 如果闲暇工夫大于等于 timeBetweenEvictionRunsMillis,则执行连贯的有效性校验
if (idleMillis >= timeBetweenEvictionRunsMillis
|| idleMillis < 0
) {boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
if (!validate) {if (LOG.isDebugEnabled()) {LOG.debug("skip not validate connection.");
}
discardConnection(poolableConnection.holder);
continue;
}
}
}
}
// 如果设置 removeAbandoned 为 true
// 则将连贯放到 activeConnections 沉闷连贯 map 中
if (removeAbandoned) {StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
poolableConnection.connectStackTrace = stackTrace;
poolableConnection.setConnectedTimeNano();
poolableConnection.traceEnable = true;
activeConnectionLock.lock();
try {activeConnections.put(poolableConnection, PRESENT);
} finally {activeConnectionLock.unlock();
}
}
if (!this.defaultAutoCommit) {poolableConnection.setAutoCommit(false);
}
return poolableConnection;
}
}
DruidDataSource#getConnectionDirect
办法中会先调用 getConnectionInternal()
办法从连接池中拿连贯,而后如果开启了 testOnBorrow,则校验一下连贯的有效性,如果有效则从新调用getConnectionInternal()
办法拿连贯,直到拿到的连贯通过校验。如果没有开启 testOnBorrow 然而开启了 testWhileIdle,则会判断连贯的闲暇工夫是否大于等于timeBetweenEvictionRunsMillis 参数,如果满足则校验一下连贯的有效性,若没有通过校验,那么须要从新调用 getConnectionInternal()
办法拿连贯,直到拿到的连贯通过校验或者连贯的闲暇工夫小于timeBetweenEvictionRunsMillis。
上面看一下理论从连接池拿连贯的 getConnectionInternal()
办法的实现,如下所示。
private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException {
......
final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait);
final int maxWaitThreadCount = this.maxWaitThreadCount;
DruidConnectionHolder holder;
// 在死循环中从连接池拿连贯
// 一开始 createDirect 为 false,示意先从池子中拿
for (boolean createDirect = false; ;) {if (createDirect) {
// createDirect 为 true 示意间接创立连贯
createStartNanosUpdater.set(this, System.nanoTime());
// creatingCount 为 0 示意以后没有其它连贯正在被创立
if (creatingCountUpdater.compareAndSet(this, 0, 1)) {
// 创立物理连贯
PhysicalConnectionInfo pyConnInfo = DruidDataSource.this.createPhysicalConnection();
holder = new DruidConnectionHolder(this, pyConnInfo);
holder.lastActiveTimeMillis = System.currentTimeMillis();
creatingCountUpdater.decrementAndGet(this);
directCreateCountUpdater.incrementAndGet(this);
......
boolean discard;
lock.lock();
try {
// 如果以后正在应用的连接数未达到最大连接数
// 则以后正在应用的连接数加 1
// 否则销毁刚刚创立进去的连贯
if (activeCount < maxActive) {
activeCount++;
holder.active = true;
if (activeCount > activePeak) {
activePeak = activeCount;
activePeakTime = System.currentTimeMillis();}
break;
} else {discard = true;}
} finally {lock.unlock();
}
if (discard) {JdbcUtils.close(pyConnInfo.getPhysicalConnection());
}
}
}
// 上锁
try {lock.lockInterruptibly();
} catch (InterruptedException e) {connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException("interrupt", e);
}
try {
// maxWaitThreadCount 示意容许的最大期待连贯的利用线程数
// notEmptyWaitThreadCount 示意正在期待连贯的利用线程数
// 期待连贯的利用线程数达到最大值时,抛出异样
if (maxWaitThreadCount > 0
&& notEmptyWaitThreadCount >= maxWaitThreadCount) {connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException("maxWaitThreadCount" + maxWaitThreadCount + ", current wait Thread count"
+ lock.getQueueLength());
}
// 产生了致命谬误,且设置了致命谬误数最大值大于 0,且正在应用的连接数大于等于致命谬误数最大值
if (onFatalError
&& onFatalErrorMaxActive > 0
&& activeCount >= onFatalErrorMaxActive) {
// 拼接异样并抛出
......
throw new SQLException(errorMsg.toString(), lastFatalError);
}
connectCount++;
// 如果配置的创立连贯的线程池是一个定时线程池
// 且连接池曾经没有可用连贯,// 且以后借出的连接数未达到容许的最大连接数
// 且以后没有其它线程(利用线程,创立连贯的线程,创立连贯的线程池里的线程)在创立连贯
// 此时将 createDirect 置为 true,让以后利用线程间接创立连贯
if (createScheduler != null
&& poolingCount == 0
&& activeCount < maxActive
&& creatingCountUpdater.get(this) == 0
&& createScheduler instanceof ScheduledThreadPoolExecutor) {ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) createScheduler;
if (executor.getQueue().size() > 0) {
createDirect = true;
continue;
}
}
if (maxWait > 0) {// 如果设置了期待连贯的最大等待时间,则调用 pollLast()办法来拿连贯
// pollLast()办法执行时如果池中没有连贯,则利用线程会在 notEmpty 上最多期待 maxWait 的工夫
holder = pollLast(nanos);
} else {// 调用 takeLast()办法拿连贯时,如果池中没有连贯,则会在 notEmpty 上始终期待,直到池中有连贯
holder = takeLast();}
if (holder != null) {if (holder.discard) {continue;}
// 正在应用的连接数加 1
activeCount++;
holder.active = true;
if (activeCount > activePeak) {
activePeak = activeCount;
activePeakTime = System.currentTimeMillis();}
}
} catch (InterruptedException e) {connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException(e.getMessage(), e);
} catch (SQLException e) {connectErrorCountUpdater.incrementAndGet(this);
throw e;
} finally {lock.unlock();
}
break;
}
// 如果拿到的连贯为 null,阐明拿连贯时期待超时了
// 此时抛出连贯超时异样
if (holder == null) {
......
final Throwable createError;
try {lock.lock();
......
createError = this.createError;
} finally {lock.unlock();
}
......
if (createError != null) {throw new GetConnectionTimeoutException(errorMessage, createError);
} else {throw new GetConnectionTimeoutException(errorMessage);
}
}
holder.incrementUseCount();
DruidPooledConnection poolalbeConnection = new DruidPooledConnection(holder);
return poolalbeConnection;
}
getConnectionInternal()
办法中拿到连贯的形式有三种,如下所示。
- 间接创立连贯。须要满足配置的创立连贯的线程池是一个定时线程池,且连接池曾经没有可用连贯,且以后借出的连接数未达到容许的最大连接数,且以后没有其它线程在创立连贯;
- 从池中拿连贯,并最多期待 maxWait 的工夫。须要设置了maxWait;
- 从池中拿连贯,并始终期待直到拿到连贯。
上面最初看一下超时期待拿连贯的 DruidDataSource#pollLast
办法的实现。
private DruidConnectionHolder pollLast(long nanos) throws InterruptedException, SQLException {
long estimate = nanos;
for (; ;) {if (poolingCount == 0) {
// 如果池中曾经没有连贯,则唤醒在 empty 上期待的创立连接线程来创立连贯
emptySignal();
if (failFast && isFailContinuous()) {throw new DataSourceNotAvailableException(createError);
}
// 等待时间耗尽,返回 null
if (estimate <= 0) {waitNanosLocal.set(nanos - estimate);
return null;
}
// 利用线程行将在上面的 notEmpty 上期待
// 这里先把期待获取连贯的利用线程数加 1
notEmptyWaitThreadCount++;
if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) {notEmptyWaitThreadPeak = notEmptyWaitThreadCount;}
try {
long startEstimate = estimate;
// 利用线程在 notEmpty 上期待
// 有连贯被创立或者被偿还时,会唤醒在 notEmpty 上期待的利用线程
estimate = notEmpty.awaitNanos(estimate);
notEmptyWaitCount++;
notEmptyWaitNanos += (startEstimate - estimate);
if (!enable) {connectErrorCountUpdater.incrementAndGet(this);
if (disableException != null) {throw disableException;}
throw new DataSourceDisableException();}
} catch (InterruptedException ie) {notEmpty.signal();
notEmptySignalCount++;
throw ie;
} finally {notEmptyWaitThreadCount--;}
if (poolingCount == 0) {if (estimate > 0) {
// 若唤醒后池中还是没有连贯,且此时等待时间还有残余
// 则从新在 notEmpty 上期待
continue;
}
waitNanosLocal.set(nanos - estimate);
return null;
}
}
// poolingCount--
decrementPoolingCount();
// 从池中拿到连贯
DruidConnectionHolder last = connections[poolingCount];
connections[poolingCount] = null;
long waitNanos = nanos - estimate;
last.setLastNotEmptyWaitNanos(waitNanos);
return last;
}
}
五. DruidDataSource 连贯偿还
Druid
数据库连接池中,每一个物理连贯都会被包装成 DruidConnectionHolder
,在提供给利用线程前,还会将DruidConnectionHolder
包装成DruidPooledConnection
,类图如下所示。
利用线程中应用连贯结束后,会调用 DruidPooledConnection
的close()
办法来偿还连贯,也就是将连贯放回连接池。DruidPooledConnection#close
办法如下所示。
public void close() throws SQLException {if (this.disable) {return;}
DruidConnectionHolder holder = this.holder;
if (holder == null) {if (dupCloseLogEnable) {LOG.error("dup close");
}
return;
}
DruidAbstractDataSource dataSource = holder.getDataSource();
// 判断偿还连贯的线程和获取连贯的线程是否是同一个线程
boolean isSameThread = this.getOwnerThread() == Thread.currentThread();
// 如果不是同一个线程,则设置 asyncCloseConnectionEnable 为 true
if (!isSameThread) {dataSource.setAsyncCloseConnectionEnable(true);
}
// 如果开启了 removeAbandoned 机制
// 或者 asyncCloseConnectionEnable 为 true
// 则调用 syncClose()办法来偿还连贯
// syncClose()办法中会先加锁,而后调用 recycle()办法来回收连贯
if (dataSource.isAsyncCloseConnectionEnable()) {syncClose();
return;
}
if (!CLOSING_UPDATER.compareAndSet(this, 0, 1)) {return;}
try {for (ConnectionEventListener listener : holder.getConnectionEventListeners()) {listener.connectionClosed(new ConnectionEvent(this));
}
List<Filter> filters = dataSource.getProxyFilters();
if (filters.size() > 0) {FilterChainImpl filterChain = new FilterChainImpl(dataSource);
filterChain.dataSource_recycle(this);
} else {
// 回收连贯
recycle();}
} finally {CLOSING_UPDATER.set(this, 0);
}
this.disable = true;
}
在 DruidPooledConnection#close
办法中,会先判断本次偿还连贯的线程和获取连贯的线程是否是同一个线程,如果不是,则先加锁而后再调用 recycle()
办法来回收连贯,如果是则间接调用 recycle()
办法来回收连贯。当开启了 removeAbandoned 机制时,就可能会呈现偿还连贯的线程和获取连贯的线程不是同一个线程的状况,这是因为一旦开启了 removeAbandoned 机制,那么每一个被借出的连贯都会被放到 activeConnections 沉闷连贯 map 中,并且在销毁连贯的线程 DestroyConnectionThread
中会每距离 timeBetweenEvictionRunsMillis 的工夫就遍历一次 activeConnections 沉闷连贯 map,一旦有沉闷连贯被借出的工夫大于了removeAbandonedTimeoutMillis,那么销毁连贯的线程DestroyConnectionThread
就会被动去回收这个连贯,以避免 连贯透露。
上面看一下 DruidPooledConnection#recycle
办法的实现。
public void recycle() throws SQLException {if (this.disable) {return;}
DruidConnectionHolder holder = this.holder;
if (holder == null) {if (dupCloseLogEnable) {LOG.error("dup close");
}
return;
}
if (!this.abandoned) {DruidAbstractDataSource dataSource = holder.getDataSource();
// 调用 DruidAbstractDataSource#recycle 回收以后连贯
dataSource.recycle(this);
}
this.holder = null;
conn = null;
transactionInfo = null;
closed = true;
}
在 DruidPooledConnection#recycle
办法中会调用到 DruidDataSource#recycle
办法来回收连贯。DruidDataSource#recycle
办法实现如下所示。
protected void recycle(DruidPooledConnection pooledConnection) throws SQLException {
final DruidConnectionHolder holder = pooledConnection.holder;
......
final boolean isAutoCommit = holder.underlyingAutoCommit;
final boolean isReadOnly = holder.underlyingReadOnly;
final boolean testOnReturn = this.testOnReturn;
try {
// 如果是非主动提交且存在事务
// 则回滚事务
if ((!isAutoCommit) && (!isReadOnly)) {pooledConnection.rollback();
}
// 重置连贯信息(配置还原为默认值,敞开 Statement,革除连贯的 Warnings 等)boolean isSameThread = pooledConnection.ownerThread == Thread.currentThread();
if (!isSameThread) {
final ReentrantLock lock = pooledConnection.lock;
lock.lock();
try {holder.reset();
} finally {lock.unlock();
}
} else {holder.reset();
}
......
// 开启了 testOnReturn 机制,则校验连贯有效性
if (testOnReturn) {boolean validate = testConnectionInternal(holder, physicalConnection);
// 校验不通过则敞开物理连贯
if (!validate) {JdbcUtils.close(physicalConnection);
destroyCountUpdater.incrementAndGet(this);
lock.lock();
try {if (holder.active) {
activeCount--;
holder.active = false;
}
closeCount++;
} finally {lock.unlock();
}
return;
}
}
......
lock.lock();
try {
// 连贯行将放回连接池,须要将 active 设置为 false
if (holder.active) {
activeCount--;
holder.active = false;
}
closeCount++;
// 将连贯放到 connections 数组的 poolingCount 地位
// 而后 poolingCount 加 1
// 而后唤醒在 notEmpty 上期待连贯的一个利用线程
result = putLast(holder, currentTimeMillis);
recycleCount++;
} finally {lock.unlock();
}
if (!result) {JdbcUtils.close(holder.conn);
LOG.info("connection recyle failed.");
}
} catch (Throwable e) {......}
}
DruidDataSource#recycle
办法中会先重置连贯信息,行将连贯的一些配置重置为默认值,而后敞开连贯的 Statement
和Warnings,如果开启了 testOnReturn 机制,则还须要校验一下连贯的有效性,校验不通过则间接敞开物理连贯,最初,将连贯放回到 connections 数组的 poolingCount 地位,而后唤醒一个在 notEmpty 上期待连贯的利用线程。
六. removeAbandoned 机制
Druid
数据库连接池提供了 removeAbandoned 机制来避免连贯透露。要开启 removeAbandoned 机制,须要设置如下参数。
参数 | 阐明 |
---|---|
removeAbandoned | 产生连贯透露时,是否须要回收透露的连贯。默认为false,示意不回收。 |
removeAbandonedTimeoutMillis | 判断产生连贯透露的超时工夫。默认为 300 秒。 |
上面将对开启 removeAbandoned 机制后,如何回收产生了透露的连贯进行阐明。当利用线程从连接池获取到一个连贯后,如果开启了 removeAbandoned 机制,那么会将这个连贯放到 activeConnections 沉闷连贯 map 中,对应的办法为DruidDataSource#getConnectionDirect
,源码片段如下所示。
public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
int notFullTimeoutRetryCnt = 0;
for (; ;) {
DruidPooledConnection poolableConnection;
......
if (removeAbandoned) {StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
poolableConnection.connectStackTrace = stackTrace;
// 设置 connectedTimeNano,用于后续判断连贯借出工夫是否大于 removeAbandonedTimeoutMillis
poolableConnection.setConnectedTimeNano();
poolableConnection.traceEnable = true;
activeConnectionLock.lock();
try {
// 将从连接池获取到的连贯放到 activeConnections 中
activeConnections.put(poolableConnection, PRESENT);
} finally {activeConnectionLock.unlock();
}
}
if (!this.defaultAutoCommit) {poolableConnection.setAutoCommit(false);
}
return poolableConnection;
}
}
又已知 Druid
数据库连接池有一个销毁连贯的线程会每距离 timeBetweenEvictionRunsMillis 执行一次 DestroyTask#run
办法来销毁连贯,DestroyTask#run
办法如下所示。
public void run() {shrink(true, keepAlive);
// 如果开启了 removeAbandoned 机制
// 则执行 removeAbandoned()办法来检测产生了透露的连贯并回收
if (isRemoveAbandoned()) {removeAbandoned();
}
}
DestroyTask#run
办法的最初会判断是否开启了 removeAbandoned 机制,如果开启了则会执行 DruidDataSource#removeAbandoned
办法来检测哪些连贯产生了透露,并被动回收这些连贯。DruidDataSource#removeAbandoned
办法如下所示。
public int removeAbandoned() {
int removeCount = 0;
long currrentNanos = System.nanoTime();
List<DruidPooledConnection> abandonedList = new ArrayList<DruidPooledConnection>();
activeConnectionLock.lock();
try {Iterator<DruidPooledConnection> iter = activeConnections.keySet().iterator();
for (; iter.hasNext(); ) {DruidPooledConnection pooledConnection = iter.next();
// 运行中的连贯不会被断定为产生了透露
if (pooledConnection.isRunning()) {continue;}
long timeMillis = (currrentNanos - pooledConnection.getConnectedTimeNano()) / (1000 * 1000);
// 判断连贯借出工夫是否达到连贯透露的超时工夫
if (timeMillis >= removeAbandonedTimeoutMillis) {
// 将产生了透露的连贯从 activeConnections 中移除
iter.remove();
pooledConnection.setTraceEnable(false);
// 将产生了泄露的连贯增加到 abandonedList 汇合中
abandonedList.add(pooledConnection);
}
}
} finally {activeConnectionLock.unlock();
}
if (abandonedList.size() > 0) {
// 遍历 abandonedList 汇合
// 被动调用每个产生了透露的 DruidPooledConnection 的 close()办法来回收连贯
for (DruidPooledConnection pooledConnection : abandonedList) {
final ReentrantLock lock = pooledConnection.lock;
lock.lock();
try {if (pooledConnection.isDisable()) {continue;}
} finally {lock.unlock();
}
JdbcUtils.close(pooledConnection);
pooledConnection.abandond();
removeAbandonedCount++;
removeCount++;
......
}
}
return removeCount;
}
DruidDataSource#removeAbandoned
办法中次要实现的事件就是将每个产生了透露的连贯从 activeConnections 中挪动到 abandonedList 中,而后遍历 abandonedList 中的每个连贯并调用 DruidPooledConnection#close
办法,最终实现透露连贯的回收。
总结
Druid
数据库连接池中,利用线程向连接池获取连贯时,如果池中没有连贯,则利用线程会在 notEmpty 上期待,同时 Druid
数据库连接池中有一个创立连贯的线程,会继续的向连接池创立连贯,如果连接池已满,则创立连贯的线程会在 empty 上期待。
当有连贯被生产,或者有连贯被偿还,会唤醒在 notEmpty 上期待的利用线程,同理有连贯被销毁时,会唤醒在 empty 上期待的生产连贯的线程。
Druid
数据库连接池中还有一个销毁连贯的线程,会每距离 timeBetweenEvictionRunsMillis 的工夫执行一次 DestroyTask
工作来销毁连贯,这些被销毁的连贯能够是存活工夫达到最大值的连贯,也能够是闲暇工夫达到指定值的连贯。如果还开启了保活机制,那么闲暇工夫大于 keepAliveBetweenTimeMillis 的连贯都会被校验一次有效性,校验不通过的连贯会被销毁。
最初,Druid
数据库连接池提供了 removeAbandoned 机制来避免连贯透露,当开启了 removeAbandoned 机制时,每一个被利用线程获取的连贯都会被增加到 activeConnections 沉闷连贯 map 中,如果这个连贯在利用线程中应用结束后没有被敞开,那么 Druid
数据库连接池会从 activeConnections 中将其辨认进去并被动回收。