ThreadPoolExecutor中是如何做到线程复用的?

咱们晓得,一个线程在创立的时候会指定一个线程工作,当执行完这个线程工作之后,线程主动销毁。然而线程池却能够复用线程,一个线程执行完线程工作后不销毁,继续执行另外一个线程工作。那么它是如何做到的?这得从addWorker()说起

addWorker()

  • 先看上半局部addWorker()

    private boolean addWorker(Runnable firstTask, boolean core) {  retry:  for (;;) {      int c = ctl.get();      int rs = runStateOf(c);      // 对边界设定的查看      if (rs >= SHUTDOWN &&          ! (rs == SHUTDOWN &&              firstTask == null &&              ! workQueue.isEmpty()))          return false;      for (;;) {          int wc = workerCountOf(c);          if (wc >= CAPACITY ||              wc >= (core ? corePoolSize : maximumPoolSize))              return false;          if (compareAndIncrementWorkerCount(c))              break retry;          c = ctl.get();  // Re-read ctl          if (runStateOf(c) != rs)              continue retry;          // else CAS failed due to workerCount change; retry inner loop      }}

    retry:可能有些同学没用过,它只是一个标记,它的下一个标记就是for循环,在for循环外面调用continue/break再紧接着retry标记时,就示意从这个中央开始执行continue/break操作,但这不是咱们关注的重点。

从下面的代码,咱们能够看出,ThreadPoolExecutor在创立线程时,会将线程封装成工作线程worker,并放入工作线程组中,而后这个worker重复从阻塞队列中拿工作去执行。这个addWorker是excute办法中调用的

  • 咱们接着看下半局部

    private boolean addWorker(Runnable firstTask, boolean core) {      // 上半局部      retry:      for (;;) {          int c = ctl.get();          int rs = runStateOf(c);          // Check if queue empty only if necessary.          if (rs >= SHUTDOWN &&              ! (rs == SHUTDOWN &&                 firstTask == null &&                 ! workQueue.isEmpty()))              return false;          for (;;) {              int wc = workerCountOf(c);              // core是ture,须要创立的线程为外围线程,则先判断以后线程是否大于外围线程              // 如果core是false,证实须要创立的是非核心线程,则先判断以后线程数是否大于总线程数              // 如果不小于,则返回false              if (wc >= CAPACITY ||                  wc >= (core ? corePoolSize : maximumPoolSize))                  return false;              if (compareAndIncrementWorkerCount(c))                  break retry;              c = ctl.get();  // Re-read ctl              if (runStateOf(c) != rs)                  continue retry;              // else CAS failed due to workerCount change; retry inner loop          }      }      // 下半局部      boolean workerStarted = false;      boolean workerAdded = false;      Worker w = null;      try {          // 创立worker对象          w = new Worker(firstTask);          final Thread t = w.thread;          if (t != null) {              // 获取线程全局锁              final ReentrantLock mainLock = this.mainLock;              mainLock.lock();              try {                                    int rs = runStateOf(ctl.get());                  // 判断线程池状态                  if (rs < SHUTDOWN ||                      (rs == SHUTDOWN && firstTask == null)) {                      if (t.isAlive()) // precheck that t is startable                          throw new IllegalThreadStateException();                      // 将以后线程增加到线程组                          workers.add(w);                      int s = workers.size();                      // 如果线程组中的线程数大于最大线程池数 largestPoolSize赋值s                      if (s > largestPoolSize)                          largestPoolSize = s;                      // 增加胜利                          workerAdded = true;                  }              } finally {                  mainLock.unlock();              }              // 增加胜利后执行线程              if (workerAdded) {                  t.start();                  workerStarted = true;              }          }      } finally {          // 增加失败后执行 addWorkerFailed          if (! workerStarted)              addWorkerFailed(w);      }      return workerStarted;  }

    再看 addWorkerFailed(),与上边相同,相当于一个回滚操作,会移除失败的工作线程

    private void addWorkerFailed(Worker w) {      // 同样须要全局锁      final ReentrantLock mainLock = this.mainLock;      mainLock.lock();      try {          if (w != null)              workers.remove(w);          decrementWorkerCount();          tryTerminate();      } finally {          mainLock.unlock();      }  }

    Worker

咱们接着看Worker对象

private final class Worker        extends AbstractQueuedSynchronizer        implements Runnable    {        /**         * This class will never be serialized, but we provide a         * serialVersionUID to suppress a javac warning.         */        private static final long serialVersionUID = 6138294804551838833L;        /** Thread this worker is running in.  Null if factory fails. */        final Thread thread;        /** Initial task to run.  Possibly null. */        Runnable firstTask;        /** Per-thread task counter */        volatile long completedTasks;        Worker(Runnable firstTask) {            setState(-1); // inhibit interrupts until runWorker            this.firstTask = firstTask;            this.thread = getThreadFactory().newThread(this);        }        /** Delegates main run loop to outer runWorker  */        public void run() {            runWorker(this);        }        //.....        // 省略下边代码    }

Worker类实现了Runnable接口,所以Worker也是一个线程工作。在构造方法中,创立了一个线程,回过头想想addWorker()里为啥能够t.start()应该很分明了吧, 并且在构造方法中调用了线程工厂创立了一个线程实例,咱们上节讲过线程工厂。其实这也不是关注的重点,重点是这个runWorker()

final void runWorker(Worker w) {        // 获取以后的线程实例        Thread wt = Thread.currentThread();        // 间接从第一个工作开始执行         Runnable task = w.firstTask;        // 获取完之后把worker的firstTask置为null 避免下次获取到        w.firstTask = null;        // 线程启动之后,通过unlock办法开释锁        w.unlock(); // allow interrupts        // 线程异样退出时 为 true        boolean completedAbruptly = true;        try {            // Worker执行firstTask或从workQueue中获取工作,直到工作为空            while (task != null || (task = getTask()) != null) {                // 获取锁以避免在工作执行过程中产生中断                w.lock();                // 判断边界值 如果线程池中断 则中断线程                if ((runStateAtLeast(ctl.get(), STOP) ||                     (Thread.interrupted() &&                      runStateAtLeast(ctl.get(), STOP))) &&                    !wt.isInterrupted())                    wt.interrupt();                try {                    // 相当于钩子办法                    beforeExecute(wt, task);                    Throwable thrown = null;                    try {                        // 执行工作                        task.run();                    } catch (RuntimeException x) {                        thrown = x; throw x;                    } catch (Error x) {                        thrown = x; throw x;                    } catch (Throwable x) {                        thrown = x; throw new Error(x);                    } finally {                        afterExecute(task, thrown);                    }                } finally {                    task = null;                    w.completedTasks++;                    w.unlock();                }            }            completedAbruptly = false;        } finally {            processWorkerExit(w, completedAbruptly);        }    }

首先去执行创立这个worker时就有的工作,当执行完这个工作后,worker的生命周期并没有完结,在while循环中,worker会一直地调用getTask办法从阻塞队列中获取工作而后调用task.run()执行工作,从而达到复用线程的目标。只有getTask办法不返回null,此线程就不会退出。

咱们接着看getTask()

private Runnable getTask() {        boolean timedOut = false; // Did the last poll() time out?        for (;;) {            int c = ctl.get();            int rs = runStateOf(c);            // Check if queue empty only if necessary.            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {                decrementWorkerCount();                return null;            }            int wc = workerCountOf(c);            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;            // 如果运行线程数超过了最大线程数,然而缓存队列曾经空了,这时递加worker数量。            // 如果有设置容许线程超时或者线程数量超过了外围线程数量,并且线程在规定工夫内均未poll到工作且队列为空则递加worker数量            if ((wc > maximumPoolSize || (timed && timedOut))                && (wc > 1 || workQueue.isEmpty())) {                if (compareAndDecrementWorkerCount(c))                    return null;                continue;            }            try {            // 如果timed为true,则会调用workQueue的poll办法获取工作.            // 超时工夫是keepAliveTime。如果超过keepAliveTime时长,            // 如果timed为false, 则会调用workQueue的take办法阻塞在以后。            // 队列中有工作退出时,线程被唤醒,take办法返回工作,并执行。                Runnable r = timed ?                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :                    workQueue.take();                if (r != null)                    return r;                timedOut = true;            } catch (InterruptedException retry) {                timedOut = false;            }        }    }

大家有没有想过这里为啥要用take和poll,它们都是出队的操作,这么做有什么益处?

take & poll

咱们说take()办法会将外围线程阻塞挂起,这样一来它就不会占用太多的cpu资源,直到拿到Runnable 而后返回。

如果allowCoreThreadTimeOut设置为true,那么外围线程就会去调用poll办法,因为poll可能会返回null,所以这时候外围线程满足超时条件也会被销毁

非核心线程会workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),如果超时还没有拿到,下一次循环判断compareAndDecrementWorkerCount就会返回null,Worker对象的run()办法循环体的判断为null,工作完结,而后线程被零碎回收 。

再回头看一下runWorker()是不是设计的很奇妙~

结束语

本节内容不是很好了解,想持续探讨的同学能够持续浏览它的源码,这部分内容理解一下就好,其实咱们从源码中能够看到大量的线程状态查看,代码写的很强壮,能够从中学习一下