前言

ThreadPoolExecutor置信大家都很相熟:线程池的实现类。明天咱们就来看看它外部是怎么实现的。

实现原理

先来看看它的类构造:

public class ThreadPoolExecutor extends AbstractExecutorService {}
public abstract class AbstractExecutorService implements ExecutorService {}
public interface ExecutorService extends Executor {    void shutdown();    <T> Future<T> submit(Callable<T> task);    // ...}
public interface Executor {     void execute(Runnable command);}

再来看看它的要害属性:

// ctl高3位示意线程池的运行状态,低29位示意线程个数private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// 线程个数掩码,Integer位数-3,与具体平台Integer位数无关,大部分是32-3=29private static final int COUNT_BITS = Integer.SIZE - 3;// 线程最大个数private static final int CAPACITY = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bits// 线程池状态private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN =  0 << COUNT_BITS;private static final int STOP =  1 << COUNT_BITS;private static final int TIDYING =  2 << COUNT_BITS;private static final int TERMINATED =  3 << COUNT_BITS;// 互斥锁private final ReentrantLock mainLock = new ReentrantLock();// 工作线程汇合private final HashSet<Worker> workers = new HashSet<Worker>();// 线程池终止条件private final Condition termination = mainLock.newCondition();// 线程池外围参数// 阻塞队列private final BlockingQueue<Runnable> workQueue;// 线程工厂private volatile ThreadFactory threadFactory;// 回绝策略private volatile RejectedExecutionHandler handler;// 线程闲置时长private volatile long keepAliveTime;// 外围线程数private volatile int corePoolSize;// 最大线程数private volatile int maximumPoolSize;// 默认回绝策略:AbortPolicy抛出异样private static final RejectedExecutionHandler defaultHandler =    new AbortPolicy();

其中Worker是它的外部类,代表工作线程。

private final class Worker    extends AbstractQueuedSynchronizer    implements Runnable {}

它继承了AQS,实现了Runnable接口

再来看要害办法:
execute()

public void execute(Runnable command) {    if (command == null)        throw new NullPointerException();    int c = ctl.get();    if (workerCountOf(c) < corePoolSize) {        if (addWorker(command, true))            return;        c = ctl.get();    }    if (isRunning(c) && workQueue.offer(command)) {        int recheck = ctl.get();        if (!isRunning(recheck) && remove(command))            reject(command);        else if (workerCountOf(recheck) == 0)            addWorker(null, false);    }    else if (!addWorker(command, false))        reject(command);}

其中addWorker()办法如下:

private boolean addWorker(Runnable firstTask, boolean core) {    retry:    for (;;) {        int c = ctl.get();        int rs = runStateOf(c);        // Check if queue empty only if necessary.        if (rs >= SHUTDOWN &&            ! (rs == SHUTDOWN &&                firstTask == null &&                !workQueue.isEmpty()))              return false;        for (;;) {            int wc = workerCountOf(c);            if (wc >= CAPACITY ||                 wc >= (core ? corePoolSize : maximumPoolSize))                 return false;            if (compareAndIncrementWorkerCount(c))                break retry;            c = ctl.get(); // Re-read ctl            if (runStateOf(c) != rs)                continue retry;            // else CAS failed due to workerCount change; retry inner loop       }    }    boolean workerStarted = false;    boolean workerAdded = false;    Worker w = null;    try {        w = new Worker(firstTask);        final Thread t = w.thread;        if (t != null) {           final ReentrantLock mainLock = this.mainLock;           mainLock.lock();           try {                       // Recheck while holding lock.        // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get());            if (rs < SHUTDOWN ||              (rs == SHUTDOWN && firstTask == null)) {                if (t.isAlive()) // precheck that t is startable                    throw new IllegalThreadStateException();                workers.add(w);                int s = workers.size();                if (s > largestPoolSize)                    largestPoolSize = s;                workerAdded = true;            }          } finally {                mainLock.unlock();          }          if (workerAdded) {             t.start();             workerStarted = true;          }       }    } finally {        if (! workerStarted)            addWorkerFailed(w);    }    return workerStarted;}

我滴个乖乖,addWorker()这么简单,今天再来吧。
晚安全世界!