关于spring:连接池-HikariPool-二-获取及关闭连接

获取数据库连贯是通过DataSource发动的,如果利用应用HikariPool作为连接池的话,须要配置DataSource为HikariDataSource,利用通过调用HikariDataSource的getConnection办法获取数据库连贯。

敞开数据库连贯是间接调用获取到的数据库连贯对象(Connection对象)的close办法实现的。

明天要钻研的课题:获取及敞开数据库连贯,获取数据库连贯其实就是从HikariDataSource的getConnection办法动手,直到getConnection办法获取到数据库连贯对象。而后再钻研数据库连贯对象的close办法。

获取数据库连贯

间接看HikariDataSource的getConnection办法:

 public Connection getConnection() throws SQLException
   {
      if (isClosed()) {
         throw new SQLException("HikariDataSource " + this + " has been closed.");
      }

      if (fastPathPool != null) {
         return fastPathPool.getConnection();
      }

      // See http://en.wikipedia.org/wiki/Double-checked_locking#Usage_in_Java
      HikariPool result = pool;
      if (result == null) {
         synchronized (this) {
            result = pool;
            if (result == null) {
               validate();
               LOGGER.info("{} - Starting...", getPoolName());
               try {
                  pool = result = new HikariPool(this);
                  this.seal();
               }
               catch (PoolInitializationException pie) {
                  if (pie.getCause() instanceof SQLException) {
                     throw (SQLException) pie.getCause();
                  }
                  else {
                     throw pie;
                  }
               }
               LOGGER.info("{} - Start completed.", getPoolName());
            }
         }
      }

fastPathPool在HikariDataSource实例化的时候被初始化为HikariPool对象。如果fastPathPool在getConnection办法调用的时候尚未初始化,就在getConnection办法内实现初始化。之后再调用HikariPool对象的getConnection办法。

HikariPool#getConnection办法

间接看代码:

       public Connection getConnection() throws SQLException
   {
      return getConnection(connectionTimeout);
   }

间接调用了getConnection(connectionTimeout)办法:

public Connection getConnection(final long hardTimeout) throws SQLException
   {
      suspendResumeLock.acquire();
      final long startTime = currentTime();

      try {
         long timeout = hardTimeout;
         do {
            PoolEntry poolEntry = connectionBag.borrow(timeout, MILLISECONDS);
            if (poolEntry == null) {
               break; // We timed out... break and throw exception
            }

            final long now = currentTime();
            if (poolEntry.isMarkedEvicted() || (elapsedMillis(poolEntry.lastAccessed, now) > aliveBypassWindowMs && !isConnectionAlive(poolEntry.connection))) {
               closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE);
               timeout = hardTimeout - elapsedMillis(startTime);
            }
            else {
               metricsTracker.recordBorrowStats(poolEntry, startTime);
               return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now);
            }
         } while (timeout > 0L);

         metricsTracker.recordBorrowTimeoutStats(startTime);
         throw createTimeoutException(startTime);
      }
      catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         throw new SQLException(poolName + " - Interrupted during connection acquisition", e);
      }
      finally {
         suspendResumeLock.release();
      }
   }

首先调用suspendResumeLock.acquire();办法,suspendResumeLock的次要作用是限流,初始化的时候设置最大并发线程数为MAX_PERMITS = 10000,如果同时获取连贯的线程未超过限度数量的话,容许获取连贯,否则,挂起、排队(外部通过Semaphore类实现,Semaphore是AQS的扩大实现)期待其余线程获取到连贯、开释suspendResumeLock之后,能力持续获取到数据库连贯。

而后调用connectionBag的borrow办法(稍后剖析源码),从数据库连接池中获取连贯。

如果没能从数据库连接池获取到连贯的话,抛异样。

查看从连接池获取到的连贯,如果连贯被标记为Evicted(该连贯因为超时或其余起因应该被回收),或者最初一次拜访工夫到以后工夫之间的时间差超过了aliveBypassWindowMs的设置并且尝试后发现该连贯曾经生效(连不上了),则敞开该连贯,timeout尚未超时的话从新调用connectionBag的borrow办法获取连贯,直到超时或获取到可用的连贯。

如果连贯可用,则调用metricsTracker.recordBorrowStats,之后返回:

  return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now); 

留神最终返回的并不是java.sql.Connection对象,而是通过poolEnry.createProxyConnection办法创立的代理对象!

最终通过JavassistProxyFactory创立代理对象HikariProxyConnection。

