对于线程池
线程池就是首先创立一些线程,它们的汇合称为线程池。应用线程池能够很好地进步性能,线程池在系统启动时即创立大量闲暇的线程,程序将一个工作传给线程池,线程池就会启动一条线程来执行这个工作,执行完结当前,该线程并不会死亡,而是再次返回线程池中成为闲暇状态,期待执行下一个工作
为什么要应用线程池
多线程运行工夫,零碎一直的启动和敞开新线程,老本十分高,会过渡耗费系统资源,以及过渡切换线程的危险,从而可能导致系统资源的解体。这时,线程池就是最好的抉择了
ThreadPoolExecutor类
Java外面线程池的顶级接口是Executor,然而严格意义上讲Executor并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是ExecutorService
ExecutorService | 真正的线程池接口。 |
---|---|
ScheduledExecutorService | 能和Timer/TimerTask相似,解决那些须要工作反复执行的问题。 |
ThreadPoolExecutor | ExecutorService的默认实现。 |
ScheduledThreadPoolExecutor | 继承ThreadPoolExecutor的ScheduledExecutorService接口实现,周期性任务调度的类实现。 |
ThreadPoolExecutor
咱们来看一下ThreadPoolExecutor的具体实现:
在ThreadPoolExecutor中有四个构造方法
public class ThreadPoolExecutor extends AbstractExecutorService { /** * Core pool size is the minimum number of workers to keep alive 外围线程数量是维持线程池存活的最小数量,而且不容许超时,除非设置allowCoreThreadTimeOut,在这种状况下最小值为0 * (and not allow to time out etc) unless allowCoreThreadTimeOut * is set, in which case the minimum is zero. */ private volatile int corePoolSize; /** * Maximum pool size. Note that the actual maximum is internally 最大线程数量,留神,理论最大数量受容量限度 * bounded by CAPACITY. */ private volatile int maximumPoolSize; /** * The queue used for holding tasks and handing off to worker 用于保留工作和移交给工作线程 * threads. We do not require that workQueue.poll() returning * null necessarily means that workQueue.isEmpty(), so rely * solely on isEmpty to see if the queue is empty (which we must * do for example when deciding whether to transition from * SHUTDOWN to TIDYING). This accommodates special-purpose * queues such as DelayQueues for which poll() is allowed to * return null even if it may later return non-null when delays * expire. */ private final BlockingQueue<Runnable> workQueue; /** * Timeout in nanoseconds for idle threads waiting for work. 闲暇线程的期待超时工夫,当线程数量超过corePoolSize或者allowCoreThreadTimeOut时应用,否则永远期待新的工作 * Threads use this timeout when there are more than corePoolSize * present or if allowCoreThreadTimeOut. Otherwise they wait * forever for new work. */ private volatile long keepAliveTime; /** * Factory for new threads. All threads are created using this 创立线程的工厂 * factory (via method addWorker). All callers must be prepared * for addWorker to fail, which may reflect a system or user's * policy limiting the number of threads. Even though it is not * treated as an error, failure to create threads may result in * new tasks being rejected or existing ones remaining stuck in * the queue. * * We go further and preserve pool invariants even in the face of * errors such as OutOfMemoryError, that might be thrown while * trying to create threads. Such errors are rather common due to * the need to allocate a native stack in Thread.start, and users * will want to perform clean pool shutdown to clean up. There * will likely be enough memory available for the cleanup code to * complete without encountering yet another OutOfMemoryError. */ private volatile ThreadFactory threadFactory; /** * Handler called when saturated or shutdown in execute. 回绝策略,线程饱和或敞开时调用的处理程序 */ private volatile RejectedExecutionHandler handler; //应用指定参数创立线程池,应用默认线程工厂和回绝策略 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } //应用指定参数创立线程池,应用默认回绝策略 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } //应用指定参数创立线程池,应用默认线程工厂 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } //应用指定参数创立线程池 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }}
- corePoolSize:外围池的大小,这个参数跟前面讲述的线程池的实现原理有十分大的关系。在创立了线程池后,默认状况下,线程池中并没有任何线程,而是期待有工作到来才创立线程去执行工作,除非调用了prestartAllCoreThreads()或者prestartCoreThread()办法,从这2个办法的名字就能够看出,是预创立线程的意思,即在没有工作到来之前就创立corePoolSize个线程或者一个线程。默认状况下,在创立了线程池后,线程池中的线程数为0,当有工作来之后,就会创立一个线程去执行工作,当线程池中的线程数目达到corePoolSize后,就会把达到的工作放到缓存队列当中;
- maximumPoolSize:线程池最大线程数,这个参数也是一个十分重要的参数,它示意在线程池中最多能创立多少个线程;
- keepAliveTime:示意线程没有工作执行时最多放弃多久工夫会终止。默认状况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程闲暇的工夫达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。然而如果调用了allowCoreThreadTimeOut(boolean)办法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
- unit:参数keepAliveTime的工夫单位
workQueue:一个阻塞队列,用来存储期待执行的工作,这个参数的抉择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种抉择
ArrayBlockingQueue;LinkedBlockingQueue;SynchronousQueue;
- threadFactory:线程工厂,次要用来创立线程;
handler:示意当回绝解决工作时的策略,有以下四种取值:
ThreadPoolExecutor.AbortPolicy:抛弃工作并抛出RejectedExecutionException异样。 ThreadPoolExecutor.DiscardPolicy:也是抛弃工作,然而不抛出异样。 ThreadPoolExecutor.DiscardOldestPolicy:抛弃队列最后面的工作,而后从新尝试执行工作(反复此过程) ThreadPoolExecutor.CallerRunsPolicy:由调用线程解决该工作
ExecutorService
ThreadPoolExecutor继承自AbstractExecutorService,AbstractExecutorService而实现了ExecutorService,ExecutorService实现了Executor
public interface Executor { /** * Executes the given command at some time in the future. The command * may execute in a new thread, in a pooled thread, or in the calling * thread, at the discretion of the {@code Executor} implementation. * * @param command the runnable task * @throws RejectedExecutionException if this task cannot be * accepted for execution * @throws NullPointerException if command is null */ void execute(Runnable command);}
Executor是顶层接口,外面定义了execute(Runnable)办法,返回类型为void,用来执行传入的工作
ExecutorService实现了Executor并申明了一些办法:submit、invokeAll、invokeAny以及shutDown等
AbstractExecutorService实现了ExecutorService并对其中定义的办法做了实现
ThreadPoolExecutor继承自AbstractExecutorService
线程池的实现
线程状态
// runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
当创立线程池后,初始时,线程池处于RUNNING状态;
如果调用了shutdown()办法,则线程池处于SHUTDOWN状态,此时线程池不可能承受新的工作,它会期待所有工作执行结束;
如果调用了shutdownNow()办法,则线程池处于STOP状态,此时线程池不能承受新的工作,并且会去尝试终止正在执行的工作;
当线程池处于SHUTDOWN或STOP状态,并且所有工作线程曾经销毁,工作缓存队列曾经清空或执行完结后,线程池被设置为TERMINATED状态。
private final BlockingQueue<Runnable> workQueue; //工作缓存队列,用来寄存期待执行的工作private final ReentrantLock mainLock = new ReentrantLock(); //线程池的次要状态锁,对线程池状态(比方线程池大小 //、runState等)的扭转都要应用这个锁private final HashSet<Worker> workers = new HashSet<Worker>(); //用来寄存工作集 private volatile long keepAliveTime; //线程存货工夫 private volatile boolean allowCoreThreadTimeOut; //是否容许为外围线程设置存活工夫private volatile int corePoolSize; //外围池的大小(即线程池中的线程数目大于这个参数时,提交的工作会被放进工作缓存队列)private volatile int maximumPoolSize; //线程池最大能容忍的线程数 private volatile int poolSize; //线程池中以后的线程数 private volatile RejectedExecutionHandler handler; //工作回绝策略 private volatile ThreadFactory threadFactory; //线程工厂,用来创立线程 private int largestPoolSize; //用来记录线程池中已经呈现过的最大线程数 private long completedTaskCount; //用来记录曾经执行结束的工作个数
举个例子:一个房间能够装15集体,目前有10集体,每个人同时只能做一件事件,那么只有10集体中有闲暇的就能够承受新的工作,如果没有闲暇的那新的工作就要排队期待,如果新增的工作越来越多,那就要思考减少人数到15个,如果还是不够就要思考是否要回绝新的工作或者放弃之前的工作了,当15集体有闲暇的,那就又须要思考缩小人数,因为要发工资的
那么在这个例子中10集体就是corePoolSize,15就是maximumPoolSize,workQueue是没有闲暇人时的期待队列,回绝或者放弃之前工作就是handler,人的闲暇工夫就是keepAliveTime,如果人数超过corePoolSize或者设置了allowCoreThreadTimeOut,那么工夫超过keepAliveTime后就要缩小人数
execute
ThreadPoolExecutor中最外围的办法就是execute
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to 如果正在运行的线程少于corePoolSize线程,尝试应用给定命令作为其第一个工作来启动新线程。 对addWorker的调用从原子性上查看runState和workerCount, * start a new thread with the given command as its first 通过返回false来避免在不应该增加线程的状况下收回虚伪警报。 * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need 如果工作能够胜利排队,那么咱们依然须要仔细检查是否应该增加线程(因为现有线程自上次查看后就死掉了)或自从进入此办法以来该池已敞开。 因而,咱们从新查看状态, * to double-check whether we should have added a thread 并在必要时回滚队列(如果已进行),或者在没有线程的状况下启动新线程。 * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new 如果咱们无奈将工作排队,则尝试增加一个新线程。 如果失败,咱们晓得咱们已敞开或已饱和,因而回绝该工作。 * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get();//ctl是一个AtomicInteger参数,用于判断线程状态 //判断工作线程数量是否小于外围线程数量,如果小于就减少工作线程数量 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //如果工作能够胜利排队 //如果线程正在运行,就尝试将工作增加进缓存队列中 //此时获取到的是一个闲暇的线程,线程运行中并且工作增加缓存队列胜利 if (isRunning(c) && workQueue.offer(command)) { //此时获取到的是一个闲暇的线程,须要再次获取线程状态 int recheck = ctl.get(); //判断闲暇线程状态,如果线程不是running状态且工作曾经remove就执行回绝策略 if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } //如果无奈从新排队 //增加失败 else if (!addWorker(command, false)) //执行回绝策略 reject(command); }
//增加工作 firstTask 工作 core 是否退出外围线程 private boolean addWorker(Runnable firstTask, boolean core) { retry: //自旋期待 for (;;) { int c = ctl.get(); //获取线执行状态 int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { //锁定 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
Worker
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
- 如果以后线程池中的线程数目小于corePoolSize,则每来一个工作,就会创立一个线程去执行这个工作;
- 如果以后线程池中的线程数目>=corePoolSize,则每来一个工作,会尝试将其增加到工作缓存队列当中,若增加胜利,则该工作会期待闲暇线程将其取出去执行;若增加失败(一般来说是工作缓存队列已满),则会尝试创立新的线程去执行这个工作;
- 如果以后线程池中的线程数目达到maximumPoolSize,则会采取工作回绝策略进行解决;
- 如果线程池中的线程数量大于 corePoolSize时,如果某线程闲暇工夫超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果容许为外围池中的线程设置存活工夫,那么外围池中的线程闲暇工夫超过keepAliveTime,线程也会被终止。
罕用的线程池
newSingleThreadExecutor
单个线程的线程池,即线程池中每次只有一个线程工作,单线程串行执行工作
外围池大小1,最大大小1,存活工夫0,即永不过期,应用LinkedBlockingQueue
public class SingleThreadExecutorTest { public static void main(String[] args) { // TODO Auto-generated method stub //创立一个可重用固定线程数的线程池 ExecutorService pool = Executors.newSingleThreadExecutor(); //创立实现了Runnable接口对象,Thread对象当然也实现了Runnable接口; Thread t1 = new MyThread(); Thread t2 = new MyThread(); Thread t3 = new MyThread(); Thread t4 = new MyThread(); Thread t5 = new MyThread(); //将线程放到池中执行; pool.execute(t1); pool.execute(t2); pool.execute(t3); pool.execute(t4); pool.execute(t5); //敞开线程池 pool.shutdown(); }}
newFixedThreadExecutor
固定数量的线程池,没提交一个工作就是一个线程,直到达到线程池的最大数量,而后前面进入期待队列直到后面的工作实现才继续执行
外围池大小nThreads,最大大小nThreads,存活工夫0,永不过期,应用LinkedBlockingQueue
public class FixedThreadExecutorTest { public static void main(String[] args) { // TODO Auto-generated method stub //创立一个可重用固定线程数的线程池 外围池大小为nThreads,keepAliveTime为0,线程永不过期 ExecutorService pool = Executors.newFixedThreadPool(2); //创立实现了Runnable接口对象,Thread对象当然也实现了Runnable接口; Thread t1 = new MyThread(); Thread t2 = new MyThread(); Thread t3 = new MyThread(); Thread t4 = new MyThread(); Thread t5 = new MyThread(); //将线程放到池中执行; pool.execute(t1); pool.execute(t2); pool.execute(t3); pool.execute(t4); pool.execute(t5); //敞开线程池 pool.shutdown(); }}
newCacheThreadExecutor
可缓存线程池, 当线程池大小超过了解决工作所需的线程,那么就会回收局部闲暇(个别是60秒无执行)的线程,当有工作来时,又智能的增加新线程来执行。
外围池大小0,最大大小Integer.MAX_VALUE,存活工夫默认60s,应用SynchronousQueue
public class CachedThreadExecutorTest { public static void main(String[] args) { // TODO Auto-generated method stub //创立一个可重用固定线程数的线程池 外围池大小为0,keepAliveTime为60L,默认60s过期 ExecutorService pool = Executors.newCachedThreadPool(); //创立实现了Runnable接口对象,Thread对象当然也实现了Runnable接口; Thread t1 = new MyThread(); Thread t2 = new MyThread(); Thread t3 = new MyThread(); Thread t4 = new MyThread(); Thread t5 = new MyThread(); //将线程放到池中执行; pool.execute(t1); pool.execute(t2); pool.execute(t3); pool.execute(t4); pool.execute(t5); //敞开线程池 pool.shutdown(); }}
newScheduleThreadExecutor
大小无限度的线程池,反对定时和周期性的执行线程
外围池大小corePoolSize,最大大小Integer.MAX_VALUE,存活工夫0,永不过期,应用DelayedWorkQueue
public class NewScheduledThreadPool { public static void main(String[] args) { // TODO Auto-generated method stub ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1); exec.scheduleAtFixedRate(new Runnable() {//每隔一段时间就触发异样 @Override public void run() { // TODO Auto-generated method stub //throw new RuntimeException(); System.out.println("==================="); } }, 1000, 5000, TimeUnit.MILLISECONDS); exec.scheduleAtFixedRate(new Runnable() {//每隔一段时间打印零碎工夫,证实两者是互不影响的 @Override public void run() { // TODO Auto-generated method stub System.out.println(System.nanoTime()); } }, 1000, 2000, TimeUnit.MILLISECONDS); }}
MyThread
public class MyThread extends Thread { @Override public void run() { // TODO Auto-generated method stub // super.run(); System.out.println(Thread.currentThread().getName() + "正在执行...."); }}
缓存队列
在后面咱们屡次提到了工作缓存队列,即workQueue,它用来寄存期待执行的工作。
workQueue的类型为BlockingQueue<Runnable>,通常能够取上面三种类型:
1)ArrayBlockingQueue:基于数组实现的有界阻塞队列,依照先进先出对数组进行排序,此队列创立时必须指定大小;
2)LinkedBlockingQueue:基于链表的先进先出阻塞队列,依照先进先出对数组进行排序,如果创立时没有指定此队列大小,则默认为Integer.MAX_VALUE;
3)synchronousQueue:这个队列比拟非凡,它不会保留提交的工作,而是将间接新建一个线程来执行新来的工作,不存储元素。
4)DelayedWorkQueue:实现PriorityBlockingQueue实现提早获取的无界队列,创立元素时,能够指定多久能力从队列中获取以后元素,只有延时期满能力从队列中获取元素。
工作回绝策略
当线程池的工作缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有工作到来就会采取工作回绝策略,通常有以下四种策略:
ThreadPoolExecutor.AbortPolicy:抛弃工作并抛出RejectedExecutionException异样。ThreadPoolExecutor.DiscardPolicy:也是抛弃工作,然而不抛出异样。ThreadPoolExecutor.DiscardOldestPolicy:抛弃队列最后面的工作,而后从新尝试执行工作(反复此过程)ThreadPoolExecutor.CallerRunsPolicy:由调用线程解决该工作