深度分析Java并发编程之线程池技术看完面试这个再也不慌了

48次阅读

共计 10955 个字符,预计需要花费 28 分钟才能阅读完成。

线程池的好处

Java 中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池,相对于单线程串行处理(Serial Processing)和为每一个任务分配一个新线程(One Task One New Thread)的做法能够带来 3 个好处。

  1. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  2. 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  3. 提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。但是,要做到合理利用线程池,必须对其实现原理了如指掌。

线程池的实现原理

下面所有的介绍都是基于 JDK 1.8 源码。

架构设计

Java 中的线程池核心实现类是 ThreadPoolExecutor。这个类的设计是继承了 AbstractExecutorService 抽象类和实现了 ExecutorService,Executor 两个接口,关系大致如下图所示:

下面将从顶向下逐个介绍这个 4 个接口与类。

Executor

顶层接口 Executor 提供了一种将 任务提交 每个任务的执行机制 (包括线程使用的细节以及线程调度等) 解耦分开的方法。使用 Executor 可以避免显式的创建线程。例如,对于一系列的任务,你可能会使用下列这种方式来代替 new Thread(new(RunnableTask())).start() 的方式:

Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());

Executor 接口提供了一个接口方法,用来在未来的某段时间执行指定的任务。指定的任务

  1. 可能由一个新创建的线程执行;
  2. 可能由一个线程池中空闲的线程执行;
  3. 也可能由方法的调用线程执行。

这些可能执行方式都取决于 Executor 接口实现类的设计或实现方式。

public interface Executor {void execute(Runnable command);
}

Serial Processing

事实上,Executor 接口并没有严格的要求线程的执行需要异步进行。最简单的接口实现方法是,将所有的任务以调用方法的线程执行。

class DirectExecutor implements Executor {public void execute(Runnable r) {r.run();
   }
}

这种实际上就是上面提到的 Serial Processing 的方式。假设,我们现在以这种方式去实现一个响应请求的服务器应用。那么,这种实现方式虽然在理论上是正确的。

  1. 但是其性能却非常差,因为它每次只能响应处理一个请求。如果有大量请求则只能串行响应。
  2. 同时,如果服务器响应逻辑里面有文件 I / O 或者数据库操作,服务器需要等待这些操作完成才能继续执行。这个时候如果阻塞的时间过长,服务器资源利用率就很低。这样,在等待过程中,服务器 CPU 将处于空闲状态。

综上,这种 Serial Processing 的方式方式就会有 无法快速响应问题 低吞吐率 问题。

One Task One New Thread

不过,更典型的实现方式是,任务由一些其他的线程执行而不是方法调用的线程执行。例如,下面的 Executor 的实现方法是对于每一个任务都新建一个线程去执行。

class ThreadPerTaskExecutor implements Executor {public void execute(Runnable r) {new Thread(r).start();}
}

这种方式实际上就是上面提到的 One Task One New Thread 的方式,这种无限创建线程的方法也有很多问题。

  1. 线程生命周期的开销非常高。如果有大量任务需要执行,那么就需要创建大量线程。这样就会造成线程生命周期的创建和销毁的开销非常大。
  2. 资源消耗。活跃的线程会消耗系统资源,尤其是内存。如果,已经有足够多的线程使所有的 CPU 保持忙碌状态,那么在创建更多的线程反而会降低性能。最简单的例子是,一个 4 核的 CPU 机器,对于 100 个任务创建 100 个线程去执行。
  3. 稳定性。可创建线程的数量上存在一个限制。这个限制受 JVM 启动参数,栈大小以及底层操作系统对线程的限制等因素。超过了这个限制,就可能抛出 OutOfMemoryError 异常。

ExecutorService

ExecutorService 接口是继承自 Executor 接口,并增加了一些接口方法。接口也可以继承?以前没注意,现在学习到了。这里介绍下 接口继承的语义

  1. 接口 Executor 有 execute(Runnable)方法,接口 ExecutorService 继承 Executor,不用复写 Executor 的方法。只需要,写自己的方法 (业务) 即可。
  2. 当一个类 ThreadPoolExecutor 要实现 ExecutorService 接口的时候,需要实现 ExecutorService 和 Executor 两个接口的方法。

ExecutorService 大致新增了 2 类接口方法:

  1. ExecutorService 的关闭方法。对于线程池实现,这些方法的具体实现在 ThreadPoolExecutor 里面。
  2. 扩充异步执行任务的方法。对于线程池实现,用的这类方法都是 AbstractExecutorService 抽象类里面实现的模板方法。

AbstractExecutorService