返回代理数据库连贯对象的同时,创立leakTask。leakTask对象的作用是:如果leakDetectionThreshold不为0的话,创立一个leakTask工作,在leakDetectionThreshold时长之后执行该工作抛出数据库连贯泄露异样。该工作不被执行(不产生数据库泄露异样)的惟一条件就是在leakDetectionThreshold时长范畴内,数据库连贯胜利执行完SQL语句后通过调用代理Connection对象的close办法勾销该工作。

OK,getConnection办法源码剖析实现了,目前为止,咱们晓得:

  1. 通过connectionBag.borrow办法从连接池获取连贯数据库连贯的封装对象poolEntry
  2. 最终获取到的数据库连贯是代理对象HikariProxyConnection

上面咱们首先剖析一下connectionBag.borrow办法,borrow办法才是从连接池获取连贯的要害。

connectionBag#borrow办法

上代码,分段剖析,先看第一局部:

public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
   {
      // Try the thread-local list first
      final List<Object> list = threadList.get();
      for (int i = list.size() - 1; i >= 0; i--) {
         final Object entry = list.remove(i);
         @SuppressWarnings("unchecked")
         final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
         if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
            return bagEntry;
         }
      }

首先从threadList中获取,如果能获取到闲暇的数据库连贯、并且能胜利通过CAS操作批改连贯状态(从STATE_NOT_IN_USE批改为STATE_IN_USE)的话,间接返回。

threadList是ThreadLocal对象,以后线程如果曾经获取过连贯,在应用完归还给连接池的过程中,该连贯会写入threadList。如果以后线程再次获取连贯的话,就能够从threadList中获取,必定是性能最好的获取形式。

接下来,如果threadList中没有获取到:

      final int waiting = waiters.incrementAndGet();
      try {
         for (T bagEntry : sharedList) {
            if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
               // If we may have stolen another waiter's connection, request another bag add.
               if (waiting > 1) {
                  listener.addBagItem(waiting - 1);
               }
               return bagEntry;
            }
         }

获取到期待获取连贯的线程数waiters,加1。

遍历sharedList,如果sharedList中有闲暇连贯,并且可能胜利批改状态的话,阐明获取胜利,间接返回。

从sharedList获取连贯胜利的话,还要检查一下以后是否还有其余期待获取连贯的线程(waiter>1),如果有的话阐明连接池中的连贯不太够用,则调用listener.addBagItem(waiting – 1)减少连贯。listerner就是HikariPool对象,调用的其实就是HikariPool的addBagItem办法。

addBagItem办法的代码非常简单,就不贴出了,逻辑也很简略:期待获取连贯的线程数如果大于等于期待退出连接池的连接数的话,就通过addConnectionExecutor减少连贯到连接池中,否则,不须要减少,只须要稍等一下,addConnectionExecutor就会把期待队列中的连贯退出池中。

持续看源码,如果从sharedList中也没有获取到连贯的话:

         listener.addBagItem(waiting);

         timeout = timeUnit.toNanos(timeout);
         do {
            final long start = currentTime();
            final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
            if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
               return bagEntry;
            }

            timeout -= elapsedNanos(start);
         } while (timeout > 10_000);

         return null;
      }
      finally {
         waiters.decrementAndGet();
      }
   }

调用listener.addBagItem(waiting),办法逻辑曾经剖析过了。

从handoffQueue获取连贯,如果获取到的连贯状不是闲暇状态(其余线程姗姗来迟了),在timeout工夫范畴内持续循环从handoffQueue获取。

如果handoffQueue中没有获取到(此时必定期待超时了),则世界返回null。

否则,如果获取到了闲暇连贯,并且可能胜利批改连贯状态为STATE_IN_USE的话,返回该连贯。

最终,不论获取胜利与否,waiters减1。

borrow办法剖析实现!咱们晓得从连接池获取连贯的根本逻辑是:

  1. 首先从ThreadLocal变量threadList中获取
  2. 如果没有获取到的话,从sharedList中获取
  3. 还是没有获取到的话,最初才从handoffQueue中获取

源码曾经剖析的很分明了,逻辑也解释的清晰明了。然而为什么要这么做?

首先从threadList中获取连贯的逻辑比拟容易了解,如果以后线程曾经获取过连贯,用完之后存入threadList中,再次获取的话间接返回threadList中的连贯,效率必定是最高的,性能最优。

然而从sharedList中获取不到的话,为什么还要再从handoffQueue中获取?什么状况下会产生从sharedList中获取不到、从handoffQueue中能获取到?

仔细分析一下sharedList以及handoffQueue的构造、以及连贯退出连接池的逻辑,就能明确这么做的起因了。

