前言

jdk 1.8 的源码看的差不多了,打算记录一下有点难度的源码了解。

我的 jdk 1.8 源码正文 github 地址

https://github.com/zhangpanqin/fly-jdk8

看源码仁者见仁智者见智,看源码的确能够学到很多货色,不论是实践还是实际。不看源码也不肯定什么都不懂。

技能程度不够,你看源码播种也不会多,有些思维你了解不了。

线程和线程池

在 Linux 下通过零碎调用 fork 能够产生一个子过程,通过给 fork 传递不同的参数能够让子过程共享父过程的内存。

在 Linux 零碎下,java 的线程 Thread 理论就是调用的零碎调用 fork 产生的轻量级子过程,通过共享父过程的内存区域,从而达到多线程的目标。

零碎调用须要 cpu 从用户态切换到内核态,绝对于 cpu 执行工夫来说,这个切换相对来说工夫较长,比拟占用系统资源。所以有了线程池,线程池中理论就是线程创立之后不销毁,run 办法中死循环从阻塞队列拿 Runable 去执行。

// 线程池简化版原理,只为了了解线程池public class ThreadPoolExecutor2 {    private static final BlockingQueue<Runnable> QUEUE = new LinkedBlockingQueue();    public boolean execute(Runnable task) {        return QUEUE.offer(task);    }    static {        new Thread(() -> {            try {                Runnable take;                while (true) {                    take = QUEUE.take();                    if (Objects.nonNull(take)) {                        take.run();                    }                }            } catch (Throwable e) {            }        }).start();    }}

线程池应用

jdk 提供的线程池实现 ThreadPoolExecutor ,咱们日常开发应用最多的也是这个。

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,                          long keepAliveTime,TimeUnit unit,                          BlockingQueue<Runnable> workQueue,                          ThreadFactory threadFactory,                          RejectedExecutionHandler handler) {}
  • corePoolSize 线程池中外围线程数

外围线程是指,当线程闲暇一段时间不会被回收的线程数量。也能够配置参数,让外围线程闲暇也别回收 ThreadPoolExecutor.allowCoreThreadTimeOut

  • maximumPoolSize 线程池中最大线程数量

超过外围线程数量之后,当线程闲暇一段时间会被回收

  • long keepAliveTime,TimeUnit unit 线程闲暇多长时间会被回收
  • workQueue 阻塞队列,承受到的工作会贮存在这外面,为了防止 oom ,肯定要设置队列的大小
  • threadFactory 创立线程的工厂
// 咱们能够在线程工厂中定义线程名称的前缀,不便判断是哪个业务的线程池有问题// 线程池中的线程默认为工作线程,能够设置线程工厂创立的线程为守护线程private static ThreadFactory getThreadFactory() {    final ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();    threadFactoryBuilder.setNameFormat("order-thread-poll-%s");    // 设置线程池中的线程是否为守护线程    threadFactoryBuilder.setDaemon(true);    // 当线程执行产生了异样,jvm 会调用 Thread.dispatchUncaughtException,而后调用设置的 UncaughtExceptionHandler    threadFactoryBuilder.setUncaughtExceptionHandler((thread, throwable) -> {        System.out.println(StrUtil.format("线程执行产生了异样,名称为: {}", thread.getName()));        System.out.println(StrUtil.format("线程执行产生了异样,异样信息为: {}", throwable.getMessage()));    });    return threadFactoryBuilder.build();}
  • handler 工作不能被线程池承受解决时的回绝策略

队列中的工作须要内存,因为内存无限,咱们不能无限度接受任务,当工作不能被线程池承受时,须要依据策略来执行应该怎么回绝这个工作或者执行这个工作。

AbortPolicy: 调用 execute 时抛出异样CallerRunsPolicy: 在调用者线程中执行这个工作。就是同步调用 execute 时,理论执行这个 Runable 的 run 办法。DiscardOldestPolicy: 摈弃队列中最久的工作,而后再次调用这个线程池的 execute(Runable)DiscardPolicy: 不解决,抛弃掉这个工作。调用者感知不到

线程池源码

线程有线程的状态。线程池也有线程池的状态。

