关于druid:聊聊druid的borrow行为

31次阅读

共计 20607 个字符,预计需要花费 52 分钟才能阅读完成。

本文次要钻研一下 druid 的 borrow 行为

getConnection

com/alibaba/druid/pool/DruidDataSource.java

    public DruidPooledConnection getConnection() throws SQLException {return getConnection(maxWait);
    }

    public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {init();

        if (filters.size() > 0) {FilterChainImpl filterChain = new FilterChainImpl(this);
            return filterChain.dataSource_connect(this, maxWaitMillis);
        } else {return getConnectionDirect(maxWaitMillis);
        }
    }

DruidDataSource 的 getConnection 办法外部调用的是 getConnectionDirect(maxWaitMillis)

getConnectionDirect

com/alibaba/druid/pool/DruidDataSource.java

    public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
        int notFullTimeoutRetryCnt = 0;
        for (; ;) {
            // handle notFullTimeoutRetry
            DruidPooledConnection poolableConnection;
            try {poolableConnection = getConnectionInternal(maxWaitMillis);
            } catch (GetConnectionTimeoutException ex) {if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) {
                    notFullTimeoutRetryCnt++;
                    if (LOG.isWarnEnabled()) {LOG.warn("get connection timeout retry :" + notFullTimeoutRetryCnt);
                    }
                    continue;
                }
                throw ex;
            }

            if (testOnBorrow) {boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
                if (!validate) {if (LOG.isDebugEnabled()) {LOG.debug("skip not validate connection.");
                    }

                    discardConnection(poolableConnection.holder);
                    continue;
                }
            } else {if (poolableConnection.conn.isClosed()) {discardConnection(poolableConnection.holder); // 传入 null,防止反复敞开
                    continue;
                }

                if (testWhileIdle) {
                    final DruidConnectionHolder holder = poolableConnection.holder;
                    long currentTimeMillis = System.currentTimeMillis();
                    long lastActiveTimeMillis = holder.lastActiveTimeMillis;
                    long lastExecTimeMillis = holder.lastExecTimeMillis;
                    long lastKeepTimeMillis = holder.lastKeepTimeMillis;

                    if (checkExecuteTime
                            && lastExecTimeMillis != lastActiveTimeMillis) {lastActiveTimeMillis = lastExecTimeMillis;}

                    if (lastKeepTimeMillis > lastActiveTimeMillis) {lastActiveTimeMillis = lastKeepTimeMillis;}

                    long idleMillis = currentTimeMillis - lastActiveTimeMillis;

                    long timeBetweenEvictionRunsMillis = this.timeBetweenEvictionRunsMillis;

                    if (timeBetweenEvictionRunsMillis <= 0) {timeBetweenEvictionRunsMillis = DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS;}

                    if (idleMillis >= timeBetweenEvictionRunsMillis
                            || idleMillis < 0 // unexcepted branch
                    ) {boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
                        if (!validate) {if (LOG.isDebugEnabled()) {LOG.debug("skip not validate connection.");
                            }

                            discardConnection(poolableConnection.holder);
                            continue;
                        }
                    }
                }
            }

            if (removeAbandoned) {StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
                poolableConnection.connectStackTrace = stackTrace;
                poolableConnection.setConnectedTimeNano();
                poolableConnection.traceEnable = true;

                activeConnectionLock.lock();
                try {activeConnections.put(poolableConnection, PRESENT);
                } finally {activeConnectionLock.unlock();
                }
            }

            if (!this.defaultAutoCommit) {poolableConnection.setAutoCommit(false);
            }

            return poolableConnection;
        }
    }

    public boolean isFull() {lock.lock();
        try {return this.poolingCount + this.activeCount >= this.maxActive;} finally {lock.unlock();
        }
    }

getConnectionDirect 在一个 for 循环外头进行获取连贯,首先执行 getConnectionInternal(maxWaitMillis),若呈现 GetConnectionTimeoutException 异样,则在 notFull 且 notFullTimeoutRetryCnt 小于等于 this.notFullTimeoutRetryCount 时会递增 notFullTimeoutRetryCnt,而后 continue 持续循环,否则间接抛出 GetConnectionTimeoutException 跳出循环

