Java多线程Executor框架

33次阅读

共计 4956 个字符,预计需要花费 13 分钟才能阅读完成。

前言

Java 线程的创建与销毁需要一定的开销,因此为每一个任务创建一个新线程来执行,线程的创建与开销将浪费大量计算资源。而且,如果不对创建线程的数量做限制,可能会导致系统负荷太高而崩溃。Java 的线程既是工作单元,也是执行机制。JDK1.5 之后,工作单元与执行机制分离,工作单元包括 Runnable 和 Callable,执行机制由 Executor 框架执行。

1.Executor 框架简介

Executor 框架的两级调度模型

    在 HotSpot 线程模型中,Java 线程被一对一的映射为本地操作系统线程。(Java 线程启动时,会创建一个本地操作系统线程,当 Java 线程终止时,对应的操作系统线程也会回收)而操作系统会调度所有的线程分配给可用的 CPU。
    Java 多线程程序通常把应用分解成若干个任务。然后使用 Executor 框架将这些任务映射为固定数量的线程。
    这种二级调度模型如下图所示:

Executor 框架的结构与成员

Executor 的框架结构由三大部分构成

  1. 任务。

      被执行任务需要实现的接口:Runnable 接口或 Callable 接口。

  1. 任务的执行

      包括任务执行机制的接口核心接口 Executor 以及继承自 Executor 的 ExecutorService 接口。

  1. 异步计算的结果

      包括接口 Future 和实现 FutureTask 接口的 FutureTask 类。
Executor 框架的主要成员

  • ThreadPoolExcutor

      线程池的核心实现类,用来执行被提交的任务。使用工厂类 Executor 来创建。Executor 可以创建三种类型的 ThreadPoolExcutor。如下:
     1.FixedThreadPoolExcutor:重用指定数目(nThreads)的线程,其背后使用的是无界的工作队列,任何时候最多有 nThreads 个工作线程是活动的。这意味着,如果任务数量超过了活动队列数目,将在工作队列中等待空闲线程出现;如果有工作线程退出,将会有新的工作线程被创建,以补足指定的数目 nThreads。适用于为了满足资源管理的需求,而需要限制当前线程数量的应用场景,它适用于负载比较重的服务器。
     2.SingleThreadPoolExcutor:它的特点在于工作线程数目被限制为 1,操作一个无界的工作队列,所以它保证了所有任务的都是被顺序执行,最多会有一个任务处于活动状态,并且不允许使用者改动线程池实例,因此可以避免其改变线程数目。。适用于需要保证顺序地执行各个任务;并且在任意时间点不会有多个线程的应用场景。
     3.CachedThreadPoolExcutor: 它是一种用来处理大量短时间工作任务的线程池,具有几个鲜明特点:它会试图缓存线程并重用,当无缓存线程可用时,就会创建新的工作线程;如果线程闲置的时间超过 60 秒,则被终止并移出缓存;长时间闲置时,这种线程池,不会消耗什么资源。其内部使用 SynchronousQueue 作为工作队列。适用于执行很多短期异步任务的小程序或者是负载较轻的服务器。

  • ScheduledThreadPoolExcutor

      可以在给定的延迟后运行命令,或者定期执行命令。他可以创建两种类型的 ScheduledThreadPoolExcutor。
      ScheduledThreadPool 和 SingleThreadScheduledExecutor,可以进行定时或周期性的工作调度,区别在区别于单一工作线程还是多个工作线程。

  • Future 接口

      Future 接口和实现 Future 接口的 FutureTask 类用来表示异步计算的结果。当把 Runnable 接口或 Callable 接口的实现类提交给 ThreadPoolExecutor 时,ThreadPoolExecutor 会向我们返回一个 FutureTask 对象。
— Runnable 接口和 Callable 接口
      Runnable 接口和 Callable 接口的实现类,都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。他们的区别是 Runnable 不会返回结果,而 Callable 可以返回结果。除了可以自己创建实习 Callable 接口的对象外,还可以使用工厂类 Executors 来把一个 Runnable 包装成一个 Callable。
各成员工作方式如下:

2.ThreadPoolExecutor 详解

关于 ThreadPoolExecutor 的工作原理参考之前的博客:线程池工作原理
Executor 框架最核心的类是 ThreadPoolExecutor。它是线程池的实现类,先看一下它的源码:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

