乐趣区

关于java:拜托不要再问我线程池啦

Java 提供了几种便捷的办法创立线程池,通过这些内置的 api 就可能很轻松的创立线程池。在 java.util.concurrent 包中的 Executors 类,其中的静态方法就是用来创立线程池的:

  • newFixedThreadPool():创立一个固定线程数量的线程池,而且线程池中的工作全副执行实现后,闲暇的线程也不会被敞开。
  • newSingleThreadExecutor():创立一个只有一个线程的线程池,闲暇时也不会被敞开。
  • newCachedThreadPool():创立一个可缓存的线程池,线程的数量为 Integer.MAX_VALUE,闲暇线程会长期缓存下来,线程会期待60s 还是没有工作退出的话就会被敞开。

Executors类中还有一些创立线程池的办法(jdk8 新加的),然而当初这个触极到我的常识盲区了~~

下面那几个办法,其实都是创立了一个 ThreadPoolExecutor 对象作为返回值,要搞清楚线程池的原理次要还是要剖析 ThreadPoolExecutor 这个类。

ThreadPoolExecutor的构造方法:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {...}

ThreadPoolExecutor的构造方法蕴含以下几个参数:

  • corePoolSize:外围线程数量,常驻线程池中的线程,即时线程池中没有工作可执行,也不会被敞开。
  • maximumPoolSize:最大线程数量
  • keepAliveTime:闲暇线程存活工夫
  • unit:闲暇线程存活工夫的单位
  • workQueue:工作队列,线程池一下忙不过来,那新来的工作就须要排队,排除中的工作就会放在 workQueue 中
  • threadFactory:线程工厂,创立线程用的
  • handler:RejectedExecutionHandler实例用于在线程池中没有闲暇线程可能执行工作,并且 workQueue 中也容不下工作时回绝工作时的策略。

ThreadPoolExecutor中的线程统称为工作线程,但有一个小概念是 外围线程 ,外围线程由参数corePoolSize 指定,如 corePoolSize 设置 5,那线程池中就会有 5 条线程常驻线程池中,不会被回收掉,然而也会有例外,如果 allowCoreThreadTimeOuttrue闲暇一段时间后,也会被敞开。

线程的状态和工作线程数量

线程中的状态和工作线程和数量都是由 ctl 示意,是一个 AtomicInteger 类型的属性:

 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl 的高四位为线程的状态,其余位数为工作线程的数量,所以线程中最大的工作线程数量为(2^29)-1

线程池中的状态有五种:

  • RUNNING:接管新的工作和解决队列中的工作
  • SHUTDOWN:不能新增工作,然而会持续解决曾经增加的工作
  • STOP:不能新增工作,不会持续解决曾经增加工作
  • TIDYING:所有的工作曾经被终止,工作线程为 0
  • TERMINATED:terminated()办法执行实现

状态码的定义如下:

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

创立线程池

如果有面试官问:如何正确的创立线程池?千万不要说应用 Executors 创立线程,尽管 Executors 能很不便的创立线程池,然而他提供的动态创立办法会有一些坑。

次要的起因是:maximumPoolSizeworkQueue 这两个参数

Executors静态方法在创立线程池时,如果 maximumPoolSize 设置为Integer.MAX_VALUE,这样会导致线程池能够始终要以接管运行工作,可能导致 cpu 负载过高。

workQueue是一个阻塞队列的实例,用于搁置正在期待执行的工作。如果在创立线程种时 workQueue 实例没有指定工作的容量,那么期待队列中能够始终增加工作,极有可能导致oom

所以创立线程,最好是依据线程池的用处,而后本人创立线程

增加工作

调用线程池的 execute 并不是立刻执行工作,线程池外部用通过一顿操作,如:判断外围线程数、是否须要增加到期待队列中。

下来的代码是 execute 的源码,代码很简洁只有 2 个 if 语句:

 public void execute(Runnable command) {int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))
            return;
        c = ctl.get();}
    if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}
  1. 第一个 if,如果以后线程池中的工作线程数量小于corePoolSize,间接创立一个工作线程执行工作
  2. 第二个 if,当线程池处于运行状态,调用 workQueue.offer(command) 办法将工作增加到 workQueue,否则调用addWorker(command, false) 尝试去增加一个工作线程。