获取到连贯之后,判断是否是 testOnBorrow,如果是则执行 testConnectionInternal,若校验不胜利则执行 discardConnection,而后持续循环;若非 testOnBorrow 则判断 conn 是否 closed,若是则执行 discardConnection,而后持续循环,若非 closed 则进入 testWhileIdle 的逻辑 (druid 间接在 getConnection 的时候执行 testWhileIdle 有点令人匪夷所思 )

最初是 removeAbandoned,保护 connectedTimeNano,将以后连贯放到 activeConnections 中

getConnectionInternal

com/alibaba/druid/pool/DruidDataSource.java

    private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException {if (closed) {connectErrorCountUpdater.incrementAndGet(this);
            throw new DataSourceClosedException("dataSource already closed at" + new Date(closeTimeMillis));
        }

        if (!enable) {connectErrorCountUpdater.incrementAndGet(this);

            if (disableException != null) {throw disableException;}

            throw new DataSourceDisableException();}

        final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait);
        final int maxWaitThreadCount = this.maxWaitThreadCount;

        DruidConnectionHolder holder;

        for (boolean createDirect = false; ;) {if (createDirect) {createStartNanosUpdater.set(this, System.nanoTime());
                if (creatingCountUpdater.compareAndSet(this, 0, 1)) {PhysicalConnectionInfo pyConnInfo = DruidDataSource.this.createPhysicalConnection();
                    holder = new DruidConnectionHolder(this, pyConnInfo);
                    holder.lastActiveTimeMillis = System.currentTimeMillis();

                    creatingCountUpdater.decrementAndGet(this);
                    directCreateCountUpdater.incrementAndGet(this);

                    if (LOG.isDebugEnabled()) {LOG.debug("conn-direct_create");
                    }

                    boolean discard;
                    lock.lock();
                    try {if (activeCount < maxActive) {
                            activeCount++;
                            holder.active = true;
                            if (activeCount > activePeak) {
                                activePeak = activeCount;
                                activePeakTime = System.currentTimeMillis();}
                            break;
                        } else {discard = true;}
                    } finally {lock.unlock();
                    }

                    if (discard) {JdbcUtils.close(pyConnInfo.getPhysicalConnection());
                    }
                }
            }

            try {lock.lockInterruptibly();
            } catch (InterruptedException e) {connectErrorCountUpdater.incrementAndGet(this);
                throw new SQLException("interrupt", e);
            }

            try {
                if (maxWaitThreadCount > 0
                        && notEmptyWaitThreadCount >= maxWaitThreadCount) {connectErrorCountUpdater.incrementAndGet(this);
                    throw new SQLException("maxWaitThreadCount" + maxWaitThreadCount + ", current wait Thread count"
                            + lock.getQueueLength());
                }

                if (onFatalError
                        && onFatalErrorMaxActive > 0
                        && activeCount >= onFatalErrorMaxActive) {connectErrorCountUpdater.incrementAndGet(this);

                    StringBuilder errorMsg = new StringBuilder();
                    errorMsg.append("onFatalError, activeCount")
                            .append(activeCount)
                            .append(", onFatalErrorMaxActive")
                            .append(onFatalErrorMaxActive);

                    if (lastFatalErrorTimeMillis > 0) {errorMsg.append(", time'")
                                .append(StringUtils.formatDateTime19(lastFatalErrorTimeMillis, TimeZone.getDefault()))
                                .append("'");
                    }

                    if (lastFatalErrorSql != null) {errorMsg.append(", sql \n")
                                .append(lastFatalErrorSql);
                    }

                    throw new SQLException(errorMsg.toString(), lastFatalError);
                }

                connectCount++;

                if (createScheduler != null
                        && poolingCount == 0
                        && activeCount < maxActive
                        && creatingCountUpdater.get(this) == 0
                        && createScheduler instanceof ScheduledThreadPoolExecutor) {ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) createScheduler;
                    if (executor.getQueue().size() > 0) {
                        createDirect = true;
                        continue;
                    }
                }

                if (maxWait > 0) {holder = pollLast(nanos);
                } else {holder = takeLast();
                }

                if (holder != null) {if (holder.discard) {continue;}

                    activeCount++;
                    holder.active = true;
                    if (activeCount > activePeak) {
                        activePeak = activeCount;
                        activePeakTime = System.currentTimeMillis();}
                }
            } catch (InterruptedException e) {connectErrorCountUpdater.incrementAndGet(this);
                throw new SQLException(e.getMessage(), e);
            } catch (SQLException e) {connectErrorCountUpdater.incrementAndGet(this);
                throw e;
            } finally {lock.unlock();
            }

            break;
        }

        if (holder == null) {long waitNanos = waitNanosLocal.get();

            final long activeCount;
            final long maxActive;
            final long creatingCount;
            final long createStartNanos;
            final long createErrorCount;
            final Throwable createError;
            try {lock.lock();
                activeCount = this.activeCount;
                maxActive = this.maxActive;
                creatingCount = this.creatingCount;
                createStartNanos = this.createStartNanos;
                createErrorCount = this.createErrorCount;
                createError = this.createError;
            } finally {lock.unlock();
            }

            StringBuilder buf = new StringBuilder(128);
            buf.append("wait millis")
                    .append(waitNanos / (1000 * 1000))
                    .append(", active").append(activeCount)
                    .append(", maxActive").append(maxActive)
                    .append(", creating").append(creatingCount);

            if (creatingCount > 0 && createStartNanos > 0) {long createElapseMillis = (System.nanoTime() - createStartNanos) / (1000 * 1000);
                if (createElapseMillis > 0) {buf.append(", createElapseMillis").append(createElapseMillis);
                }
            }

            if (createErrorCount > 0) {buf.append(", createErrorCount").append(createErrorCount);
            }

            List<JdbcSqlStatValue> sqlList = this.getDataSourceStat().getRuningSqlList();
            for (int i = 0; i < sqlList.size(); ++i) {if (i != 0) {buf.append('\n');
                } else {buf.append(",");
                }
                JdbcSqlStatValue sql = sqlList.get(i);
                buf.append("runningSqlCount").append(sql.getRunningCount());
                buf.append(":");
                buf.append(sql.getSql());
            }

            String errorMessage = buf.toString();

            if (createError != null) {throw new GetConnectionTimeoutException(errorMessage, createError);
            } else {throw new GetConnectionTimeoutException(errorMessage);
            }
        }

        holder.incrementUseCount();

        DruidPooledConnection poolalbeConnection = new DruidPooledConnection(holder);
        return poolalbeConnection;
    }

