关于thread:线程池-ThreadPoolExecutor-详解

7次阅读

共计 5477 个字符,预计需要花费 14 分钟才能阅读完成。

一 为什么要应用线程池
对于操作系统而言,创立一个线程的代价是非常低廉的, 须要给它分配内存、列入调度,同时在线程切换时要执行内存换页,清空 CPU 缓存,切换回来时还要从新从内存中读取信息,毁坏了数据的局部性。因而在并发编程中,当线程创立过多时,会影响程序性能,甚至引起程序解体。
而线程池属于池化管理模式,具备以下长处:

升高资源耗费:通过反复利用已创立的线程升高线程创立和销毁造成的性能耗费。
进步响应速度:当工作达到时,工作能够不须要等到线程创立就能立刻执行。
进步线程的可管理性:可能对线程进行统一分配、调优和监控。

二 线程池原理详解
2.1 线程池外围组成
线程池蕴含 3 个外围局部:

线程汇合:外围线程和工作线程
阻塞队列:用于待执行工作排队
回绝策略处理器:阻塞队列满后,对工作解决进行

2.2 Execute 原理
当一个新工作提交至线程池之后,线程池的解决流程如下:

首先判断以后运行的线程数量是否小于 corePoolSize。如果是,则创立一个工作线程来执行工作;如果都在执行工作,则进入步骤 2。
判断 BlockingQueue 是否曾经满了,若没满,则将工作放入 BlockingQueue;若满了,则进入步骤 3。
判断以后运行的总线程数量是否小于 maximumPoolSize,如果是则创立一个新的工作线程来执行工作。
否则交给 RejectedExecutionHandler 来解决工作。

当 ThreadPoolExecutor 创立新线程时,通过 CAS 来更新线程池的状态 ctl。

三 线程池的应用
线程池的应用次要分为以下三个步骤:

3.1 创立线程池
3.1.1 自定义线程池
线程池的真正实现类是 ThreadPoolExecutor,其构造方法有如下 4 种:
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}
 
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         threadFactory, defaultHandler);
}
 
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}
 
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
复制代码
上面具体来看构造函数须要传入的重点参数:

corePoolSize(必须)线程池中的外围线程数,当提交一个工作时,线程池创立一个新线程执行工作,直到以后线程数等于 corePoolSize, 即便有其余闲暇线程可能执行新来的工作, 也会持续创立线程;如果以后线程数为 corePoolSize,持续提交的工作被保留到阻塞队列中,期待被执行;如果执行了线程池的 prestartAllCoreThreads()办法,线程池会提前创立并启动所有外围线程。
workQueue(必须)用来保留期待被执行的工作的阻塞队列。

ArrayBlockingQueue: 基于数组构造的有界阻塞队列,按 FIFO 排序工作;
LinkedBlockingQueue: 基于链表构造的阻塞队列,按 FIFO 排序工作,吞吐量通常要高于 ArrayBlockingQueue;
SynchronousQueue: 一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作始终处于阻塞状态,吞吐量通常要高于 LinkedBlockingQueue;
PriorityBlockingQueue: 具备优先级的无界阻塞队列;

maximumPoolSize(必须)线程池中能包容的最大线程数。如果以后阻塞队列满了,且持续提交工作,则创立新的线程执行工作,前提是以后线程数小于 maximumPoolSize;当阻塞队列是无界队列, 则 maximumPoolSize 则不起作用, 因为无奈提交至外围线程池的线程会始终继续地放入 workQueue。
keepAliveTime(必须)线程闲置超时时长。如果超过该时长,非核心线程就会被回收。如果将 allowCoreThreadTimeout 设置为 true 时,外围线程也会超时回收。
unit(必须)keepAliveTime 的单位,罕用的有:TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分)
threadFactory(可选)创立线程的工厂,通过自定义的线程工厂能够给每个新建的线程设置一个具备辨认度的线程名。默认为 DefaultThreadFactory
handler(可选)线程池的饱和策略,当阻塞队列满了,且没有闲暇的工作线程,如果持续提交工作,必须采取一种策略解决该工作,线程池提供了 4 种策略:

