关于druid:搞懂Druid之连接创建和销毁

8次阅读

共计 10268 个字符,预计需要花费 26 分钟才能阅读完成。

前言

Druid是阿里开源的数据库连接池,是阿里监控零碎 Dragoon 的副产品,提供了弱小的可监控性和基于 Filter-Chain 的可扩展性。

本篇文章将对 Druid 数据库连接池的 连贯创立 销毁 进行剖析。剖析 Druid 数据库连接池的源码前,须要明确几个概念。

  1. Druid数据库连接池中可用的连贯寄存在一个数组 connections 中;
  2. Druid数据库连接池做并发管制,次要靠一把可重入锁以及和这把锁关联的两个 Condition 对象;
public DruidAbstractDataSource(boolean lockFair) {lock = new ReentrantLock(lockFair);

   notEmpty = lock.newCondition();
   empty = lock.newCondition();}
  1. 连接池没有可用连贯时,利用线程会在 notEmpty 上期待,连接池已满时,生产连贯的线程会在 empty 上期待;
  2. 对连贯保活,就是每距离肯定工夫,对达到了保活距离周期的连贯进行有效性校验,能够将有效连贯销毁,也能够避免连贯长时间不与数据库服务端通信。

Druid版本:1.2.11

注释

一. DruidDataSource 连贯创立

DruidDataSource连贯的创立由 CreateConnectionThread 线程实现,其run() 办法如下所示。

public void run() {initedLatch.countDown();

    long lastDiscardCount = 0;
    int errorCount = 0;
    for (; ;) {
        try {lock.lockInterruptibly();
        } catch (InterruptedException e2) {break;}

        long discardCount = DruidDataSource.this.discardCount;
        boolean discardChanged = discardCount - lastDiscardCount > 0;
        lastDiscardCount = discardCount;

        try {
            // emptyWait 为 true 示意生产连接线程须要期待,无需生产连贯
            boolean emptyWait = true;

            // 产生了创立谬误,且池中已无连贯,且抛弃连贯的统计没有扭转
            // 此时生产连接线程须要生产连贯
            if (createError != null
                    && poolingCount == 0
                    && !discardChanged) {emptyWait = false;}

            if (emptyWait
                    && asyncInit && createCount < initialSize) {emptyWait = false;}

            if (emptyWait) {
                // 池中已有连接数大于等于正在期待连贯的利用线程数
                // 且以后是非 keepAlive 场景
                // 且以后是非间断失败
                // 此时生产连贯的线程在 empty 上期待
                // keepAlive && activeCount + poolingCount < minIdle 时会在 shrink()办法中触发 emptySingal()来增加连贯
                // isFailContinuous()返回 true 示意间断失败,即屡次(默认 2 次)创立物理连贯失败
                if (poolingCount >= notEmptyWaitThreadCount
                        && (!(keepAlive && activeCount + poolingCount < minIdle))
                        && !isFailContinuous()) {empty.await();
                }

                // 避免创立超过 maxActive 数量的连贯
                if (activeCount + poolingCount >= maxActive) {empty.await();
                    continue;
                }
            }

        } catch (InterruptedException e) {// 省略} finally {lock.unlock();
        }

        PhysicalConnectionInfo connection = null;

        try {connection = createPhysicalConnection();
        } catch (SQLException e) {
            LOG.error("create connection SQLException, url:" + jdbcUrl
                    + ", errorCode" + e.getErrorCode()
                    + ", state" + e.getSQLState(), e);

            errorCount++;
            if (errorCount > connectionErrorRetryAttempts
                    && timeBetweenConnectErrorMillis > 0) {
                // 屡次创立失败
                setFailContinuous(true);
                // 如果配置了疾速失败,就唤醒所有在 notEmpty 上期待的利用线程
                if (failFast) {lock.lock();
                    try {notEmpty.signalAll();
                    } finally {lock.unlock();
                    }
                }

                if (breakAfterAcquireFailure) {break;}

                try {Thread.sleep(timeBetweenConnectErrorMillis);
                } catch (InterruptedException interruptEx) {break;}
            }
        } catch (RuntimeException e) {LOG.error("create connection RuntimeException", e);
            setFailContinuous(true);
            continue;
        } catch (Error e) {LOG.error("create connection Error", e);
            setFailContinuous(true);
            break;
        }

        if (connection == null) {continue;}

        // 把连贯增加到连接池
        boolean result = put(connection);
        if (!result) {JdbcUtils.close(connection.getPhysicalConnection());
            LOG.info("put physical connection to pool failed.");
        }

        errorCount = 0;

        if (closing || closed) {break;}
    }
}

