关于java:连接池-Druid-四-连接归还

46次阅读

共计 7485 个字符,预计需要花费 19 分钟才能阅读完成。

驾轻就熟,连贯偿还是通过 Connection 的代理对象重写 close 办法实现的, 通过后面的学习咱们曾经晓得 Connectin 的代理对象是 DruidPooledConnection,所以咱们间接看 DruidPooledConnection 的 close 办法。

DruidPooledConnection#close

间接上代码:

   public void close() throws SQLException {if (this.disable) {return;}

        DruidConnectionHolder holder = this.holder;
        if (holder == null) {if (dupCloseLogEnable) {LOG.error("dup close");
            }
            return;
        }

        DruidAbstractDataSource dataSource = holder.getDataSource();
        boolean isSameThread = this.getOwnerThread() == Thread.currentThread();

        if (!isSameThread) {dataSource.setAsyncCloseConnectionEnable(true);
        }
        if (dataSource.isAsyncCloseConnectionEnable()) {syncClose();
            return;
        }

判断以后 Connection(DruidPooledConnection)的状态为 disbale、或者 connectionHolder(DruidConnectionHolder)为 null(阐明连贯已敞开了),则间接返回。

而后判断连贯的 ownerThread(获取 connection 的线程)与以后线程不是同一线程的话,则设置异步敞开 asyncCloseConnectionEnable=true。调用 syncClose() 办法。

syncClose() 办法和同线程敞开形式的代码逻辑基本一致,只不过 syncClose() 整个办法须要加锁,同线程敞开则不须要。

所以咱们就不贴出 syncClose() 办法的源码了。

在什么场景下数据库连贯会跨线程敞开?一个线程获取到数据库连贯,而后会交给另外一个线程,由另外一个线程执行连贯敞开?应用层这么做是为了实现多线程之间的事务处理?

接下来:

       if (!CLOSING_UPDATER.compareAndSet(this, 0, 1)) {return;}

        try {for (ConnectionEventListener listener : holder.getConnectionEventListeners()) {listener.connectionClosed(new ConnectionEvent(this));
            }

            List<Filter> filters = dataSource.getProxyFilters();
            if (filters.size() > 0) {FilterChainImpl filterChain = new FilterChainImpl(dataSource);
                filterChain.dataSource_recycle(this);
            } else {recycle();
            }
        } finally {CLOSING_UPDATER.set(this, 0);
        }

        this.disable = true;
    }

CAS 形式批改以后对象的 closing 属性值为 1(0->1),如果批改不胜利,则阐明有其余线程正在试图敞开以后连贯,间接返回。

获取 ConnectionHolder 的所有 ConnectionEventListener 对象,给所有监听对象发送连贯敞开告诉。

而后,获取 dataSource 的 ProxyFilters,不空的话调用 filterChain 的 dataSource_recycle 办法,无关 dataSource 过滤器咱们仍然临时不论。没有 filter 的话,调用 recycle() 回收连贯,之后 CAS 办法批改 closing 属性值为 0,并设置以后连贯对象的 disable=true。

所以,咱们发现连贯偿还应该是 recycle() 办法中实现的。

DruidPooledConnection#recycle()

连贯(DruidPooledConnection)的 recycle 办法:

   public void recycle() throws SQLException {if (this.disable) {return;}

        DruidConnectionHolder holder = this.holder;
        if (holder == null) {if (dupCloseLogEnable) {LOG.error("dup close");
            }
            return;
        }

        if (!this.abandoned) {DruidAbstractDataSource dataSource = holder.getDataSource();
            dataSource.recycle(this);
        }

        this.holder = null;
        conn = null;
        transactionInfo = null;
        closed = true;
    }

如果以后连贯没有被遗弃(abandoned)的话,调用 dataSource 的 recycle 办法回收,之后清理相干对象、设置 close 为 true。

abandoned 是在 removeAbandoned 参数关上、连贯执行时长超过设定的超时时长 removeAbandonedTimeoutMillis 之后设置的。无关 removeAbandoned 咱们前面会专门进行剖析,此处略过。

那接下来就是 DataSource 的 recycle 办法了。

DruidDataSrouce#recycle

