前言
jdk 1.8 的源码看的差不多了,打算记录一下有点难度的源码了解。
我的 jdk 1.8 源码正文 github 地址
https://github.com/zhangpanqin/fly-jdk8
看源码仁者见仁智者见智,看源码的确能够学到很多货色,不论是实践还是实际。不看源码也不肯定什么都不懂。
技能程度不够,你看源码播种也不会多,有些思维你了解不了。
线程和线程池
在 Linux 下通过零碎调用 fork 能够产生一个子过程,通过给 fork 传递不同的参数能够让子过程共享父过程的内存。
在 Linux 零碎下,java 的线程 Thread
理论就是调用的零碎调用 fork 产生的轻量级子过程,通过共享父过程的内存区域,从而达到多线程的目标。
零碎调用须要 cpu 从用户态切换到内核态,绝对于 cpu 执行工夫来说,这个切换相对来说工夫较长,比拟占用系统资源。所以有了线程池
,线程池中理论就是线程创立之后不销毁,run 办法中死循环从阻塞队列拿 Runable
去执行。
// 线程池简化版原理,只为了了解线程池public class ThreadPoolExecutor2 { private static final BlockingQueue<Runnable> QUEUE = new LinkedBlockingQueue(); public boolean execute(Runnable task) { return QUEUE.offer(task); } static { new Thread(() -> { try { Runnable take; while (true) { take = QUEUE.take(); if (Objects.nonNull(take)) { take.run(); } } } catch (Throwable e) { } }).start(); }}
线程池应用
jdk
提供的线程池实现 ThreadPoolExecutor
,咱们日常开发应用最多的也是这个。
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize, long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {}
corePoolSize
线程池中外围线程数
外围线程是指,当线程闲暇一段时间不会被回收的线程数量。也能够配置参数,让外围线程闲暇也别回收 ThreadPoolExecutor.allowCoreThreadTimeOut
maximumPoolSize
线程池中最大线程数量
超过外围线程数量之后,当线程闲暇一段时间会被回收
long keepAliveTime,TimeUnit unit
线程闲暇多长时间会被回收workQueue
阻塞队列,承受到的工作会贮存在这外面,为了防止 oom ,肯定要设置队列的大小threadFactory
创立线程的工厂
// 咱们能够在线程工厂中定义线程名称的前缀,不便判断是哪个业务的线程池有问题// 线程池中的线程默认为工作线程,能够设置线程工厂创立的线程为守护线程private static ThreadFactory getThreadFactory() { final ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder(); threadFactoryBuilder.setNameFormat("order-thread-poll-%s"); // 设置线程池中的线程是否为守护线程 threadFactoryBuilder.setDaemon(true); // 当线程执行产生了异样,jvm 会调用 Thread.dispatchUncaughtException,而后调用设置的 UncaughtExceptionHandler threadFactoryBuilder.setUncaughtExceptionHandler((thread, throwable) -> { System.out.println(StrUtil.format("线程执行产生了异样,名称为: {}", thread.getName())); System.out.println(StrUtil.format("线程执行产生了异样,异样信息为: {}", throwable.getMessage())); }); return threadFactoryBuilder.build();}
- handler 工作不能被线程池承受解决时的回绝策略
队列中的工作须要内存,因为内存无限,咱们不能无限度接受任务,当工作不能被线程池承受时,须要依据策略来执行应该怎么回绝这个工作或者执行这个工作。
AbortPolicy: 调用 execute 时抛出异样CallerRunsPolicy: 在调用者线程中执行这个工作。就是同步调用 execute 时,理论执行这个 Runable 的 run 办法。DiscardOldestPolicy: 摈弃队列中最久的工作,而后再次调用这个线程池的 execute(Runable)DiscardPolicy: 不解决,抛弃掉这个工作。调用者感知不到
线程池源码
线程有线程的状态。线程池也有线程池的状态。
public class ThreadPoolExecutor extends AbstractExecutorService { /** * 示意线程池的状态和线程池中线程数量 * int 占四个字节,32 bit * 高三位示意线程池的状态,后 29 示意线程的数量 */ private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // COUNT_BITS 为 29 private static final int COUNT_BITS = Integer.SIZE - 3; /** * 能够承受新的工作,也能够解决阻塞队列里的工作 * 前三位为 111 */ private static final int RUNNING = -1 << COUNT_BITS; /** * 不承受新的工作,然而能够解决阻塞队列里的工作 * 前三位为 000 */ private static final int SHUTDOWN = 0 << COUNT_BITS; /** * 不承受新的工作,不解决阻塞队列列的工作,中断正在解决的工作 * 前三位为 001 */ private static final int STOP = 1 << COUNT_BITS; /** * 过渡状态,也就是说所有的工作都执行完了,以后线程池曾经没有无效的线程, * 这个时候线程池的状态将会TIDYING,并且将要调用 terminated 办法 * 前三位为 010 */ private static final int TIDYING = 2 << COUNT_BITS; /** * 线程池调用了 terminated 办法,资源曾经开释完 * 前三位为 011 */ private static final int TERMINATED = 3 << COUNT_BITS; /** * 获取线程池的状态 */ private static int runStateOf(int c) { return c & ~CAPACITY; } /** * 获取工作线程的数量 */ private static int workerCountOf(int c) { return c & CAPACITY; }}
打断线程其实就是调用了线程的 Thread.interrupt()
,只是标记了线程被打断,不会影响程序运行,打断的线程调用 Thread.isInterrupted()
返回 true。当线程阻塞期待时被打断,会抛出异样 InterruptedException
,在线程 run 办法中如果捕捉解决这个异样,线程就会退出。
// 线程是停不下来的,因而线程也停不下来。public static void main1(String[] args) { THREAD_POOL_EXECUTOR.execute(() -> { while (true) { } }); THREAD_POOL_EXECUTOR.shutdownNow();}// 当捕捉到打断异样抛出,而后线程没有解决异样,导致线程退出,线程池也退出了public static void main2(String[] args) { THREAD_POOL_EXECUTOR.execute(() -> { while (true) { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); throw new RuntimeException(e); } } }); THREAD_POOL_EXECUTOR.shutdownNow();}
execute
public void execute(Runnable command) { if (command == null) { throw new NullPointerException(); } int c = ctl.get(); /** * 线程池中线程数量少于外围线程数量,创立新的线程执行工作,创立新的线程执行工作胜利,return。 */ if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) { return; } c = ctl.get(); } /** * 线程池中,线程数量大于外围线程数,将工作增加至队列中去,期待被执行。 * 如果工作增加队列失败,如果没有达到最大线程数量,开启新的线程执行工作;达到最大线程数量,执行回绝策略。 */ if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 再次查看线程池状态,如果线程敞开,从队列中移除这个工作 if (!isRunning(recheck) && remove(command)) { reject(command); // 如果线程池在运行状态,然而没有工作过程。增加一个工作线程,这个线程会从队列那工作执行 } else if (workerCountOf(recheck) == 0) { addWorker(null, false); } } else if (!addWorker(command, false)) { reject(command); }}
addWorker
// 创立新的线程,并调用这个线程的 start 办法,返回 trueprivate boolean addWorker(Runnable firstTask, boolean core) { retry: /** * 双层 for 循环为了判断线程池的状态是否正在运行和线程数量是否满足定义 */ for (; ; ) { int c = ctl.get(); /** * rs 为线程池运行状态 */ int rs = runStateOf(c); /** * 1.当线程池 shutdown 之后,工作是不能增加的.当存在工作时,返回 false * 2.当线程池 shutdown 之后,当工作队列为空时也返回 false */ if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) { return false; } for (; ; ) { // 判断线程池中线程数量是否满足定义 int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) { return false; } // cas 怎么工作线程数量 if (compareAndIncrementWorkerCount(c)) { break retry; } c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) { continue retry; } // else CAS failed due to workerCount change; retry inner loop } } // worker 中的 线程是否调用了 start 办法 boolean workerStarted = false; // 是否将这个 worker 增加到 workers 这个 HashSet 中去 boolean workerAdded = false; Worker w = null; try { // 创立新的线程 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) { throw new IllegalThreadStateException(); } workers.add(w); int s = workers.size(); if (s > largestPoolSize) { largestPoolSize = s; } workerAdded = true; } } finally { mainLock.unlock(); } // 将 worker 增加到 workers 中去,阐明这个 worker 第一次应用.要启动这个线程 start if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (!workerStarted) { addWorkerFailed(w); } } return workerStarted; }
Worker.run
// 理论调用 runWorkerpublic void run() { runWorker(this);}
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts // 线程执行是否因为异样导致的,true 代表异样退出了 boolean completedAbruptly = true; try { // 线程中不停的获取队列头部的工作去执行 // getTask 理论是调用阻塞队列的workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) // 当线程池中线程数量大于外围线程数,getTask 因为超时返回了 null 线程执行退出。开释掉了线程 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) { wt.interrupt(); } try { // 工作执行之前的钩子函数 beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 工作执行之后的钩子函数 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); }}
tryTerminate
tryTerminate
尝试敞开线程池。
/** * 当波及移除 work 时,都要尝试判断线程池是否能退出了 */ final void tryTerminate() { for (; ; ) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty())) { return; } /** * 如果工作线程不为 0 ,打断一个线程 */ if (workerCountOf(c) != 0) { interruptIdleWorkers(ONLY_ONE); return; } /** * 走到这里,工作线程为 0 了,并且队列中工作也为 0 ,设置线程池状态为 TIDYING * 设置线程池状态为 TIDYING 并调用 terminated() ,调用 terminated() 办法之后设置线程池状态为 TERMINATED */ final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } } }
本文由 张攀钦的博客 http://www.mflyyou.cn/ 创作。 可自在转载、援用,但需署名作者且注明文章出处。如转载至微信公众号,请在文末增加作者公众号二维码。微信公众号名称:Mflyyou