CreateConnectionThreadrun() 办法整体就是在一个死循环中一直的期待,被唤醒,而后创立线程。当一个物理连贯被创立进去后,会调用DruidDataSource#put 办法将其放到连接池 connections 中,put() 办法源码如下所示。

protected boolean put(PhysicalConnectionInfo physicalConnectionInfo) {
    DruidConnectionHolder holder = null;
    try {holder = new DruidConnectionHolder(DruidDataSource.this, physicalConnectionInfo);
    } catch (SQLException ex) {
        // 省略
        return false;
    }

    return put(holder, physicalConnectionInfo.createTaskId, false);
}

private boolean put(DruidConnectionHolder holder,
                    long createTaskId, boolean checkExists) {
    // 波及到连接池中连贯数量扭转的操作,都须要加锁
    lock.lock();
    try {if (this.closing || this.closed) {return false;}

        // 池中已有连接数曾经大于等于最大连接数,则不再把连贯加到连接池并间接返回 false
        if (poolingCount >= maxActive) {if (createScheduler != null) {clearCreateTask(createTaskId);
            }
            return false;
        }

        // 查看反复增加
        if (checkExists) {for (int i = 0; i < poolingCount; i++) {if (connections[i] == holder) {return false;}
            }
        }

        // 连贯放入连接池
        connections[poolingCount] = holder;
        // poolingCount++
        incrementPoolingCount();

        if (poolingCount > poolingPeak) {
            poolingPeak = poolingCount;
            poolingPeakTime = System.currentTimeMillis();}

        // 唤醒在 notEmpty 上期待连贯的利用线程
        notEmpty.signal();
        notEmptySignalCount++;

        if (createScheduler != null) {clearCreateTask(createTaskId);

            if (poolingCount + createTaskCount < notEmptyWaitThreadCount
                    && activeCount + poolingCount + createTaskCount < maxActive) {emptySignal();
            }
        }
    } finally {lock.unlock();
    }
    return true;
}

put() 办法会先将物理连贯从 PhysicalConnectionInfo 中获取进去并封装成一个 DruidConnectionHolderDruidConnectionHolder 就是 Druid 连接池中的连贯。新增加的连贯会寄存在连接池数组 connectionspoolingCount地位,而后 poolingCount 会加 1,也就是 poolingCount 代表着连接池中能够获取的连贯的数量。

二. DruidDataSource 连贯销毁

DruidDataSource连贯的销毁由 DestroyConnectionThread 线程实现,其run() 办法如下所示。

public void run() {// run()办法只有执行了,就调用 initedLatch#countDown
    initedLatch.countDown();

    for (; ;) {// 每距离 timeBetweenEvictionRunsMillis 执行一次 DestroyTask 的 run()办法
        try {if (closed || closing) {break;}

            if (timeBetweenEvictionRunsMillis > 0) {Thread.sleep(timeBetweenEvictionRunsMillis);
            } else {Thread.sleep(1000);
            }

            if (Thread.interrupted()) {break;}

            // 执行 DestroyTask 的 run()办法来销毁须要销毁的连贯
            destroyTask.run();} catch (InterruptedException e) {break;}
    }
}

DestroyConnectionThreadrun() 办法就是在一个死循环中每距离timeBetweenEvictionRunsMillis 的工夫就执行一次 DestroyTaskrun() 办法。DestroyTask#run办法实现如下所示。

public void run() {
    // 依据一系列条件判断并销毁连贯
    shrink(true, keepAlive);

    // RemoveAbandoned 机制
    if (isRemoveAbandoned()) {removeAbandoned();
    }
}

DestroyTask#run 办法中会调用 DruidDataSource#shrink 办法来依据设定的条件来判断出须要销毁和保活的连贯。DruidDataSource#shrink办法如下所示。