getConnectionInternal 办法先判断是否 closed,如果是则抛出 DataSourceClosedException,接着判断是否 enable,如果不是则抛出 DataSourceDisableException,紧接着 for 循环,它次要依据 createDirect 来执行不同逻辑,第一次默认 createDirect 为 false;

createDirect 为 false,对于 notEmptyWaitThreadCount 大于等于 maxWaitThreadCount 则抛出 SQLException,对于 poolingCount 为 0 且 activeCount 小于 maxActive,createScheduler 的 queue 大小大于 0 的,则设置 createDirect 为 true;否则对于 maxWait 大于 0 的,执行 pollLast(nanos),否则执行 takeLast()

createDirect 为 true,会通过 DruidDataSource.this.createPhysicalConnection() 创立物理连贯,对于 activeCount 小于 maxActive 的,则保护 activeCount 跳出循环,否则标记 discard 为 true,通过 JdbcUtils.close(pyConnInfo.getPhysicalConnection()) 敞开连贯

pollLast

    private DruidConnectionHolder pollLast(long nanos) throws InterruptedException, SQLException {
        long estimate = nanos;

        for (; ;) {if (poolingCount == 0) {emptySignal(); // send signal to CreateThread create connection

                if (failFast && isFailContinuous()) {throw new DataSourceNotAvailableException(createError);
                }

                if (estimate <= 0) {waitNanosLocal.set(nanos - estimate);
                    return null;
                }

                notEmptyWaitThreadCount++;
                if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) {notEmptyWaitThreadPeak = notEmptyWaitThreadCount;}

                try {
                    long startEstimate = estimate;
                    estimate = notEmpty.awaitNanos(estimate); // signal by
                    // recycle or
                    // creator
                    notEmptyWaitCount++;
                    notEmptyWaitNanos += (startEstimate - estimate);

                    if (!enable) {connectErrorCountUpdater.incrementAndGet(this);

                        if (disableException != null) {throw disableException;}

                        throw new DataSourceDisableException();}
                } catch (InterruptedException ie) {notEmpty.signal(); // propagate to non-interrupted thread
                    notEmptySignalCount++;
                    throw ie;
                } finally {notEmptyWaitThreadCount--;}

                if (poolingCount == 0) {if (estimate > 0) {continue;}

                    waitNanosLocal.set(nanos - estimate);
                    return null;
                }
            }

            decrementPoolingCount();
            DruidConnectionHolder last = connections[poolingCount];
            connections[poolingCount] = null;

            long waitNanos = nanos - estimate;
            last.setLastNotEmptyWaitNanos(waitNanos);

            return last;
        }
    }

