关于java:连接池-Druid-三-获取连接-getConnection

3次阅读

共计 10131 个字符,预计需要花费 26 分钟才能阅读完成。

Druid 连接池只存储在 connections 数组中,所以获取连贯的逻辑应该比 HikariPool 简略一些:间接从 connectoins 获取即可。

DruidDataSource.getConnection

间接上代码:

 @Override
    public DruidPooledConnection getConnection() throws SQLException {return getConnection(maxWait);
    }

调用了 getConnection(maxWait),maxWait 是参数设定的获取连贯的最长等待时间,超过该时长还没有获取到连贯的话,抛异样。

看 getConnection(maxWait)代码:

   public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {init();

        if (filters.size() > 0) {FilterChainImpl filterChain = new FilterChainImpl(this);
            return filterChain.dataSource_connect(this, maxWaitMillis);
        } else {return getConnectionDirect(maxWaitMillis);
        }
    }

先调用 init,init 办法会判断连接池是否曾经实现了初始化,如果没有实现初始化则首先进行初始化,初始化的代码咱们上一篇文章曾经剖析过了。

之后判断是否有 filters,filters 的内容咱们先放放,临时不论,间接看没有 filters 的状况下,调用 getConnectionDirect 办法。

getConnectionDirect

办法比拟长,咱们还是老办法,分段剖析:

   public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
        int notFullTimeoutRetryCnt = 0;
        for (;;) {
            // handle notFullTimeoutRetry
            DruidPooledConnection poolableConnection;
            try {poolableConnection = getConnectionInternal(maxWaitMillis);
            } catch (GetConnectionTimeoutException ex) {if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) {
                    notFullTimeoutRetryCnt++;
                    if (LOG.isWarnEnabled()) {LOG.warn("get connection timeout retry :" + notFullTimeoutRetryCnt);
                    }
                    continue;
                }
                throw ex;
            }

上来之后首先有限 for 循环,目标是从连接池获取到连贯之后,依据参数设定可能会做必要的查看,如果查看不通过(比方连贯不可用、连贯已敞开等等)的话循环从新获取。

而后调用 getConnectionInternal 获取连贯,getConnectionInternal 办法应该是咱们明天文章的配角,咱们略微放一放,为了文章的可读性,先剖析完 getConnectionDirect 办法。

