一、线程
1、什么是线程
线程(thread)是操作系统可能进行运算调度的最小单位。它被蕴含在过程之中,是过程中的理论 运作单位。一条线程指的是过程中一个繁多程序的控制流,一个过程中能够并发多个线程,每条线 程并行执行不同的工作。
2、如何创立线程
2.1、JAVA中创立线程
/** * 继承Thread类,重写run办法 */class MyThread extends Thread { @Override public void run() { System.out.println("myThread..." + Thread.currentThread().getName());} }/** * 实现Runnable接口,实现run办法 */class MyRunnable implements Runnable { @Override public void run() { System.out.println("MyRunnable..." + Thread.currentThread().getName());} }/** * 实现Callable接口,指定返回类型,实现call办法 */class MyCallable implements Callable<String> { @Override public String call() throws Exception { return "MyCallable..." + Thread.currentThread().getName();} }
2.2、测试一下
public static void main(String[] args) throws Exception { MyThread thread = new MyThread(); thread.run(); //myThread...main thread.start(); //myThread...Thread-0 MyRunnable myRunnable = new MyRunnable(); Thread thread1 = new Thread(myRunnable); myRunnable.run(); //MyRunnable...main thread1.start(); //MyRunnable...Thread-1 MyCallable myCallable = new MyCallable(); FutureTask<String> futureTask = new FutureTask<>(myCallable); Thread thread2 = new Thread(futureTask); thread2.start(); System.out.println(myCallable.call()); //MyCallable...main System.out.println(futureTask.get()); //MyCallable...Thread-2}
2.3、问题
既然咱们创立了线程,那为何咱们间接调用办法和咱们调用start()办法的后果不同?new Thread() 是否实在创立了线程?
2.4、问题剖析
咱们间接调用办法,能够看到是执行的主线程,而调用start()办法就是开启了新线程,那阐明new Thread()并没有创立线程,而是在start()中创立了线程。
那咱们看下Thread类start()办法:
class Thread implements Runnable { //Thread类实现了Runnalbe接口,实现了run()办法 private Runnable target; public synchronized void start() { ... boolean started = false; try { start0(); //能够看到,start()办法实在的调用时start0()办法 started = true; } finally { ... } } private native void start0(); //start0()是一个native办法,由JVM调用底层操作系统,开启一个线程,由操作系统过对立调度 @Override public void run() { if (target != null) { target.run(); //操作系统在执行新开启的线程时,回调Runnable接口的run()办法,执行咱们预设的线程工作 } } }
2.5、总结
- JAVA不能间接创立线程执行工作,而是通过创立Thread对象调用操作系统开启线程,在由操作系 统回调Runnable接口的run()办法执行工作;
- 实现Runnable的形式,将线程理论要执行的回调工作独自提出来了,实现线程的启动与回调工作 解耦;
- 实现Callable的形式,通过Future模式岂但将线程的启动与回调工作解耦,而且能够在执行实现后 获取到执行的后果;
二、多线程
1、什么是多线程
多线程(multithreading),是指从软件或者硬件上实现多个线程并发执行的技术。同一个线程只 能解决完一个工作在解决下一个工作,有时咱们须要多个工作同时解决,这时,咱们就须要创立多 个线程来同时解决工作。
2、多线程有什么益处
2.1、串行解决
public static void main(String[] args) throws Exception { System.out.println("start..."); long start = System.currentTimeMillis(); for (int i = 0; i < 5; i++) { Thread.sleep(2000); //每个工作执行2秒 System.out.println("task done..."); //解决执行后果 } long end = System.currentTimeMillis(); System.out.println("end...,time = " + (end - start));}//执行后果start...task done...task done...task done...task done...task done... end...,time = 10043
2.2、并行处理
public static void main(String[] args) throws Exception { System.out.println("start..."); long start = System.currentTimeMillis(); List<Future> list = new ArrayList<>(); for (int i = 0; i < 5; i++) { Callable<String> callable = new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(2000); //每个工作执行2秒 return "task done..."; } }; FutureTask task = new FutureTask(callable); list.add(task); new Thread(task).start(); } list.forEach(future -> { try { System.out.println(future.get()); //解决执行后果 } catch (Exception e) { } }); long end = System.currentTimeMillis(); System.out.println("end...,time = " + (end - start));} //执行后果 start... task done... task done... task done... task done... task done... end...,time = 2005
2.3、总结
- 多线程能够把一个工作拆分为几个子工作,多个子工作能够并发执行,每一个子工作就是一个线程。
- 多线程是为了同步实现多项工作,不是为了进步运行效率,而是为了进步资源应用效率来进步零碎 的效率。
2.4、多线程的问题
下面示例中咱们能够看到,如果每来一个工作,咱们就创立一个线程,有很多工作的状况下,咱们 会创立大量的线程,可能会导致系统资源的耗尽。同时,咱们晓得线程的执行是须要抢占CPU资源 的,那如果有太多的线程,就会导致大量工夫用在线程切换的开销上。
再有,每来一个工作都须要创立一个线程,而创立一个线程须要调用操作系统底层办法,开销较 大,而线程执行实现后就被回收了。在须要大量线程的时候,创立线程的工夫就破费不少了。
三、线程池
1、如何设计一个线程池
因为多线程的开发存在上述的一些问题,那咱们是否能够设计一个货色来防止这些问题呢?当然能够! 线程池就是为了解决这些问题而生的。那咱们该如何设计一个线程池来解决这些问题呢?或者说,一个线程池该具备什么样的性能?
1.1、线程池基本功能
- 多线程会创立大量的线程耗尽资源,那线程池应该对线程数量有所限度,能够保障不会耗尽零碎资 源;
- 每次创立新的线程会减少创立时的开销,那线程池应该缩小线程的创立,尽量复用已创立好的线 程;
1.2、线程池面临问题
- 咱们晓得线程在执行完本人的工作后就会被回收,那咱们如何复用线程?
- 咱们指定了线程的最大数量,当工作数超出线程数时,咱们该如何解决?
1.3、翻新源于生存
先假如一个场景:假如咱们是一个物流公司的管理人员,要配送的货物就是咱们的工作,货车就是 咱们配送工具,咱们当然不能有多少货物就筹备多少货车。那当顾客源源不断的将货物交给咱们配 送,咱们该如何治理能力让公司经营的最好呢?
- 最开始货物来的时候,咱们还没有货车,每批要运输的货物咱们都要购买一辆车来运输;
- 当货车运输实现后,临时还没有下一批货物达到,那货车就在仓库停着,等有货物来了立马就能够 运输;
- 当咱们有了肯定数量的车后,咱们认为曾经够用了,那前面就不再买车了,这时要是由新的货物来 了,咱们就会让货物先放仓库,等有车回来在配送;
- 当618大促来袭,要配送的货物太多,车都在路上,仓库也都放满了,那怎么办呢?咱们就抉择临 时租一些车来帮忙配送,进步配送的效率;
- 然而货物还是太多,咱们减少了长期的货车,仍旧配送不过去,那这时咱们就没方法了,只能让发 货的客户排队等待或者罗唆不承受了;
- 大促圆满完成后,累计的货物曾经配送实现了,为了降低成本,咱们就将长期租的车都还了;
1.4、技术源于翻新
基于上述场景,物流公司就是咱们的线程池、货物就是咱们的线程工作、货车就是咱们的线程。我 们如何设计公司的治理货车的流程,就应该如何设计线程池治理线程的流程。
- 当工作进来咱们还没有线程时,咱们就该创立线程执行工作;
- 当线程工作执行实现后,线程不开释,等着下一个工作进来后接着执行;
- 当创立的线程数量达到一定量后,新来的工作咱们存起来期待闲暇线程执行,这就要求线程池有个 存工作的容器;
- 当容器存满后,咱们须要减少一些长期的线程来进步解决效率;
- 当减少长期线程后仍旧解决不了的工作,那就应该将此工作回绝;
- 当所有工作执行实现后,就应该将长期的线程开释掉,免得减少不必要的开销;
2、线程池具体分析
上文中,咱们讲了该如何设计一个线程池,上面咱们看看大神是如何设计的;
2.1、 JAVA中的线程池是如何设计的
2.1.1、 线程池设计
看下线程池中的属性,理解线程池的设计。
public class ThreadPoolExecutor extends AbstractExecutorService { //线程池的打包管制状态,用高3位来示意线程池的运行状态,低29位来示意线程池中工作线程的数量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //值为29,用来示意偏移量 private static final int COUNT_BITS = Integer.SIZE - 3; //线程池的最大容量 private static final int CAPACITY = (1 << COUNT_BITS) - 1; //线程池的运行状态,总共有5个状态,用高3位来示意 private static final int RUNNING = -1 << COUNT_BITS; //承受新工作并解决阻塞队列中的工作 private static final int SHUTDOWN = 0 << COUNT_BITS; //不承受新工作但会解决阻塞队列中的工作 private static final int STOP = 1 << COUNT_BITS; //不会承受新工作,也不会解决阻塞队列中的工作,并且中断正在运行的工作 private static final int TIDYING = 2 << COUNT_BITS; //所有工作都已终止, 工作线程数量为0,行将要执行terminated()钩子办法 private static final int TERMINATED = 3 << COUNT_BITS; // terminated()办法曾经执行完结 //工作缓存队列,用来寄存期待执行的工作 private final BlockingQueue<Runnable> workQueue; //全局锁,对线程池状态等属性批改时须要应用这个锁 private final ReentrantLock mainLock = new ReentrantLock(); //线程池中工作线程的汇合,拜访和批改须要持有全局锁 private final HashSet<Worker> workers = new HashSet<Worker>(); // 终止条件 private final Condition termination = mainLock.newCondition(); //线程池中已经呈现过的最大线程数 private int largestPoolSize; //已实现工作的数量 private long completedTaskCount; //线程工厂 private volatile ThreadFactory threadFactory; //工作回绝策略 private volatile RejectedExecutionHandler handler; //线程存活工夫 private volatile long keepAliveTime; //是否容许外围线程超时 private volatile boolean allowCoreThreadTimeOut; //外围池大小,若allowCoreThreadTimeOut被设置,外围线程全副闲暇超时被回收的状况下会为0 private volatile int corePoolSize; //最大池大小,不得超过CAPACITY private volatile int maximumPoolSize; //默认的工作回绝策略 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); //运行权限相干 private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread"); ... }
小结一下:以上线程池的设计能够看出,线程池的性能还是很欠缺的。
- 提供了线程创立、数量及存活工夫等的治理;
- 提供了线程池状态流转的治理;
- 提供了工作缓存的各种容器;
- 提供了多余工作的解决机制;
- 提供了简略的统计性能;
2.1.2、线程池构造函数
//构造函数 public ThreadPoolExecutor(int corePoolSize, //外围线程数 int maximumPoolSize, //最大容许线程数 long keepAliveTime, //线程存活工夫 TimeUnit unit, //存活工夫单位 BlockingQueue<Runnable> workQueue, //工作缓存队列 ThreadFactory threadFactory, //线程工厂 RejectedExecutionHandler handler) { //回绝策略 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler;}
小结一下:
- 构造函数通知了咱们能够怎么去实用线程池,线程池的哪些个性是咱们能够管制的;
2.1.3、线程池执行
2.1.3.1、提交工作办法
• public void execute(Runnable command);
• Future<?> submit(Runnable task);
• Future submit(Runnable task, T result);
• Future submit(Callable task);
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask;}
能够看到submit办法的底层调用的也是execute办法,所以咱们这里只剖析execute办法;
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); //第一步:创立外围线程 if (workerCountOf(c) < corePoolSize) { //worker数量小于corePoolSize if (addWorker(command, true)) //创立worker return; c = ctl.get(); } //第二步:退出缓存队列 if (isRunning(c) && workQueue.offer(command)) { //线程池处于RUNNING状态,将工作退出workQueue工作缓存队列 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) //双重查看,若线程池状态敞开了,移除工作 reject(command); else if (workerCountOf(recheck) == 0) //线程池状态失常,然而没有线程了,创立worker addWorker(null, false); } //第三步:创立长期线程 else if (!addWorker(command, false)) reject(command); }
小结一下:execute()办法次要性能:
- 外围线程数量有余就创立外围线程;
- 外围线程满了就退出缓存队列;
- 缓存队列满了就减少非核心线程;
- 非核心线程也满了就回绝工作;
2.1.3.2、创立线程
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); //等价于:rs>=SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty()) //线程池已敞开,并且无需执行缓存队列中的工作,则不创立 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) //CAS减少线程数 break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } //下面的流程走完,就能够实在开始创立线程了 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); //这里创立了线程 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); //这里将线程退出到线程池中 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); //增加胜利,启动线程 workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); //增加线程失败操作 } return workerStarted; }
小结:addWorker()办法次要性能;
- 减少线程数;
- 创立线程Worker实例退出线程池;
- 退出实现开启线程;
- 启动失败则回滚减少流程;
2.1.3.3、工作线程的实现
private final class Worker //Worker类是ThreadPoolExecutor的外部类 extends AbstractQueuedSynchronizer implements Runnable { final Thread thread; //持有理论线程 Runnable firstTask; //worker所对应的第一个工作,可能为空 volatile long completedTasks; //记录执行工作数 Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); //以后线程调用ThreadPoolExecutor中的runWorker办法,在这里实现的线程复用 } ...继承AQS,实现了不可重入锁... }
小结:工作线程Worker类次要性能;
- 此类持有一个工作线程,一直解决拿到的新工作,持有的线程即为可复用的线程;
- 此类可看作一个适配类,在run()办法中实在调用runWorker()办法一直获取新工作,实现线程复用;
2.1.3.4、线程的复用
final void runWorker(Worker w) { //ThreadPoolExecutor中的runWorker办法,在这里实现的线程复用 Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; //标识线程是否异样终止 try { while (task != null || (task = getTask()) != null) { //这里会一直从工作队列获取工作并执行 w.lock(); //线程是否须要中断 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); //执行工作前的Hook办法,可自定义 Throwable thrown = null; try { task.run(); //执行理论的工作 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); //执行工作后的Hook办法,可自定义 } } finally { task = null; //执行实现后,将以后线程中的工作制空,筹备执行下一个工作 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); //线程执行实现后的清理工作 } }
小结:runWorker()办法次要性能;
- 循环从缓存队列中获取新的工作,直到没有工作为止;
- 应用worker持有的线程实在执行工作;
- 工作都执行实现后的清理工作;
2.1.3.5、队列中获取待执行工作
private Runnable getTask() { boolean timedOut = false; //标识以后线程是否超时未能获取到task对象 for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) //若线程存活工夫超时,则CAS减去线程数量 return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //容许超时回收则阻塞期待 workQueue.take(); //不容许则间接获取,没有就返回null if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
小结:getTask()办法次要性能;
- 理论在缓存队列中获取待执行的工作;
- 在这里治理线程是否要阻塞期待,控制线程的数量;
2.1.3.6、清理工作
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); //移除执行实现的线程 } finally { mainLock.unlock(); } tryTerminate(); //每次回收完一个线程后都尝试终止线程池 int c = ctl.get(); if (runStateLessThan(c, STOP)) { //到这里阐明线程池没有终止 if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); //异样终止线程的话,须要在常见一个线程 } }
小结:processWorkerExit()办法次要性能;
- 实在实现线程池线程的回收;
- 调用尝试终止线程池;
- 保障线程池失常运行;
2.1.3.7、尝试终止线程池
final void tryTerminate() { for (;;) { int c = ctl.get(); //若线程池正在执行、线程池已终止、线程池还须要执行缓存队列中的工作时,返回 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; //执行到这里,线程池为SHUTDOWN且无待执行工作 或 STOP 状态 if (workerCountOf(c) != 0) { interruptIdleWorkers(ONLY_ONE); //只中断一个线程 return; } //执行到这里,线程池曾经没有可用线程了,能够终止了 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { //CAS设置线程池终止 try { terminated(); //执行钩子办法 } finally { ctl.set(ctlOf(TERMINATED, 0)); //这里将线程池设为终态 termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
小结:tryTerminate()办法次要性能;
- 理论尝试终止线程池;
- 终止胜利则调用钩子办法,并且将线程池置为终态。
2.2、JAVA线程池总结
以上通过对JAVA线程池的具体分析咱们能够看出,尽管流程看似简单,但其实有很多内容都是状态反复校验、线程平安的保障等内容,其次要的性能与咱们后面所提出的设计性能统一,只是额定减少了一些扩大,上面咱们简略整顿下线程池的性能;
2.2.1、次要性能
- 线程数量及存活工夫的治理;
- 待处理工作的存储性能;
- 线程复用机制性能;
- 工作超量的回绝性能;
2.2.2、扩大性能
- 简略的执行后果统计性能;
- 提供线程执行异样解决机制;
- 执行前后解决流程自定义;
- 提供线程创立形式的自定义;
2.2.3、流程总结
以上通过对JAVA线程池工作提交流程的剖析咱们能够看出,线程池执行的简略流程如下图所示;
2.3、JAVA线程池应用
线程池根本应用验证上述流程:
public static void main(String[] args) throws Exception { //创立线程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 5, 10, 100, TimeUnit.SECONDS, new ArrayBlockingQueue(5)); //退出4个工作,小于外围线程,应该只有4个外围线程,队列为0 for (int i = 0; i < 4; i++) { threadPoolExecutor.submit(new MyRunnable()); } System.out.println("worker count = " + threadPoolExecutor.getPoolSize()); //worker count = 4 System.out.println("queue size = " + threadPoolExecutor.getQueue().size()); //queue size = 0 //再加4个工作,超过外围线程,然而没有超过外围线程 + 缓存队列容量,应该5个外围线程,队列为3 for (int i = 0; i < 4; i++) { threadPoolExecutor.submit(new MyRunnable()); } System.out.println("worker count = " + threadPoolExecutor.getPoolSize()); //worker count = 5 System.out.println("queue size = " + threadPoolExecutor.getQueue().size()); //queue size = 3 //再加4个工作,队列满了,应该5个热外围线程,队列5个,非核心线程2个 for (int i = 0; i < 4; i++) { threadPoolExecutor.submit(new MyRunnable()); } System.out.println("worker count = " + threadPoolExecutor.getPoolSize()); //worker count = 7 System.out.println("queue size = " + threadPoolExecutor.getQueue().size()); //queue size = 5 //再加4个工作,外围线程满了,应该5个热外围线程,队列5个,非核心线程5个,最初一个回绝 for (int i = 0; i < 4; i++) { try { threadPoolExecutor.submit(new MyRunnable()); } catch (Exception e) { e.printStackTrace(); //java.util.concurrent.RejectedExecutionException } } System.out.println("worker count = " + threadPoolExecutor.getPoolSize()); //worker count = 10 System.out.println("queue size = " + threadPoolExecutor.getQueue().size()); //queue size = 5 System.out.println(threadPoolExecutor.getTaskCount()); //共执行15个工作 //执行实现,休眠15秒,非核心线程开释,应该5个外围线程,队列为0 Thread.sleep(1500); System.out.println("worker count = " + threadPoolExecutor.getPoolSize()); //worker count = 5 System.out.println("queue size = " + threadPoolExecutor.getQueue().size()); //queue size = 0 //敞开线程池 threadPoolExecutor.shutdown(); }
作者:京东批发 秦浩然
起源:京东云开发者社区 转载请注明起源