代码比拟长,咱们还是分段剖析:


    /**
     * 回收连贯
     */
    protected void recycle(DruidPooledConnection pooledConnection) throws SQLException {
        final DruidConnectionHolder holder = pooledConnection.holder;

        if (holder == null) {LOG.warn("connectionHolder is null");
            return;
        }

        if (logDifferentThread //
            && (!isAsyncCloseConnectionEnable()) //
            && pooledConnection.ownerThread != Thread.currentThread()//) {LOG.warn("get/close not same thread");
        }

        final Connection physicalConnection = holder.conn;

        if (pooledConnection.traceEnable) {
            Object oldInfo = null;
            activeConnectionLock.lock();
            try {if (pooledConnection.traceEnable) {oldInfo = activeConnections.remove(pooledConnection);
                    pooledConnection.traceEnable = false;
                }
            } finally {activeConnectionLock.unlock();
            }
            if (oldInfo == null) {if (LOG.isWarnEnabled()) {LOG.warn("remove abandonded failed. activeConnections.size" + activeConnections.size());
                }
            }
        }
      

吐槽一句,终于呈现了哪怕是一句话的 javaDoc 阐明:回收连贯。整个 Druid 连接池的源码中,都十分悭吝,少有正文。

判断以后连贯 DruidPooledConnection 的 traceEnable 属性为 true 的话,加 activeConnectionLock 锁之后,将以后连贯从 activeConnections 中移除。

traceEnable 属性其实反馈的还是 removeAbandoned 参数(这个参数尽管在正式环境不倡议关上,然而代码中阴魂不散,到处都有)。removeAbandoned 参数关上的状况下,获取连贯的时候会将连贯放入 activeConnections 中,并同时设置连贯的 traceEnable 为 true。 所以咱们其实能够认为这个 traceEnable 其实就是 removeAbandoned 参数,换了个名字而已。

所以这里的逻辑是:removeAbandoned 参数关上的话,连贯偿还的时候如果该连贯没有被 abandoned 的话,在偿还连贯时会将以后连贯从 activeConnections 中移除,该连贯就会防止被 abandoned 解决。

接下来:

         final boolean isAutoCommit = holder.underlyingAutoCommit;
        final boolean isReadOnly = holder.underlyingReadOnly;
        final boolean testOnReturn = this.testOnReturn;

        try {
            // check need to rollback?
            if ((!isAutoCommit) && (!isReadOnly)) {pooledConnection.rollback();
            }   
           boolean isSameThread = pooledConnection.ownerThread == Thread.currentThread();
            if (!isSameThread) {
                final ReentrantLock lock = pooledConnection.lock;
                lock.lock();
                try {holder.reset();
                } finally {lock.unlock();
                }
            } else {holder.reset();
            }

            if (holder.discard) {return;}
           if (phyMaxUseCount > 0 && holder.useCount >= phyMaxUseCount) {discardConnection(holder);
                return;
            }

            if (physicalConnection.isClosed()) {lock.lock();
                try {if (holder.active) {
                        activeCount--;
                        holder.active = false;
                    }
                    closeCount++;
                } finally {lock.unlock();
                }
                return;
            }

对没有提交的事务做回滚解决。之后调用 DruidConnectionHolder 的 reset 办法,从新设置连贯属性为默认值,因为连贯获取之后利用从依据须要可能对连贯属性从新进行了设置,偿还的时候从新设置会默认值,以便每一次利用获取连贯之后都能拿到各项属性设置为默认值的连贯、而不是不确定。

这里对没有提交的事务的判断条件是!isAutoCommit,获取的是物理连贯 Connection 的 isAutocommit 属性。咱们晓得如果利用开启事务管理的话,获取连贯之后会设置连贯的 autoCommit 为 false,事务提交或回滚之后,个别状况下利用也会复原连贯的默认设置,这个时候 autoCommit 就会被复原为 true。比方 Spring 的事务框架在事务提交之后会通过 TransactionManager 的 cleanupAfterCompletion->doCleanupAfterCompletion 设置 autoCommit 为 true。所以失常来讲,利用曾经提交事务之后,autoCommit 就会变为 true,也就不须要 Druid 在偿还连贯的时候解决回滚了。

Druid 减少这个判断可能是为了给利用擦屁股(猜想而已),应用层没有启用事务框架的状况下手动开启了事务、设置 autoCommit 为 false,执行实现之后没有重置 autoCommit,这个时候利用可能 commit、也可能没有 commit 事务,Durid 的解决形式是:一律回滚。

仔细检查了 HikariPool 的连贯回收过程,并没有这个回滚解决。集体不赞成(基于猜想而已………)Druid 的这个回滚解决,因为事务提交还是回滚究竟还是利用应该关注的事件,应用层为了简化解决逻辑能够把事务管理交给框架,比方 Spring 事务框架来解决。也就是说,要么是应用层本人解决、要么交给事务框架解决事务的提交或回滚是比拟正当的抉择。

持续。

查看如果 DruidConnectionHolder 曾经被弃用(连贯曾经被弃用),间接返回,不偿还。

查看如果物理连贯曾经被敞开,则加锁批改以后 holder 的 active 状态为 false,间接返回,不偿还。

而后:

           if (testOnReturn) {boolean validate = testConnectionInternal(holder, physicalConnection);
                if (!validate) {JdbcUtils.close(physicalConnection);

                    destroyCountUpdater.incrementAndGet(this);

                    lock.lock();
                    try {if (holder.active) {
                            activeCount--;
                            holder.active = false;
                        }
                        closeCount++;
                    } finally {lock.unlock();
                    }
                    return;
                }
            }
            if (holder.initSchema != null) {holder.conn.setSchema(holder.initSchema);
                holder.initSchema = null;
            }
            if (!enable) {discardConnection(holder);
                return;
            }

查看 testOnReturn 参数如果设置为 true,则测试连贯可用性,如果连贯不可用,解决形式同上,批改 holder 的 active=false,间接返回,不偿还。

testOnReturn 参数也没有必要设置,情理和 testOnBorrow 参数一样,会影响性能。这两个参数的默认设置都是 false,不关上。

之后查看以后 dataSrouce 如果不可用,则调用 discardConnection 敞开连贯,不偿还。

而后:

            boolean result;
            final long currentTimeMillis = System.currentTimeMillis();

            if (phyTimeoutMillis > 0) {
                long phyConnectTimeMillis = currentTimeMillis - holder.connectTimeMillis;
                if (phyConnectTimeMillis > phyTimeoutMillis) {discardConnection(holder);
                    return;
                }
            }

            lock.lock();
            try {if (holder.active) {
                    activeCount--;
                    holder.active = false;
                }
                closeCount++;

                result = putLast(holder, currentTimeMillis);
                recycleCount++;
            } finally {lock.unlock();
            }
           if (!result) {JdbcUtils.close(holder.conn);
                LOG.info("connection recyle failed.");
            }
        } catch (Throwable e) {holder.clearStatementCache();

            if (!holder.discard) {discardConnection(holder);
                holder.discard = true;
            }

            LOG.error("recyle error", e);
            recycleErrorCountUpdater.incrementAndGet(this);
        }
    }