咱们假如通过调用 getConnectionInternal 办法获取到一个连贯(留神获取到的连贯对象是 DruidPooledConnection,不是 Connection 对象,这个也不难想象,连接池获取到的连贯肯定是数据库物理连贯的代理对象(或者叫封装对象,封装了数据库物理连贯 Connection 对象的对象,这个原理咱们在剖析 HikariPool 的时候曾经说过了。这个 DruidPooledConnection 对象咱们也临时放一放,前面剖析)。

调用 getConnectionInternal 办法如果返回超时异样,判断:如果以后连接池没满,而且获取连贯超时重试次数小于参数 notFullTimeoutRetryCount 设定的次数的话,则 continue,从新获取连贯。否则,抛出超时异样。

接下来:

             if (testOnBorrow) {boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
                if (!validate) {if (LOG.isDebugEnabled()) {LOG.debug("skip not validate connection.");
                    }

                    discardConnection(poolableConnection.holder);
                    continue;
                }
            } else {if (poolableConnection.conn.isClosed()) {discardConnection(poolableConnection.holder); // 传入 null,防止反复敞开
                    continue;
                }          

testOnBorrow 参数的目标是:获取连贯后是否要做连贯可用性测试,如果设定为 true 的话,调用 testConnectionInternal 测试连贯的可用性,testConnectionInternal 办法上一篇文章剖析连贯回收的时候、解决 keepAlive 的过程中就碰到过,就是执行配置好的 sql 语句测试连贯可用性,如果测试不通过的话则调用 discardConnection 敞开连贯,continue 从新获取连贯。

否则,如果 testOnBorrow 参数没有关上的话,查看以后连贯如果曾经敞开,则调用 discardConnection 敞开连贯(没太明确连贯既然曾经是敞开状态,为啥还须要调用?),continue 从新获取连贯。

不倡议关上 testOnBorrow 参数,因为连接池都会有连贯回收机制,比方上一篇文章讲过的 Druid 的 DestroyConnectionThread & DestroyTask,回收参数配置失常的话,回收机制根本能够确保连贯的可用性。关上 testOnBorrow 参数会导致每次获取连贯之后都测试连贯的可用性,重大影响零碎性能。

接下来:

          if (testWhileIdle) {
                    final DruidConnectionHolder holder = poolableConnection.holder;
                    long currentTimeMillis             = System.currentTimeMillis();
                    long lastActiveTimeMillis          = holder.lastActiveTimeMillis;
                    long lastExecTimeMillis            = holder.lastExecTimeMillis;
                    long lastKeepTimeMillis            = holder.lastKeepTimeMillis;

                    if (checkExecuteTime
                            && lastExecTimeMillis != lastActiveTimeMillis) {lastActiveTimeMillis = lastExecTimeMillis;}

                    if (lastKeepTimeMillis > lastActiveTimeMillis) {lastActiveTimeMillis = lastKeepTimeMillis;}

                    long idleMillis                    = currentTimeMillis - lastActiveTimeMillis;

                    long timeBetweenEvictionRunsMillis = this.timeBetweenEvictionRunsMillis;

                    if (timeBetweenEvictionRunsMillis <= 0) {timeBetweenEvictionRunsMillis = DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS;}

                    if (idleMillis >= timeBetweenEvictionRunsMillis
                            || idleMillis < 0 // unexcepted branch
                            ) {boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
                        if (!validate) {if (LOG.isDebugEnabled()) {LOG.debug("skip not validate connection.");
                            }

                            discardConnection(poolableConnection.holder);
                             continue;
                        }
                    }
                }
            }          

这段代码的逻辑是:参数 testWhileIdle 设置为 true 的话,查看以后链接的闲暇时长如果大于 timeBetweenEvictionRunsMillis(默认 60 秒)的话,则调用 testConnectionInternal 测试连贯可用性,连贯不可用则敞开连贯,continue 从新获取连贯。

而后:

      if (removeAbandoned) {StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
                poolableConnection.connectStackTrace = stackTrace;
                poolableConnection.setConnectedTimeNano();
                poolableConnection.traceEnable = true;

                activeConnectionLock.lock();
                try {activeConnections.put(poolableConnection, PRESENT);
                } finally {activeConnectionLock.unlock();
                }
            }

呈现了一个 removeAbandoned 参数,这个参数的意思是移除被遗弃的连贯对象,如果关上的话就把以后连贯放到 activeConnections 中,篇幅无限,这部分内容就不开展了,前面咱们会专门写一篇文章介绍 removeAbandoned 参数。

剩下的一小部分代码,很简略,依据参数设置连贯的 autoCommit,之后返回连贯 poolableConnection。

            if (!this.defaultAutoCommit) {poolableConnection.setAutoCommit(false);
            }

            return poolableConnection;
        }
    }

getConnectionDirect 办法源码剖析实现了,上面咱们要看一下 getConnectionInternal 办法,这是真正从连接池中获取连贯的办法。

getConnectionInternal

间接看代码:

   private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException {if (closed) {connectErrorCountUpdater.incrementAndGet(this);
            throw new DataSourceClosedException("dataSource already closed at" + new Date(closeTimeMillis));
        }

        if (!enable) {connectErrorCountUpdater.incrementAndGet(this);

            if (disableException != null) {throw disableException;}

            throw new DataSourceDisableException();}
        final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait);
        final int maxWaitThreadCount = this.maxWaitThreadCount;

        DruidConnectionHolder holder;

查看连接池状态如果曾经 disable 或 cloesed 的话,抛异样。

