对于线程池
线程池就是首先创立一些线程,它们的汇合称为线程池。应用线程池能够很好地进步性能,线程池在系统启动时即创立大量闲暇的线程,程序将一个工作传给线程池,线程池就会启动一条线程来执行这个工作,执行完结当前,该线程并不会死亡,而是再次返回线程池中成为闲暇状态,期待执行下一个工作
为什么要应用线程池
多线程运行工夫,零碎一直的启动和敞开新线程,老本十分高,会过渡耗费系统资源,以及过渡切换线程的危险,从而可能导致系统资源的解体。这时,线程池就是最好的抉择了
ThreadPoolExecutor 类
Java 外面线程池的顶级接口是 Executor,然而严格意义上讲 Executor 并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是 ExecutorService
ExecutorService | 真正的线程池接口。 |
---|---|
ScheduledExecutorService | 能和 Timer/TimerTask 相似,解决那些须要工作反复执行的问题。 |
ThreadPoolExecutor | ExecutorService 的默认实现。 |
ScheduledThreadPoolExecutor | 继承 ThreadPoolExecutor 的 ScheduledExecutorService 接口实现,周期性任务调度的类实现。 |
ThreadPoolExecutor
咱们来看一下 ThreadPoolExecutor 的具体实现:
在 ThreadPoolExecutor 中有四个构造方法
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* Core pool size is the minimum number of workers to keep alive 外围线程数量是维持线程池存活的最小数量,而且不容许超时,除非设置 allowCoreThreadTimeOut,在这种状况下最小值为 0
* (and not allow to time out etc) unless allowCoreThreadTimeOut
* is set, in which case the minimum is zero.
*/
private volatile int corePoolSize;
/**
* Maximum pool size. Note that the actual maximum is internally 最大线程数量,留神,理论最大数量受容量限度
* bounded by CAPACITY.
*/
private volatile int maximumPoolSize;
/**
* The queue used for holding tasks and handing off to worker 用于保留工作和移交给工作线程
* threads. We do not require that workQueue.poll() returning
* null necessarily means that workQueue.isEmpty(), so rely
* solely on isEmpty to see if the queue is empty (which we must
* do for example when deciding whether to transition from
* SHUTDOWN to TIDYING). This accommodates special-purpose
* queues such as DelayQueues for which poll() is allowed to
* return null even if it may later return non-null when delays
* expire.
*/
private final BlockingQueue<Runnable> workQueue;
/**
* Timeout in nanoseconds for idle threads waiting for work. 闲暇线程的期待超时工夫,当线程数量超过 corePoolSize 或者 allowCoreThreadTimeOut 时应用,否则永远期待新的工作
* Threads use this timeout when there are more than corePoolSize
* present or if allowCoreThreadTimeOut. Otherwise they wait
* forever for new work.
*/
private volatile long keepAliveTime;
/**
* Factory for new threads. All threads are created using this 创立线程的工厂
* factory (via method addWorker). All callers must be prepared
* for addWorker to fail, which may reflect a system or user's
* policy limiting the number of threads. Even though it is not
* treated as an error, failure to create threads may result in
* new tasks being rejected or existing ones remaining stuck in
* the queue.
*
* We go further and preserve pool invariants even in the face of
* errors such as OutOfMemoryError, that might be thrown while
* trying to create threads. Such errors are rather common due to
* the need to allocate a native stack in Thread.start, and users
* will want to perform clean pool shutdown to clean up. There
* will likely be enough memory available for the cleanup code to
* complete without encountering yet another OutOfMemoryError.
*/
private volatile ThreadFactory threadFactory;
/**
* Handler called when saturated or shutdown in execute. 回绝策略,线程饱和或敞开时调用的处理程序
*/
private volatile RejectedExecutionHandler handler;
// 应用指定参数创立线程池,应用默认线程工厂和回绝策略
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
// 应用指定参数创立线程池,应用默认回绝策略
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
// 应用指定参数创立线程池,应用默认线程工厂
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
// 应用指定参数创立线程池
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
}
- corePoolSize:外围池的大小,这个参数跟前面讲述的线程池的实现原理有十分大的关系。在创立了线程池后,默认状况下,线程池中并没有任何线程,而是期待有工作到来才创立线程去执行工作,除非调用了 prestartAllCoreThreads() 或者 prestartCoreThread() 办法,从这 2 个办法的名字就能够看出,是预创立线程的意思,即在没有工作到来之前就创立 corePoolSize 个线程或者一个线程。默认状况下,在创立了线程池后,线程池中的线程数为 0,当有工作来之后,就会创立一个线程去执行工作,当线程池中的线程数目达到 corePoolSize 后,就会把达到的工作放到缓存队列当中;
- maximumPoolSize:线程池最大线程数,这个参数也是一个十分重要的参数,它示意在线程池中最多能创立多少个线程;
- keepAliveTime:示意线程没有工作执行时最多放弃多久工夫会终止。默认状况下,只有当线程池中的线程数大于 corePoolSize 时,keepAliveTime 才会起作用,直到线程池中的线程数不大于 corePoolSize,即当线程池中的线程数大于 corePoolSize 时,如果一个线程闲暇的工夫达到 keepAliveTime,则会终止,直到线程池中的线程数不超过 corePoolSize。然而如果调用了 allowCoreThreadTimeOut(boolean) 办法,在线程池中的线程数不大于 corePoolSize 时,keepAliveTime 参数也会起作用,直到线程池中的线程数为 0;
- unit:参数 keepAliveTime 的工夫单位
-
workQueue:一个阻塞队列,用来存储期待执行的工作,这个参数的抉择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种抉择
ArrayBlockingQueue; LinkedBlockingQueue; SynchronousQueue;
- threadFactory:线程工厂,次要用来创立线程;
-
handler:示意当回绝解决工作时的策略,有以下四种取值:
ThreadPoolExecutor.AbortPolicy: 抛弃工作并抛出 RejectedExecutionException 异样。ThreadPoolExecutor.DiscardPolicy:也是抛弃工作,然而不抛出异样。ThreadPoolExecutor.DiscardOldestPolicy:抛弃队列最后面的工作,而后从新尝试执行工作(反复此过程)ThreadPoolExecutor.CallerRunsPolicy:由调用线程解决该工作
ExecutorService
ThreadPoolExecutor 继承自 AbstractExecutorService,AbstractExecutorService 而实现了 ExecutorService,ExecutorService 实现了 Executor
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
Executor 是顶层接口,外面定义了 execute(Runnable) 办法,返回类型为 void,用来执行传入的工作
ExecutorService 实现了 Executor 并申明了一些办法:submit、invokeAll、invokeAny 以及 shutDown 等
AbstractExecutorService 实现了 ExecutorService 并对其中定义的办法做了实现
ThreadPoolExecutor 继承自 AbstractExecutorService
线程池的实现
线程状态
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
当创立线程池后,初始时,线程池处于 RUNNING 状态;
如果调用了 shutdown() 办法,则线程池处于 SHUTDOWN 状态,此时线程池不可能承受新的工作,它会期待所有工作执行结束;
如果调用了 shutdownNow() 办法,则线程池处于 STOP 状态,此时线程池不能承受新的工作,并且会去尝试终止正在执行的工作;
当线程池处于 SHUTDOWN 或 STOP 状态,并且所有工作线程曾经销毁,工作缓存队列曾经清空或执行完结后,线程池被设置为 TERMINATED 状态。
private final BlockingQueue<Runnable> workQueue; // 工作缓存队列,用来寄存期待执行的工作
private final ReentrantLock mainLock = new ReentrantLock(); // 线程池的次要状态锁,对线程池状态(比方线程池大小
//、runState 等)的扭转都要应用这个锁
private final HashSet<Worker> workers = new HashSet<Worker>(); // 用来寄存工作集
private volatile long keepAliveTime; // 线程存货工夫
private volatile boolean allowCoreThreadTimeOut; // 是否容许为外围线程设置存活工夫
private volatile int corePoolSize; // 外围池的大小(即线程池中的线程数目大于这个参数时,提交的工作会被放进工作缓存队列)private volatile int maximumPoolSize; // 线程池最大能容忍的线程数
private volatile int poolSize; // 线程池中以后的线程数
private volatile RejectedExecutionHandler handler; // 工作回绝策略
private volatile ThreadFactory threadFactory; // 线程工厂,用来创立线程
private int largestPoolSize; // 用来记录线程池中已经呈现过的最大线程数
private long completedTaskCount; // 用来记录曾经执行结束的工作个数
举个例子:一个房间能够装 15 集体,目前有 10 集体,每个人同时只能做一件事件,那么只有 10 集体中有闲暇的就能够承受新的工作,如果没有闲暇的那新的工作就要排队期待,如果新增的工作越来越多,那就要思考减少人数到 15 个,如果还是不够就要思考是否要回绝新的工作或者放弃之前的工作了,当 15 集体有闲暇的,那就又须要思考缩小人数,因为要发工资的
那么在这个例子中 10 集体就是 corePoolSize,15 就是 maximumPoolSize,workQueue 是没有闲暇人时的期待队列,回绝或者放弃之前工作就是 handler,人的闲暇工夫就是 keepAliveTime,如果人数超过 corePoolSize 或者设置了 allowCoreThreadTimeOut,那么工夫超过 keepAliveTime 后就要缩小人数
execute
ThreadPoolExecutor 中最外围的办法就是 execute
public void execute(Runnable command) {if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to 如果正在运行的线程少于 corePoolSize 线程,尝试应用给定命令作为其第一个工作来启动新线程。对 addWorker 的调用从原子性上查看 runState 和 workerCount,* start a new thread with the given command as its first 通过返回 false 来避免在不应该增加线程的状况下收回虚伪警报。* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need 如果工作能够胜利排队,那么咱们依然须要仔细检查是否应该增加线程(因为现有线程自上次查看后就死掉了)或自从进入此办法以来该池已敞开。因而,咱们从新查看状态,* to double-check whether we should have added a thread 并在必要时回滚队列(如果已进行),或者在没有线程的状况下启动新线程。* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new 如果咱们无奈将工作排队,则尝试增加一个新线程。如果失败,咱们晓得咱们已敞开或已饱和,因而回绝该工作。* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();//ctl 是一个 AtomicInteger 参数,用于判断线程状态
// 判断工作线程数量是否小于外围线程数量,如果小于就减少工作线程数量
if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))
return;
c = ctl.get();}
// 如果工作能够胜利排队
// 如果线程正在运行,就尝试将工作增加进缓存队列中
// 此时获取到的是一个闲暇的线程,线程运行中并且工作增加缓存队列胜利
if (isRunning(c) && workQueue.offer(command)) {
// 此时获取到的是一个闲暇的线程,须要再次获取线程状态
int recheck = ctl.get();
// 判断闲暇线程状态,如果线程不是 running 状态且工作曾经 remove 就执行回绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果无奈从新排队
// 增加失败
else if (!addWorker(command, false))
// 执行回绝策略
reject(command);
}
// 增加工作 firstTask 工作 core 是否退出外围线程
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 自旋期待
for (;;) {int c = ctl.get();
// 获取线执行状态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 锁定
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {mainLock.unlock();
}
if (workerAdded) {t.start();
workerStarted = true;
}
}
} finally {if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {return getState() != 0;
}
protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {t.interrupt();
} catch (SecurityException ignore) {}}
}
}
- 如果以后线程池中的线程数目小于 corePoolSize,则每来一个工作,就会创立一个线程去执行这个工作;
- 如果以后线程池中的线程数目 >=corePoolSize,则每来一个工作,会尝试将其增加到工作缓存队列当中,若增加胜利,则该工作会期待闲暇线程将其取出去执行;若增加失败(一般来说是工作缓存队列已满),则会尝试创立新的线程去执行这个工作;
- 如果以后线程池中的线程数目达到 maximumPoolSize,则会采取工作回绝策略进行解决;
- 如果线程池中的线程数量大于 corePoolSize 时,如果某线程闲暇工夫超过 keepAliveTime,线程将被终止,直至线程池中的线程数目不大于 corePoolSize;如果容许为外围池中的线程设置存活工夫,那么外围池中的线程闲暇工夫超过 keepAliveTime,线程也会被终止。
罕用的线程池
newSingleThreadExecutor
单个线程的线程池,即线程池中每次只有一个线程工作,单线程串行执行工作
外围池大小 1,最大大小 1,存活工夫 0,即永不过期, 应用 LinkedBlockingQueue
public class SingleThreadExecutorTest {public static void main(String[] args) {
// TODO Auto-generated method stub
// 创立一个可重用固定线程数的线程池
ExecutorService pool = Executors.newSingleThreadExecutor();
// 创立实现了 Runnable 接口对象,Thread 对象当然也实现了 Runnable 接口;
Thread t1 = new MyThread();
Thread t2 = new MyThread();
Thread t3 = new MyThread();
Thread t4 = new MyThread();
Thread t5 = new MyThread();
// 将线程放到池中执行;pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
// 敞开线程池
pool.shutdown();}
}
newFixedThreadExecutor
固定数量的线程池,没提交一个工作就是一个线程,直到达到线程池的最大数量,而后前面进入期待队列直到后面的工作实现才继续执行
外围池大小 nThreads,最大大小 nThreads,存活工夫 0,永不过期, 应用 LinkedBlockingQueue
public class FixedThreadExecutorTest {public static void main(String[] args) {
// TODO Auto-generated method stub
// 创立一个可重用固定线程数的线程池 外围池大小为 nThreads,keepAliveTime 为 0,线程永不过期
ExecutorService pool = Executors.newFixedThreadPool(2);
// 创立实现了 Runnable 接口对象,Thread 对象当然也实现了 Runnable 接口;
Thread t1 = new MyThread();
Thread t2 = new MyThread();
Thread t3 = new MyThread();
Thread t4 = new MyThread();
Thread t5 = new MyThread();
// 将线程放到池中执行;pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
// 敞开线程池
pool.shutdown();}
}
newCacheThreadExecutor
可缓存线程池,当线程池大小超过了解决工作所需的线程,那么就会回收局部闲暇(个别是 60 秒无执行)的线程,当有工作来时,又智能的增加新线程来执行。
外围池大小 0,最大大小 Integer.MAX_VALUE,存活工夫默认 60s, 应用 SynchronousQueue
public class CachedThreadExecutorTest {public static void main(String[] args) {
// TODO Auto-generated method stub
// 创立一个可重用固定线程数的线程池 外围池大小为 0,keepAliveTime 为 60L,默认 60s 过期
ExecutorService pool = Executors.newCachedThreadPool();
// 创立实现了 Runnable 接口对象,Thread 对象当然也实现了 Runnable 接口;
Thread t1 = new MyThread();
Thread t2 = new MyThread();
Thread t3 = new MyThread();
Thread t4 = new MyThread();
Thread t5 = new MyThread();
// 将线程放到池中执行;pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
// 敞开线程池
pool.shutdown();}
}
newScheduleThreadExecutor
大小无限度的线程池,反对定时和周期性的执行线程
外围池大小 corePoolSize,最大大小 Integer.MAX_VALUE,存活工夫 0,永不过期,应用 DelayedWorkQueue
public class NewScheduledThreadPool {public static void main(String[] args) {
// TODO Auto-generated method stub
ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1);
exec.scheduleAtFixedRate(new Runnable() {// 每隔一段时间就触发异样
@Override
public void run() {
// TODO Auto-generated method stub
//throw new RuntimeException();
System.out.println("===================");
}
}, 1000, 5000, TimeUnit.MILLISECONDS);
exec.scheduleAtFixedRate(new Runnable() {// 每隔一段时间打印零碎工夫,证实两者是互不影响的
@Override
public void run() {
// TODO Auto-generated method stub
System.out.println(System.nanoTime());
}
}, 1000, 2000, TimeUnit.MILLISECONDS);
}
}
MyThread
public class MyThread extends Thread {
@Override
public void run() {
// TODO Auto-generated method stub
// super.run();
System.out.println(Thread.currentThread().getName() + "正在执行....");
}
}
缓存队列
在后面咱们屡次提到了工作缓存队列,即 workQueue,它用来寄存期待执行的工作。
workQueue 的类型为 BlockingQueue<Runnable>,通常能够取上面三种类型:
1)ArrayBlockingQueue:基于数组实现的有界阻塞队列,依照先进先出对数组进行排序,此队列创立时必须指定大小;
2)LinkedBlockingQueue:基于链表的先进先出阻塞队列,依照先进先出对数组进行排序,如果创立时没有指定此队列大小,则默认为 Integer.MAX_VALUE;
3)synchronousQueue:这个队列比拟非凡,它不会保留提交的工作,而是将间接新建一个线程来执行新来的工作,不存储元素。
4)DelayedWorkQueue:实现 PriorityBlockingQueue 实现提早获取的无界队列,创立元素时,能够指定多久能力从队列中获取以后元素,只有延时期满能力从队列中获取元素。
工作回绝策略
当线程池的工作缓存队列已满并且线程池中的线程数目达到 maximumPoolSize,如果还有工作到来就会采取工作回绝策略,通常有以下四种策略:
ThreadPoolExecutor.AbortPolicy: 抛弃工作并抛出 RejectedExecutionException 异样。ThreadPoolExecutor.DiscardPolicy:也是抛弃工作,然而不抛出异样。ThreadPoolExecutor.DiscardOldestPolicy:抛弃队列最后面的工作,而后从新尝试执行工作(反复此过程)ThreadPoolExecutor.CallerRunsPolicy:由调用线程解决该工作