如果设置了 phyTimeoutMillis(默认设置为 -1,不查看)的话,查看以后连贯创立以来的时长是否超过了该参数的设置,超过的话敞开连贯,不偿还。 这个参数也不倡议设置。

之后,加锁,调用 putLast(holder, currentTimeMillis); 偿还连贯,如果偿还失败,则调用 JdbcUtil.close 办法,底层间接敞开连贯。

putLast 是偿还连贯的外围办法。

DruidDataSource#putLast

putLast 是连贯偿还的最初一步,目标是各种查看校验、连贯属性重置之后,最终将连贯放回到连接池 connections 中。

putLast 是在锁状态下执行的。

看代码:

   boolean putLast(DruidConnectionHolder e, long lastActiveTimeMillis) {if (poolingCount >= maxActive || e.discard || this.closed) {return false;}

        e.lastActiveTimeMillis = lastActiveTimeMillis;
        connections[poolingCount] = e;
        incrementPoolingCount();

        if (poolingCount > poolingPeak) {
            poolingPeak = poolingCount;
            poolingPeakTime = lastActiveTimeMillis;
        }

        notEmpty.signal();
        notEmptySignalCount++;

        return true;
    }

查看如果没必要偿还的话,比方连接池数量大于参数设定的 maxActive 数量、或者以后连贯曾经被遗弃、或者以后 dataSource 曾经被敞开,则不须要偿还,间接返回。

将连贯放入到 connections 的尾部,之后减少连接池数量 poolingCount。

调用 notEmpty.signal(); 唤醒期待获取连贯的线程:连接池中有新的连贯退出,能够获取了。

Druid 敞开连贯(偿还连贯)代码剖析结束!

小结

Druid 连接池的初始化、连贯获取以及连贯偿还的源码剖析结束,前面会补充一篇文章剖析连贯遗弃的解决。

Thanks a lot!

上一篇 连接池 Druid(三)– 获取连贯 getConnection

正文完
 0