共计 8826 个字符,预计需要花费 23 分钟才能阅读完成。
HikariPool 之后,明天钻研另一支流连接池 Druid。
对于数据库连接池的根本认知
先对数据库连接池的根本工作原理做个理解,不论是 HikariPool、还是 druid,所有的数据库连接池应该都是依照这个基本原理工作和实现的,带着这个思路去学习数据库连接池,防止盲人摸象。
数据库连接池肯定会蕴含以下根本逻辑:
- 创立连贯并池化:初始化的时候创立、或者是在利用获取连贯的过程中创立,连贯创立好之后放在连接池(内存中的容器,比方 List)中保留。
- 获取数据库连贯:接管了获取数据库连贯的办法,从连接池中获取、而不是创立连贯。
- 敞开数据库连贯:接管了敞开数据库连贯的办法,将连贯偿还到连接池、而不是真正的敞开数据库连贯。
- 连接池保护:连接池容量、连贯超时清理等工作。
带着这个思路钻研 HikariPool 的源码,会有事倍功半的效用。
意识 Druid 的构造
包含以下几个局部:
- DruidConnectionHolder
- Connections
- evictConnections
- keepAliveConnections
- destroyConnectionThread/destroySchedulerFuture
- createConnectionThread/createSchedulerFuture
DruidConnectionHolder
HikariPool 通过 poolEntry 持有数据库连贯,Druid 通过 DruidConnectionHolder 持有数据库连贯。
DruidConnectionHolder 持有物理数据库连贯 Connectin 对象,以及该连贯的相干属性,比方 connectTimeMillis、lastActiveTimeMillis、lastExecTimeMillis,以及 underlyingReadOnly、underlyingAutoCommit、underlyingTransactionIsolation 等等。连接池能够依据这些属性以及相干参数执行相应的 houseKeep。
Connections
Connections 是 DruidConnectionHolder 组成的数组,是 Druid 连接池中惟一存储可用连贯的中央,看起来会比 HikariPool 简略许多(HikariPool 有三个存储连贯的中央),然而这可能也是 Druid 在性能上稍逊于 HikariPool 的起因之一。
evictConnections
存储须要被回收的连贯的数组,在连接池进行清理的时候用来存储须要被敞开的连贯。
keepAliveConnections
存储放弃流动的连贯的数组。
createConnectionThread/createSchedulerFuture
Druid 依据配置,能够通过 createConnectionThread 线程、或者 createSchedulerFuture 线程工作创立数据库连贯并退出连接池。
Druid 兴许并没有默认的 createSchedulerFuture 的实现,如果要启用 createSchedulerFuture,须要配置 createSchedulerFuture 的实现类。
createConnectionThread 是 Druid 默认的创立连贯的线程,负责获取物理连贯、组装物理连贯为 DruidConnectionHolder 并退出到 connections 数组中。
destroyConnectionThread/destroySchedulerFuture
与创立连贯的形式相似,Druid 提供两种不同的形式销毁(或者敞开)过期的数据库连贯。默认实现是 destroyConnectionThread。
好了,Druid 的根底构造理解完了,咱们采纳和 HikariPool 齐全一样的剖析套路,接下来要进入源码剖析了,次要包含:
- Druid 连接池的初始化
- 获取数据库连贯 – getConnection 办法
- 敞开数据库连贯 – close 办法
Druid 的初始化
Druid 的初始化过程貌似和 HikariPool 稍有不同,因为 HikariPool 默认的在获取连贯之前的 HikariPool 实例化过程中就实现了连接池的初始化。
所谓实现连接池的初始化,指的是依照参数的设定,实现了数据库连贯的创立和池化,也就是说连接池曾经筹备好了,利用在通过 getConnecton 办法获取连贯的时候,间接从连接池中 borrow 就能够了。
Druid 貌似不这样。咱们看一下 DruidDataSource 的实例化办法:
public DruidDataSource(){this(false); | |
} | |
public DruidDataSource(boolean fairLock){super(fairLock); | |
configFromPropety(System.getProperties()); | |
} |
super 指的是 DruidAbstractDataSource,他的构造方法:
public DruidAbstractDataSource(boolean lockFair){lock = new ReentrantLock(lockFair); | |
notEmpty = lock.newCondition(); | |
empty = lock.newCondition();} |
只初始化了 ReentrantLock,以及他的两个 Condition:empty 和 notEmpty。
而 configFromPropety 只是负责把参数从配置文件中读入,不做其余的事件。
所以,连接池的初始化过程没有放在 DruidDataSource 的创立过程中。
既然构造方法中没有实现连接池的初始化,咱们自然而然的就想到去看看 getConnection 办法,不做初始化、怎么能获取到数据库连贯?
果然,getConnectin 办法的第一行代码就是:调用 init() 办法。
init 的办法很长,不过有很多代码都是查看参数合理性的,这部分代码咱们间接跳过:
... 疏忽 n 多行代码 | |
connections = new DruidConnectionHolder[maxActive]; | |
evictConnections = new DruidConnectionHolder[maxActive]; | |
keepAliveConnections = new DruidConnectionHolder[maxActive]; |
创立了咱们后面说过的存储数据库连贯的 connections(其实就是池),以及另外两个辅助数组 evictConnections 和 keepAliveConnections,连接池的大小初始化为 maxActive。
接下来:
if (createScheduler != null && asyncInit) {for (int i = 0; i < initialSize; ++i) {submitCreateTask(true); | |
} | |
} else if (!asyncInit) { | |
// init connections | |
while (poolingCount < initialSize) { | |
try {PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection(); | |
DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo); | |
connections[poolingCount++] = holder; | |
} catch (SQLException ex) {LOG.error("init datasource error, url:" + this.getUrl(), ex); | |
if (initExceptionThrow) { | |
connectError = ex; | |
break; | |
} else {Thread.sleep(3000); | |
} | |
} | |
} | |
if (poolingCount > 0) { | |
poolingPeak = poolingCount; | |
poolingPeakTime = System.currentTimeMillis();} | |
} |
查看 createScheduler 不空、并且参数设置为 asyncInit(异步初始化)的话,则提交 initialSize 个连贯创立工作。 咱们说过 createScheduler 并非 Druid 的默认实现,所以咱们临时不论这部分代码。
上面的代码逻辑是:如果不是异步初始化的话,那就是要同步初始化,也就是以后线程要负责实现连接池的初始化,则循环创立 initialSize 个物理连贯、封装为 DruidConnectionHolder 后间接退出到 connections 中。
这个过程就相当于实现了数据库连接池的初始化,然而倡议设置 asyncInit 参数为 true:异步初始化。因为如果同步初始化、并且 initialSize 设置比拟大的话,利用的首个 getConnection 办法必定会耗时比拟长,用户体验不好。
持续看代码,如果 asyncInit 设置为 true,异步初始化的话:
createAndLogThread(); | |
createAndStartCreatorThread(); | |
createAndStartDestroyThread(); |
从办法名上看,应该是启动了 3 个线程,别离是 createAndLogThread 线程、负责线程创立连贯的线程和负责销毁(敞开)连贯的线程。
createAndLogThread 线程负责定期收集 Druid 连接池的状态并通过 log 打印进去,是 Druid 统计分析的一部分。
createAndStartCreatorThread 是启动连贯创立线程,异步初始化的状况下,连接池就是通过 createAndStartCreatorThread 线程创立的。
createAndStartDestroyThread 是启动连贯销毁的线程,作用相似于 HikariPool 的 houseKeep 线程。
createAndStartCreatorThread 办法以及连贯创立线程、reateAndStartDestroyThread 办法以及连贯销毁线程的源码咱们先放放,稍后剖析。
咱们先一鼓作气实现 init() 办法源码的残余局部:
initedLatch.await(); | |
init = true; | |
initedTime = new Date(); | |
registerMbean(); | |
if (connectError != null && poolingCount == 0) {throw connectError;} |
initedLatch 是数量 = 2 的 CountDownLatch,在 DruidDataSource 类成员变量初始化的时候定义好的,这里 initedLatch.await(); 的意思是期待连贯创立线程和连贯销毁线程实现启动。
之后,打标签 init=true,表明初始化已实现,进行异样解决、锁开释等开头工作。
初始化实现!
createAndStartCreatorThread
创立并启动“创立连接线程”:
protected void createAndStartCreatorThread() {if (createScheduler == null) {String threadName = "Druid-ConnectionPool-Create-" + System.identityHashCode(this); | |
createConnectionThread = new CreateConnectionThread(threadName); | |
createConnectionThread.start(); | |
return; | |
} | |
initedLatch.countDown();} |
代码非常简单,就是创立并启动 createConnectionThread。createConnectionThread 线程负责物理连贯的创立、以及连贯的池化。
createConnectionThread#run
连贯的创立及池化是在 createConnectionThread 线程的 run 办法中实现的。
public void run() {initedLatch.countDown(); | |
long lastDiscardCount = 0; | |
int errorCount = 0; | |
for (;;) { | |
// addLast | |
try {lock.lockInterruptibly(); | |
} catch (InterruptedException e2) {break;} | |
long discardCount = DruidDataSource.this.discardCount; | |
boolean discardChanged = discardCount - lastDiscardCount > 0; | |
lastDiscardCount = discardCount; | |
try { | |
boolean emptyWait = true; | |
if (createError != null | |
&& poolingCount == 0 | |
&& !discardChanged) {emptyWait = false;} | |
if (emptyWait | |
&& asyncInit && createCount < initialSize) {emptyWait = false;} |
线程启动之后会一直检测是否须要创立连贯,在 run 办法的无条件 for 循环中实现。
首先取得锁。
discardChanged 变量示意在每次循环检测过程中,被 discardCount 的连接数是否有增长。
而后定义了一个 emptyWait 变量,用来示意是否须要暂缓创立连贯、直到期待获取连贯的线程唤醒之后才创立。
对 emptyWait 的解决逻辑是:如果创立连贯产生了谬误并且以后连接池空、并且没有产生 discardChanged,则不期待。或者,如果是异步初始化并且初始化的连接池数量尚未满足 initialSize 的要求,则不期待。
而后:
if (emptyWait) { | |
// 必须存在线程期待,才创立连贯 | |
if (poolingCount >= notEmptyWaitThreadCount // | |
&& (!(keepAlive && activeCount + poolingCount < minIdle)) | |
&& !isFailContinuous()) {empty.await(); | |
} | |
// 避免创立超过 maxActive 数量的连贯 | |
if (activeCount + poolingCount >= maxActive) {empty.await(); | |
continue; | |
} | |
} | |
} catch (InterruptedException e) { | |
lastCreateError = e; | |
lastErrorTimeMillis = System.currentTimeMillis(); | |
if ((!closing) && (!closed)) {LOG.error("create connection Thread Interrupted, url:" + jdbcUrl, e); | |
} | |
break; | |
} finally {lock.unlock(); | |
} |
这段代码是须要期待的状况,判断以后线程池数量满足期待条件(连接池数量大于 notEmptyWaitThreadCount 数量,activeCount + poolingCount 数量大于等于 minIdle 或者是 keepAlive)。或者,activeCount + poolingCount 曾经大于等于 maxActive 了,则调用 empty.await(); 也就是临时不再创立连贯了,期待获取连贯的线程唤醒之后再创立。 当然,期待是须要开释锁资源的。
接下来是不期待的代码:
PhysicalConnectionInfo connection = null; | |
try {connection = createPhysicalConnection(); | |
} catch (SQLException e) {// 上面是一堆创立连贯失败的异样解决,疏忽 |
创立数据库物理连贯。之后:
if (connection == null) {continue;} | |
boolean result = put(connection); | |
if (!result) {JdbcUtils.close(connection.getPhysicalConnection()); | |
LOG.info("put physical connection to pool failed."); | |
} | |
errorCount = 0; // reset errorCount | |
if (closing || closed) {break;} | |
} | |
} | |
} |
如果创立连贯过程中产生异样,connection==null,则 continue,持续创立。
否则,调用 put 办法,将新创建的连贯退出连接池。当然,如果退出失败的话则敞开刚创立好的连贯,免得资源节约。
接下来看一下连贯放入 connections 的 put 办法。
put 办法
protected boolean put(PhysicalConnectionInfo physicalConnectionInfo) { | |
DruidConnectionHolder holder = null; | |
try {holder = new DruidConnectionHolder(DruidDataSource.this, physicalConnectionInfo); | |
} catch (SQLException ex) {lock.lock(); | |
try {if (createScheduler != null) {clearCreateTask(physicalConnectionInfo.createTaskId); | |
} | |
} finally {lock.unlock(); | |
} | |
LOG.error("create connection holder error", ex); | |
return false; | |
} | |
return put(holder, physicalConnectionInfo.createTaskId); | |
} |
将 connection 封装为 DruidConnectionHolder,之后调用 put:
private boolean put(DruidConnectionHolder holder, long createTaskId) {lock.lock(); | |
try {if (this.closing || this.closed) {return false;} | |
if (poolingCount >= maxActive) {if (createScheduler != null) {clearCreateTask(createTaskId); | |
} | |
return false; | |
} | |
connections[poolingCount] = holder; | |
incrementPoolingCount(); | |
if (poolingCount > poolingPeak) { | |
poolingPeak = poolingCount; | |
poolingPeakTime = System.currentTimeMillis();} | |
notEmpty.signal(); | |
notEmptySignalCount++; | |
if (createScheduler != null) {clearCreateTask(createTaskId); | |
if (poolingCount + createTaskCount < notEmptyWaitThreadCount // | |
&& activeCount + poolingCount + createTaskCount < maxActive) {emptySignal(); | |
} | |
} | |
} finally {lock.unlock(); | |
} | |
return true; | |
} |
首先获取锁资源。
之后判断如果创立的连接数大于最大流动连接数 poolingCount >= maxActive 的话,则不放入连接池间接返回。
接下来,连贯放入连接池 connections 中,poolingCount 加 1。
接下来 notEmpty.signal(); 告诉期待获取连贯的线程。
之后开释锁资源,连贯退出连接池实现!
createAndStartDestroyThread 办法
创立并启动 DestroyThread,间接看 destroyConnectionThread 的 run 办法:
public void run() {initedLatch.countDown(); | |
for (;;) { | |
// 从后面开始删除 | |
try {if (closed || closing) {break;} | |
if (timeBetweenEvictionRunsMillis > 0) {Thread.sleep(timeBetweenEvictionRunsMillis); | |
} else {Thread.sleep(1000); // | |
} | |
if (Thread.interrupted()) {break;} | |
destroyTask.run();} catch (InterruptedException e) {break;} | |
} | |
} | |
} |
查看 timeBetweenEvictionRunsMillis,该参数的意思是执行连贯回收的间隔时间,如果该参数设置为 >0,则线程睡眠 timeBetweenEvictionRunsMillis 之后再执行,否则线程每秒执行一次。
调用 destroyTask.run();-> shrink() 办法执行线程回收。
连贯回收的次要工作是 shrink 办法实现的,篇幅起因,放在下一篇文章钻研。
小结
理解了 Druid 连接池的根底构造及其初始化过程,以及连接池中连贯销毁的发动机制:通过 destroyConnectionThread 线程调用 destroyTask 对闲暇超时的连贯进行回收,确保连接池中的连贯放弃在衰弱状态。连贯回收的次要逻辑是在 shrink 办法中,其实也是 Druid 连接池中比拟要害的一部分,下一篇文章剖析。
Thanks a lot!
上一篇 连接池 HikariPool(二)– 获取及敞开连贯