public class ThreadPoolExecutor extends AbstractExecutorService {    /**     * 示意线程池的状态和线程池中线程数量     * int 占四个字节,32 bit     * 高三位示意线程池的状态,后 29 示意线程的数量     */    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));    // COUNT_BITS 为 29    private static final int COUNT_BITS = Integer.SIZE - 3;    /**     * 能够承受新的工作,也能够解决阻塞队列里的工作     * 前三位为 111     */    private static final int RUNNING = -1 << COUNT_BITS;    /**     * 不承受新的工作,然而能够解决阻塞队列里的工作     * 前三位为 000     */    private static final int SHUTDOWN = 0 << COUNT_BITS;    /**     * 不承受新的工作,不解决阻塞队列列的工作,中断正在解决的工作     * 前三位为 001     */    private static final int STOP = 1 << COUNT_BITS;    /**     * 过渡状态,也就是说所有的工作都执行完了,以后线程池曾经没有无效的线程,     * 这个时候线程池的状态将会TIDYING,并且将要调用 terminated 办法     * 前三位为 010     */    private static final int TIDYING = 2 << COUNT_BITS;    /**     * 线程池调用了 terminated 办法,资源曾经开释完     * 前三位为 011     */    private static final int TERMINATED = 3 << COUNT_BITS;       /**     * 获取线程池的状态     */    private static int runStateOf(int c) {        return c & ~CAPACITY;    }    /**     * 获取工作线程的数量     */    private static int workerCountOf(int c) {        return c & CAPACITY;    }}

打断线程其实就是调用了线程的 Thread.interrupt(),只是标记了线程被打断,不会影响程序运行,打断的线程调用 Thread.isInterrupted() 返回 true。当线程阻塞期待时被打断,会抛出异样 InterruptedException ,在线程 run 办法中如果捕捉解决这个异样,线程就会退出。

// 线程是停不下来的,因而线程也停不下来。public static void main1(String[] args) {    THREAD_POOL_EXECUTOR.execute(() -> {        while (true) {        }    });    THREAD_POOL_EXECUTOR.shutdownNow();}// 当捕捉到打断异样抛出,而后线程没有解决异样,导致线程退出,线程池也退出了public static void main2(String[] args) {    THREAD_POOL_EXECUTOR.execute(() -> {        while (true) {            try {                Thread.sleep(1);            } catch (InterruptedException e) {                e.printStackTrace();                throw new RuntimeException(e);            }        }    });    THREAD_POOL_EXECUTOR.shutdownNow();}

execute