整顿了一张图,把线程池分为三局部Core WorkerWorkerworkQueue

换一种说法,在调用 execute 办法时,工作首先会放在 Core Worker 内,而后才是workQueue,最初才会思考Worker

这样做的起因能够保障 Core Worker 中的工作执行实现后,能立刻从 workQueue 获取下一个工作,而不须要启动别的工作线程,用起码的工作线程办更多的事。

创立工作线程

execute 办法中,有三个中央调用了 addWorkeraddWorker 办法能够分为二局部:

  1. 减少工作线程数量
  2. 启动工作线程

addWorker的办法签名如下:

private boolean addWorker(Runnable firstTask, boolean core) 
  • firstTask:第一个运行的工作,能够为空。如果为空工作会从 workQueue 中获取。
  • core:是否是外围工作线程

减少工作线程数量

    retry:
    for (;;) {int c = ctl.get();
        int rs = runStateOf(c);

             ....

        for (;;) {int wc = workerCountOf(c);
            
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
                
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

下面代码省略了一部分代码,次要代码都在 for 循环中,利用 CAS 锁,平安的实现线程池状态的查看与减少工作线程的数量。其中的 compareAndIncrementWorkerCount(c) 调用就是将工作线程数量 +1。

启动工作线程

减少工作线程的数量后,紧接着就会启动工作线程:

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // Recheck while holding lock.
            // Back out on ThreadFactory failure or if
            // shut down before lock acquired.
            int rs = runStateOf(ctl.get());

            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startable
                    throw new IllegalThreadStateException();
                workers.add(w);
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
                workerAdded = true;
            }
        } finally {mainLock.unlock();
        }
        if (workerAdded) {t.start();
            workerStarted = true;
        }
    }
} finally {if (! workerStarted)
        addWorkerFailed(w);
}

启动工作线程的流程:

  • 创立一个 Worker 实例, Worker构造方法会应用 ThreadFactory 创立一个线程
w = new Worker(firstTask);
final Thread t = w.thread;

就不说 Worker 类的实现了,间接给出构造方法来细品:

Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}
  • 如果线程池状态是在运行中,或者曾经敞开,但工作线程要从 workQueue 中获取工作,能力增加工作线程
if (rs < SHUTDOWN ||
        (rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startable
            throw new IllegalThreadStateException();
        workers.add(w);
        int s = workers.size();
        if (s > largestPoolSize)
            largestPoolSize = s;
        workerAdded = true;
    }

留神::当线程池处于 SHUTDOWN 状态时,它不能接管新的工作,然而能够继续执行未实现的工作。工作是否从 workQueue 中获取,是依据 firstTask 判断,每个 Worker 实例都有一个 firstTask 属性,如果这个值为 null,工作线程启动的时候就会从workQueue 中获取工作,否则会执行firstTask

  • 启动线程

调用线程的 start 办法,启动线程。

 if (workerAdded) {t.start();
    workerStarted = true;
}

执行工作

回过头来看一个 Worker 类的定义:

 private final class Worker extends AbstractQueuedSynchronizer
    implements Runnable{Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
      ...
}

Worker类实现了 Runnable 接口,同时在构造方法中会将 this 传递给线程,到这里你就晓得了 Worker 实例中有 run 办法,它会在线程启动后执行:

public void run() {runWorker(this);
}

run办法外部接着调用 runWorker 办法运行工作,在这里才是真正的开始运行工作了:

 final void runWorker(Worker w) {Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {while (task != null || (task = getTask()) != null) {w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {task.run();
                    } catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);
                    } finally {afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();}
            }
            completedAbruptly = false;
        } finally {processWorkerExit(w, completedAbruptly);
        }
    }
  • 获取工作

首先将 firstTask 传递给 task 长期变量:

Runnable task = w.firstTask;

而后循环查看 task 或者从 workQueue 中获取工作:

while (task != null || (task = getTask()) != null) {...}

