共计 20607 个字符,预计需要花费 52 分钟才能阅读完成。
序
本文次要钻研一下 druid 的 borrow 行为
getConnection
com/alibaba/druid/pool/DruidDataSource.java
public DruidPooledConnection getConnection() throws SQLException {return getConnection(maxWait);
}
public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {init();
if (filters.size() > 0) {FilterChainImpl filterChain = new FilterChainImpl(this);
return filterChain.dataSource_connect(this, maxWaitMillis);
} else {return getConnectionDirect(maxWaitMillis);
}
}
DruidDataSource 的 getConnection 办法外部调用的是 getConnectionDirect(maxWaitMillis)
getConnectionDirect
com/alibaba/druid/pool/DruidDataSource.java
public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
int notFullTimeoutRetryCnt = 0;
for (; ;) {
// handle notFullTimeoutRetry
DruidPooledConnection poolableConnection;
try {poolableConnection = getConnectionInternal(maxWaitMillis);
} catch (GetConnectionTimeoutException ex) {if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) {
notFullTimeoutRetryCnt++;
if (LOG.isWarnEnabled()) {LOG.warn("get connection timeout retry :" + notFullTimeoutRetryCnt);
}
continue;
}
throw ex;
}
if (testOnBorrow) {boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
if (!validate) {if (LOG.isDebugEnabled()) {LOG.debug("skip not validate connection.");
}
discardConnection(poolableConnection.holder);
continue;
}
} else {if (poolableConnection.conn.isClosed()) {discardConnection(poolableConnection.holder); // 传入 null,防止反复敞开
continue;
}
if (testWhileIdle) {
final DruidConnectionHolder holder = poolableConnection.holder;
long currentTimeMillis = System.currentTimeMillis();
long lastActiveTimeMillis = holder.lastActiveTimeMillis;
long lastExecTimeMillis = holder.lastExecTimeMillis;
long lastKeepTimeMillis = holder.lastKeepTimeMillis;
if (checkExecuteTime
&& lastExecTimeMillis != lastActiveTimeMillis) {lastActiveTimeMillis = lastExecTimeMillis;}
if (lastKeepTimeMillis > lastActiveTimeMillis) {lastActiveTimeMillis = lastKeepTimeMillis;}
long idleMillis = currentTimeMillis - lastActiveTimeMillis;
long timeBetweenEvictionRunsMillis = this.timeBetweenEvictionRunsMillis;
if (timeBetweenEvictionRunsMillis <= 0) {timeBetweenEvictionRunsMillis = DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS;}
if (idleMillis >= timeBetweenEvictionRunsMillis
|| idleMillis < 0 // unexcepted branch
) {boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
if (!validate) {if (LOG.isDebugEnabled()) {LOG.debug("skip not validate connection.");
}
discardConnection(poolableConnection.holder);
continue;
}
}
}
}
if (removeAbandoned) {StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
poolableConnection.connectStackTrace = stackTrace;
poolableConnection.setConnectedTimeNano();
poolableConnection.traceEnable = true;
activeConnectionLock.lock();
try {activeConnections.put(poolableConnection, PRESENT);
} finally {activeConnectionLock.unlock();
}
}
if (!this.defaultAutoCommit) {poolableConnection.setAutoCommit(false);
}
return poolableConnection;
}
}
public boolean isFull() {lock.lock();
try {return this.poolingCount + this.activeCount >= this.maxActive;} finally {lock.unlock();
}
}
getConnectionDirect 在一个 for 循环外头进行获取连贯,首先执行 getConnectionInternal(maxWaitMillis),若呈现 GetConnectionTimeoutException 异样,则在 notFull 且 notFullTimeoutRetryCnt 小于等于 this.notFullTimeoutRetryCount 时会递增 notFullTimeoutRetryCnt,而后 continue 持续循环,否则间接抛出 GetConnectionTimeoutException 跳出循环
获取到连贯之后,判断是否是 testOnBorrow,如果是则执行 testConnectionInternal,若校验不胜利则执行 discardConnection,而后持续循环;若非 testOnBorrow 则判断 conn 是否 closed,若是则执行 discardConnection,而后持续循环,若非 closed 则进入 testWhileIdle 的逻辑 (
druid 间接在 getConnection 的时候执行 testWhileIdle 有点令人匪夷所思
)最初是 removeAbandoned,保护 connectedTimeNano,将以后连贯放到 activeConnections 中
getConnectionInternal
com/alibaba/druid/pool/DruidDataSource.java
private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException {if (closed) {connectErrorCountUpdater.incrementAndGet(this);
throw new DataSourceClosedException("dataSource already closed at" + new Date(closeTimeMillis));
}
if (!enable) {connectErrorCountUpdater.incrementAndGet(this);
if (disableException != null) {throw disableException;}
throw new DataSourceDisableException();}
final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait);
final int maxWaitThreadCount = this.maxWaitThreadCount;
DruidConnectionHolder holder;
for (boolean createDirect = false; ;) {if (createDirect) {createStartNanosUpdater.set(this, System.nanoTime());
if (creatingCountUpdater.compareAndSet(this, 0, 1)) {PhysicalConnectionInfo pyConnInfo = DruidDataSource.this.createPhysicalConnection();
holder = new DruidConnectionHolder(this, pyConnInfo);
holder.lastActiveTimeMillis = System.currentTimeMillis();
creatingCountUpdater.decrementAndGet(this);
directCreateCountUpdater.incrementAndGet(this);
if (LOG.isDebugEnabled()) {LOG.debug("conn-direct_create");
}
boolean discard;
lock.lock();
try {if (activeCount < maxActive) {
activeCount++;
holder.active = true;
if (activeCount > activePeak) {
activePeak = activeCount;
activePeakTime = System.currentTimeMillis();}
break;
} else {discard = true;}
} finally {lock.unlock();
}
if (discard) {JdbcUtils.close(pyConnInfo.getPhysicalConnection());
}
}
}
try {lock.lockInterruptibly();
} catch (InterruptedException e) {connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException("interrupt", e);
}
try {
if (maxWaitThreadCount > 0
&& notEmptyWaitThreadCount >= maxWaitThreadCount) {connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException("maxWaitThreadCount" + maxWaitThreadCount + ", current wait Thread count"
+ lock.getQueueLength());
}
if (onFatalError
&& onFatalErrorMaxActive > 0
&& activeCount >= onFatalErrorMaxActive) {connectErrorCountUpdater.incrementAndGet(this);
StringBuilder errorMsg = new StringBuilder();
errorMsg.append("onFatalError, activeCount")
.append(activeCount)
.append(", onFatalErrorMaxActive")
.append(onFatalErrorMaxActive);
if (lastFatalErrorTimeMillis > 0) {errorMsg.append(", time'")
.append(StringUtils.formatDateTime19(lastFatalErrorTimeMillis, TimeZone.getDefault()))
.append("'");
}
if (lastFatalErrorSql != null) {errorMsg.append(", sql \n")
.append(lastFatalErrorSql);
}
throw new SQLException(errorMsg.toString(), lastFatalError);
}
connectCount++;
if (createScheduler != null
&& poolingCount == 0
&& activeCount < maxActive
&& creatingCountUpdater.get(this) == 0
&& createScheduler instanceof ScheduledThreadPoolExecutor) {ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) createScheduler;
if (executor.getQueue().size() > 0) {
createDirect = true;
continue;
}
}
if (maxWait > 0) {holder = pollLast(nanos);
} else {holder = takeLast();
}
if (holder != null) {if (holder.discard) {continue;}
activeCount++;
holder.active = true;
if (activeCount > activePeak) {
activePeak = activeCount;
activePeakTime = System.currentTimeMillis();}
}
} catch (InterruptedException e) {connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException(e.getMessage(), e);
} catch (SQLException e) {connectErrorCountUpdater.incrementAndGet(this);
throw e;
} finally {lock.unlock();
}
break;
}
if (holder == null) {long waitNanos = waitNanosLocal.get();
final long activeCount;
final long maxActive;
final long creatingCount;
final long createStartNanos;
final long createErrorCount;
final Throwable createError;
try {lock.lock();
activeCount = this.activeCount;
maxActive = this.maxActive;
creatingCount = this.creatingCount;
createStartNanos = this.createStartNanos;
createErrorCount = this.createErrorCount;
createError = this.createError;
} finally {lock.unlock();
}
StringBuilder buf = new StringBuilder(128);
buf.append("wait millis")
.append(waitNanos / (1000 * 1000))
.append(", active").append(activeCount)
.append(", maxActive").append(maxActive)
.append(", creating").append(creatingCount);
if (creatingCount > 0 && createStartNanos > 0) {long createElapseMillis = (System.nanoTime() - createStartNanos) / (1000 * 1000);
if (createElapseMillis > 0) {buf.append(", createElapseMillis").append(createElapseMillis);
}
}
if (createErrorCount > 0) {buf.append(", createErrorCount").append(createErrorCount);
}
List<JdbcSqlStatValue> sqlList = this.getDataSourceStat().getRuningSqlList();
for (int i = 0; i < sqlList.size(); ++i) {if (i != 0) {buf.append('\n');
} else {buf.append(",");
}
JdbcSqlStatValue sql = sqlList.get(i);
buf.append("runningSqlCount").append(sql.getRunningCount());
buf.append(":");
buf.append(sql.getSql());
}
String errorMessage = buf.toString();
if (createError != null) {throw new GetConnectionTimeoutException(errorMessage, createError);
} else {throw new GetConnectionTimeoutException(errorMessage);
}
}
holder.incrementUseCount();
DruidPooledConnection poolalbeConnection = new DruidPooledConnection(holder);
return poolalbeConnection;
}
getConnectionInternal 办法先判断是否 closed,如果是则抛出 DataSourceClosedException,接着判断是否 enable,如果不是则抛出 DataSourceDisableException,紧接着 for 循环,它次要依据 createDirect 来执行不同逻辑,第一次默认 createDirect 为 false;
createDirect 为 false,对于 notEmptyWaitThreadCount 大于等于 maxWaitThreadCount 则抛出 SQLException,对于 poolingCount 为 0 且 activeCount 小于 maxActive,createScheduler 的 queue 大小大于 0 的,则设置 createDirect 为 true;否则对于 maxWait 大于 0 的,执行 pollLast(nanos),否则执行 takeLast()
createDirect 为 true,会通过 DruidDataSource.this.createPhysicalConnection() 创立物理连贯,对于 activeCount 小于 maxActive 的,则保护 activeCount 跳出循环,否则标记 discard 为 true,通过 JdbcUtils.close(pyConnInfo.getPhysicalConnection()) 敞开连贯
pollLast
private DruidConnectionHolder pollLast(long nanos) throws InterruptedException, SQLException {
long estimate = nanos;
for (; ;) {if (poolingCount == 0) {emptySignal(); // send signal to CreateThread create connection
if (failFast && isFailContinuous()) {throw new DataSourceNotAvailableException(createError);
}
if (estimate <= 0) {waitNanosLocal.set(nanos - estimate);
return null;
}
notEmptyWaitThreadCount++;
if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) {notEmptyWaitThreadPeak = notEmptyWaitThreadCount;}
try {
long startEstimate = estimate;
estimate = notEmpty.awaitNanos(estimate); // signal by
// recycle or
// creator
notEmptyWaitCount++;
notEmptyWaitNanos += (startEstimate - estimate);
if (!enable) {connectErrorCountUpdater.incrementAndGet(this);
if (disableException != null) {throw disableException;}
throw new DataSourceDisableException();}
} catch (InterruptedException ie) {notEmpty.signal(); // propagate to non-interrupted thread
notEmptySignalCount++;
throw ie;
} finally {notEmptyWaitThreadCount--;}
if (poolingCount == 0) {if (estimate > 0) {continue;}
waitNanosLocal.set(nanos - estimate);
return null;
}
}
decrementPoolingCount();
DruidConnectionHolder last = connections[poolingCount];
connections[poolingCount] = null;
long waitNanos = nanos - estimate;
last.setLastNotEmptyWaitNanos(waitNanos);
return last;
}
}
pollLast 办法在 poolingCount 为 0 时执行 emptySignal,另外次要是解决 notEmpty 这个 condition,而后取 connections[poolingCount]
takeLast
DruidConnectionHolder takeLast() throws InterruptedException, SQLException {
try {while (poolingCount == 0) {emptySignal(); // send signal to CreateThread create connection
if (failFast && isFailContinuous()) {throw new DataSourceNotAvailableException(createError);
}
notEmptyWaitThreadCount++;
if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) {notEmptyWaitThreadPeak = notEmptyWaitThreadCount;}
try {notEmpty.await(); // signal by recycle or creator
} finally {notEmptyWaitThreadCount--;}
notEmptyWaitCount++;
if (!enable) {connectErrorCountUpdater.incrementAndGet(this);
if (disableException != null) {throw disableException;}
throw new DataSourceDisableException();}
}
} catch (InterruptedException ie) {notEmpty.signal(); // propagate to non-interrupted thread
notEmptySignalCount++;
throw ie;
}
decrementPoolingCount();
DruidConnectionHolder last = connections[poolingCount];
connections[poolingCount] = null;
return last;
}
takeLast 办法在 poolingCount 为 0 的时候执行 emptySignal,而后通过 notEmpty.await() 进行阻塞期待,最初返回 connections[poolingCount]
emptySignal
private void emptySignal() {if (createScheduler == null) {empty.signal();
return;
}
if (createTaskCount >= maxCreateTaskCount) {return;}
if (activeCount + poolingCount + createTaskCount >= maxActive) {return;}
submitCreateTask(false);
}
emptySignal 办法,对于 createScheduler 为 null 的执行 empty.signal(),之后判断 task 数量即 maxActive 判断,最初执行 submitCreateTask(false)
submitCreateTask
private void submitCreateTask(boolean initTask) {
createTaskCount++;
CreateConnectionTask task = new CreateConnectionTask(initTask);
if (createTasks == null) {createTasks = new long[8];
}
boolean putted = false;
for (int i = 0; i < createTasks.length; ++i) {if (createTasks[i] == 0) {createTasks[i] = task.taskId;
putted = true;
break;
}
}
if (!putted) {long[] array = new long[createTasks.length * 3 / 2];
System.arraycopy(createTasks, 0, array, 0, createTasks.length);
array[createTasks.length] = task.taskId;
createTasks = array;
}
this.createSchedulerFuture = createScheduler.submit(task);
}
submitCreateTask 会创立 CreateConnectionTask,而后提交到 createScheduler 执行
CreateConnectionTask
com/alibaba/druid/pool/DruidDataSource.java
public class CreateConnectionTask implements Runnable {
private int errorCount;
private boolean initTask;
private final long taskId;
public CreateConnectionTask() {taskId = createTaskIdSeedUpdater.getAndIncrement(DruidDataSource.this);
}
public CreateConnectionTask(boolean initTask) {taskId = createTaskIdSeedUpdater.getAndIncrement(DruidDataSource.this);
this.initTask = initTask;
}
@Override
public void run() {runInternal();
}
private void runInternal() {for (; ;) {
// addLast
lock.lock();
try {if (closed || closing) {clearCreateTask(taskId);
return;
}
boolean emptyWait = true;
if (createError != null && poolingCount == 0) {emptyWait = false;}
if (emptyWait) {
// 必须存在线程期待,才创立连贯
if (poolingCount >= notEmptyWaitThreadCount //
&& (!(keepAlive && activeCount + poolingCount < minIdle)) // 在 keepAlive 场景不能放弃创立
&& (!initTask) // 线程池初始化时的工作不能放弃创立
&& !isFailContinuous() // failContinuous 时不能放弃创立,否则会无奈创立线程
&& !isOnFatalError() // onFatalError 时不能放弃创立,否则会无奈创立线程) {clearCreateTask(taskId);
return;
}
// 避免创立超过 maxActive 数量的连贯
if (activeCount + poolingCount >= maxActive) {clearCreateTask(taskId);
return;
}
}
} finally {lock.unlock();
}
PhysicalConnectionInfo physicalConnection = null;
try {physicalConnection = createPhysicalConnection();
} catch (OutOfMemoryError e) {LOG.error("create connection OutOfMemoryError, out memory.", e);
errorCount++;
if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
// fail over retry attempts
setFailContinuous(true);
if (failFast) {lock.lock();
try {notEmpty.signalAll();
} finally {lock.unlock();
}
}
if (breakAfterAcquireFailure) {lock.lock();
try {clearCreateTask(taskId);
} finally {lock.unlock();
}
return;
}
this.errorCount = 0; // reset errorCount
if (closing || closed) {lock.lock();
try {clearCreateTask(taskId);
} finally {lock.unlock();
}
return;
}
createSchedulerFuture = createScheduler.schedule(this, timeBetweenConnectErrorMillis, TimeUnit.MILLISECONDS);
return;
}
} catch (SQLException e) {LOG.error("create connection SQLException, url:" + jdbcUrl, e);
errorCount++;
if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
// fail over retry attempts
setFailContinuous(true);
if (failFast) {lock.lock();
try {notEmpty.signalAll();
} finally {lock.unlock();
}
}
if (breakAfterAcquireFailure) {lock.lock();
try {clearCreateTask(taskId);
} finally {lock.unlock();
}
return;
}
this.errorCount = 0; // reset errorCount
if (closing || closed) {lock.lock();
try {clearCreateTask(taskId);
} finally {lock.unlock();
}
return;
}
createSchedulerFuture = createScheduler.schedule(this, timeBetweenConnectErrorMillis, TimeUnit.MILLISECONDS);
return;
}
} catch (RuntimeException e) {LOG.error("create connection RuntimeException", e);
// unknow fatal exception
setFailContinuous(true);
continue;
} catch (Error e) {lock.lock();
try {clearCreateTask(taskId);
} finally {lock.unlock();
}
LOG.error("create connection Error", e);
// unknow fatal exception
setFailContinuous(true);
break;
} catch (Throwable e) {lock.lock();
try {clearCreateTask(taskId);
} finally {lock.unlock();
}
LOG.error("create connection unexecpted error.", e);
break;
}
if (physicalConnection == null) {continue;}
physicalConnection.createTaskId = taskId;
boolean result = put(physicalConnection);
if (!result) {JdbcUtils.close(physicalConnection.getPhysicalConnection());
LOG.info("put physical connection to pool failed.");
}
break;
}
}
}
CreateConnectionTask 通过 for 循环,而后加锁解决 minIdle 及 maxActive,最初通过 createPhysicalConnection 创立物理连贯
createPhysicalConnection
com/alibaba/druid/pool/DruidAbstractDataSource.java
public PhysicalConnectionInfo createPhysicalConnection() throws SQLException {String url = this.getUrl();
Properties connectProperties = getConnectProperties();
String user;
if (getUserCallback() != null) {user = getUserCallback().getName();} else {user = getUsername();
}
String password = getPassword();
PasswordCallback passwordCallback = getPasswordCallback();
if (passwordCallback != null) {if (passwordCallback instanceof DruidPasswordCallback) {DruidPasswordCallback druidPasswordCallback = (DruidPasswordCallback) passwordCallback;
druidPasswordCallback.setUrl(url);
druidPasswordCallback.setProperties(connectProperties);
}
char[] chars = passwordCallback.getPassword();
if (chars != null) {password = new String(chars);
}
}
Properties physicalConnectProperties = new Properties();
if (connectProperties != null) {physicalConnectProperties.putAll(connectProperties);
}
if (user != null && user.length() != 0) {physicalConnectProperties.put("user", user);
}
if (password != null && password.length() != 0) {physicalConnectProperties.put("password", password);
}
Connection conn = null;
long connectStartNanos = System.nanoTime();
long connectedNanos, initedNanos, validatedNanos;
Map<String, Object> variables = initVariants
? new HashMap<String, Object>()
: null;
Map<String, Object> globalVariables = initGlobalVariants
? new HashMap<String, Object>()
: null;
createStartNanosUpdater.set(this, connectStartNanos);
creatingCountUpdater.incrementAndGet(this);
try {conn = createPhysicalConnection(url, physicalConnectProperties);
connectedNanos = System.nanoTime();
if (conn == null) {throw new SQLException("connect error, url" + url + ", driverClass" + this.driverClass);
}
initPhysicalConnection(conn, variables, globalVariables);
initedNanos = System.nanoTime();
validateConnection(conn);
validatedNanos = System.nanoTime();
setFailContinuous(false);
setCreateError(null);
} catch (SQLException ex) {setCreateError(ex);
JdbcUtils.close(conn);
throw ex;
} catch (RuntimeException ex) {setCreateError(ex);
JdbcUtils.close(conn);
throw ex;
} catch (Error ex) {createErrorCountUpdater.incrementAndGet(this);
setCreateError(ex);
JdbcUtils.close(conn);
throw ex;
} finally {long nano = System.nanoTime() - connectStartNanos;
createTimespan += nano;
creatingCountUpdater.decrementAndGet(this);
}
return new PhysicalConnectionInfo(conn, connectStartNanos, connectedNanos, initedNanos, validatedNanos, variables, globalVariables);
}
createPhysicalConnection 通过 try catch 去创立物理连贯,若有异样则会通过 JdbcUtils.close(conn) 去敞开连贯
testConnectionInternal
protected boolean testConnectionInternal(DruidConnectionHolder holder, Connection conn) {String sqlFile = JdbcSqlStat.getContextSqlFile();
String sqlName = JdbcSqlStat.getContextSqlName();
if (sqlFile != null) {JdbcSqlStat.setContextSqlFile(null);
}
if (sqlName != null) {JdbcSqlStat.setContextSqlName(null);
}
try {if (validConnectionChecker != null) {boolean valid = validConnectionChecker.isValidConnection(conn, validationQuery, validationQueryTimeout);
long currentTimeMillis = System.currentTimeMillis();
if (holder != null) {
holder.lastValidTimeMillis = currentTimeMillis;
holder.lastExecTimeMillis = currentTimeMillis;
}
if (valid && isMySql) { // unexcepted branch
long lastPacketReceivedTimeMs = MySqlUtils.getLastPacketReceivedTimeMs(conn);
if (lastPacketReceivedTimeMs > 0) {
long mysqlIdleMillis = currentTimeMillis - lastPacketReceivedTimeMs;
if (lastPacketReceivedTimeMs > 0 //
&& mysqlIdleMillis >= timeBetweenEvictionRunsMillis) {discardConnection(holder);
String errorMsg = "discard long time none received connection."
+ ", jdbcUrl :" + jdbcUrl
+ ", version :" + VERSION.getVersionNumber()
+ ", lastPacketReceivedIdleMillis :" + mysqlIdleMillis;
LOG.warn(errorMsg);
return false;
}
}
}
if (valid && onFatalError) {lock.lock();
try {if (onFatalError) {onFatalError = false;}
} finally {lock.unlock();
}
}
return valid;
}
if (conn.isClosed()) {return false;}
if (null == validationQuery) {return true;}
Statement stmt = null;
ResultSet rset = null;
try {stmt = conn.createStatement();
if (getValidationQueryTimeout() > 0) {stmt.setQueryTimeout(validationQueryTimeout);
}
rset = stmt.executeQuery(validationQuery);
if (!rset.next()) {return false;}
} finally {JdbcUtils.close(rset);
JdbcUtils.close(stmt);
}
if (onFatalError) {lock.lock();
try {if (onFatalError) {onFatalError = false;}
} finally {lock.unlock();
}
}
return true;
} catch (Throwable ex) {
// skip
return false;
} finally {if (sqlFile != null) {JdbcSqlStat.setContextSqlFile(sqlFile);
}
if (sqlName != null) {JdbcSqlStat.setContextSqlName(sqlName);
}
}
}
testConnectionInternal 次要通过 validConnectionChecker.isValidConnection(conn, validationQuery, validationQueryTimeout) 来校验连贯,如果 validConnectionChecker 为 null 则通过 jdbc 执行 validationQuery 进行校验
discardConnection
public void discardConnection(DruidConnectionHolder holder) {if (holder == null) {return;}
Connection conn = holder.getConnection();
if (conn != null) {JdbcUtils.close(conn);
}
lock.lock();
try {if (holder.discard) {return;}
if (holder.active) {
activeCount--;
holder.active = false;
}
discardCount++;
holder.discard = true;
if (activeCount <= minIdle) {emptySignal();
}
} finally {lock.unlock();
}
}
discardConnection 办法次要是敞开 connection,之后桎梏解决一些统计标记
小结
DruidDataSource 的 getConnection 办法外部调用的是 getConnectionDirect(maxWaitMillis)
getConnectionDirect 在一个 for 循环外头进行获取连贯,首先执行 getConnectionInternal(maxWaitMillis),若呈现 GetConnectionTimeoutException 异样,则在 notFull 且 notFullTimeoutRetryCnt 小于等于 this.notFullTimeoutRetryCount 时会递增 notFullTimeoutRetryCnt,而后 continue 持续循环,否则间接抛出 GetConnectionTimeoutException 跳出循环
获取到连贯之后,判断是否是 testOnBorrow,如果是则执行 testConnectionInternal,若校验不胜利则执行 discardConnection,而后持续循环;若非 testOnBorrow 则判断 conn 是否 closed,若是则执行 discardConnection,而后持续循环,若非 closed 则进入 testWhileIdle 的逻辑
最初是 removeAbandoned,保护 connectedTimeNano,将以后连贯放到 activeConnections 中
整体代码看下来感觉跟 commons-pool 相比,druid 代码的实现感觉有点毛糙,形象层级不够高,代码充斥大量统计标记、状态位的解决,保护起来得很小心,另外 druid 间接在 getConnection 的时候执行 testWhileIdle 有点令人匪夷所思