// checkTime 参数示意在将一个连贯进行销毁前,是否须要判断一下闲暇工夫
public void shrink(boolean checkTime, boolean keepAlive) {
    // 加锁
    try {lock.lockInterruptibly();
    } catch (InterruptedException e) {return;}

    // needFill = keepAlive && poolingCount + activeCount < minIdle
    // needFill 为 true 时,会调用 empty.signal()唤醒生产连贯的线程来生产连贯
    boolean needFill = false;
    // evictCount 记录须要销毁的连接数
    // keepAliveCount 记录须要保活的连接数
    int evictCount = 0;
    int keepAliveCount = 0;
    int fatalErrorIncrement = fatalErrorCount - fatalErrorCountLastShrink;
    fatalErrorCountLastShrink = fatalErrorCount;

    try {if (!inited) {return;}

        // checkCount = 池中已有连接数 - 最小闲暇连接数
        // 失常状况下,最多可能将前 checkCount 个连贯进行销毁
        final int checkCount = poolingCount - minIdle;
        final long currentTimeMillis = System.currentTimeMillis();
        // 失常状况下,须要遍历池中所有连贯
        // 从前往后遍历,i 为数组索引
        for (int i = 0; i < poolingCount; ++i) {DruidConnectionHolder connection = connections[i];

            // 如果产生了致命谬误(onFatalError == true)且致命谬误产生工夫(lastFatalErrorTimeMillis)在连贯建设工夫之后
            // 把连贯退出到保活连贯数组中
            if ((onFatalError || fatalErrorIncrement > 0)
                    && (lastFatalErrorTimeMillis > connection.connectTimeMillis)) {keepAliveConnections[keepAliveCount++] = connection;
                continue;
            }

            if (checkTime) {
                // phyTimeoutMillis 示意连贯的物理存活超时工夫,默认值是 -1
                if (phyTimeoutMillis > 0) {
                    // phyConnectTimeMillis 示意连贯的物理存活工夫
                    long phyConnectTimeMillis = currentTimeMillis
                            - connection.connectTimeMillis;
                    // 连贯的物理存活工夫大于 phyTimeoutMillis,则将这个连贯放入 evictConnections 数组
                    if (phyConnectTimeMillis > phyTimeoutMillis) {evictConnections[evictCount++] = connection;
                        continue;
                    }
                }

                // idleMillis 示意连贯的闲暇工夫
                long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;

                // minEvictableIdleTimeMillis 示意连贯容许的最小闲暇工夫,默认是 30 分钟
                // keepAliveBetweenTimeMillis 示意保活间隔时间,默认是 2 分钟
                // 如果连贯的闲暇工夫小于 minEvictableIdleTimeMillis 且还小于 keepAliveBetweenTimeMillis
                // 则 connections 数组中以后连贯之后的连贯都会满足闲暇工夫小于 minEvictableIdleTimeMillis 且还小于 keepAliveBetweenTimeMillis
                // 此时跳出遍历,不再查看其余的连贯
                if (idleMillis < minEvictableIdleTimeMillis
                        && idleMillis < keepAliveBetweenTimeMillis
                ) {break;}

                // 连贯的闲暇工夫大于等于容许的最小闲暇工夫
                if (idleMillis >= minEvictableIdleTimeMillis) {if (checkTime && i < checkCount) {
                        // i < checkCount 这个条件的了解如下:// 每次 shrink()办法执行时,connections 数组中只有索引 0 到 checkCount- 1 的连贯才容许被销毁
                        // 这样能力保障销毁完连贯后,connections 数组中至多还有 minIdle 个连贯
                        evictConnections[evictCount++] = connection;
                        continue;
                    } else if (idleMillis > maxEvictableIdleTimeMillis) {
                        // 如果闲暇工夫过久,曾经大于了容许的最大闲暇工夫(默认 7 小时)// 那么无论如何都要销毁这个连贯
                        evictConnections[evictCount++] = connection;
                        continue;
                    }
                }

                // 如果开启了保活机制,且连贯闲暇工夫大于等于了保活间隔时间
                // 此时将连贯退出到保活连贯数组中
                if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) {keepAliveConnections[keepAliveCount++] = connection;
                }
            } else {
                // checkTime 为 false,那么前 checkCount 个连贯间接进行销毁,不再判断这些连贯的闲暇工夫是否超过阈值
                if (i < checkCount) {evictConnections[evictCount++] = connection;
                } else {break;}
            }
        }

        // removeCount = 销毁连接数 + 保活连接数
        // removeCount 示意本次从 connections 数组中拿掉的连接数
        // 注:肯定是从返回后拿,失常状况下最初 minIdle 个连贯是平安的
        int removeCount = evictCount + keepAliveCount;
        if (removeCount > 0) {// [0, 1, 2, 3, 4, null, null, null] -> [3, 4, 2, 3, 4, null, null, null]
            System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
            // [3, 4, 2, 3, 4, null, null, null] -> [3, 4, null, null, null, null, null, null, null]
            Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
            // 更新池中连接数
            poolingCount -= removeCount;
        }
        keepAliveCheckCount += keepAliveCount;

        // 如果池中连接数加上沉闷连接数(借出去的连贯)小于最小闲暇连接数
        // 则将 needFill 设为 true,后续须要唤醒生产连贯的线程来生产连贯
        if (keepAlive && poolingCount + activeCount < minIdle) {needFill = true;}
    } finally {lock.unlock();
    }

    if (evictCount > 0) {
        // 遍历 evictConnections 数组,销毁其中的连贯
        for (int i = 0; i < evictCount; ++i) {DruidConnectionHolder item = evictConnections[i];
            Connection connection = item.getConnection();
            JdbcUtils.close(connection);
            destroyCountUpdater.incrementAndGet(this);
        }
        Arrays.fill(evictConnections, null);
    }

    if (keepAliveCount > 0) {
        // 遍历 keepAliveConnections 数组,对其中的连贯做可用性校验
        // 校验通过连贯就放入 connections 数组,没通过连贯就销毁
        for (int i = keepAliveCount - 1; i >= 0; --i) {DruidConnectionHolder holer = keepAliveConnections[i];
            Connection connection = holer.getConnection();
            holer.incrementKeepAliveCheckCount();

            boolean validate = false;
            try {this.validateConnection(connection);
                validate = true;
            } catch (Throwable error) {if (LOG.isDebugEnabled()) {LOG.debug("keepAliveErr", error);
                }
            }

            boolean discard = !validate;
            if (validate) {holer.lastKeepTimeMillis = System.currentTimeMillis();
                boolean putOk = put(holer, 0L, true);
                if (!putOk) {discard = true;}
            }

            if (discard) {
                try {connection.close();
                } catch (Exception e) { }

                lock.lock();
                try {
                    discardCount++;

                    if (activeCount + poolingCount <= minIdle) {emptySignal();
                    }
                } finally {lock.unlock();
                }
            }
        }
        this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
        Arrays.fill(keepAliveConnections, null);
    }

    // 如果 needFill 为 true 则唤醒生产连贯的线程来生产连贯
    if (needFill) {lock.lock();
        try {
            // 计算须要生产连贯的个数
            int fillCount = minIdle - (activeCount + poolingCount + createTaskCount);
            for (int i = 0; i < fillCount; ++i) {emptySignal();
            }
        } finally {lock.unlock();
        }
    } else if (onFatalError || fatalErrorIncrement > 0) {lock.lock();
        try {emptySignal();
        } finally {lock.unlock();
        }
    }
}