接下来:

       for (boolean createDirect = false;;) {if (createDirect) {createStartNanosUpdater.set(this, System.nanoTime());
                if (creatingCountUpdater.compareAndSet(this, 0, 1)) {PhysicalConnectionInfo pyConnInfo = DruidDataSource.this.createPhysicalConnection();
                    holder = new DruidConnectionHolder(this, pyConnInfo);
                    holder.lastActiveTimeMillis = System.currentTimeMillis();

                    creatingCountUpdater.decrementAndGet(this);
                    directCreateCountUpdater.incrementAndGet(this);

                    if (LOG.isDebugEnabled()) {LOG.debug("conn-direct_create");
                    }

                    boolean discard = false;
                    lock.lock();
                    try {if (activeCount < maxActive) {
                            activeCount++;
                            holder.active = true;
                            if (activeCount > activePeak) {
                                activePeak = activeCount;
                                activePeakTime = System.currentTimeMillis();}
                            break;
                        } else {discard = true;}
                    } finally {lock.unlock();
                    }

                    if (discard) {JdbcUtils.close(pyConnInfo.getPhysicalConnection());
                    }
                }
            }

初始化 createDirect 变量为 false 之后启动有限循环,意思是一直循环直到获取到连贯、或超时等其余异常情况产生。

紧接着的这段代码是 createDirect=true 的状况下执行的,createDirect 是在上面循环体中查看如果:createScheduler 不为空、连接池空、流动连接数小于设定的最大流动连接数 maxActive、并且 createScheduler 的队列中排队期待创立连贯的线程大于 0 的状况下,设置 createDirect 为 true 的,以上这些条件如果成立的话,大概率表明 createScheduler 中的创立线程出问题了、所以 createScheduler 大概率指望不上了,所以要间接创立连贯了。

间接创立的代码也很容易了解,调用 createPhysicalConnection 创立物理连贯,创立 DruidConnectionHolder 封装该物理连贯,创立之后获取锁资源,查看 activeCount < maxActive 则表明创立连贯胜利、完结 for 循环,否则,activeCount >= maxActive 则阐明违反了准则(间接创立连贯的过程中 createScheduler 可能复活了、又创立进去连贯放入连接池中了),所以,敞开锁资源之后,将刚创立进去的连贯敞开。

而后:

try {lock.lockInterruptibly();
            } catch (InterruptedException e) {connectErrorCountUpdater.incrementAndGet(this);
                throw new SQLException("interrupt", e);
            }

            try {
                if (maxWaitThreadCount > 0
                        && notEmptyWaitThreadCount >= maxWaitThreadCount) {connectErrorCountUpdater.incrementAndGet(this);
                    throw new SQLException("maxWaitThreadCount" + maxWaitThreadCount + ", current wait Thread count"
                            + lock.getQueueLength());
                }

                if (onFatalError
                        && onFatalErrorMaxActive > 0
                        && activeCount >= onFatalErrorMaxActive) {connectErrorCountUpdater.incrementAndGet(this);

                    StringBuilder errorMsg = new StringBuilder();
                    errorMsg.append("onFatalError, activeCount")
                            .append(activeCount)
                            .append(", onFatalErrorMaxActive")
                            .append(onFatalErrorMaxActive);

                    if (lastFatalErrorTimeMillis > 0) {errorMsg.append(", time'")
                                .append(StringUtils.formatDateTime19(lastFatalErrorTimeMillis, TimeZone.getDefault()))
                                .append("'");
                    }

                    if (lastFatalErrorSql != null) {errorMsg.append(", sql \n")
                                .append(lastFatalErrorSql);
                    }

                    throw new SQLException(errorMsg.toString(), lastFatalError);
                }

                connectCount++;

                if (createScheduler != null
                        && poolingCount == 0
                        && activeCount < maxActive
                        && creatingCountUpdater.get(this) == 0
                        && createScheduler instanceof ScheduledThreadPoolExecutor) {ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) createScheduler;
                    if (executor.getQueue().size() > 0) {
                        createDirect = true;
                        continue;
                    }
                }

获取锁资源,查看期待获取连贯的线程数如果大于参数设置的最大期待线程数,抛异样。

查看并解决异样。

累加 connectCount。

之后是下面提到过的对 createDirect 的解决。

接下来到了最为要害的局部,个别状况下 createDirect 为 false,不会间接创立连贯,逻辑会走到上面这部分代码中,从连接池中获取连贯:

               if (maxWait > 0) {holder = pollLast(nanos);
                } else {holder = takeLast();
                }

                if (holder != null) {if (holder.discard) {continue;}

                    activeCount++;
                    holder.active = true;
                    if (activeCount > activePeak) {
                        activePeak = activeCount;
                        activePeakTime = System.currentTimeMillis();}
                }
            } catch (InterruptedException e) {connectErrorCountUpdater.incrementAndGet(this);
                throw new SQLException(e.getMessage(), e);
            } catch (SQLException e) {connectErrorCountUpdater.incrementAndGet(this);
                throw e;
            } finally {lock.unlock();
            }     

