关于线程池:Java-JUC-ThreadPoolExecutor解析

35次阅读

共计 11784 个字符,预计需要花费 30 分钟才能阅读完成。

线程池 ThreadPoolExecutor

介绍

线程池次要解决两个问题:一是当执行大量异步工作时线程池可能提供较好的性能。在不应用线程池时,每当须要执行工作时就须要 new 一个线程来执行,频繁的创立与销毁十分耗费性能。而线程池中的线程是能够复用的,不须要在每次须要执行工作时候都从新创立和销毁。二是线程池提供了资源限度和治理的伎俩,比方能够限度线程个数,动静减少线程等。

另外,线程池也提供了许多可调参数和可扩展性接口,以满足不同状况下的须要,咱们能够应用更不便的 Executors 的工厂办法,来创立不同类型的线程池,也能够本人自定义线程池。

线程池的工作机制

  1. 线程池刚创立的时候没有任何线程,当来了新的申请的时候才会创立 外围线程 去解决对应的申请
  2. 当解决实现之后,外围线程并不会回收
  3. 在外围线程达到指定的数量之前,每一个申请都会在线程池中创立一个新的外围线程
  4. 当外围线程全都被占用的时候,新来的申请会放入工作队列中。工作队列实质上是一个 阻塞队列
  5. 当工作队列被占满,再来的新申请会交给长期线程来解决
  6. 长期线程在应用实现之后会持续存活一段时间,直到没有申请解决才会被销毁

类图介绍

如上类图所示,Executors 是一个工具类,提供了多种静态方法,依据咱们抉择的不同提供不同的线程池实例。

ThreadPoolExecutor 继承了 AbstractExecutorService 抽象类,在 ThreadPoolExecutor 中成员变量 ctl 是一个 Integer 的原子性变量,用来记录线程池的状态和线程中线程个数,相似于 ReentrantReadWriteLock 应用一个变量来保留两种信息一样。

假如 Integer 类型是 32 位二进制示意,则其中高 3 位示意线程池的状态,后 29 为示意线程池线程数量。

// 默认 RUNNING 状态,线程个数为 0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

获取高 3 位,运行状态

private static int runStateOf(int c)     {return c & ~CAPACITY;}

获取低 29 位,线程个数

private static int workerCountOf(int c)  {return c & CAPACITY;}

线程池状态含意如下:

  • RUNNING:承受新工作并且解决阻塞队列中的工作
  • SHUTDOWN:回绝新工作然而解决阻塞队列中的工作
  • STOP:回绝新工作并且摈弃阻塞队列里的工作,同时中断正在解决的工作
  • TIDYING:所有工作都执行完(包含阻塞队列中的工作)后以后线程池流动线程数量为 0,将调用 terminated 办法
  • TERMINATED:终止状态。terminated 调用实现办法后的状态

线程池状态转换如下:

  • RUNNING -> SHUTDOWN:显式调用 shutdown 办法,或者隐式调用了 finalize 办法外面的 shutdown 办法
  • RUNNING 或 SHUTDOWN -> STOP:显式调用 shutdownNow 办法时
  • SHUTDOWN -> TIDYING:当线程池和工作队列都为空时
  • STOP -> TIDYING:当线程池为空时
  • TIDYING -> TERMINATED:当 terminated hook 办法执行实现时

线程池参数如下:

