HikariPool之后,明天钻研另一支流连接池Druid。

对于数据库连接池的根本认知

先对数据库连接池的根本工作原理做个理解,不论是HikariPool、还是druid,所有的数据库连接池应该都是依照这个基本原理工作和实现的,带着这个思路去学习数据库连接池,防止盲人摸象。

数据库连接池肯定会蕴含以下根本逻辑:

  1. 创立连贯并池化:初始化的时候创立、或者是在利用获取连贯的过程中创立,连贯创立好之后放在连接池(内存中的容器,比方List)中保留。
  2. 获取数据库连贯:接管了获取数据库连贯的办法,从连接池中获取、而不是创立连贯。
  3. 敞开数据库连贯:接管了敞开数据库连贯的办法,将连贯偿还到连接池、而不是真正的敞开数据库连贯。
  4. 连接池保护:连接池容量、连贯超时清理等工作。

带着这个思路钻研HikariPool的源码,会有事倍功半的效用。

意识Druid的构造

包含以下几个局部:

  1. DruidConnectionHolder
  2. Connections
  3. evictConnections
  4. keepAliveConnections
  5. destroyConnectionThread/destroySchedulerFuture
  6. createConnectionThread/createSchedulerFuture

DruidConnectionHolder

HikariPool通过poolEntry持有数据库连贯,Druid通过DruidConnectionHolder持有数据库连贯。

DruidConnectionHolder持有物理数据库连贯Connectin对象,以及该连贯的相干属性,比方connectTimeMillis、lastActiveTimeMillis、lastExecTimeMillis,以及underlyingReadOnly、underlyingAutoCommit、underlyingTransactionIsolation等等。连接池能够依据这些属性以及相干参数执行相应的houseKeep。

Connections

Connections是DruidConnectionHolder组成的数组,是Druid连接池中惟一存储可用连贯的中央,看起来会比HikariPool简略许多(HikariPool有三个存储连贯的中央),然而这可能也是Druid在性能上稍逊于HikariPool的起因之一。

evictConnections

存储须要被回收的连贯的数组,在连接池进行清理的时候用来存储须要被敞开的连贯。

keepAliveConnections

存储放弃流动的连贯的数组。

createConnectionThread/createSchedulerFuture

Druid依据配置,能够通过createConnectionThread线程、或者createSchedulerFuture线程工作创立数据库连贯并退出连接池。

Druid兴许并没有默认的createSchedulerFuture的实现,如果要启用createSchedulerFuture,须要配置createSchedulerFuture的实现类。

createConnectionThread是Druid默认的创立连贯的线程,负责获取物理连贯、组装物理连贯为DruidConnectionHolder并退出到connections数组中。

destroyConnectionThread/destroySchedulerFuture

与创立连贯的形式相似,Druid提供两种不同的形式销毁(或者敞开)过期的数据库连贯。默认实现是destroyConnectionThread。

好了,Druid的根底构造理解完了,咱们采纳和HikariPool齐全一样的剖析套路,接下来要进入源码剖析了,次要包含:

  1. Druid连接池的初始化
  2. 获取数据库连贯 - getConnection办法
  3. 敞开数据库连贯 - close办法

Druid的初始化

Druid的初始化过程貌似和HikariPool稍有不同,因为HikariPool默认的在获取连贯之前的HikariPool实例化过程中就实现了连接池的初始化。

所谓实现连接池的初始化,指的是依照参数的设定,实现了数据库连贯的创立和池化,也就是说连接池曾经筹备好了,利用在通过getConnecton办法获取连贯的时候,间接从连接池中borrow就能够了。

Druid貌似不这样。咱们看一下DruidDataSource的实例化办法:

public DruidDataSource(){        this(false);    }    public DruidDataSource(boolean fairLock){        super(fairLock);        configFromPropety(System.getProperties());    }

super指的是DruidAbstractDataSource,他的构造方法:

public DruidAbstractDataSource(boolean lockFair){        lock = new ReentrantLock(lockFair);        notEmpty = lock.newCondition();        empty = lock.newCondition();    }

只初始化了ReentrantLock,以及他的两个Condition:empty和notEmpty。

而configFromPropety只是负责把参数从配置文件中读入,不做其余的事件。

所以,连接池的初始化过程没有放在DruidDataSource的创立过程中。

既然构造方法中没有实现连接池的初始化,咱们自然而然的就想到去看看getConnection办法,不做初始化、怎么能获取到数据库连贯?

果然,getConnectin办法的第一行代码就是:调用init()办法。

init的办法很长,不过有很多代码都是查看参数合理性的,这部分代码咱们间接跳过:

            ...疏忽n多行代码            connections = new DruidConnectionHolder[maxActive];            evictConnections = new DruidConnectionHolder[maxActive];            keepAliveConnections = new DruidConnectionHolder[maxActive];

创立了咱们后面说过的存储数据库连贯的connections(其实就是池),以及另外两个辅助数组evictConnections和keepAliveConnections,连接池的大小初始化为maxActive。