抽象类 AbstractExecutorService 提供了 ExecutorService 接口类中各种 submit 异步执行方法的实现 ,这些方法与 Executor.execute(Runnable) 相比,它们都是有返回值的。同时,这些方法的实现的最终都是调用 ThreadPoolExecutor 类中实现的 execute(Runnable)方法。

尽管说 submit 方法能提供线程执行的返回值,但只有实现了 Callable 才会有返回值,而实现 Runnable 的返回值是 null。

    public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

除此之外,这个抽象类中还有 ExecutorService 接口类中 invokeAny 和 invokeAll 方法的实现。这里就只是简单介绍下这 2 个种方法的语义。

invokeAny

  1. invokeAny() 接收一个包含 Callable 对象的集合 作为参数。调用该方法不会返回 Future 对象,而是返回集合中 某一个 Callable 对象的运行结果
  2. 这个方法没法保证调用之后返回的结果是哪一个 Callable,只知道它是这些 Callable 中一个执行结束的 Callable 对象。

invokeAll

  1. invokeAll 接受一个包含 Callable 对象的集合 作为参数。调用该方法会返回一个 Future 对象的列表,对应输入的 Callable 对象的集合的运行结果。
  2. 这里 提交的任务容器列表和返回的 Future 列表存在顺序对应的关系

ThreadPoolExecutor

execute(Runnable)方法

线程池是如何执行输入的任务,这个整个线程池实现的核心逻辑,我们从这个方法开始学习。其代码如下所示:

    public void execute(Runnable command) {if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))
                return;
            c = ctl.get();}
        if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

可以发现,当提交一个新任务到线程池时,线程池的处理流程如下:

  1. 判断线程池中工作的线程数是否小于核心线程数(corePoolSize)。如果是,则新建一个新的工作线程来执行任务(需要获取全局锁)。否则,进入下个流程。
  2. 判断线程池的工作队列 (BlockingQeue) 是否已满。如果未满,将新加的任务存储在工作队列中。否则,进入下个流程。
  3. 判断线程池中工作的线程数是否小于最大线程数(maximumPoolSize)。如果小于,则新建一个工作线程来执行任务(需要获取全局锁)。
  4. 如果大于或者等于,则交给饱和策略处理这个任务。

新提交任务处理流程图

以流程图来说明的话,线程池处理一个新提交的任务的流程如下图所示:

ThreadPoolExecutor 执行示意图

从上面的内容,我们可以发现线程池对于一个新任务有 4 种处理的可能,分别对应于上面处理流程的 4 个步骤。

ThreadPoolExecutor 采取上述步骤的总体设计思路,是为了在执行 execute()方法时,尽可能地 避免获取全局锁 (那将会是一个严重的可伸缩瓶颈)。在 ThreadPoolExecutor 完成预热之后(当前运行的线程数大于等于 corePoolSize),几乎所有的 execute()方法调用都是执行步骤 2,而步骤 2 不需要获取全局锁。

工作线程

从上面 execute(Runnable)的代码我们可以发现,线程池创建线程时,会将线程封装成工作线程 Worker,Worker 在执行完任务后,还会循环获取工作队列里的任务来执行。

ThreadPoolExecutor 中线程执行任务的示意图如下所示:

线程池中的线程执行任务分两种情况:

  1. 在 execute()方法中创建一个线程时,会让这个线程执行当前任务。
  2. 这个线程执行完上图中 1 的任务后,会反复从 BlockingQueue 获取任务来执行。

ThreadPoolExecutor 的 ctl 变量

ctl 是一个 AtomicInteger 的类,保存的 int 变量的更新都是原子操作,保证线程安全。它的 前面 3 位用来表示线程池状态,后面 29 位用来表示工程线程数量

ThreadPoolExecutor 的状态

线程池的状态有 5 种:

  1. Running:线程池处在 Running 的状态时,能够接收新任务,以及对已添加的任务进行处理 。线程池的初始化状态是 RUNNING。换句话说, 线程池被一旦被创建,就处于 Running 状态,并且线程池中的任务数为 0。

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

  2. Shutdown: 线程池处在 SHUTDOWN 状态时,不接收新任务,但能处理已添加 (正在运行的以及在 BlockingQueue) 的任务 。调用线程池的 shutdown() 接口时,线程池由 RUNNING -> SHUTDOWN。
  3. Stop: 线程池处在 STOP 状态时,不接收新任务,不处理已添加的任务,并且会中断正在运行的任务 。调用线程池的 shutdownNow() 接口时,线程池由(RUNNING or SHUTDOWN) -> STOP。
  4. Tidying: 当所有的任务已终止,ctl 记录的”任务数量”为 0,线程池会变为 Tidying 状态。当线程池变为 Tidying 状态时,会执行钩子函数 terminated()。terminated()在 ThreadPoolExecutor 类中是空的,若用户想在线程池变为 Tidying 时,进行相应的处理;可以通过重载 terminated()函数来实现。
  5. Terminated: 线程池彻底终止,就变成 Terminated 状态。线程池处在 Tidying 状态时,执行完 terminated()之后,就会由 Tidying -> Terminated。