参数名 类型 含意
corePoolSize int 外围线程数
maxPoolSize int 最大线程数
keepAliveTime long 放弃存活工夫
workQueue BlockingQueue 工作存储队列
threadFactory ThreadFactory 当线程池须要新的线程时,应用 ThreadFactory 来创立新的线程
Handler RejectedExecutionHandler 因为线程池无奈承受所提交的工作所给出的回绝策略
  • corePoolSize:指的是外围线程数,线程池初始化实现后,默认状况下,线程池并没有任何线程,线程池会期待工作到来时,再创立新的线程去执行工作。
  • maxPoolSize:线程池有可能会在外围线程数上,额定减少一些线程,然而这些新减少的线程有一个下限,最大不能超过 maxPoolSize。

    • 如果线程数小于 corePoolSize,即便其余工作线程处于闲暇状态,也会创立一个新的线程来运行工作。
    • 如果线程数大于等于 corePoolSize 但少于 maxPoolSize,则将工作放进工作队列中。
    • 如果队列已满,并且线程数小于 maxPoolSize,则创立一个新线程来运行工作。
    • 如果队列已满,并且线程数曾经大于等于 maxPoolSize,则应用回绝策略来回绝该工作。
  • keepAliveTime:一个线程如果处于闲暇状态,并且以后的线程数量大于 corePoolSize,那么在指定工夫后,这个闲暇线程会被销毁,这里的指定工夫由 keepAliveTime 来设定。
  • workQueue:新工作被提交后,会先进入到此工作队列中,任务调度时再从队列中取出工作。jdk 中提供了四种工作队列:

    • ArrayBlockingQueue:基于数组的 有界阻塞队列,按 FIFO 排序。新工作进来后,会放到该队列的队尾,有界的数组能够避免资源耗尽问题。当线程池中线程数量达到 corePoolSize 后,再有新工作进来,则会将工作放入该队列的队尾,期待被调度。如果队列曾经是满的,则创立一个新线程,如果线程数量曾经达到 maxPoolSize,则会执行回绝策略。
    • LinkedBlockingQueue:基于链表的 无界阻塞队列(其实最大容量为 Interger.MAX),依照 FIFO 排序。因为该队列的近似无界性,当线程池中线程数量达到 corePoolSize 后,再有新工作进来,会始终存入该队列,而不会去创立新线程直到 maxPoolSize,因而应用该工作队列时,参数 maxPoolSize 其实是不起作用的。
    • SynchronousQueue:一个 不缓存工作的阻塞队列,生产者放入一个工作必须等到消费者取出这个工作。也就是说新工作进来时,不会缓存,而是间接被调度执行该工作,如果没有可用线程,则创立新线程,如果线程数量达到 maxPoolSize,则执行回绝策略。
    • PriorityBlockingQueue:具备优先级的 无界阻塞队列,优先级通过参数 Comparator 实现。
    • delayQueue:具备优先级的 延时无界阻塞队列
    • LinkedTransferQueue:基于链表的 无界阻塞队列
    • LinkedBlockingDeque:基于链表的 双端阻塞队列
  • threadFactory:创立一个新线程时应用的工厂,能够用来设定线程名、是否为 daemon 线程等等
  • handler:当工作队列中的工作已达到最大限度,并且线程池中的线程数量也达到最大限度,这时如果有新工作提交进来,就会执行回绝策略。

如上 ThreadPoolExecutor 类图所示,其中 mainLock 是独占锁,用来管制新增 Worker 线程操作的原子性。termination 是该锁对应的条件队列,在线程调用 awaitTermination 时用来寄存阻塞的线程。

private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet<Worker> workers = new HashSet<Worker>();
private final Condition termination = mainLock.newCondition();

Worker 类继承 AQS 和 Runnable 接口,是具体承载工作的对象。Worker 继承了 AQS,实现了简略的不可重入的独占锁,state = 0示意锁未被获取,state = 1则示意锁曾经被获取,state = -1是创立 Worker 时默认的状态,创立时状态设置为 -1 是为了防止该线程在运行 runWorker 办法前被中断。其中变量 firstTask 记录该工作线程执行的第一个工作,thread 是具体执行工作的线程。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    final Thread thread;
    Runnable firstTask;
    //...

DefaultThreadFactory 是线程工厂,newThread 办法是对线程的一个润饰。其中 poolNumber 是个动态的原子变量,用来统计线程工厂的个数,threadNumber 用来记录每个线程工厂创立了多少线程,这两个值也作为线程池和线程的名称的一部分。

