Druid连接池只存储在connections数组中,所以获取连贯的逻辑应该比HikariPool简略一些:间接从connectoins获取即可。
DruidDataSource.getConnection
间接上代码:
@Override public DruidPooledConnection getConnection() throws SQLException { return getConnection(maxWait); }
调用了getConnection(maxWait),maxWait是参数设定的获取连贯的最长等待时间,超过该时长还没有获取到连贯的话,抛异样。
看getConnection(maxWait)代码:
public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException { init(); if (filters.size() > 0) { FilterChainImpl filterChain = new FilterChainImpl(this); return filterChain.dataSource_connect(this, maxWaitMillis); } else { return getConnectionDirect(maxWaitMillis); } }
先调用init,init办法会判断连接池是否曾经实现了初始化,如果没有实现初始化则首先进行初始化,初始化的代码咱们上一篇文章曾经剖析过了。
之后判断是否有filters,filters的内容咱们先放放,临时不论,间接看没有filters的状况下,调用getConnectionDirect办法。
getConnectionDirect
办法比拟长,咱们还是老办法,分段剖析:
public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException { int notFullTimeoutRetryCnt = 0; for (;;) { // handle notFullTimeoutRetry DruidPooledConnection poolableConnection; try { poolableConnection = getConnectionInternal(maxWaitMillis); } catch (GetConnectionTimeoutException ex) { if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) { notFullTimeoutRetryCnt++; if (LOG.isWarnEnabled()) { LOG.warn("get connection timeout retry : " + notFullTimeoutRetryCnt); } continue; } throw ex; }
上来之后首先有限for循环,目标是从连接池获取到连贯之后,依据参数设定可能会做必要的查看,如果查看不通过(比方连贯不可用、连贯已敞开等等)的话循环从新获取。
而后调用getConnectionInternal获取连贯,getConnectionInternal办法应该是咱们明天文章的配角,咱们略微放一放,为了文章的可读性,先剖析完getConnectionDirect办法。
咱们假如通过调用getConnectionInternal办法获取到一个连贯(留神获取到的连贯对象是DruidPooledConnection,不是Connection对象,这个也不难想象,连接池获取到的连贯肯定是数据库物理连贯的代理对象(或者叫封装对象,封装了数据库物理连贯Connection对象的对象,这个原理咱们在剖析HikariPool的时候曾经说过了。这个DruidPooledConnection对象咱们也临时放一放,前面剖析)。
调用getConnectionInternal办法如果返回超时异样,判断:如果以后连接池没满,而且获取连贯超时重试次数小于参数notFullTimeoutRetryCount设定的次数的话,则continue,从新获取连贯。否则,抛出超时异样。
接下来:
if (testOnBorrow) { boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn); if (!validate) { if (LOG.isDebugEnabled()) { LOG.debug("skip not validate connection."); } discardConnection(poolableConnection.holder); continue; } } else { if (poolableConnection.conn.isClosed()) { discardConnection(poolableConnection.holder); // 传入null,防止反复敞开 continue; }
testOnBorrow参数的目标是:获取连贯后是否要做连贯可用性测试,如果设定为true的话,调用testConnectionInternal测试连贯的可用性,testConnectionInternal办法上一篇文章剖析连贯回收的时候、解决keepAlive的过程中就碰到过,就是执行配置好的sql语句测试连贯可用性,如果测试不通过的话则调用discardConnection敞开连贯,continue从新获取连贯。
否则,如果testOnBorrow参数没有关上的话,查看以后连贯如果曾经敞开,则调用discardConnection敞开连贯(没太明确连贯既然曾经是敞开状态,为啥还须要调用?),continue从新获取连贯。
不倡议关上testOnBorrow参数,因为连接池都会有连贯回收机制,比方上一篇文章讲过的Druid的DestroyConnectionThread & DestroyTask,回收参数配置失常的话,回收机制根本能够确保连贯的可用性。关上testOnBorrow参数会导致每次获取连贯之后都测试连贯的可用性,重大影响零碎性能。
接下来:
if (testWhileIdle) { final DruidConnectionHolder holder = poolableConnection.holder; long currentTimeMillis = System.currentTimeMillis(); long lastActiveTimeMillis = holder.lastActiveTimeMillis; long lastExecTimeMillis = holder.lastExecTimeMillis; long lastKeepTimeMillis = holder.lastKeepTimeMillis; if (checkExecuteTime && lastExecTimeMillis != lastActiveTimeMillis) { lastActiveTimeMillis = lastExecTimeMillis; } if (lastKeepTimeMillis > lastActiveTimeMillis) { lastActiveTimeMillis = lastKeepTimeMillis; } long idleMillis = currentTimeMillis - lastActiveTimeMillis; long timeBetweenEvictionRunsMillis = this.timeBetweenEvictionRunsMillis; if (timeBetweenEvictionRunsMillis <= 0) { timeBetweenEvictionRunsMillis = DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS; } if (idleMillis >= timeBetweenEvictionRunsMillis || idleMillis < 0 // unexcepted branch ) { boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn); if (!validate) { if (LOG.isDebugEnabled()) { LOG.debug("skip not validate connection."); } discardConnection(poolableConnection.holder); continue; } } } }
这段代码的逻辑是:参数testWhileIdle设置为true的话,查看以后链接的闲暇时长如果大于timeBetweenEvictionRunsMillis(默认60秒)的话,则调用testConnectionInternal测试连贯可用性,连贯不可用则敞开连贯,continue从新获取连贯。
而后:
if (removeAbandoned) { StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); poolableConnection.connectStackTrace = stackTrace; poolableConnection.setConnectedTimeNano(); poolableConnection.traceEnable = true; activeConnectionLock.lock(); try { activeConnections.put(poolableConnection, PRESENT); } finally { activeConnectionLock.unlock(); } }
呈现了一个removeAbandoned参数,这个参数的意思是移除被遗弃的连贯对象,如果关上的话就把以后连贯放到activeConnections中,篇幅无限,这部分内容就不开展了,前面咱们会专门写一篇文章介绍removeAbandoned参数。
剩下的一小部分代码,很简略,依据参数设置连贯的autoCommit,之后返回连贯poolableConnection。
if (!this.defaultAutoCommit) { poolableConnection.setAutoCommit(false); } return poolableConnection; } }
getConnectionDirect办法源码剖析实现了,上面咱们要看一下getConnectionInternal办法,这是真正从连接池中获取连贯的办法。
getConnectionInternal
间接看代码:
private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException { if (closed) { connectErrorCountUpdater.incrementAndGet(this); throw new DataSourceClosedException("dataSource already closed at " + new Date(closeTimeMillis)); } if (!enable) { connectErrorCountUpdater.incrementAndGet(this); if (disableException != null) { throw disableException; } throw new DataSourceDisableException(); } final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait); final int maxWaitThreadCount = this.maxWaitThreadCount; DruidConnectionHolder holder;
查看连接池状态如果曾经disable或cloesed的话,抛异样。
接下来:
for (boolean createDirect = false;;) { if (createDirect) { createStartNanosUpdater.set(this, System.nanoTime()); if (creatingCountUpdater.compareAndSet(this, 0, 1)) { PhysicalConnectionInfo pyConnInfo = DruidDataSource.this.createPhysicalConnection(); holder = new DruidConnectionHolder(this, pyConnInfo); holder.lastActiveTimeMillis = System.currentTimeMillis(); creatingCountUpdater.decrementAndGet(this); directCreateCountUpdater.incrementAndGet(this); if (LOG.isDebugEnabled()) { LOG.debug("conn-direct_create "); } boolean discard = false; lock.lock(); try { if (activeCount < maxActive) { activeCount++; holder.active = true; if (activeCount > activePeak) { activePeak = activeCount; activePeakTime = System.currentTimeMillis(); } break; } else { discard = true; } } finally { lock.unlock(); } if (discard) { JdbcUtils.close(pyConnInfo.getPhysicalConnection()); } } }
初始化createDirect变量为false之后启动有限循环,意思是一直循环直到获取到连贯、或超时等其余异常情况产生。
紧接着的这段代码是createDirect=true的状况下执行的,createDirect是在上面循环体中查看如果:createScheduler不为空、连接池空、流动连接数小于设定的最大流动连接数maxActive、并且createScheduler的队列中排队期待创立连贯的线程大于0的状况下,设置createDirect为true的,以上这些条件如果成立的话,大概率表明createScheduler中的创立线程出问题了、所以createScheduler大概率指望不上了,所以要间接创立连贯了。
间接创立的代码也很容易了解,调用createPhysicalConnection创立物理连贯,创立DruidConnectionHolder封装该物理连贯,创立之后获取锁资源,查看activeCount < maxActive则表明创立连贯胜利、完结for循环,否则,activeCount >= maxActive则阐明违反了准则(间接创立连贯的过程中createScheduler可能复活了、又创立进去连贯放入连接池中了),所以,敞开锁资源之后,将刚创立进去的连贯敞开。
而后:
try { lock.lockInterruptibly(); } catch (InterruptedException e) { connectErrorCountUpdater.incrementAndGet(this); throw new SQLException("interrupt", e); } try { if (maxWaitThreadCount > 0 && notEmptyWaitThreadCount >= maxWaitThreadCount) { connectErrorCountUpdater.incrementAndGet(this); throw new SQLException("maxWaitThreadCount " + maxWaitThreadCount + ", current wait Thread count " + lock.getQueueLength()); } if (onFatalError && onFatalErrorMaxActive > 0 && activeCount >= onFatalErrorMaxActive) { connectErrorCountUpdater.incrementAndGet(this); StringBuilder errorMsg = new StringBuilder(); errorMsg.append("onFatalError, activeCount ") .append(activeCount) .append(", onFatalErrorMaxActive ") .append(onFatalErrorMaxActive); if (lastFatalErrorTimeMillis > 0) { errorMsg.append(", time '") .append(StringUtils.formatDateTime19( lastFatalErrorTimeMillis, TimeZone.getDefault())) .append("'"); } if (lastFatalErrorSql != null) { errorMsg.append(", sql \n") .append(lastFatalErrorSql); } throw new SQLException( errorMsg.toString(), lastFatalError); } connectCount++; if (createScheduler != null && poolingCount == 0 && activeCount < maxActive && creatingCountUpdater.get(this) == 0 && createScheduler instanceof ScheduledThreadPoolExecutor) { ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) createScheduler; if (executor.getQueue().size() > 0) { createDirect = true; continue; } }
获取锁资源,查看期待获取连贯的线程数如果大于参数设置的最大期待线程数,抛异样。
查看并解决异样。
累加connectCount。
之后是下面提到过的对createDirect的解决。
接下来到了最为要害的局部,个别状况下createDirect为false,不会间接创立连贯,逻辑会走到上面这部分代码中,从连接池中获取连贯:
if (maxWait > 0) { holder = pollLast(nanos); } else { holder = takeLast(); } if (holder != null) { if (holder.discard) { continue; } activeCount++; holder.active = true; if (activeCount > activePeak) { activePeak = activeCount; activePeakTime = System.currentTimeMillis(); } } } catch (InterruptedException e) { connectErrorCountUpdater.incrementAndGet(this); throw new SQLException(e.getMessage(), e); } catch (SQLException e) { connectErrorCountUpdater.incrementAndGet(this); throw e; } finally { lock.unlock(); }
如果参数设置了maxWait,则调用pollLast限时获取,否则调用takeLast获取连贯,这两个办法稍后剖析。
之后查看获取到的连贯曾经被discard的话,continue从新获取连贯。
开释锁资源。
从连接池中获取到了连贯,完结for循环。
如果takeLast或poolLast返回的DruidConnectionHolder为null的话(调用poolLast超时),处理错误信息,抛GetConnectionTimeoutException超时异样(这部分代码没有贴出,省略了......感兴趣的童鞋本人关上源码看一下)。
否则,用DruidConnectionHolder封装创立DruidPooledConnection后返回。
takeLast & pollLast(nanos)
这两个办法的逻辑其实差不多,次要区别一个是限时,一个不限时,两个办法都是在锁状态下执行。
具体调用哪一个办法取决于参数maxWait,默认值为-1,默认状况下会调用takeLast,获取连贯的时候不限时。
倡议设置maxWait,否则在非凡状况下如果创立连贯失败、会导致应用层线程挂起,获取不到任何返回的状况呈现。如果设置了maxWait,getConnection办法会调用pollLast(nanos),获取不到连贯后,应用层会失去连贯超时的反馈。
先看takeLast办法:
takeLast() throws InterruptedException, SQLException { try { while (poolingCount == 0) { emptySignal(); // send signal to CreateThread create connection if (failFast && isFailContinuous()) { throw new DataSourceNotAvailableException(createError); } notEmptyWaitThreadCount++; if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) { notEmptyWaitThreadPeak = notEmptyWaitThreadCount; } try { notEmpty.await(); // signal by recycle or creator } finally { notEmptyWaitThreadCount--; } notEmptyWaitCount++; if (!enable) { connectErrorCountUpdater.incrementAndGet(this); if (disableException != null) { throw disableException; } throw new DataSourceDisableException(); } } } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread notEmptySignalCount++; throw ie; } decrementPoolingCount(); DruidConnectionHolder last = connections[poolingCount]; connections[poolingCount] = null; return last; }
如果连接池为空(poolingCount == 0)的话,有限循环。
调用emptySignal(),告诉创立连接线程,有人在期待获取连贯,抓紧时间创立连贯。
而后调用notEmpty.await(),期待创立连接线程在实现创立、或者有连贯偿还到连接池中后唤醒告诉。
如果产生异样,调用一下notEmpty.signal()告诉其余获取连贯的线程,没准本人没能获取胜利、其余线程能获取胜利。
上面的代码,线程池肯定不空了。
线程池的线程数量减1(decrementPoolingCount),而后获取connections的最初一个元素返回。
pollLast办法的代码逻辑和takeLast的相似,只不过线程池空的话,以后线程会限时挂起期待,超时依然不能获取到连贯的话,间接返回null。
Druid连接池获取连贯代码剖析结束!
小结
Druid连接池的连贯获取过程的源码剖析结束,前面还有连贯偿还过程,下一篇文章持续剖析。
Thanks a lot!
上一篇 连接池 Druid (二) - 连贯回收 DestroyConnectionThread & DestroyTask