pollLast 办法在 poolingCount 为 0 时执行 emptySignal,另外次要是解决 notEmpty 这个 condition,而后取 connections[poolingCount]

takeLast

    DruidConnectionHolder takeLast() throws InterruptedException, SQLException {
        try {while (poolingCount == 0) {emptySignal(); // send signal to CreateThread create connection

                if (failFast && isFailContinuous()) {throw new DataSourceNotAvailableException(createError);
                }

                notEmptyWaitThreadCount++;
                if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) {notEmptyWaitThreadPeak = notEmptyWaitThreadCount;}
                try {notEmpty.await(); // signal by recycle or creator
                } finally {notEmptyWaitThreadCount--;}
                notEmptyWaitCount++;

                if (!enable) {connectErrorCountUpdater.incrementAndGet(this);
                    if (disableException != null) {throw disableException;}

                    throw new DataSourceDisableException();}
            }
        } catch (InterruptedException ie) {notEmpty.signal(); // propagate to non-interrupted thread
            notEmptySignalCount++;
            throw ie;
        }

        decrementPoolingCount();
        DruidConnectionHolder last = connections[poolingCount];
        connections[poolingCount] = null;

        return last;
    }

takeLast 办法在 poolingCount 为 0 的时候执行 emptySignal,而后通过 notEmpty.await() 进行阻塞期待,最初返回 connections[poolingCount]

emptySignal

    private void emptySignal() {if (createScheduler == null) {empty.signal();
            return;
        }

        if (createTaskCount >= maxCreateTaskCount) {return;}

        if (activeCount + poolingCount + createTaskCount >= maxActive) {return;}
        submitCreateTask(false);
    }

emptySignal 办法,对于 createScheduler 为 null 的执行 empty.signal(),之后判断 task 数量即 maxActive 判断,最初执行 submitCreateTask(false)

submitCreateTask

    private void submitCreateTask(boolean initTask) {
        createTaskCount++;
        CreateConnectionTask task = new CreateConnectionTask(initTask);
        if (createTasks == null) {createTasks = new long[8];
        }

        boolean putted = false;
        for (int i = 0; i < createTasks.length; ++i) {if (createTasks[i] == 0) {createTasks[i] = task.taskId;
                putted = true;
                break;
            }
        }
        if (!putted) {long[] array = new long[createTasks.length * 3 / 2];
            System.arraycopy(createTasks, 0, array, 0, createTasks.length);
            array[createTasks.length] = task.taskId;
            createTasks = array;
        }

        this.createSchedulerFuture = createScheduler.submit(task);
    }

submitCreateTask 会创立 CreateConnectionTask,而后提交到 createScheduler 执行

CreateConnectionTask