源码解析

execute 办法

execute 办法次要作用就是提交工作 command 到线程池中进行执行。

该图能够看到,ThreadPoolExecutor 实现其实就是一个 生产者消费者模型,当用户增加工作到线程池相当于生产者生产元素,workers 线程中的线程间接执行工作或者从工作队列外面获取工作则相当于是消费者生产元素。

具体代码如下:

public void execute(Runnable command) {
        //1. 校验工作是否为 null
        if (command == null)
            throw new NullPointerException();
        //2. 获取以后线程池的状态 + 线程个数的组合值
        int c = ctl.get();
        //3. 判断线程池中线程个数是否小于 corePoolSize,小则开启新线程(core 线程)运行
        if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))
                return;
            c = ctl.get();}
        //4. 如果线程池处于 RUNNING 状态,则增加工作到阻塞队列
        if (isRunning(c) && workQueue.offer(command)) {
            //4.1 二次查看
            int recheck = ctl.get();
            //4.2 如果以后线程池状态不是 RUNNING 则从队列中删除工作,并执行回绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //4.3 如果以后线程池为空,则增加一个线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //5. 如果队列满,则新增线程,新增失败则执行回绝策略
        else if (!addWorker(command, false))
            reject(command);
    }

如果以后线程池线程个数大于等于 corePoolSize 则执行代码(4),如果以后线程处于 RUNNING 则增加到工作队列。

须要留神的是这里判断线程池状态是因为有可能线程池曾经处于非 RUNNING 状态,在非 RUNNING 状态下是要摈弃新工作的。

如果工作增加胜利,则执行代码(4.2)进行二次校验,因为在执行 4.2 之前可能线程池的状态发送变动,如果线程池状态不是 RUNNING 则把工作从工作队列中移除,而后执行回绝策略;如果二次校验通过,则从新判断线程池里是否还有线程,没有则新增一个线程。

如果代码(4)增加工作失败,则阐明队列已满,随后执行代码(5)尝试新增线程也就是上图中的 thread3,thread4 线程来执行工作,如果以后线程池个数 > maximumPoolSize 则执行回绝策略。

咱们接下来看 addWorker 办法:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
   // 每次 for 循环都须要获取最新的 ctl 值
    for (;;) {int c = ctl.get();
        int rs = runStateOf(c);
                //1. 查看队列是否只在必要时为空
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
              //2. 循环 CAS 减少线程个数
        for (;;) {int wc = workerCountOf(c);
            //2.1 如果线程个数超限度则返回 false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //2.2 通过 CAS 减少线程个数
            if (compareAndIncrementWorkerCount(c))
                break retry;
            //2.3 CAS 失败后,查看线程状态是否发生变化,如果变动则跳转到外层循环从新尝试获取线程池状态,否则内层循环从新进行 CAS
            c = ctl.get();
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

      //3. 执行到这一步阐明 CAS 胜利
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //3.1 创立 Worker
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            //3.2 减少独占锁,实现同步,因为可能多个线程同时调用线程池的 execute 办法
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //3.3 从新查看线程池状态,防止在获取锁前调用了 shutdown
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //3.4 增加工作
                    workers.add(w);
                  // 更新以后最大线程数量 maximumPoolSize 和 corePoolSize 能够在线程池创立之后动静批改的
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {mainLock.unlock();
            }
            //3.5 增加胜利后启动工作
            if (workerAdded) {t.start();
                workerStarted = true;
            }
        }
    } finally {// 如果没有执行过 t.start() 就要把这个 woker 从 workers 外面删除,并且 ctl 外面 worker 数量减 1
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

首先明确第一局部双重循环目标是通过 CAS 操作进行增加线程数,第二局部次要通过 ReentrantLock 平安的将工作增加到 workers 里,随后启动工作。

首先看第一局部代码(1)。

if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN && //(1)
               firstTask == null && //(2)
               ! workQueue.isEmpty())) //(3)