DruidDataSource#shrink 办法中,外围逻辑是遍历 connections 数组中的连贯,并判断这些连贯是须要销毁还是须要保活。通常状况下,connections数组中的前 checkCount(checkCount = poolingCount – minIdle) 个连贯是 危险 的,因为这些连贯只有满足了:闲暇工夫 >= minEvictableIdleTimeMillis(容许的最小闲暇工夫),那么就须要被销毁,而 connections 数组中的最初 minIdle 个连贯是 绝对平安 的,因为这些连贯只有在满足:闲暇工夫 > maxEvictableIdleTimeMillis(容许的最大闲暇工夫) 时,才会被销毁。这么判断的起因,次要就是须要让连接池里可能保障至多有 minIdle 个闲暇连贯能够让利用线程获取。

当确定好了须要销毁和须要保活的连贯后,此时会先将 connections 数组清理,只保留平安的连贯,这个过程示意图如下。

最初,会遍历 evictConnections 数组,销毁数组中的连贯,遍历 keepAliveConnections 数组,对其中的每个连贯做可用性校验,如果校验可用,那么就从新放回 connections 数组,否则销毁。

总结

连贯的创立由一个叫做 CreateConnectionThread 的线程实现,整体流程就是在一个死循环中一直的期待,被唤醒,而后创立连贯。每一个被创立进去的物理连贯 java.sql.Connection 会被封装为一个 DruidConnectionHolder,而后寄存到connections 数组中。

连贯的销毁由一个叫做 DestroyConnectionThread 的线程实现,外围逻辑是周期性的遍历 connections 数组中的连贯,并判断这些连贯是须要销毁还是须要保活,须要销毁的连贯最初会被物理销毁,须要保活的连贯最初会进行一次可用性校验,如果校验不通过,则进行物理销毁。

正文完
 0