调用该方法时要传入的参数有以下几个

  • corePoolSize:核心线程池的大小。
  • maximumPoolSize:最大线程池的大小。
  • unit:keepAliveTime 对应的时间单位。
  • keepAliveTime:线程池的工作线程空闲后,保存存活的时间。默认对核心线程池的线程不生效。
  • workQueue:用来暂时保存任务的工作队列。

方法内部自动调用的参数

  • RejectedExecutionHandle(饱和策略)当 ThreadPoolExecutor 已经关闭或饱和时(达到最大线程池大小且工作队列已满),executor 方法将要调用的 Handler。
  • ThreadFactory 用于创建线程的工厂。

补充
饱和策略有以下几种:

  • AbortPolicy: 直接抛出异常。
  • CallerRunsPolicy: 使用调用者所在线程来运行任务。
  • DiscardOldestPolicy: 丢弃队列里最近的一个任务,并执行当前任务。
  • DiscardPolicy: 不处理,丢弃掉。

存放线程的任务队列有以下几种:

  • ArrayBlockingQueue: 基于数组结构的有界阻塞队列,遵循 FIFO。
  • LinkedBlockingQueue:基于链表结构的无界阻塞队列,遵循 FIFO。吞吐量比 ArrayBlockingQueue 高。
  • SynchronousQueue: 不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。吞吐量比 LinkedBlockingQueue 高。
  • PriorityBlockingQueue:一个具有优先级的无阻塞队列。

下面详细介绍几种 ThreadPoolExecutor

1.FixedThreadPool

    可重用固定线程数的线程池。它的源码如下:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    } 

通过它的传参可以看到以下信息:
1.corePoolSize 和 maximumPoolSize 都被设置成指定大小 nThread。
2.keepAliveTime 被设置成了 0。
3. 使用的无界队列 LinkedBlockingQueue。
结合线程池的工作原理可以知道 FixedThreadPool 的工作流程如下:
1. 如果当前运行的线程数少于 corePoolSize, 则创建新线程来执行任务。
2. 当前运行的线程数大于 corePoolSize 后(完成预热),将任务加入无界队列中。
3. 线程执行完当前的任务后,会循环反复的从 LinkedBlockingQueue 中获取任务来执行。
由于使用了无界队列,当线程池中的任务达到 corePoolSize 后,新任务可以一直加入到队列中,不存在队列满的情况,因此也不会执行的后续的操作(队列满了后判断当前线程数是否大于 maximumPoolSize, 以及执行饱和策略),所以 maximumPoolSize,keepAliveTime 以及 defaultHandler 都是无效参数。

2.SingleThreadPool

      它的源码如下:

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

通过构造参数可以得到如下信息:
1.corePoolSize 和 maximumPoolSize 都被设置成指定为 1。
2. 其他同 FixedThreadPool。
结合线程池的工作原理可以知道 SingleThreadPool 的工作流程如下:
1. 如果线程池当前无运行的线程,则创建一个新线程。
2. 在线程池预热后(当前线程池有一个运行的线程),将任务将入 LinkedBlockingQueue。
3. 线程执行任务后,会循环反复的从队列中获取新任务来执行。
从上面分析可知,SingleThreadPool 能满足任务按照顺序执行的场景。

3.CachedThreadPoolExecutor

CachedThreadPoolExecutor 是一个会根据需要创建新线程的线程池。它的源码如下:

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

通过构造参数可以得到如下信息:
1.corePoolSize 设置为 0,,maximumPoolSize 都被设置成 Integer.MAX_VALUE。
2.keepAliveTime 被设置成了 60s。
3. 使用了没有容量的 SynchronousQueue 作为线程池的工作队列。
这种线程池的工作流程如下:
1. 首先执行 SynchronousQueue.offer(Runnable task)。如果当前 maximumPool 中有空闲线程正在执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS), 那么主线程执行 offer 操作与空闲线程执行的 poll 操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成; 否则执行第二步。
2. 如果当前 maximumPool 中为空或没有空闲线程时,将没有线程执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这种情况下,CachedThreadPool 会创建一个新线程执行任务,execute()方法执行完成。
3. 在步骤 2 中新建线程将任务执行完成后,会执行 poll,这个 poll 操作会让空闲线程最多在 SynchronousQueue 等待 60 秒,如果 60 秒内主线程提交了一个新任务(执行步骤 1),那么这个空闲线程将执行主线程提交的新任务;否则,这个空闲线程将终止。因此长时间保持空闲的 CachedThreadPool 不会使用任何 cpu 资源。
过程有点小复杂,画个图表示一下:

待补充:ScheduledThreadPoolExecutor 及 Future 详解。

正文完
 0