代码(1)中会在上面三种状况返回 false:

  • (1)以后线程池状态为 STOP、TIDYING 或 TERMINATED
  • (2)以后线程池状态为 SHUTDOWN 并且曾经有了第一个工作
  • (3)以后线程池状态为 SHUTDOWN 并且工作队列为空

代码(2)内层循环的作用是应用 CAS 操作减少线程数量。

执行到代码(8)时,阐明曾经通过 CAS 胜利减少了线程个数,当初工作还没有开始执行,所以这部分代码通过全局锁管制来减少 Worker 到工作汇合 workers 中。

工作线程 Worker 的执行

用户线程提交到线程池之后,由 Worker 来执行,上面是 Worker 的构造函数。

Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);// 创立一个线程
}

在构造函数内首先设置 Worker 的状态为 -1,为了防止以后 Worker 在调用 runWorker 办法前被中断(当其余线程调用了线程池的 shutdownNow 时,如果 Worker 状态 >=0 则会中断该线程)。这里设置了线程的状态为 -1,所以该线程就不会被中断了。

在 runWorker 代码中,运行代码(1)时会调用 unlock 办法,该办法把 status 设置为了 0,所以这时候调用 shutdownNow 会中断 Worker 线程。

final void runWorker(Worker w) {Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); //1. 将 state 设置为 0,容许中断
    boolean completedAbruptly = true;
    try {
        //2.
        while (task != null || (task = getTask()) != null) {
            //2.1
            w.lock();
            //// 如果状态值大于等于 STOP 且以后线程还没有被中断,则被动中断线程
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                //2.2 执行工作前解决操作,默认是一个空实现;在子类中能够通过重写来扭转工作执行前的解决行为
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //2.3 执行工作
                    task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);
                } finally {
                    //2.4 工作之后解决,同 beforeExecute
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                //2.5 统计以后 worker 实现了多少个工作
                w.completedTasks++;
                w.unlock();}
        }
        // 设置为 false,示意工作失常解决实现
        completedAbruptly = false;
    } finally {
        //3. 清理工作
        processWorkerExit(w, completedAbruptly);
    }
}

这里在执行具体任务期间加锁,是为了防止在工作运行期间,其余线程调用了 shutdown 后正在执行的工作被中断(shutdown 只会中断以后被阻塞挂起的线程)

清理工作代码如下:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 如果 completedAbruptly 为 true 则示意工作执行过程中抛出了未解决的异样
    // 所以还没有正确地缩小 worker 计数,这里须要缩小一次 worker 计数
    if (completedAbruptly)
        decrementWorkerCount();

      //1. 统计线程池中实现工作的个数,并从工作汇合外面删除以后 Worker
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {mainLock.unlock();
    }
        //1.2 尝试设置线程池状态为 TERMINATED,在敞开线程池时等到所有 worker 都被回收后再完结线程池
    tryTerminate();
        //1.3 如果线程池状态 < STOP,即 RUNNING 或 SHUTDOWN,则须要思考创立新线程来代替被销毁的线程
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        // 如果 worker 是失常执行完的,则要判断一下是否曾经满足了最小线程数要求
        // 否则间接创立代替线程
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // 从新创立一个 worker 来代替被销毁的线程
        addWorker(null, false);
    }
}

在如上代码中,代码(1.1)统计线程池实现工作个数,并且在统计前加了全局锁。把在当前工作线程中实现的工作累加到全局计数器,而后从工作集中删除以后 Worker。

代码(1.2)判断如果以后线程池状态是 SHUTDOWN 并且工作队列为空,或者以后线程池状态是 STOP 并且以后线程池外面没有流动线程,则设置线程池状态为 TERMINATED。如果设置为了 TERMINATED 状态,则还须要调用条件变量 termination 的 signalAll 办法激活所有因为调用线程池的 awaitTermination 办法而被阻塞的线程。