连贯退出连接池的过程为:首先退出sharedList,之后再退出handoffQueue。

假如获取连贯的线程读取sharedList的时候,sharedList没有闲暇的连贯,此时假如有一个新创建的连贯正好刚刚退出到sharedList中。这种状况下获取连贯的线程就从sharedList中完满错过了新退出的连贯。

而后新退出的连贯会退出handoffQueue,handoffQueue是手递手队列,退出连贯的线程会期待获取连贯的线程从队列中拿走该连贯,两个人一拍即合,相互满足需要,实现“手递手”工作。

这个场景下,HikariPool的handoffQueue的设计肯定是性能最好的,必定回比重复遍历sharedList的性能好很多。

从连接池中获取连贯的过程,源码级剖析实现!

敞开连贯 HikariProxyConnection#close办法

开篇曾经说过了,敞开连贯其实就是调用代理连贯对象HikariProxyConnection的close办法。close办法是在他的父类、抽象类ProxyConnection中定义的:

public final void close() throws SQLException
   {
      // Closing statements can cause connection eviction, so this must run before the conditional below
      closeStatements();

      if (delegate != ClosedConnection.CLOSED_CONNECTION) {
         leakTask.cancel();

         try {
            if (isCommitStateDirty && !isAutoCommit) {
               delegate.rollback();
               lastAccess = currentTime();
               LOGGER.debug("{} - Executed rollback on connection {} due to dirty commit state on close().", poolEntry.getPoolName(), delegate);
            }

            if (dirtyBits != 0) {
               poolEntry.resetConnectionState(this, dirtyBits);
               lastAccess = currentTime();
            }

            delegate.clearWarnings();
         }
         catch (SQLException e) {
            // when connections are aborted, exceptions are often thrown that should not reach the application
            if (!poolEntry.isMarkedEvicted()) {
               throw checkException(e);
            }
         }
         finally {
            delegate = ClosedConnection.CLOSED_CONNECTION;
            poolEntry.recycle(lastAccess);
         }
      }
   }

首先调用closeStatements办法,敞开该连贯关上的Statement。

而后,调用leakTask.cancel();勾销连贯泄露查看工作。

之后,调用poolEntry.resetConnectionState重置连贯属性,筹备偿还到连接池。

最初,调用poolEntry.recycle(lastAccess);办法名阐明所有:回收连贯。

poolEntry#recycle(lastAccess)

代码很简略:

   void recycle(final long lastAccessed)
   {
      if (connection != null) {
         this.lastAccessed = lastAccessed;
         hikariPool.recycle(this);
      }
   }

记录lastAccessed后,调用hikariPool.recycle(this)。

hikariPool#recycle

重点来了,调用了connectionBag的requite办法。

   @Override
   void recycle(final PoolEntry poolEntry)
   {
      metricsTracker.recordConnectionUsage(poolEntry);

      connectionBag.requite(poolEntry);
   }

connectionBag#requite

requite负责偿还PoolEntry对象到连接池:

public void requite(final T bagEntry)
   {
      bagEntry.setState(STATE_NOT_IN_USE);

      for (int i = 0; waiters.get() > 0; i++) {
         if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
            return;
         }
         else if ((i & 0xff) == 0xff) {
            parkNanos(MICROSECONDS.toNanos(10));
         }
         else {
            Thread.yield();
         }
      }

首先将poolEntry对象设置为闲暇状态(STATE_NOT_IN_USE)。

之后查看如果有期待获取连贯的线程存在的话,帮忙其疾速拿到连贯:

如果以后对象曾经不再是闲暇状态(曾经被getConnection线程拿走了),则间接返回。

否则如果没被拿走的话,以后连贯对象放入handoffQueue,阻塞期待,如果被getConnection线程从handoffQueue中拿走的话,返回。

否则,循环肯定次数(0xff=256次)后挂起期待10秒,直到连贯被期待获取连贯的线程拿走。(这个中央不是很了解,为什么要这么做?因为敞开连贯的调用方应该是利用线程,做完本人该做的事件、偿还连贯的时候,还要负责把连贯交给下一个期待获取连贯的线程,难道不会影响以后业务的疾速返回吗?)

最初,如果threadList的容量小于50的话,将以后poolEntry对象放入threadList。

小结

HikariPool的初始化、连贯创立及退出连接池、连贯获取、连贯偿还等所有连接池相干的次要逻辑剖析结束。

Thanks a lot!

上一篇 连接池 HikariPool (一) – 根底框架及初始化过程

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理