public void execute(Runnable command) {    if (command == null) {        throw new NullPointerException();    }    int c = ctl.get();    /**      * 线程池中线程数量少于外围线程数量,创立新的线程执行工作,创立新的线程执行工作胜利,return。      */    if (workerCountOf(c) < corePoolSize) {        if (addWorker(command, true)) {            return;        }        c = ctl.get();    }    /**      * 线程池中,线程数量大于外围线程数,将工作增加至队列中去,期待被执行。      * 如果工作增加队列失败,如果没有达到最大线程数量,开启新的线程执行工作;达到最大线程数量,执行回绝策略。      */    if (isRunning(c) && workQueue.offer(command)) {        int recheck = ctl.get();        // 再次查看线程池状态,如果线程敞开,从队列中移除这个工作        if (!isRunning(recheck) && remove(command)) {            reject(command);            // 如果线程池在运行状态,然而没有工作过程。增加一个工作线程,这个线程会从队列那工作执行        } else if (workerCountOf(recheck) == 0) {            addWorker(null, false);        }    } else if (!addWorker(command, false)) {        reject(command);    }}

addWorker

// 创立新的线程,并调用这个线程的 start 办法,返回 trueprivate boolean addWorker(Runnable firstTask, boolean core) {        retry:        /**         * 双层 for 循环为了判断线程池的状态是否正在运行和线程数量是否满足定义         */        for (; ; ) {            int c = ctl.get();            /**             * rs 为线程池运行状态             */            int rs = runStateOf(c);            /**             * 1.当线程池 shutdown 之后,工作是不能增加的.当存在工作时,返回 false             * 2.当线程池 shutdown 之后,当工作队列为空时也返回 false             */            if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) {                return false;            }            for (; ; ) {                // 判断线程池中线程数量是否满足定义                int wc = workerCountOf(c);                if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) {                    return false;                }                // cas 怎么工作线程数量                if (compareAndIncrementWorkerCount(c)) {                    break retry;                }                c = ctl.get();  // Re-read ctl                if (runStateOf(c) != rs) {                    continue retry;                }                // else CAS failed due to workerCount change; retry inner loop            }        }        // worker 中的 线程是否调用了 start 办法        boolean workerStarted = false;        // 是否将这个 worker 增加到 workers 这个 HashSet 中去        boolean workerAdded = false;        Worker w = null;        try {            // 创立新的线程            w = new Worker(firstTask);            final Thread t = w.thread;            if (t != null) {                final ReentrantLock mainLock = this.mainLock;                mainLock.lock();                try {                    int rs = runStateOf(ctl.get());                    if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {                        if (t.isAlive()) {                            throw new IllegalThreadStateException();                        }                        workers.add(w);                        int s = workers.size();                        if (s > largestPoolSize) {                            largestPoolSize = s;                        }                        workerAdded = true;                    }                } finally {                    mainLock.unlock();                }                // 将 worker 增加到 workers 中去,阐明这个 worker 第一次应用.要启动这个线程 start                if (workerAdded) {                    t.start();                    workerStarted = true;                }            }        } finally {            if (!workerStarted) {                addWorkerFailed(w);            }        }        return workerStarted;    }

Worker.run

// 理论调用 runWorkerpublic void run() {    runWorker(this);}
final void runWorker(Worker w) {    Thread wt = Thread.currentThread();    Runnable task = w.firstTask;    w.firstTask = null;    w.unlock(); // allow interrupts    // 线程执行是否因为异样导致的,true 代表异样退出了    boolean completedAbruptly = true;    try {        // 线程中不停的获取队列头部的工作去执行        // getTask 理论是调用阻塞队列的workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)        // 当线程池中线程数量大于外围线程数,getTask 因为超时返回了 null 线程执行退出。开释掉了线程        while (task != null || (task = getTask()) != null) {            w.lock();            // If pool is stopping, ensure thread is interrupted;            // if not, ensure thread is not interrupted.  This            // requires a recheck in second case to deal with            // shutdownNow race while clearing interrupt            if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) {                wt.interrupt();            }            try {                // 工作执行之前的钩子函数                beforeExecute(wt, task);                Throwable thrown = null;                try {                    task.run();                } catch (RuntimeException x) {                    thrown = x;                    throw x;                } catch (Error x) {                    thrown = x;                    throw x;                } catch (Throwable x) {                    thrown = x;                    throw new Error(x);                } finally {                    // 工作执行之后的钩子函数                    afterExecute(task, thrown);                }            } finally {                task = null;                w.completedTasks++;                w.unlock();            }        }        completedAbruptly = false;    } finally {        processWorkerExit(w, completedAbruptly);    }}

tryTerminate

tryTerminate 尝试敞开线程池。

     /**     * 当波及移除 work 时,都要尝试判断线程池是否能退出了     */    final void tryTerminate() {        for (; ; ) {            int c = ctl.get();            if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty())) {                return;            }            /**             * 如果工作线程不为 0 ,打断一个线程             */            if (workerCountOf(c) != 0) {                interruptIdleWorkers(ONLY_ONE);                return;            }            /**             * 走到这里,工作线程为 0 了,并且队列中工作也为 0 ,设置线程池状态为 TIDYING             * 设置线程池状态为 TIDYING 并调用 terminated() ,调用 terminated() 办法之后设置线程池状态为 TERMINATED             */            final ReentrantLock mainLock = this.mainLock;            mainLock.lock();            try {                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {                    try {                        terminated();                    } finally {                        ctl.set(ctlOf(TERMINATED, 0));                        termination.signalAll();                    }                    return;                }            } finally {                mainLock.unlock();            }        }    }

本文由 张攀钦的博客 http://www.mflyyou.cn/ 创作。 可自在转载、援用,但需署名作者且注明文章出处。

如转载至微信公众号,请在文末增加作者公众号二维码。微信公众号名称:Mflyyou