com/alibaba/druid/pool/DruidDataSource.java

    public class CreateConnectionTask implements Runnable {
        private int errorCount;
        private boolean initTask;
        private final long taskId;

        public CreateConnectionTask() {taskId = createTaskIdSeedUpdater.getAndIncrement(DruidDataSource.this);
        }

        public CreateConnectionTask(boolean initTask) {taskId = createTaskIdSeedUpdater.getAndIncrement(DruidDataSource.this);
            this.initTask = initTask;
        }

        @Override
        public void run() {runInternal();
        }

        private void runInternal() {for (; ;) {
                // addLast
                lock.lock();
                try {if (closed || closing) {clearCreateTask(taskId);
                        return;
                    }

                    boolean emptyWait = true;

                    if (createError != null && poolingCount == 0) {emptyWait = false;}

                    if (emptyWait) {
                        // 必须存在线程期待,才创立连贯
                        if (poolingCount >= notEmptyWaitThreadCount //
                                && (!(keepAlive && activeCount + poolingCount < minIdle)) // 在 keepAlive 场景不能放弃创立
                                && (!initTask) // 线程池初始化时的工作不能放弃创立
                                && !isFailContinuous() // failContinuous 时不能放弃创立,否则会无奈创立线程
                                && !isOnFatalError() // onFatalError 时不能放弃创立,否则会无奈创立线程) {clearCreateTask(taskId);
                            return;
                        }

                        // 避免创立超过 maxActive 数量的连贯
                        if (activeCount + poolingCount >= maxActive) {clearCreateTask(taskId);
                            return;
                        }
                    }
                } finally {lock.unlock();
                }

                PhysicalConnectionInfo physicalConnection = null;

                try {physicalConnection = createPhysicalConnection();
                } catch (OutOfMemoryError e) {LOG.error("create connection OutOfMemoryError, out memory.", e);

                    errorCount++;
                    if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
                        // fail over retry attempts
                        setFailContinuous(true);
                        if (failFast) {lock.lock();
                            try {notEmpty.signalAll();
                            } finally {lock.unlock();
                            }
                        }

                        if (breakAfterAcquireFailure) {lock.lock();
                            try {clearCreateTask(taskId);
                            } finally {lock.unlock();
                            }
                            return;
                        }

                        this.errorCount = 0; // reset errorCount
                        if (closing || closed) {lock.lock();
                            try {clearCreateTask(taskId);
                            } finally {lock.unlock();
                            }
                            return;
                        }

                        createSchedulerFuture = createScheduler.schedule(this, timeBetweenConnectErrorMillis, TimeUnit.MILLISECONDS);
                        return;
                    }
                } catch (SQLException e) {LOG.error("create connection SQLException, url:" + jdbcUrl, e);

                    errorCount++;
                    if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
                        // fail over retry attempts
                        setFailContinuous(true);
                        if (failFast) {lock.lock();
                            try {notEmpty.signalAll();
                            } finally {lock.unlock();
                            }
                        }

                        if (breakAfterAcquireFailure) {lock.lock();
                            try {clearCreateTask(taskId);
                            } finally {lock.unlock();
                            }
                            return;
                        }

                        this.errorCount = 0; // reset errorCount
                        if (closing || closed) {lock.lock();
                            try {clearCreateTask(taskId);
                            } finally {lock.unlock();
                            }
                            return;
                        }

                        createSchedulerFuture = createScheduler.schedule(this, timeBetweenConnectErrorMillis, TimeUnit.MILLISECONDS);
                        return;
                    }
                } catch (RuntimeException e) {LOG.error("create connection RuntimeException", e);
                    // unknow fatal exception
                    setFailContinuous(true);
                    continue;
                } catch (Error e) {lock.lock();
                    try {clearCreateTask(taskId);
                    } finally {lock.unlock();
                    }
                    LOG.error("create connection Error", e);
                    // unknow fatal exception
                    setFailContinuous(true);
                    break;
                } catch (Throwable e) {lock.lock();
                    try {clearCreateTask(taskId);
                    } finally {lock.unlock();
                    }

                    LOG.error("create connection unexecpted error.", e);
                    break;
                }

                if (physicalConnection == null) {continue;}

                physicalConnection.createTaskId = taskId;
                boolean result = put(physicalConnection);
                if (!result) {JdbcUtils.close(physicalConnection.getPhysicalConnection());
                    LOG.info("put physical connection to pool failed.");
                }
                break;
            }
        }
    }

