关于面试:面试官-线程池是如何做到线程复用的有了解过吗说说看

2次阅读

共计 5613 个字符,预计需要花费 15 分钟才能阅读完成。

ThreadPoolExecutor 中是如何做到线程复用的?

咱们晓得,一个线程在创立的时候会指定一个线程工作,当执行完这个线程工作之后,线程主动销毁。然而线程池却能够复用线程,一个线程执行完线程工作后不销毁,继续执行另外一个线程工作。那么它是如何做到的?这得从 addWorker() 说起

addWorker()

  • 先看上半局部 addWorker()

    private boolean addWorker(Runnable firstTask, boolean core) {
      retry:
      for (;;) {int c = ctl.get();
          int rs = runStateOf(c);
    
          // 对边界设定的查看
          if (rs >= SHUTDOWN &&
              ! (rs == SHUTDOWN &&
                  firstTask == null &&
                  ! workQueue.isEmpty()))
              return false;
    
          for (;;) {int wc = workerCountOf(c);
              if (wc >= CAPACITY ||
                  wc >= (core ? corePoolSize : maximumPoolSize))
                  return false;
              if (compareAndIncrementWorkerCount(c))
                  break retry;
              c = ctl.get();  // Re-read ctl
              if (runStateOf(c) != rs)
                  continue retry;
              // else CAS failed due to workerCount change; retry inner loop
          }
    }

    retry: 可能有些同学没用过,它只是一个标记,它的下一个标记就是 for 循环, 在 for 循环外面调用 continue/break 再紧接着 retry 标记时,就示意从这个中央开始执行 continue/break 操作,但这不是咱们关注的重点。

从下面的代码,咱们能够看出,ThreadPoolExecutor 在创立线程时,会将线程封装成工作线程 worker, 并放入工作线程组中,而后这个 worker 重复从阻塞队列中拿工作去执行。这个 addWorker 是 excute 办法中调用的

  • 咱们接着看下半局部

    private boolean addWorker(Runnable firstTask, boolean core) {
          // 上半局部
          retry:
          for (;;) {int c = ctl.get();
              int rs = runStateOf(c);
    
              // Check if queue empty only if necessary.
              if (rs >= SHUTDOWN &&
                  ! (rs == SHUTDOWN &&
                     firstTask == null &&
                     ! workQueue.isEmpty()))
                  return false;
    
              for (;;) {int wc = workerCountOf(c);
                  // core 是 ture, 须要创立的线程为外围线程,则先判断以后线程是否大于外围线程
                  // 如果 core 是 false, 证实须要创立的是非核心线程,则先判断以后线程数是否大于总线程数
                  // 如果不小于,则返回 false
                  if (wc >= CAPACITY ||
                      wc >= (core ? corePoolSize : maximumPoolSize))
                      return false;
                  if (compareAndIncrementWorkerCount(c))
                      break retry;
                  c = ctl.get();  // Re-read ctl
                  if (runStateOf(c) != rs)
                      continue retry;
                  // else CAS failed due to workerCount change; retry inner loop
              }
          }
    
    
          // 下半局部
          boolean workerStarted = false;
          boolean workerAdded = false;
          Worker w = null;
          try {
              // 创立 worker 对象
              w = new Worker(firstTask);
              final Thread t = w.thread;
              if (t != null) {
                  // 获取线程全局锁
                  final ReentrantLock mainLock = this.mainLock;
                  mainLock.lock();
                  try {int rs = runStateOf(ctl.get());
                      // 判断线程池状态
                      if (rs < SHUTDOWN ||
                          (rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startable
                              throw new IllegalThreadStateException();
                          // 将以后线程增加到线程组    
                          workers.add(w);
                          int s = workers.size();
                          // 如果线程组中的线程数大于最大线程池数 largestPoolSize 赋值 s
                          if (s > largestPoolSize)
                              largestPoolSize = s;
                          // 增加胜利    
                          workerAdded = true;
                      }
                  } finally {mainLock.unlock();
                  }
                  // 增加胜利后执行线程
                  if (workerAdded) {t.start();
                      workerStarted = true;
                  }
              }
          } finally {
              // 增加失败后执行 addWorkerFailed
              if (! workerStarted)
                  addWorkerFailed(w);
          }
          return workerStarted;
      }

    再看 addWorkerFailed(),与上边相同,相当于一个回滚操作,会移除失败的工作线程

    private void addWorkerFailed(Worker w) {
          // 同样须要全局锁
          final ReentrantLock mainLock = this.mainLock;
          mainLock.lock();
          try {if (w != null)
                  workers.remove(w);
              decrementWorkerCount();
              tryTerminate();} finally {mainLock.unlock();
          }
      }

    Worker

咱们接着看 Worker 对象

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {runWorker(this);
        }

        //.....
        // 省略下边代码

    }

Worker 类实现了 Runnable 接口,所以 Worker 也是一个线程工作。在构造方法中,创立了一个线程,回过头想想 addWorker() 里为啥能够 t.start() 应该很分明了吧,并且在构造方法中调用了线程工厂创立了一个线程实例,咱们上节讲过线程工厂。其实这也不是关注的重点,重点是这个 runWorker()

final void runWorker(Worker w) {
        // 获取以后的线程实例
        Thread wt = Thread.currentThread();
        // 间接从第一个工作开始执行 
        Runnable task = w.firstTask;
        // 获取完之后把 worker 的 firstTask 置为 null 避免下次获取到
        w.firstTask = null;
        // 线程启动之后,通过 unlock 办法开释锁
        w.unlock(); // allow interrupts

        // 线程异样退出时 为 true
        boolean completedAbruptly = true;
        try {
            // Worker 执行 firstTask 或从 workQueue 中获取工作,直到工作为空
            while (task != null || (task = getTask()) != null) {
                // 获取锁以避免在工作执行过程中产生中断
                w.lock();
                // 判断边界值 如果线程池中断 则中断线程
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 相当于钩子办法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 执行工作
                        task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);
                    } finally {afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();}
            }
            completedAbruptly = false;
        } finally {processWorkerExit(w, completedAbruptly);
        }
    }