线程池的使用

线程池的创建

我们可以通过 ThreadPoolExecutor 的构造函数来创建一个线程池。

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
  1. corePoolSize(线程池的核心线程数): 线程池要保持的线程数目,即使是他们是空闲也不会停止。当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建 。如果调用了线程池的 prestartAllCoreThreads() 方法,线程池会提前创建并启动所有基本线程。
  2. maximumPoolSize(线程池的最大线程数): 线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是,如果使用了 无界的任务队列这个参数就没什么效果
  3. keepAliveTime(线程活动保持时间): 当线程池中的线程数大于 corePoolSize 时,keepAliveTime 为多余的空闲线程等待新任务的最长保持存活的时间。所以,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。
  4. unit(线程活动保持时间的单位) : 可选的单位有天(DAYS)、小时(HOURS)、分钟(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和纳秒(NANOSECONDS,千分之一微秒)。
  5. runnableTaskQueue(任务队列):用于 保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列。
  • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
  • LinkedBlockingQueue:一个基于链表结构的无界阻塞队列,此队列按 FIFO 排序元素,吞吐量通常要高于 ArrayBlockingQueue。静态工厂方法 Executors.newFixedThreadPool()使用了这个队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 Linked-BlockingQueue,静态工厂方法 Executors.newCachedThreadPool 使用了这个队列。
  • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
  1. ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。
  2. RejectedExecutionHandler(饱和策略):当 ThreadPoolExecutor 已经关闭或 ThreadPoolExecutor 已经饱和 时(达到了最大线程池大小且工作队列已满),execute()方法将要调用的 Handler,那么必须采取 一种策略处理提交的新任务。这个策略默认情况下是 AbortPolicy。Java 线程池框架提供了以下 4 种策略:

    • AbortPolicy:直接抛出异常
    • CallerRunsPolicy:只用调用者所在线程来运行任务
    • DiscardOldestPolicy:丢弃队列里最老的一个任务,并执行当前任务
    • DiscardPolicy:不处理,丢弃掉

常用 ThreadPoolExecutor

通过 Executor 框架的工具类 Executors,可以创建以下 3 种类型的 ThreadPoolExecutor。通过源码可以发现这 3 种线程池的本质都是不同输入参数配置的 ThreadPoolExecutor。

FixedThreadPool

FixedThreadPool 被称为 可重用固定线程数的线程池。下面是 FixedThreadPool 的源代码实现。

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

注意到,

  1. FixedThreadPool 的 corePoolSize 和 maximumPoolSize 都被设置为创建时的同一个指定的参数 nThreads。
  2. 任务阻塞队列使用的是无界队列 new LinkedBlockingQueue()。
  3. keepAliveTime 设置为 0。
  4. ThreadFactory 和 RejectedExecutionHandler 皆使用的默认值。

FixedThreadPool 的 execute()方法的运行示意图如下所示:

其运行说明:

  1. 如果当前运行的线程数少于 corePoolSize,则创建新线程来执行任务。
  2. 在线程池完成预热之后(当前运行的线程数等于 corePoolSize),将任务加入 LinkedBlockingQueue。
  3. 线程执行完 1 中的任务后,会在循环中反复从 LinkedBlockingQueue 获取任务来执行。

FixedThreadPool 使用无界队列 LinkedBlockingQueue 作为线程池的工作队列(队列的容量为 Integer.MAX_VALUE)对线程池会带来如下影响:

  1. 当线程池中的线程数达到 corePoolSize 后,新任务将在无界队列中等待。由于无界队列永远不会满,因此线程池中的线程数不会超过 corePoolSize。
  2. 由于 1,使用无界队列时 maximumPoolSize 将是一个无效参数。
  3. 由于 1 和 2,使用无界队列时 keepAliveTime 将是一个无效参数。不会有超过 corePoolSize 的线程数目。
  4. 由于使用无界队列。运行中的 FixedThreadPool(未执行方法 shutdown()或 shutdownNow())不会拒绝任务(不会调用 RejectedExecutionHandler.rejectedExecution 方法)。
SingleThreadExecutor

SingleThreadExecutor 是使用单个 worker 线程的 Executor。SingleThreadExecutor 与 FixedThreadPool 类似,只是它的 corePoolSize 和 maximumPoolSize 被设置为 1。下面是 SingleThreadExecutor 的源代码实现。

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
CachedThreadPool

CachedThreadPool 是一个会根据需要创建新线程的线程池。下面是创建 CachedThread-Pool 的源代码。

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

注意到:

  1. CachedThreadPool 的 corePoolSize 被设置为 0,即 corePool 为空;maximumPoolSize 被设置为 Integer.MAX_VALUE,即 maximumPool 是无界的。
  2. keepAliveTime 设置为 60L,意味着 CachedThreadPool 中的空闲线程等待新任务的最长时间为 60 秒,空闲线程超过 60 秒后将会被终止。
  3. CachedThreadPool 使用没有容量的 SynchronousQueue 作为线程池的工作队列,但 CachedThreadPool 的 maximumPool 是无界的。这意味着,如果主线程提交任务的速度高于 maximumPool 中线程处理任务的速度时,CachedThreadPool 会不断创建新线程。极端情况下,CachedThreadPool 会因为创建过多线程而耗尽 CPU 和内存资源。

CacheThreadPool 的 execute()方法的执行过程如下图所示:

其执行过程的说明如下:

  1. 首先执行 SynchronousQueue.offer(Runnable task)。如果当前 maximumPool 中有空闲线程正在执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主线程执行 offer 操作与空闲线程执行的 poll 操作配对成功,主线程把任务交给空闲线程执行;否则执行下面的步骤 2。
  2. 当初始 maximumPool 为空,或者 maximumPool 中当前没有空闲线程时,将没有线程执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这种情况下,CachedThreadPool 将会创建一个新线程执行任务。
  3. 步骤 2 中新创建的线程将任务执行完后,会执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这个 poll 操作会让空闲线程最多在 SynchronousQueue 中等待 60 秒钟。如果 60 秒钟内主线程提交了一个新任务(主线程执行步骤 1),那么这个空闲线程将执行主线程提交的新任务;否则,这个空闲线程将终止。由于空闲 60 秒的空闲线程会被终止,因此长时间保持空闲的 CachedThreadPool 不会使用任何资源。

向线程池提交任务

可以使用两个方法向线程池提交任务,分别为 execute()和 submit()方法。

  1. execute()方法用于提交 不需要返回值的任务 ,所以无法判断任务是否被线程池执行成功。一般 execute() 方法输入的任务是一个 Runnable 类的实例。
  2. submit()方法用于提交 需要返回值的任务 。线程池会返回一个 future 类型的对象,通过这个 future 对象可以判断任务是否执行成功,并且可以通过future 的 get() 方法来获取返回值 get() 方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

关闭线程池

可以通过调用线程池的 shutdown 或者 shutdownNow 方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的 interrupt 方法来中断线程,所以无法响应中断的任务可能永远无法终止。但是它们存在一定的区别。

  1. shutdown 首先将线程池的状态设置成 SHUTDOWN。然后 阻止新提交的任务 ,对于新提交的任务,如果测试到状态不为 RUNNING,则抛出 rejectedExecution。对于 已经提交 (正在运行的以及在任务队列中的) 任务不会产生任何影响 。同时会将那些 闲置的线程 (idleWorkers) 进行中断
  2. shutdownNow 首先将线程池的状态设置成 STOP。然后 阻止新提交的任务 ,对于新提交的任务,如果测试到状态不为 RUNNING,则抛出 rejectedExecution 同时会 中断当前正在运行的线程 。另外它还将 BolckingQueue 中的任务给移除,并将 这些任务添加到列表中进行返回

线程池的监控

可以通过线程池提供的参数进行监控,在监控线程池的时候可以使用以下属性:

  1. taskCount:线程池需要执行的任务数量。
  2. completedTaskCount:线程池在运行过程中已完成的任务数量,小于或等于 taskCount。
  3. largestPoolSize:线程池里曾经创建过的最大线程数量。通过这个数据可以知道线程池是 否曾经满过。如该数值等于线程池的最大大小,则表示线程池曾经满过。
  4. getPoolSize:线程池的线程数量。如果线程池不销毁的话,线程池里的线程不会自动销 毁,所以这个大小只增不减。
  5. getActiveCount:获取活动的线程数。

另外,通过扩展线程池进行监控。可以通过继承线程池来自定义线程池,重写线程池的 beforeExecute、afterExecute 和 terminated 方法,也可以在任务执行前、执行后和线程池关闭前执行一些代码来进行监控。例如,监控任务的平均执行时间、最大执行时间和最小执行时间等。这几个方法在线程池里是空方法。

正文完
 0