乐趣区

ThreadPoolExecutor-源码解读

ThreadPoolExecutor 源码解读

傻瓜源码 - 内容简介

傻瓜源码 - 内容简介
????【职场经验】(持续更新)
精编短文:如何成为值钱的 Java 开发 - 指南

如何日常学习、如何书写简历、引导面试官、系统准备面试、选择 offer、提高绩效、晋升 TeamLeader…..
????【源码解读】(持续更新) <br/> 1. 源码选材:Java 架构师必须掌握的所有框架和类库源码<br/>2. 内容大纲:按照“企业应用 Demo”讲解执行源码:总纲“阅读指南”、第一章“源码基础”、第二章“相关 Java 基础”、第三章“白话讲源码”、第四章“代码解读”、第五章“设计模式”、第六章“附录 - 面试习题、相关 JDK 方法、中文注释可运行源码项目”
3. 读后问题:粉丝群答疑解惑
已收录:HashMap、ReentrantLock、ThreadPoolExecutor、《Spring 源码解读》、《Dubbo 源码解读》…..
????【面试题集】(持续更新)<br/>1. 面试题选材:Java 面试常问的所有面试题和必会知识点 <br/>2. 内容大纲:第一部分”注意事项“、第二部分“面试题解读”(包括:”面试题“、”答案“、”答案详解“、“实际开发解说”)
3. 深度 / 广度:面试题集中的答案和答案详解,都是对齐一般面试要求的深度和广度
4. 读后问题:粉丝群答疑解惑
已收录:Java 基础面试题集、Java 并发面试题集、JVM 面试题集、数据库 (Mysql) 面试题集、缓存 (Redis) 面试题集 …..
????【粉丝群】(持续更新) <br/>收录:阿里、字节跳动、京东、小米、美团、哔哩哔哩等大厂内推
???? 作者介绍:Spring 系源码贡献者、世界五百强互联网公司、TeamLeader、Github 开源产品作者
???? 作者微信:wowangle03(企业内推联系我)

  加入我的粉丝社群,阅读更多内容。从学习到面试,从面试到工作,从 coder 到 TeamLeader,每天给你答疑解惑,还能有第二份收入!