CreateConnectionTask 通过 for 循环,而后加锁解决 minIdle 及 maxActive,最初通过 createPhysicalConnection 创立物理连贯

createPhysicalConnection

com/alibaba/druid/pool/DruidAbstractDataSource.java

    public PhysicalConnectionInfo createPhysicalConnection() throws SQLException {String url = this.getUrl();
        Properties connectProperties = getConnectProperties();

        String user;
        if (getUserCallback() != null) {user = getUserCallback().getName();} else {user = getUsername();
        }

        String password = getPassword();
        PasswordCallback passwordCallback = getPasswordCallback();

        if (passwordCallback != null) {if (passwordCallback instanceof DruidPasswordCallback) {DruidPasswordCallback druidPasswordCallback = (DruidPasswordCallback) passwordCallback;

                druidPasswordCallback.setUrl(url);
                druidPasswordCallback.setProperties(connectProperties);
            }

            char[] chars = passwordCallback.getPassword();
            if (chars != null) {password = new String(chars);
            }
        }

        Properties physicalConnectProperties = new Properties();
        if (connectProperties != null) {physicalConnectProperties.putAll(connectProperties);
        }

        if (user != null && user.length() != 0) {physicalConnectProperties.put("user", user);
        }

        if (password != null && password.length() != 0) {physicalConnectProperties.put("password", password);
        }

        Connection conn = null;

        long connectStartNanos = System.nanoTime();
        long connectedNanos, initedNanos, validatedNanos;

        Map<String, Object> variables = initVariants
                ? new HashMap<String, Object>()
                : null;
        Map<String, Object> globalVariables = initGlobalVariants
                ? new HashMap<String, Object>()
                : null;

        createStartNanosUpdater.set(this, connectStartNanos);
        creatingCountUpdater.incrementAndGet(this);
        try {conn = createPhysicalConnection(url, physicalConnectProperties);
            connectedNanos = System.nanoTime();

            if (conn == null) {throw new SQLException("connect error, url" + url + ", driverClass" + this.driverClass);
            }

            initPhysicalConnection(conn, variables, globalVariables);
            initedNanos = System.nanoTime();

            validateConnection(conn);
            validatedNanos = System.nanoTime();

            setFailContinuous(false);
            setCreateError(null);
        } catch (SQLException ex) {setCreateError(ex);
            JdbcUtils.close(conn);
            throw ex;
        } catch (RuntimeException ex) {setCreateError(ex);
            JdbcUtils.close(conn);
            throw ex;
        } catch (Error ex) {createErrorCountUpdater.incrementAndGet(this);
            setCreateError(ex);
            JdbcUtils.close(conn);
            throw ex;
        } finally {long nano = System.nanoTime() - connectStartNanos;
            createTimespan += nano;
            creatingCountUpdater.decrementAndGet(this);
        }

        return new PhysicalConnectionInfo(conn, connectStartNanos, connectedNanos, initedNanos, validatedNanos, variables, globalVariables);
    }

createPhysicalConnection 通过 try catch 去创立物理连贯,若有异样则会通过 JdbcUtils.close(conn) 去敞开连贯