AbortPolicy: 间接抛出异样,默认策略;
CallerRunsPolicy: 用调用者所在的线程来执行工作;
DiscardOldestPolicy: 抛弃阻塞队列中靠最前的工作,并执行当前任务;
DiscardPolicy: 间接抛弃工作;
也能够依据利用场景实现 RejectedExecutionHandler 接口,自定义饱和策略,如记录日志或长久化存储不能解决的工作。

3.1.2 性能线程池
除了调用 ThreadPoolExecutor 自定义线程池的形式,其实 Executors 也曾经为咱们封装好了 4 种常见的性能线程池,如下:

定长线程池(FixedThreadPool)
定时线程池(ScheduledThreadPool)
可缓存线程池(CachedThreadPool)
单线程化线程池(SingleThreadExecutor)

定长线程池(FixedThreadPool)

特点:只有外围线程,线程数量固定,执行完立刻回收,工作队列为链表构造的有界队列。
利用场景:控制线程最大并发数。
应用示例:

// 1. 创立定长线程池对象 & 设置线程池线程数量固定为 3
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
// 2. 创立好 Runnable 类线程对象 & 需执行的工作
Runnable task = new Runnable(){
  public void run() {
     …// 待执行的耗时工作
  }
};
// 3. 向线程池提交工作
fixedThreadPool.execute(task);
复制代码
定时线程池(ScheduledThreadPool)

特点:外围线程数量固定,非核心线程数量有限,执行完闲置 10ms 后回收,工作队列为延时阻塞队列。
利用场景:执行定时或周期性的工作。
应用示例:

// 1. 创立定时线程池对象 & 设置线程池线程数量固定为 5
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
// 2. 创立好 Runnable 类线程对象 & 需执行的工作
Runnable task =new Runnable(){
  public void run() {
     …// 待执行的耗时工作
  }
};
// 3. 向线程池提交工作
scheduledThreadPool.schedule(task, 1, TimeUnit.SECONDS); // 提早 1s 后执行工作
复制代码
可缓存线程池(CachedThreadPool)

特点:无外围线程,非核心线程数量有限,执行完闲置 60s 后回收,工作队列为不存储元素的阻塞队列。
利用场景:执行大量、耗时少的工作。
应用示例:

// 1. 创立可缓存线程池对象
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// 2. 创立好 Runnable 类线程对象 & 需执行的工作
Runnable task =new Runnable(){
  public void run() {
     …// 待执行的耗时工作
  }
};
// 3. 向线程池提交工作
cachedThreadPool.execute(task);
复制代码
单线程化线程池(SingleThreadExecutor)

特点:只有 1 个外围线程,无非外围线程,执行完立刻回收,工作队列为链表构造的有界队列。
利用场景:不适宜并发但可能引起 IO 阻塞性及影响 UI 线程响应的操作,如数据库操作、文件操作等。
应用示例:

// 1. 创立单线程化线程池
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
// 2. 创立好 Runnable 类线程对象 & 需执行的工作
Runnable task =new Runnable(){
  public void run() {
     …// 待执行的耗时工作
  }
};
// 3. 向线程池提交工作
singleThreadExecutor.execute(task);
复制代码
3.1.3 性能线程池存在的问题
目前已不举荐应用性能线程池,而是通过自定义 ThreadPoolExecutor 的形式。因为间接应用性能线程池具备资源耗尽的危险。

newFixedThreadPool 和 newSingleThreadExecutor: 次要问题是沉积的申请解决队列均采纳 LinkedBlockingQueue,可能会消耗十分大的内存,甚至 OOM。
newCachedThreadPool 和 newScheduledThreadPool: 次要问题是线程数最大数是 Integer.MAX_VALUE,可能会创立数量十分多的线程,甚至 OOM。

3.2 向线程池提交工作
向线程池提交工作的流程非常简单,只须要向线程池的 execute 办法传入 Runnable 对象即可。
// 向线程池提交工作
threadPool.execute(new Runnable() {
    @Override
    public void run() {
        … // 待执行的工作
    }
});

正文完
 0