第 1 章 阅读指南

  • 本文基于 open-jdk 1.8 版本。
  • 本文根据”Demo“解读源码。
  • 本文建议分为两个学习阶段,掌握了第一阶段,再进行第二阶段;

    • 第一阶段,理解章节“源码解读”前的所有内容。即掌握 IT 技能:熟悉 ThreadPoolExecutor 原理。
    • 第二阶段,理解章节“源码解读”(包括源码解读)之后的内容。即掌握 IT 技能:精读 ThreadPoolExecutor 源码。
  • 建议按照本文内容顺序阅读(内容前后顺序存在依赖关系)。
  • 阅读过程中,如果遇到问题,记下来,后面不远的地方肯定有解答,或者在粉丝群里提问。
  • 阅读章节“源码解读”时,建议获得中文注释源码项目配合本文,Debug 进行阅读学习。
  • 源码项目中的注释含义:

    • “Demo”在源码中,会标注“// ThreadPoolExecutor Demo”。
    • 在源码中的不易定位到的主线源码,会标注“// tofix 主线”。
  • 以下注释的源码,暂时不深入讲解:

    • 在执行“Demo”过程中,没有被执行到的源码(由于遍历空集合、if 判断),会标注“/* Demo 不涉及 /”。

第 2 章 Demo

// ThreadPoolExecutor Demo
public class ThreadPoolExecutorTest {

    @Test
    public void testThreadPoolExecutor() {
        // 初始化线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                1,              // 指定 corePoolSize(核心线程数)1,              // 指定的 maximumPoolSize(线程池可以创建的最大线程数)0,             // 指定的 keepAliveTime(线程空闲的情况下,可以存活的时间)TimeUnit.SECONDS, // keepAliveTime 的时间单位
                new LinkedBlockingQueue<Runnable>()// 指定的 workQueue(装任务的队列));
        // 初始化任务
        Task task = new Task();
        // 向线程池提交任务
        executor.execute(task);
        
        // 关闭线程池有以下两种方法:// 关闭线程池:// 1. 线程池不会接受新的任务,执行拒绝任务处理器(默认是抛出异常)// 2. 线程池执行完已经提交了的任务再结束
        executor.shutdown();
        
        // 关闭线程池:// 1. 线程池不会接受新的任务,执行拒绝任务处理器(默认是抛出异常)// 2. 正在执行任务的线程不会被终止,直到任务处理完毕;// 3. 任务队列中没有线程处理的等待任务,会直接被终止;// 4. 返回未执行的任务
        executor.shutdownNow();}

}

class Task implements Runnable {

    @Override
    public void run() {System.out.println("任务被执行!");
    }
}

第 3 章 相关 Java 基础

3.1 二进制数

  在计算机中,所有的数字都是以二进制的形式存在,只用 0 和 1 两个数码来表示。为了区分正数和负数的二进制数,用最高位表示符号位(0 代表正数 /1 代表负数)。

  以 int 类型的二进制为例;int 类型的十进制整数转换成二进制,最多能够转换成 32 位的二进制数(前 16 位称为高 16 位,后 16 位称为低 16 位);正整数二进制数第 32 位为 0;负整数二进制数第 32 位为 1。

  在后文的讲解中,遇到类似这种的 int 二进制数:0000 0000 0000 0000 0000 0000 0010 1010,就直接省略为 0010 1010。

3.2 正整数十进制转二进制

  以 42 为例,转化为二进制等于 0010 1010。

3.3 二进制转正整数十进制

  以 0001 0100 为例,转换为整数十进制等于 20。

3.4 二进制加法

  把二进制数从右对齐,根据“逢二进一”规则计算;二进制数加法的法则:

0+0=0
0+1=1+0=1
1+1=0(进位为 1)1+1+1=1(进位为 1)

  例如:1110 和 1011 相加过程如下:

3.5 二进制减法

  把二进制数从右对齐,根据“借一有二”的规则计算;二进制数减法的法则:

0-0=0
1-1=0
1-0=1
0-1=1(借位为 1)

  例如:1101 和 1011 相减过程如下:

3.6 原码、反码、补码

  以负数 -5 为例:(扩展:正数的二进制反码、补码都是原码本身)

  • 1000 0000 0000 0000 0000 0000 0000 0101(-5 的二进制数;这样的编码方式,称为二进制“原码”)
  • 1111 1111 1111 1111 1111 1111 1111 1010(-5 的二进制原码符号位不变,其余位取反;这样的编码方式,称为二进制“反码”)
  • 1111 1111 1111 1111 1111 1111 1111 1011(-5 的二进制反码 + 1;这样的编码方式,称为二进制“补码”)

3.7 n<<w(有符号左移)

  以 5<<2 为例;就是把 5 的二进制位往左移两位,右边补 0,得出的十进制结果为 20。(符号位也会跟着左移运算移动)

  • 0000 0101(5 的二进制数)
  • 0001 0100(5 的二进制数向往左移动两位,右边补 0,得出的十进制结果为 20)

  在计算机中,负数都是以二进制补码的形式表示。以 -5<<2 为例;就是把 -5 的二进制补码位往左移两位,右边补 0;得出的结果仍是二进制补码的形式(转换成十进制是 -20)。如下:

  • 1000 0000 0000 0000 0000 0000 0000 0101(-5 的二进制原码)
  • 1111 1111 1111 1111 1111 1111 1111 1010(-5 的二进制原码符号位不变,其余位取反,得到二进制反码)
  • 1111 1111 1111 1111 1111 1111 1111 1011(-5 的二进制反码 + 1,得到二进制补码)
  • ========= -5<<2 运算开始 ==========
  • 1111 1111 1111 1111 1111 1111 1110 1100(-5 的二进制补码向左移动两位,右边补 0,得到二进制补码形式的结果)
  • ========= -5<<2 运算结束 ==========
  • 1111 1111 1111 1111 1111 1111 1110 1011(二进制补码结果 – 1,得到二进制反码)
  • 1000 0000 0000 0000 0000 0000 0001 0100(二进制反码符号位不变,其余位再取反,得到二进制原码,转化为十进制是 -20)

3.8 a&b(按位与)

  就是将 a 和 b 先转换为二进制数,然后相同位比较,只有两个都为 1,结果才为 1,否则为 0。

  以 3&5=1 为例:

  • 0000 0011(3 的二进制数)
  • 0000 0101(5 的二进制数)
  • 0000 0001(3&5(011 & 101)的结果,得出的十进制结果是 1)

3.9 a|b(按位或)

  就是将 a 和 b 先转换为二进制数,然后相同位比较,只要一个为 1,结果即为 1,否则为 0。

  以 6|2=6 为例:

  • 0000 0110(6 的二进制数)
  • 0000 0010(2 的二进制数)
  • 0000 0110(6|2(110|010)的结果,得出的十进制结果是 6)

3.10 跳出多重循环

代码示例 跳出多重循环

    @Test
    public void testRetry() {
        retry:
        for (int i = 0; i < 3; i++) {for (int j = 0; j < 3; j++) {if (i == 1) {continue retry;}
                System.out.println("i=" + i + ",j =" + j);
            }
        }

        System.out.println("------ 分割线 ------");

        retry:
        for (int i = 0; i < 3; i++) {for (int j = 0; j < 3; j++) {if (i == 1) {break retry;}
                System.out.println("i=" + i + ",j =" + j);
            }
        }
    }

  输出如下:
i=0,j = 0
i=0,j = 1
i=0,j = 2
i=2,j = 0
i=2,j = 1
i=2,j = 2
—— 分割线 ——
i=0,j = 0
i=0,j = 1
i=0,j = 2

3.11 Thread

  void interrupt():在一个线程中调用另一个线程的 interrupt() 方法,会将那个线程设置成线程中断状态,而不会真的中断线程。

  boolean isInterrupted(boolean ClearInterrupted):返回当前线程是否处于中断状态,ClearInterrupted 为 true,则返回的同时清除中断状态,反之则保留中断状态。

  boolean interrupted():就是封装了 isInterrupted(boolean ClearInterrupted) 方法,参数为 true。

  boolean interrupted():就是封装了 isInterrupted(boolean ClearInterrupted) 方法,参数为 false。

  boolean isInterrupted():就是封装了 isInterrupted(boolean ClearInterrupted) 方法,参数为 false。

interrupt() 代码示例

class ThreadTest {public static void main(String[] args) throws Exception {MyThread thread = new MyThread();
        thread.start();
        
        // 主线程通过调用 thread 对象的 interrupt() 方法,将 thread 线程设置成线程中断状态,但不会真的中断线程。thread.interrupt();

        System.out.println("主线程执行结束!");
    }
}

class MyThread extends Thread {
    @Override
    public void run() {System.out.println("test thread!");
    }
}
// 打印结果:// 主线程执行结束!// test thread!

3.12 LinkedBlockingQueue

  void offer(E e):向队尾添加元素,若队列已经满了,则直接返回 false。

  E poll(long timeout, TimeUnit unit):移除并返回 BlockingQueue 头部的元素,如果队列为空,则阻塞 timeout 个 unit 单位时间;如果规定时间内,仍然没有数据可取,则返回 null。如果线程被标记为中断状态(对线程调用 interrupt() 方法),则会抛出中断异常。

  E take():移除并返回 BlockingQueue 头部的元素,如果队列为空,则阻塞。如果线程被标记为中断状态(对线程调用 interrupt() 方法),则会抛出中断异常。

  int drainTo(Collection<? super E> c):从 BlockingQueue 移除所有数据,并放到 c 集合里。

  boolean remove(Object o):在 LinkedBlockingQueue 中移除 o。

3.13 原子性

  满足以下几个特点,我们就说这个操作支持原子性,线程安全:

  1. 原子操作中的所有子操作,要不全成功、要不全失败;
  2. 线程执行原子操作过程中,不会受到其它线程的任何影响;
  3. 其它线程只能感知到原子操作开始前和结束后的变化。

解释:

  包含多个操作单元,但仍支持原子性,通常都是由锁实现的。

代码示例:

class Test {
    int x = 0;
    int y = 0;

    public void test() {
        // 原子操作
        x = 10; 

        // 大致分为两步:1)获取 x 的值到缓存里;2)取出缓存里的值,赋值给 y
        // 不支持原子性;获取 x 的值到缓存里之后,其它线程可能修改 x 的值,导致 y 值错误
        y = x; 

        // 大致分为三步:1)获取 x 的值到缓存里;2)取出缓存里的值加一;3)赋值给 x
        // 不支持原子性;原理类似 y = x;
        x++; 

    }

}

3.14 Cas

  Cas 是 Compare-and-swap(比较并替换)的缩写,是支持原子性的操作;在 Java 中,底层是 native 方法实现,通过 CPU 提供的 lock 信号保证的原子性。

  想要将数据 V 的原值 O 替换为新值 N,执行 Cas 操作:

  1. 预先读取数据 V 的值 O 作为预期值;
  2. 执行 Cas 操作:

    1. 比较当前数据 V 的值是否是 O;

      1. 如果是,则替换为 N,返回执行成功;
      2. 如果不是,则不替换,返回执行失败;

例子:

  以 java.util.concurrent.atomic 包中的 AtomicInteger 为例;

    public static void main(String[] args) {AtomicInteger atomicInteger = new AtomicInteger(100);
        // 将 AtomicInteger 的值,从 100 替换为 200
        Boolean b = atomicInteger.compareAndSet(100, 200);
        // 返回 true,替换成功
        System.out.println(b);
    }

3.15 volatile

  用于修饰变量,可以保证被修饰变量的操作支持可见性和有序性,但不支持原子性。详见”Java 并发面试题集“

第 4 章 源码基础

4.1 导读

   1. 工作线程定义

  文中的工作线程是指线程池创建的存活线程。

4.2 ThreadPoolExecutor

代码示例 ThreadPoolExecutor 重要成员变量

public class ThreadPoolExecutor extends AbstractExecutorService {
    
   /**
    * 可通过构造函数指定的成员变量
    */

    // 创建工作线程的工厂类
    private volatile ThreadFactory threadFactory;

    // 核心线程数。用于控制线程池逻辑:当用户提交任务时,只要工作线程数小于 corePoolSize 时,就会创建工作线程执行任务;如果等于 corePoolSize,就不会创建工作线程,而是将任务放到 workQueue 队列对象里。// 也就是说核心线程只是一个概念,在代码中并没有标记某个线程是否是核心线程;也就是说当工作线程数小于等于 corePoolSize 时,线程池中这部分工作线程就是核心线程。private volatile int corePoolSize;

    // 线程存活时间。指线程空闲的情况下,可以存活的时间;核心线程不受 keepAliveTime 控制;除非 allowCoreThreadTimeOut 置为 true
    private volatile long keepAliveTime;
    
    // 是否允许核心线程空闲超时,默认为 false。如果配置 true,核心线程也会受 keepAliveTime 控制
    private volatile boolean allowCoreThreadTimeOut;
    
    // 当工作线程数等于 corePoolSize 时,提交的任务就会放在 workQueue 指定的队列对象里。private final BlockingQueue<Runnable> workQueue;
    
    // 表示线程池中最多能够容纳多少工作线程数。当工作线程数等于配置的 corePoolSize,并且 workQueue 已满时,线程池会创建新的非核心线程, 直到线程池中的工作线程总数等于 maximumPoolSize;private volatile int maximumPoolSize;
    
    // 拒绝任务处理器,默认是抛出异常;如果【任务队列已满,并且工作线程总数等于 maximumPoolSize】或者【已经调用 shutdown() 方法或 shutdownNow() 方法】,向线程池提交任务,则会执行 handler 
    private volatile RejectedExecutionHandler handler;

   /**
    * 供内部使用的成员变量
    */

    // ctl 二进制值的低 29 位,表示工作线程数
    // ctl 二进制值的高 3 位,表示线程池运行状态
    // ctl 是 AtomicInteger 类型,也就是说基于 CAS,支持原子操作,private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    // CAPACITY 表示工作线程数容量(十进制值为 536870911;二进制值为 0001 1111 1111 1111 1111 1111 1111 1111),用于参与 ctl 的计算,获取运行状态以及工作线程数;也是比 maximumPoolSize 优先级更高的工作线程数上限
    private static final int COUNT_BITS = Integer.SIZE - 3; // 29
    private static final int CAPACITY = (1 << COUNT_BITS) - 1;

    // workers 就是存放 worker(工作线程执行者,每个对象持有一个 thread 线程对象,代理了任务的执行)的容器
    private final HashSet<Worker> workers = new HashSet<Worker>();

    // largestPoolSize 记录 workers 里保存 worker 对象最多时的数量
    private int largestPoolSize;
    
    // completedTasks 表示线程池累积处理的任务数量
    volatile long completedTasks;
    
   /**
    * 线程池状态
    */
    
    // 状态变化:// 运行 -> 关闭(调用 shutdown 方法)-> 整理 -> 终止(RUNING->SHUTDOWN->TIDYING->TERMINATED)// 运行 -> 停止(调用 shutdownNow 方法)-> 整理 -> 终止(RUNING->STOP->TIDYING->TERMINATED)// RUNNING 表示运行状态(初始状态);(十进制值为 -536870912; 二进制值为 1110 0000 0000 0000 0000 0000 0000 0000)private static final int RUNNING = -1 << COUNT_BITS;

    // SHUTDOWN 表示关闭状态(十进制值为 0;二进制值为 0000 0000 0000 0000 0000 0000 0000 0000);调用 shutdown() 方法时,会将线程池的状态改为 SHUTDOWN;private static final int SHUTDOWN = 0 << COUNT_BITS;

    // STOP 表示停止状态(十进制值为 536870912;二进制值为 0010 0000 0000 0000 0000 0000 0000 0000);调用 shutdownNow() 方法时,会将线程池的状态改为 STOP;private static final int STOP = 1 << COUNT_BITS;

    // TIDYING 表示整理状态(十进制值为 1073741824;二进制值为 0100 0000 0000 0000 0000 0000 0000 0000),只有当线程池处于【【SHUTDOWN 状态并且 workQueue 队列不存在任务】或者【STOP 状态】】时, 并且所有工作线程都被销毁,才会将线程池状态改为 TERMINATED,在改为 TERMINATED 状态前,会先改为 TIDYING,增加这个状态为了区分是否执行 terminated() 方法(该方法默认为空方法,模版模式,用于子类实现扩展),执行 terminated() 方法前为 TIDYING 状态,执行后为 TERMINATED 状态
    private static final int TIDYING = 2 << COUNT_BITS;

    // TERMINATED 表示终止状态(十进制值为 1610612736;二进制值为 0110 0000 0000 0000 0000 0000 0000 0000);只有当线程池处于【【SHUTDOWN 状态并且 workQueue 队列不存在任务】或者【STOP 状态】】,并且所有工作线程都被销毁,并且执行完 terminated() 方法后,线程池才会设置为 TERMINATED 状态
    private static final int TERMINATED = 3 << COUNT_BITS;

4.3 Worker

  ThreadPoolExecutor 内部类 Worker。顾名思义,就是目标任务的执行者,每个对象里持有一个 thread 线程对象;基于代理者模式,控制目标任务的访问与执行。

代码示例 Worker 重要成员变量

    private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable {
        
        // 持有的工作线程对象(thread 是初始化 Worker 时,通过 ThreadPoolExecutor 的 threadFactory 创建出来的,用于执行 firstTask)final Thread thread;

        // 指定的第一个目标任务;(是初始化 worker 时,指定的;如果为空,则说明可能创建时就未指定或者指定的第一个任务已经处理完了,Worker 就开始从 workQueue 取任务执行)Runnable firstTask;

        // 当前 Worker 对象处理的任务数量总和
        volatile long completedTasks;

<br/>

加入我的粉丝社群,阅读全部内容

  从学习到面试,从面试到工作,从 coder 到 TeamLeader,每天给你答疑解惑,还能有第二份收入,这样的知识星球,难道你还要犹豫!

退出移动版