getTask()稍后再做剖析。

  • 运行工作

去掉一些状态查看、异样捕捉、和勾子办法调用后,保留最重要的调用task.run()

 while (task != null || (task = getTask()) != null) {
     ...          
     task.run();
     ...
 }

task其实就是通过调用 execute 办法传递进来的 Runnable 实例,也就是你的工作。只不过它可能保留在 Worker.firstTask 中,或者在 workQueue 中,保留在哪里在后面的 工作增加程序 中曾经阐明。

从 workQueue 中获取工作

试想一下如果每个工作执行实现,就敞开掉一个线程那有多浪费资源,这样应用线程池也没有多大的意义。所以线程的次要的性能就是线程复用,一旦工作执行实现间接去获取下一个工作,或者挂起线程期待下一个提交的工作,而后期待一段时间后还是没有工作提交,而后才思考是否敞开局部闲暇的线程。

runWorker中会循环的获取工作:

 while (task != null || (task = getTask()) != null) {
     ...          
     task.run();
     ...
 }

下面的代码 getTask() 就是从 workQueue 中获取工作:

  private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
                    ...
             int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
                 ...
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {timedOut = false;}
        }
    }

获取工作的时候会有两种形式:

  1. 超时期待获取工作
  2. 始终期待工作,直到有新工作

如果 allowCoreThreadTimeOuttruecorePoolSize指定的外围线程数量会被疏忽,间接应用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 获取工作,否则的话会依据当前工作线程的数量,如果 wc > corePoolSizefalse则以后会被认为是外围线程,调用 workQueue.take() 始终期待工作。

工作线程的敞开

还是在 runWorker 办法中:

 final void runWorker(Worker w) {Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {while (task != null || (task = getTask()) != null) {task.run();
                       
            }
                 
            completedAbruptly = false;   
        } finally {processWorkerExit(w, completedAbruptly);
        }
    }
  • completedAbruptly 变量:标记当前工作线程是失常执行实现,还是异样实现的。completedAbruptly 为 false 能够确定线程池中没有可执行的工作了。

下面代码是简洁后的代码,一个 while 循环保障不间断的获取工作,没有工作能够执行(task 为 null)退出循环,最初再才会调用 processWorkerExit 办法:

 private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

processWorkerExit接管一个 Worker 实例与 completedAbruptly 变量。processWorkerExit 的大抵工作流程:

  • 判断当前工作线程是否异样实现,如果是间接缩小工作线程的数量,简略的说就是校对一下工作线程的数量。
  • 减少实现的工作数量,将 Workerworkers中移除
  • tryTerminate() 查看线程池状态,因为线程池能够提早敞开,如果你调用 shutdown 办法后不会立刻敞开,要期待所有的工作执行实现,所以这里调用 tryTerminate()办法,尝试去调用 terminated 办法。

工作线程实现策略

如果某个工作线程实现,线程池外部会判断是否须要重新启动一个:

// 判断线程池状态
if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {
            // 获取最小工作线程数量
        int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
        // 如果最小工作线程数量为 0,然而 workQueue 中还有工作,那重置最小工作线程数量 1
        if (min == 0 && ! workQueue.isEmpty())
            min = 1;
        // 如果当前工作线程数数量大于或等于最小工作线程数量,则不须要启动新的工作线程
        if (workerCountOf(c) >= min)
            return; // replacement not needed
    }
    
    // 启动一个新的工作线程
    addWorker(null, false);
}

工作线程实现后有两种解决策略:

  1. 对于异样实现的工作线程,间接启动一个新的替换
  2. 对于失常实现的工作线程,判断当前工作线程是否足够,如果足够则不须要新启动工作线程

留神:这里的实现,示意工作线程的工作执行实现,workQueue中也没有工作能够获取了。

线程池的敞开

敞开线程池有能够通过 shutdown 办法:

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();
    }
    tryTerminate();}