如果参数设置了 maxWait,则调用 pollLast 限时获取,否则调用 takeLast 获取连贯,这两个办法稍后剖析。

之后查看获取到的连贯曾经被 discard 的话,continue 从新获取连贯。

开释锁资源。

从连接池中获取到了连贯,完结 for 循环。

如果 takeLast 或 poolLast 返回的 DruidConnectionHolder 为 null 的话(调用 poolLast 超时),处理错误信息,抛 GetConnectionTimeoutException 超时异样(这部分代码没有贴出,省略了 …… 感兴趣的童鞋本人关上源码看一下)。

否则,用 DruidConnectionHolder 封装创立 DruidPooledConnection 后返回。

takeLast & pollLast(nanos)

这两个办法的逻辑其实差不多,次要区别一个是限时,一个不限时,两个办法都是在锁状态下执行。

具体调用哪一个办法取决于参数 maxWait,默认值为 -1,默认状况下会调用 takeLast,获取连贯的时候不限时。

倡议设置 maxWait,否则在非凡状况下如果创立连贯失败、会导致应用层线程挂起,获取不到任何返回的状况呈现。如果设置了 maxWait,getConnection 办法会调用 pollLast(nanos),获取不到连贯后,应用层会失去连贯超时的反馈。

先看 takeLast 办法:

takeLast() throws InterruptedException, SQLException {
        try {while (poolingCount == 0) {emptySignal(); // send signal to CreateThread create connection

                if (failFast && isFailContinuous()) {throw new DataSourceNotAvailableException(createError);
                }

                notEmptyWaitThreadCount++;
                if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) {notEmptyWaitThreadPeak = notEmptyWaitThreadCount;}
                try {notEmpty.await(); // signal by recycle or creator
                } finally {notEmptyWaitThreadCount--;}
                notEmptyWaitCount++;

                if (!enable) {connectErrorCountUpdater.incrementAndGet(this);
                    if (disableException != null) {throw disableException;}

                    throw new DataSourceDisableException();}
            }
        } catch (InterruptedException ie) {notEmpty.signal(); // propagate to non-interrupted thread
            notEmptySignalCount++;
            throw ie;
        }

        decrementPoolingCount();
        DruidConnectionHolder last = connections[poolingCount];
        connections[poolingCount] = null;

        return last;
    }

如果连接池为空 (poolingCount == 0) 的话,有限循环。

调用 emptySignal(),告诉创立连接线程,有人在期待获取连贯,抓紧时间创立连贯。

而后调用 notEmpty.await(),期待创立连接线程在实现创立、或者有连贯偿还到连接池中后唤醒告诉。

如果产生异样,调用一下 notEmpty.signal()告诉其余获取连贯的线程,没准本人没能获取胜利、其余线程能获取胜利。

上面的代码,线程池肯定不空了。

线程池的线程数量减 1(decrementPoolingCount),而后获取 connections 的最初一个元素返回。

pollLast 办法的代码逻辑和 takeLast 的相似,只不过线程池空的话,以后线程会限时挂起期待,超时依然不能获取到连贯的话,间接返回 null。

Druid 连接池获取连贯代码剖析结束!

小结

Druid 连接池的连贯获取过程的源码剖析结束,前面还有连贯偿还过程,下一篇文章持续剖析。

Thanks a lot!

上一篇 连接池 Druid(二)– 连贯回收 DestroyConnectionThread & DestroyTask

正文完
 0