代码(1.3)则判断以后线程池外面线程个数是否小于外围线程个数,如果是则新增一个线程。

shutdown 办法

调用 shutdown 办法后,线程池就不会再承受新的工作了,然而工作队列外面的工作还是要执行的。该办法会立即返回,并不期待队列工作实现再返回。

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //1. 权限查看
        checkShutdownAccess();
        //2. 设置以后线程池状态为 SHUTDOWN,如果曾经是 SHUTDOWN 则间接返回
        advanceRunState(SHUTDOWN);
        //3. 设置中断标记
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();
    }
    //4. 尝试将状态改为 TERMINATED
    tryTerminate();}

首先查看以后调用的线程是否有敞开线程的权限。

随后代码(2)的代码如下。如果以后线程池状态 >= SHUTDOWN 则间接返回,否则设置为 SHUTDOWN 状态。

private void advanceRunState(int targetState) {for (;;) {int c = ctl.get();
        if (runStateAtLeast(c, targetState) ||
            ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}

代码(3)的源码如下,其设置所有闲暇线程的中断标记。这里首先加了全局锁,同时只有一个线程能够调用 shutdown 办法设置中断标记。而后尝试获取 Worker 本人的锁,获取胜利则设置中断标记。因为正在执行的工作曾经获取了锁,所以正在执行的工作没有被中断。这里中断的是阻塞到 getTask 办法并希图从队列外面获取工作的线程,也就是闲暇线程。

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {t.interrupt();
                } catch (SecurityException ignore) { } finally {w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {mainLock.unlock();
    }
}

最初尝试将状态改为 TERMINATED,首先应用 CAS 设置以后线程池状态为 TIDYING,如果设置胜利则执行扩大接口 terminated 在线程池状态变为 TERMINATED 前做一些事件,而后设置以后线程池状态为 TERMINATED。最初调用 termination.signalAll 激活因调用条件变量 termination 的 await 系列办法而被阻塞的所有线程。

final void tryTerminate() {for (;;) {int c = ctl.get();
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {terminated();
                } finally {ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();}
                return;
            }
        } finally {mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

shutdownNow 办法

调用 shutdownNow 办法后,线程池就不会再承受新的工作了,并且会抛弃工作队列外面的工作,正在执行的工作会被中断,该办法会立即返回,并不期待激活的工作执行实现。返回值为这时候队列外面被抛弃的工作列表。

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //1. 权限查看
        checkShutdownAccess();
        //2. 设置线程池状态为 STOP
        advanceRunState(STOP);
        //3. 中断所有线程
        interruptWorkers();
        //4. 将工作队列挪动到 tasks 中
        tasks = drainQueue();} finally {mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

须要留神的是,中断的所有线程蕴含闲暇线程和正在执行工作的线程。

awaitTermination 办法

当线程调用 awaitTermination 办法后,以后线程会被阻塞,直到线程池状态变为 TERMINATED 才返回,或者等待时间超时才返回。

public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {for (;;) {if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                if (nanos <= 0)
                    return false;
                nanos = termination.awaitNanos(nanos);
            }
        } finally {mainLock.unlock();
        }
}

首先获取独占锁,而后在有限循环外部判断以后线程池状态是否至多是 TERMINATED 状态,如果是则间接返回,否则阐明以后线程池外面还有线程在执行,则看设置的超时工夫 nanos 是否小于 0,小于 0 则阐明不须要期待,那就间接返回,如果大于 0 则调用条件变量 termination 的 awaitNanos 办法期待 nanos 工夫,冀望在这段时间内线程池状态变为 TERMINATED。

总结

线程池奇妙地应用一个 Integer 类型的原子变量来记录线程池状态和线程池中的线程个数。通过线程池状态来管制工作的执行,每个 Worker 线程能够解决多个工作。线程池通过线程的复用缩小了线程创立和销毁的开销。

正文完
 0