testConnectionInternal

    protected boolean testConnectionInternal(DruidConnectionHolder holder, Connection conn) {String sqlFile = JdbcSqlStat.getContextSqlFile();
        String sqlName = JdbcSqlStat.getContextSqlName();

        if (sqlFile != null) {JdbcSqlStat.setContextSqlFile(null);
        }
        if (sqlName != null) {JdbcSqlStat.setContextSqlName(null);
        }
        try {if (validConnectionChecker != null) {boolean valid = validConnectionChecker.isValidConnection(conn, validationQuery, validationQueryTimeout);
                long currentTimeMillis = System.currentTimeMillis();
                if (holder != null) {
                    holder.lastValidTimeMillis = currentTimeMillis;
                    holder.lastExecTimeMillis = currentTimeMillis;
                }

                if (valid && isMySql) { // unexcepted branch
                    long lastPacketReceivedTimeMs = MySqlUtils.getLastPacketReceivedTimeMs(conn);
                    if (lastPacketReceivedTimeMs > 0) {
                        long mysqlIdleMillis = currentTimeMillis - lastPacketReceivedTimeMs;
                        if (lastPacketReceivedTimeMs > 0 //
                                && mysqlIdleMillis >= timeBetweenEvictionRunsMillis) {discardConnection(holder);
                            String errorMsg = "discard long time none received connection."
                                    + ", jdbcUrl :" + jdbcUrl
                                    + ", version :" + VERSION.getVersionNumber()
                                    + ", lastPacketReceivedIdleMillis :" + mysqlIdleMillis;
                            LOG.warn(errorMsg);
                            return false;
                        }
                    }
                }

                if (valid && onFatalError) {lock.lock();
                    try {if (onFatalError) {onFatalError = false;}
                    } finally {lock.unlock();
                    }
                }

                return valid;
            }

            if (conn.isClosed()) {return false;}

            if (null == validationQuery) {return true;}

            Statement stmt = null;
            ResultSet rset = null;
            try {stmt = conn.createStatement();
                if (getValidationQueryTimeout() > 0) {stmt.setQueryTimeout(validationQueryTimeout);
                }
                rset = stmt.executeQuery(validationQuery);
                if (!rset.next()) {return false;}
            } finally {JdbcUtils.close(rset);
                JdbcUtils.close(stmt);
            }

            if (onFatalError) {lock.lock();
                try {if (onFatalError) {onFatalError = false;}
                } finally {lock.unlock();
                }
            }

            return true;
        } catch (Throwable ex) {
            // skip
            return false;
        } finally {if (sqlFile != null) {JdbcSqlStat.setContextSqlFile(sqlFile);
            }
            if (sqlName != null) {JdbcSqlStat.setContextSqlName(sqlName);
            }
        }
    }

testConnectionInternal 次要通过 validConnectionChecker.isValidConnection(conn, validationQuery, validationQueryTimeout) 来校验连贯,如果 validConnectionChecker 为 null 则通过 jdbc 执行 validationQuery 进行校验

discardConnection

    public void discardConnection(DruidConnectionHolder holder) {if (holder == null) {return;}

        Connection conn = holder.getConnection();
        if (conn != null) {JdbcUtils.close(conn);
        }

        lock.lock();
        try {if (holder.discard) {return;}

            if (holder.active) {
                activeCount--;
                holder.active = false;
            }
            discardCount++;

            holder.discard = true;

            if (activeCount <= minIdle) {emptySignal();
            }
        } finally {lock.unlock();
        }
    }

discardConnection 办法次要是敞开 connection,之后桎梏解决一些统计标记

小结

DruidDataSource 的 getConnection 办法外部调用的是 getConnectionDirect(maxWaitMillis)

getConnectionDirect 在一个 for 循环外头进行获取连贯,首先执行 getConnectionInternal(maxWaitMillis),若呈现 GetConnectionTimeoutException 异样,则在 notFull 且 notFullTimeoutRetryCnt 小于等于 this.notFullTimeoutRetryCount 时会递增 notFullTimeoutRetryCnt,而后 continue 持续循环,否则间接抛出 GetConnectionTimeoutException 跳出循环

获取到连贯之后,判断是否是 testOnBorrow,如果是则执行 testConnectionInternal,若校验不胜利则执行 discardConnection,而后持续循环;若非 testOnBorrow 则判断 conn 是否 closed,若是则执行 discardConnection,而后持续循环,若非 closed 则进入 testWhileIdle 的逻辑

最初是 removeAbandoned,保护 connectedTimeNano,将以后连贯放到 activeConnections 中

整体代码看下来感觉跟 commons-pool 相比,druid 代码的实现感觉有点毛糙,形象层级不够高,代码充斥大量统计标记、状态位的解决,保护起来得很小心,另外 druid 间接在 getConnection 的时候执行 testWhileIdle 有点令人匪夷所思

正文完
 0