接下来:

            if (createScheduler != null && asyncInit) {                for (int i = 0; i < initialSize; ++i) {                    submitCreateTask(true);                }            } else if (!asyncInit) {                // init connections                while (poolingCount < initialSize) {                    try {                        PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();                        DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);                        connections[poolingCount++] = holder;                    } catch (SQLException ex) {                        LOG.error("init datasource error, url: " + this.getUrl(), ex);                        if (initExceptionThrow) {                            connectError = ex;                            break;                        } else {                            Thread.sleep(3000);                        }                    }                }                if (poolingCount > 0) {                    poolingPeak = poolingCount;                    poolingPeakTime = System.currentTimeMillis();                }            }

查看createScheduler不空、并且参数设置为asyncInit(异步初始化)的话,则提交initialSize个连贯创立工作。咱们说过createScheduler并非Druid的默认实现,所以咱们临时不论这部分代码。

上面的代码逻辑是:如果不是异步初始化的话,那就是要同步初始化,也就是以后线程要负责实现连接池的初始化,则循环创立initialSize个物理连贯、封装为DruidConnectionHolder后间接退出到connections中。

这个过程就相当于实现了数据库连接池的初始化,然而倡议设置asyncInit参数为true:异步初始化。因为如果同步初始化、并且initialSize设置比拟大的话,利用的首个getConnection办法必定会耗时比拟长,用户体验不好。

持续看代码,如果asyncInit设置为true,异步初始化的话:

            createAndLogThread();            createAndStartCreatorThread();            createAndStartDestroyThread();

从办法名上看,应该是启动了3个线程,别离是createAndLogThread线程、负责线程创立连贯的线程和负责销毁(敞开)连贯的线程。

createAndLogThread线程负责定期收集Druid连接池的状态并通过log打印进去,是Druid统计分析的一部分。

createAndStartCreatorThread是启动连贯创立线程,异步初始化的状况下,连接池就是通过createAndStartCreatorThread线程创立的。

createAndStartDestroyThread是启动连贯销毁的线程,作用相似于HikariPool的houseKeep线程。

createAndStartCreatorThread办法以及连贯创立线程、reateAndStartDestroyThread办法以及连贯销毁线程的源码咱们先放放,稍后剖析。

咱们先一鼓作气实现init()办法源码的残余局部:

            initedLatch.await();            init = true;            initedTime = new Date();            registerMbean();            if (connectError != null && poolingCount == 0) {                throw connectError;            }

initedLatch是数量=2的CountDownLatch,在DruidDataSource类成员变量初始化的时候定义好的,这里initedLatch.await();的意思是期待连贯创立线程和连贯销毁线程实现启动。

之后,打标签init=true,表明初始化已实现,进行异样解决、锁开释等开头工作。

初始化实现!

createAndStartCreatorThread

创立并启动“创立连接线程”:

   protected void createAndStartCreatorThread() {        if (createScheduler == null) {            String threadName = "Druid-ConnectionPool-Create-" + System.identityHashCode(this);            createConnectionThread = new CreateConnectionThread(threadName);            createConnectionThread.start();            return;        }        initedLatch.countDown();    }

代码非常简单,就是创立并启动createConnectionThread。createConnectionThread线程负责物理连贯的创立、以及连贯的池化。

createConnectionThread#run

连贯的创立及池化是在createConnectionThread线程的run办法中实现的。

       public void run() {            initedLatch.countDown();            long lastDiscardCount = 0;            int errorCount = 0;            for (;;) {                // addLast                try {                    lock.lockInterruptibly();                } catch (InterruptedException e2) {                    break;                }               long discardCount = DruidDataSource.this.discardCount;                boolean discardChanged = discardCount - lastDiscardCount > 0;                lastDiscardCount = discardCount;                try {                    boolean emptyWait = true;                    if (createError != null                            && poolingCount == 0                            && !discardChanged) {                        emptyWait = false;                    }                    if (emptyWait                            && asyncInit && createCount < initialSize) {                        emptyWait = false;                    }

线程启动之后会一直检测是否须要创立连贯,在run办法的无条件for循环中实现。

首先取得锁。

discardChanged变量示意在每次循环检测过程中,被discardCount的连接数是否有增长。

而后定义了一个emptyWait变量,用来示意是否须要暂缓创立连贯、直到期待获取连贯的线程唤醒之后才创立。

对emptyWait的解决逻辑是:如果创立连贯产生了谬误并且以后连接池空、并且没有产生discardChanged,则不期待。或者,如果是异步初始化并且初始化的连接池数量尚未满足initialSize的要求,则不期待。

而后:

                  if (emptyWait) {                        // 必须存在线程期待,才创立连贯                        if (poolingCount >= notEmptyWaitThreadCount //                                && (!(keepAlive && activeCount + poolingCount < minIdle))                                && !isFailContinuous()                        ) {                            empty.await();                        }                        // 避免创立超过maxActive数量的连贯                        if (activeCount + poolingCount >= maxActive) {                            empty.await();                            continue;                        }                    }                } catch (InterruptedException e) {                    lastCreateError = e;                    lastErrorTimeMillis = System.currentTimeMillis();                    if ((!closing) && (!closed)) {                        LOG.error("create connection Thread Interrupted, url: " + jdbcUrl, e);                    }                    break;                } finally {                    lock.unlock();                }

这段代码是须要期待的状况,判断以后线程池数量满足期待条件(连接池数量大于notEmptyWaitThreadCount数量,activeCount + poolingCount数量大于等于minIdle或者是keepAlive)。或者,activeCount + poolingCount曾经大于等于maxActive了,则调用empty.await();也就是临时不再创立连贯了,期待获取连贯的线程唤醒之后再创立。当然,期待是须要开释锁资源的。

接下来是不期待的代码:

               PhysicalConnectionInfo connection = null;                try {                    connection = createPhysicalConnection();                } catch (SQLException e) {         //上面是一堆创立连贯失败的异样解决,疏忽

创立数据库物理连贯。之后:

                if (connection == null) {                    continue;                }                boolean result = put(connection);                if (!result) {                    JdbcUtils.close(connection.getPhysicalConnection());                    LOG.info("put physical connection to pool failed.");                }                errorCount = 0; // reset errorCount                if (closing || closed) {                    break;                }            }        }    }

如果创立连贯过程中产生异样,connection==null,则continue,持续创立。

否则,调用put办法,将新创建的连贯退出连接池。当然,如果退出失败的话则敞开刚创立好的连贯,免得资源节约。

接下来看一下连贯放入connections的put办法。

put办法

   protected boolean put(PhysicalConnectionInfo physicalConnectionInfo) {        DruidConnectionHolder holder = null;        try {            holder = new DruidConnectionHolder(DruidDataSource.this, physicalConnectionInfo);        } catch (SQLException ex) {            lock.lock();            try {                if (createScheduler != null) {                    clearCreateTask(physicalConnectionInfo.createTaskId);                }            } finally {                lock.unlock();            }            LOG.error("create connection holder error", ex);            return false;        }        return put(holder, physicalConnectionInfo.createTaskId);    }

将connection封装为DruidConnectionHolder,之后调用put:

private boolean put(DruidConnectionHolder holder, long createTaskId) {        lock.lock();        try {            if (this.closing || this.closed) {                return false;            }            if (poolingCount >= maxActive) {                if (createScheduler != null) {                    clearCreateTask(createTaskId);                }                return false;            }            connections[poolingCount] = holder;            incrementPoolingCount();            if (poolingCount > poolingPeak) {                poolingPeak = poolingCount;                poolingPeakTime = System.currentTimeMillis();            }            notEmpty.signal();            notEmptySignalCount++;            if (createScheduler != null) {                clearCreateTask(createTaskId);                if (poolingCount + createTaskCount < notEmptyWaitThreadCount //                    && activeCount + poolingCount + createTaskCount < maxActive) {                    emptySignal();                }            }        } finally {            lock.unlock();        }        return true;    }

首先获取锁资源。

之后判断如果创立的连接数大于最大流动连接数poolingCount >= maxActive的话,则不放入连接池间接返回。

接下来,连贯放入连接池connections中,poolingCount加1。

接下来notEmpty.signal();告诉期待获取连贯的线程。

之后开释锁资源,连贯退出连接池实现!

createAndStartDestroyThread办法

创立并启动DestroyThread,间接看destroyConnectionThread的run办法:

        public void run() {            initedLatch.countDown();            for (;;) {                // 从后面开始删除                try {                    if (closed || closing) {                        break;                    }                    if (timeBetweenEvictionRunsMillis > 0) {                        Thread.sleep(timeBetweenEvictionRunsMillis);                    } else {                        Thread.sleep(1000); //                    }                    if (Thread.interrupted()) {                        break;                    }                    destroyTask.run();                } catch (InterruptedException e) {                    break;                }            }        }    }

查看timeBetweenEvictionRunsMillis,该参数的意思是执行连贯回收的间隔时间,如果该参数设置为>0,则线程睡眠timeBetweenEvictionRunsMillis之后再执行,否则线程每秒执行一次。

调用destroyTask.run();-> shrink()办法执行线程回收。

连贯回收的次要工作是shrink办法实现的,篇幅起因,放在下一篇文章钻研。

小结

理解了Druid连接池的根底构造及其初始化过程,以及连接池中连贯销毁的发动机制:通过destroyConnectionThread线程调用destroyTask对闲暇超时的连贯进行回收,确保连接池中的连贯放弃在衰弱状态。连贯回收的次要逻辑是在shrink办法中,其实也是Druid连接池中比拟要害的一部分,下一篇文章剖析。

Thanks a lot!

上一篇 连接池 HikariPool (二) - 获取及敞开连贯