首先去执行创立这个 worker 时就有的工作,当执行完这个工作后,worker 的生命周期并没有完结,在 while 循环中,worker 会一直地调用 getTask 办法从阻塞队列中获取工作而后调用 task.run() 执行工作, 从而达到复用线程的目标。只有 getTask 办法不返回 null, 此线程就不会退出。

咱们接着看 getTask()

private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?

        for (;;) {int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            // 如果运行线程数超过了最大线程数,然而缓存队列曾经空了,这时递加 worker 数量。// 如果有设置容许线程超时或者线程数量超过了外围线程数量,并且线程在规定工夫内均未 poll 到工作且队列为空则递加 worker 数量
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
            // 如果 timed 为 true, 则会调用 workQueue 的 poll 办法获取工作.
            // 超时工夫是 keepAliveTime。如果超过 keepAliveTime 时长,// 如果 timed 为 false, 则会调用 workQueue 的 take 办法阻塞在以后。// 队列中有工作退出时,线程被唤醒,take 办法返回工作,并执行。Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {timedOut = false;}
        }
    }

大家有没有想过这里为啥要用 take 和 poll, 它们都是出队的操作,这么做有什么益处?

take & poll

咱们说 take() 办法会将外围线程阻塞挂起,这样一来它就不会占用太多的 cpu 资源, 直到拿到 Runnable 而后返回。

如果 allowCoreThreadTimeOut 设置为 true, 那么外围线程就会去调用 poll 办法,因为 poll 可能会返回 null, 所以这时候外围线程满足超时条件也会被销毁

非核心线程会 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),如果超时还没有拿到,下一次循环判断 compareAndDecrementWorkerCount 就会返回 null,Worker 对象的 run() 办法循环体的判断为 null, 工作完结,而后线程被零碎回收。

再回头看一下 runWorker() 是不是设计的很奇妙~

结束语

本节内容不是很好了解,想持续探讨的同学能够持续浏览它的源码,这部分内容理解一下就好,其实咱们从源码中能够看到大量的线程状态查看,代码写的很强壮, 能够从中学习一下

正文完
 0