shutdown办法,第一步就是先扭转线程池的状态,调用 advanceRunState(SHUTDOWN) 办法,将线程池以后状态更改为SHUTDOWN,advanceRunState 代码如下:

 private void advanceRunState(int targetState) {for (;;) {int c = ctl.get();
            if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }

而后立刻调用 interruptIdleWorkers() 办法,interruptIdleWorkers()外部会调用它的重载办法 interruptIdleWorkers(boolean onlyOne) 同时 onlyOne 参数传递的 false 来敞开闲暇的线程:

private void interruptIdleWorkers() {interruptIdleWorkers(false);
}
    
private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {t.interrupt();
                    } catch (SecurityException ignore) { } finally {w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {mainLock.unlock();
        }
    }

以上代码会遍历 workers 中的 Worker 实例,而后调用线程的 interrupt() 办法。

什么样的线程才是闲暇工作线程?

后面提到过在 getTask() 中,线程从 workQueue 中获取工作时会阻塞,被阻塞的线程就是闲暇的

再次回到 getTask() 的代码中:

  private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
        
              // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();
                    return null;
                }
                    ...
             int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
                 ...
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {timedOut = false;}
        }
    }

再次剖析 getTask() 中的代码中有一段捕捉 InterruptedException 的代码块,interruptIdleWorkers 办法中断线程后,getTask()会捕捉中断异样,因为里面是一个 for 循环,随后代码走到判断线程池状态的中央:

if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();
                    return null;
}

下面的代码的会判断以后线程池状态,如果状态大于 STOP 或者状态等于 SHUTDOWN 并且 workQueue 为空时则返回 nullgetTask() 返回空那么在 runWorker 中循环就会退出,当前工作线程的工作就实现了,能够退出了:

 final void runWorker(Worker w) {Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {while (task != null || (task = getTask()) != null) {task.run();
                       
            }
                 
            completedAbruptly = false;   
        } finally {processWorkerExit(w, completedAbruptly);
        }
    }

shutdownNow

除了 shutdown 办法能敞开线程池,还有 shutdownNow 也能够敞开线程池。它两的区别在于:

  • shutdownNow会清空 workQueue 中的工作
  • shutdownNow还会停止以后正在运行的工作
  • shutdownNow会使线程进入 STOP 状态,而 shutdown()SHUTDOWN状态
public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();} finally {mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

下面代码根本流程:

  • advanceRunState(STOP): 使线程池进行 STOP 状态,与 shutdown() 中的统一,只是应用的状态码是STOP
  • interruptWorkers():与 shutdown() 中的统一
  • drainQueue(): 清空队列

工作是中止执行还是继续执行?

调用 shutdownNow()后线程池处于 STOP 状态,紧接着所有的工作线程都会被调用 interrupt 办法,如果此时 runWorker 还在运行会产生什么?

runWorker 有一段代码,就是工作线程停止的重要代码:

 final void runWorker(Worker w) {
        ...
        
         while (task != null || (task = getTask()) != null) {if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                    
                  task.run();}
        ...
}

重点关注:

if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();

这个 if 看起来有点难了解,了解下来大抵意思是:如果线程池状态大于等于 STOP,立刻中断线程,否则革除线程的中断标记,也就是说当线程池状态为RUNNINGSHUTDOWN时,线程的中断标记会被革除(线程的中断代码在 interruptWorkers 办法中),能够继续执行工作。

以上代码执行实现后,紧接着就会调用 task.run() 办法,这外面咱们本人就能够依据线程的中断标记来判断工作是否被中断。

总结

集体程度无限,文中如有谬误,谢谢大家斧正。

本文从线程池的源码动手,剖析线程池的创立、增加工作、运行工作等流程,整个剖析下来基本上大多数公司对于线程池面试的问题都能够答复得上来,当然还有一些小细节如:Worker类是继承 AQS 的,为什么这么做其实源码中都有一些苗头,Worker在运行时会锁住运行的代码块,而 shutdown 在敞开闲暇的 Worker 时,首先就要去获取 Worker 的同步锁能力持续操作,这样能力平安的敞开工作线程。

欢送关注我的公众号:架构文摘,取得独家整顿 120G 的收费学习资源助力你的架构师学习之路!

公众号后盾回复 arch028 获取材料:

退出移动版