ThreadPoolExecutor
ThreadPoolExecutor 的创建
ThreadPoolExecutor 提供了 4 种构造方法,以最多参数的为例
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
序号 | 参数名 | 类型 | 含义 |
---|---|---|---|
1 | corePoolSize | int | 核心线程数 |
2 | maximumPoolSize | int | 最大线程数 |
3 | keepAliveTime | long | 线程最大空闲时间 |
4 | unit | TimeUnit | 空闲时间单位 |
5 | workQueue | BlockingQueue<Runnable> | 工作队列 |
6 | threadFactory | ThreadFactory | 线程工厂 |
7 | handler | RejectedExecutionHandler | 拒绝策略 |
workQueue
移步 BlockingQueue
threadFactory
新的线程通过指定的 ThreadFactory 创建。如果未指定,则使用 Executors.defaultThreadFactory 默认工厂,创建的线程将全部属于同一个 ThreadGroup 中。
DefaultThreadFactory() {SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
所有的线程具有相同的 NORM_PRIORITY 优先级和非守护进程状态。
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
通过指定的 ThreadFactory,可以控制线程的名称,线程组,优先级,守护进程状态等。
handler
ThreadPoolExecutor.AbortPolicy 抛出 java.util.concurrent.RejectedExecutionException 异常
ThreadPoolExecutor.CallerRunsPolicy 线程调用运行该任务的 execute 本身运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务
ThreadPoolExecutor.DiscardPolicy 默认情况下将丢弃被拒绝的任务
ThreadPoolExecutor.DiscardOldestPolicy 如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序
ThreadPoolExecutor 的工作流程
当一个任务通过 execute(Runnable) 方法欲添加到线程池时
如果此时线程池中的数量小于 corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue 未满,那么任务被放入缓冲队列。
如果此时线程池中的数量大于 corePoolSize,缓冲队列 workQueue 满,并且线程池中的数量小于 maximumPoolSize,建新的线程来处理被添加的任务。
如果此时线程池中的数量大于 corePoolSize,缓冲队列 workQueue 满,并且线程池中的数量等于 maximumPoolSize,就要通过 handler 所指定的策略来处理被拒绝的任务。
当线程池中的线程数量大于 corePoolSize 时,如果某线程空闲时间超过 keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。
Executors 提供的预定义线程池
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
corePoolSize 与 maximumPoolSize 相等,所有线程都是核心线程,线程池大小固定;
keepAliveTime = 0 该参数无效,因为 FixedThreadPool 全部为核心线程;
workQueue = LinkedBlockingQueue,缺省初始化大小时队列最大长度为 Integer.MAX_VALUE,实际上有内存大小控制队列实际长度,(JVM 线程分配内存 -Xss)如果任务提交速度持续大余任务处理速度,大量线程阻塞在队列中,可能在拒绝策略前 OOM;
LinkedBlockingQueue 中的 putLock 和 takeLock 皆为非公平锁,因此 FixedThreadPool 的任务执行是无序的;
newSingleThreadPool
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
特殊的 FixedThreadPool,线程池大小固定 1,单线程执行
newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
corePoolSize = 0,maximumPoolSize = Integer.MAX_VALUE,执行线程数无限制(实际数量由虚拟机内存限制);
keepAliveTime = 60s,线程空闲 60s 后回收。
workQueue = SynchronousQueue,同步队列,此队列入队必须出队必须同时传递,队列内部不会存储元素,因此 CachedThreadPool 线程实际上不会有队列等待;
newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
DelayedWorkQueue 保证添加到队列中的任务,会按照任务的延时时间进行排序,延时时间少的任务首先被获取
newWorkStealingPool
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
newWorkStealingPool 是一个并行的线程池,参数中传入的是一个线程并发的数量,和之前 4 种线程池不同,这个线程池不会保证任务的顺序执行,也就是 WorkStealing(工作窃取)的意思,抢占式的工作会创建一个含有足够多线程的线程池,来维持相应的并行级别,它会通过工作窃取的方式使多核的 CPU 不会闲置,总会有活着的线程让 CPU 运行