前言

Druid是阿里开源的数据库连接池,是阿里监控零碎Dragoon的副产品,提供了弱小的可监控性和基于Filter-Chain的可扩展性。

本篇文章将对Druid数据库连接池的连贯创立销毁进行剖析。剖析Druid数据库连接池的源码前,须要明确几个概念。

  1. Druid数据库连接池中可用的连贯寄存在一个数组connections中;
  2. Druid数据库连接池做并发管制,次要靠一把可重入锁以及和这把锁关联的两个Condition对象;
public DruidAbstractDataSource(boolean lockFair) {   lock = new ReentrantLock(lockFair);   notEmpty = lock.newCondition();   empty = lock.newCondition();}
  1. 连接池没有可用连贯时,利用线程会在notEmpty上期待,连接池已满时,生产连贯的线程会在empty上期待;
  2. 对连贯保活,就是每距离肯定工夫,对达到了保活距离周期的连贯进行有效性校验,能够将有效连贯销毁,也能够避免连贯长时间不与数据库服务端通信。

Druid版本:1.2.11

注释

一. DruidDataSource连贯创立

DruidDataSource连贯的创立由CreateConnectionThread线程实现,其run() 办法如下所示。

public void run() {    initedLatch.countDown();    long lastDiscardCount = 0;    int errorCount = 0;    for (; ; ) {        try {            lock.lockInterruptibly();        } catch (InterruptedException e2) {            break;        }        long discardCount = DruidDataSource.this.discardCount;        boolean discardChanged = discardCount - lastDiscardCount > 0;        lastDiscardCount = discardCount;        try {            // emptyWait为true示意生产连接线程须要期待,无需生产连贯            boolean emptyWait = true;            // 产生了创立谬误,且池中已无连贯,且抛弃连贯的统计没有扭转            // 此时生产连接线程须要生产连贯            if (createError != null                    && poolingCount == 0                    && !discardChanged) {                emptyWait = false;            }            if (emptyWait                    && asyncInit && createCount < initialSize) {                emptyWait = false;            }            if (emptyWait) {                // 池中已有连接数大于等于正在期待连贯的利用线程数                // 且以后是非keepAlive场景                // 且以后是非间断失败                // 此时生产连贯的线程在empty上期待                // keepAlive && activeCount + poolingCount < minIdle时会在shrink()办法中触发emptySingal()来增加连贯                // isFailContinuous()返回true示意间断失败,即屡次(默认2次)创立物理连贯失败                if (poolingCount >= notEmptyWaitThreadCount                        && (!(keepAlive && activeCount + poolingCount < minIdle))                        && !isFailContinuous()                ) {                    empty.await();                }                // 避免创立超过maxActive数量的连贯                if (activeCount + poolingCount >= maxActive) {                    empty.await();                    continue;                }            }        } catch (InterruptedException e) {            // 省略        } finally {            lock.unlock();        }        PhysicalConnectionInfo connection = null;        try {            connection = createPhysicalConnection();        } catch (SQLException e) {            LOG.error("create connection SQLException, url: " + jdbcUrl                    + ", errorCode " + e.getErrorCode()                    + ", state " + e.getSQLState(), e);            errorCount++;            if (errorCount > connectionErrorRetryAttempts                    && timeBetweenConnectErrorMillis > 0) {                // 屡次创立失败                setFailContinuous(true);                // 如果配置了疾速失败,就唤醒所有在notEmpty上期待的利用线程                if (failFast) {                    lock.lock();                    try {                        notEmpty.signalAll();                    } finally {                        lock.unlock();                    }                }                if (breakAfterAcquireFailure) {                    break;                }                try {                    Thread.sleep(timeBetweenConnectErrorMillis);                } catch (InterruptedException interruptEx) {                    break;                }            }        } catch (RuntimeException e) {            LOG.error("create connection RuntimeException", e);            setFailContinuous(true);            continue;        } catch (Error e) {            LOG.error("create connection Error", e);            setFailContinuous(true);            break;        }        if (connection == null) {            continue;        }        // 把连贯增加到连接池        boolean result = put(connection);        if (!result) {            JdbcUtils.close(connection.getPhysicalConnection());            LOG.info("put physical connection to pool failed.");        }        errorCount = 0;        if (closing || closed) {            break;        }    }}

CreateConnectionThreadrun() 办法整体就是在一个死循环中一直的期待,被唤醒,而后创立线程。当一个物理连贯被创立进去后,会调用DruidDataSource#put办法将其放到连接池connections中,put() 办法源码如下所示。

protected boolean put(PhysicalConnectionInfo physicalConnectionInfo) {    DruidConnectionHolder holder = null;    try {        holder = new DruidConnectionHolder(DruidDataSource.this, physicalConnectionInfo);    } catch (SQLException ex) {        // 省略        return false;    }    return put(holder, physicalConnectionInfo.createTaskId, false);}private boolean put(DruidConnectionHolder holder,                    long createTaskId, boolean checkExists) {    // 波及到连接池中连贯数量扭转的操作,都须要加锁    lock.lock();    try {        if (this.closing || this.closed) {            return false;        }        // 池中已有连接数曾经大于等于最大连接数,则不再把连贯加到连接池并间接返回false        if (poolingCount >= maxActive) {            if (createScheduler != null) {                clearCreateTask(createTaskId);            }            return false;        }        // 查看反复增加        if (checkExists) {            for (int i = 0; i < poolingCount; i++) {                if (connections[i] == holder) {                    return false;                }            }        }        // 连贯放入连接池        connections[poolingCount] = holder;        // poolingCount++        incrementPoolingCount();        if (poolingCount > poolingPeak) {            poolingPeak = poolingCount;            poolingPeakTime = System.currentTimeMillis();        }        // 唤醒在notEmpty上期待连贯的利用线程        notEmpty.signal();        notEmptySignalCount++;        if (createScheduler != null) {            clearCreateTask(createTaskId);            if (poolingCount + createTaskCount < notEmptyWaitThreadCount                    && activeCount + poolingCount + createTaskCount < maxActive) {                emptySignal();            }        }    } finally {        lock.unlock();    }    return true;}

put() 办法会先将物理连贯从PhysicalConnectionInfo中获取进去并封装成一个DruidConnectionHolderDruidConnectionHolder就是Druid连接池中的连贯。新增加的连贯会寄存在连接池数组connectionspoolingCount地位,而后poolingCount会加1,也就是poolingCount代表着连接池中能够获取的连贯的数量。

二. DruidDataSource连贯销毁

DruidDataSource连贯的销毁由DestroyConnectionThread线程实现,其run() 办法如下所示。

public void run() {    // run()办法只有执行了,就调用initedLatch#countDown    initedLatch.countDown();    for (; ; ) {        // 每距离timeBetweenEvictionRunsMillis执行一次DestroyTask的run()办法        try {            if (closed || closing) {                break;            }            if (timeBetweenEvictionRunsMillis > 0) {                Thread.sleep(timeBetweenEvictionRunsMillis);            } else {                Thread.sleep(1000);            }            if (Thread.interrupted()) {                break;            }            // 执行DestroyTask的run()办法来销毁须要销毁的连贯            destroyTask.run();        } catch (InterruptedException e) {            break;        }    }}

DestroyConnectionThreadrun() 办法就是在一个死循环中每距离timeBetweenEvictionRunsMillis的工夫就执行一次DestroyTaskrun() 办法。DestroyTask#run办法实现如下所示。

public void run() {    // 依据一系列条件判断并销毁连贯    shrink(true, keepAlive);    // RemoveAbandoned机制    if (isRemoveAbandoned()) {        removeAbandoned();    }}

DestroyTask#run办法中会调用DruidDataSource#shrink办法来依据设定的条件来判断出须要销毁和保活的连贯。DruidDataSource#shrink办法如下所示。

// checkTime参数示意在将一个连贯进行销毁前,是否须要判断一下闲暇工夫public void shrink(boolean checkTime, boolean keepAlive) {    // 加锁    try {        lock.lockInterruptibly();    } catch (InterruptedException e) {        return;    }    // needFill = keepAlive && poolingCount + activeCount < minIdle    // needFill为true时,会调用empty.signal()唤醒生产连贯的线程来生产连贯    boolean needFill = false;    // evictCount记录须要销毁的连接数    // keepAliveCount记录须要保活的连接数    int evictCount = 0;    int keepAliveCount = 0;    int fatalErrorIncrement = fatalErrorCount - fatalErrorCountLastShrink;    fatalErrorCountLastShrink = fatalErrorCount;    try {        if (!inited) {            return;        }        // checkCount = 池中已有连接数 - 最小闲暇连接数        // 失常状况下,最多可能将前checkCount个连贯进行销毁        final int checkCount = poolingCount - minIdle;        final long currentTimeMillis = System.currentTimeMillis();        // 失常状况下,须要遍历池中所有连贯        // 从前往后遍历,i为数组索引        for (int i = 0; i < poolingCount; ++i) {            DruidConnectionHolder connection = connections[i];            // 如果产生了致命谬误(onFatalError == true)且致命谬误产生工夫(lastFatalErrorTimeMillis)在连贯建设工夫之后            // 把连贯退出到保活连贯数组中            if ((onFatalError || fatalErrorIncrement > 0)                    && (lastFatalErrorTimeMillis > connection.connectTimeMillis)) {                keepAliveConnections[keepAliveCount++] = connection;                continue;            }            if (checkTime) {                // phyTimeoutMillis示意连贯的物理存活超时工夫,默认值是-1                if (phyTimeoutMillis > 0) {                    // phyConnectTimeMillis示意连贯的物理存活工夫                    long phyConnectTimeMillis = currentTimeMillis                            - connection.connectTimeMillis;                    // 连贯的物理存活工夫大于phyTimeoutMillis,则将这个连贯放入evictConnections数组                    if (phyConnectTimeMillis > phyTimeoutMillis) {                        evictConnections[evictCount++] = connection;                        continue;                    }                }                // idleMillis示意连贯的闲暇工夫                long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;                // minEvictableIdleTimeMillis示意连贯容许的最小闲暇工夫,默认是30分钟                // keepAliveBetweenTimeMillis示意保活间隔时间,默认是2分钟                // 如果连贯的闲暇工夫小于minEvictableIdleTimeMillis且还小于keepAliveBetweenTimeMillis                // 则connections数组中以后连贯之后的连贯都会满足闲暇工夫小于minEvictableIdleTimeMillis且还小于keepAliveBetweenTimeMillis                // 此时跳出遍历,不再查看其余的连贯                if (idleMillis < minEvictableIdleTimeMillis                        && idleMillis < keepAliveBetweenTimeMillis                ) {                    break;                }                // 连贯的闲暇工夫大于等于容许的最小闲暇工夫                if (idleMillis >= minEvictableIdleTimeMillis) {                    if (checkTime && i < checkCount) {                        // i < checkCount这个条件的了解如下:                        // 每次shrink()办法执行时,connections数组中只有索引0到checkCount-1的连贯才容许被销毁                        // 这样能力保障销毁完连贯后,connections数组中至多还有minIdle个连贯                        evictConnections[evictCount++] = connection;                        continue;                    } else if (idleMillis > maxEvictableIdleTimeMillis) {                        // 如果闲暇工夫过久,曾经大于了容许的最大闲暇工夫(默认7小时)                        // 那么无论如何都要销毁这个连贯                        evictConnections[evictCount++] = connection;                        continue;                    }                }                // 如果开启了保活机制,且连贯闲暇工夫大于等于了保活间隔时间                // 此时将连贯退出到保活连贯数组中                if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) {                    keepAliveConnections[keepAliveCount++] = connection;                }            } else {                // checkTime为false,那么前checkCount个连贯间接进行销毁,不再判断这些连贯的闲暇工夫是否超过阈值                if (i < checkCount) {                    evictConnections[evictCount++] = connection;                } else {                    break;                }            }        }        // removeCount = 销毁连接数 + 保活连接数        // removeCount示意本次从connections数组中拿掉的连接数        // 注:肯定是从返回后拿,失常状况下最初minIdle个连贯是平安的        int removeCount = evictCount + keepAliveCount;        if (removeCount > 0) {            // [0, 1, 2, 3, 4, null, null, null] -> [3, 4, 2, 3, 4, null, null, null]            System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);            // [3, 4, 2, 3, 4, null, null, null] -> [3, 4, null, null, null, null, null, null, null]            Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);            // 更新池中连接数            poolingCount -= removeCount;        }        keepAliveCheckCount += keepAliveCount;        // 如果池中连接数加上沉闷连接数(借出去的连贯)小于最小闲暇连接数        // 则将needFill设为true,后续须要唤醒生产连贯的线程来生产连贯        if (keepAlive && poolingCount + activeCount < minIdle) {            needFill = true;        }    } finally {        lock.unlock();    }    if (evictCount > 0) {        // 遍历evictConnections数组,销毁其中的连贯        for (int i = 0; i < evictCount; ++i) {            DruidConnectionHolder item = evictConnections[i];            Connection connection = item.getConnection();            JdbcUtils.close(connection);            destroyCountUpdater.incrementAndGet(this);        }        Arrays.fill(evictConnections, null);    }    if (keepAliveCount > 0) {        // 遍历keepAliveConnections数组,对其中的连贯做可用性校验        // 校验通过连贯就放入connections数组,没通过连贯就销毁        for (int i = keepAliveCount - 1; i >= 0; --i) {            DruidConnectionHolder holer = keepAliveConnections[i];            Connection connection = holer.getConnection();            holer.incrementKeepAliveCheckCount();            boolean validate = false;            try {                this.validateConnection(connection);                validate = true;            } catch (Throwable error) {                if (LOG.isDebugEnabled()) {                    LOG.debug("keepAliveErr", error);                }            }            boolean discard = !validate;            if (validate) {                holer.lastKeepTimeMillis = System.currentTimeMillis();                boolean putOk = put(holer, 0L, true);                if (!putOk) {                    discard = true;                }            }            if (discard) {                try {                    connection.close();                } catch (Exception e) {                }                lock.lock();                try {                    discardCount++;                    if (activeCount + poolingCount <= minIdle) {                        emptySignal();                    }                } finally {                    lock.unlock();                }            }        }        this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);        Arrays.fill(keepAliveConnections, null);    }    // 如果needFill为true则唤醒生产连贯的线程来生产连贯    if (needFill) {        lock.lock();        try {            // 计算须要生产连贯的个数            int fillCount = minIdle - (activeCount + poolingCount + createTaskCount);            for (int i = 0; i < fillCount; ++i) {                emptySignal();            }        } finally {            lock.unlock();        }    } else if (onFatalError || fatalErrorIncrement > 0) {        lock.lock();        try {            emptySignal();        } finally {            lock.unlock();        }    }}

DruidDataSource#shrink办法中,外围逻辑是遍历connections数组中的连贯,并判断这些连贯是须要销毁还是须要保活。通常状况下,connections数组中的前checkCount(checkCount = poolingCount - minIdle) 个连贯是危险的,因为这些连贯只有满足了:闲暇工夫 >= minEvictableIdleTimeMillis(容许的最小闲暇工夫),那么就须要被销毁,而connections数组中的最初minIdle个连贯是绝对平安的,因为这些连贯只有在满足:闲暇工夫 > maxEvictableIdleTimeMillis(容许的最大闲暇工夫) 时,才会被销毁。这么判断的起因,次要就是须要让连接池里可能保障至多有minIdle个闲暇连贯能够让利用线程获取。

当确定好了须要销毁和须要保活的连贯后,此时会先将connections数组清理,只保留平安的连贯,这个过程示意图如下。

最初,会遍历evictConnections数组,销毁数组中的连贯,遍历keepAliveConnections数组,对其中的每个连贯做可用性校验,如果校验可用,那么就从新放回connections数组,否则销毁。

总结

连贯的创立由一个叫做CreateConnectionThread的线程实现,整体流程就是在一个死循环中一直的期待,被唤醒,而后创立连贯。每一个被创立进去的物理连贯java.sql.Connection会被封装为一个DruidConnectionHolder,而后寄存到connections数组中。

连贯的销毁由一个叫做DestroyConnectionThread的线程实现,外围逻辑是周期性的遍历connections数组中的连贯,并判断这些连贯是须要销毁还是须要保活,须要销毁的连贯最初会被物理销毁,须要保活的连贯最初会进行一次可用性校验,如果校验不通过,则进行物理销毁。