关于java:线程池从设计思想到源码解读

48次阅读

共计 29598 个字符,预计需要花费 74 分钟才能阅读完成。

明天说一说,线程池,从设计思维到源码解析。

前言

各位小伙伴儿,春节曾经完结了,在此献上一篇肝了一个春节假期的迟来的拜年之作,心愿读者敌人们都能有播种。多多点赞、评论、珍藏!

初识线程池

咱们晓得,线程的创立和销毁都须要映射到操作系统,因而其代价是比拟昂扬的。出于防止频繁创立、销毁线程以及不便线程治理的须要,线程池应运而生。

线程池劣势

  • 「升高资源耗费」:线程池通常会保护一些线程(数量为 corePoolSize),这些线程被重复使用来执行不同的工作,工作实现后不会销毁。在待处理任务量很大的时候,通过对线程资源的复用,防止了线程的频繁创立与销毁,从而升高了系统资源耗费。
  • 「进步响应速度」:因为线程池保护了一批 alive 状态的线程,当工作达到时,不须要再创立线程,而是间接由这些线程去执行工作,从而缩小了工作的等待时间。
  • 「进步线程的可管理性」:应用线程池能够对线程进行对立的调配,调优和监控。

线程池设计思路

有句话叫做艺术来源于生存,编程语言也是如此,很多设计思维能映射到日常生活中,比方面向对象思维、封装、继承,等等。明天咱们要说的线程池,它同样能够在事实世界找到对应的实体——工厂。

先假想一个工厂的生产流程:

线程池设计思路

工厂中有固定的一批工人,称为正式工人,工厂接管的订单由这些工人去实现。当订单减少,正式工人曾经忙不过来了,工厂会将生产原料临时沉积在仓库中,等有闲暇的工人时再解决(因为工人闲暇了也不会被动解决仓库中的生产工作,所以须要调度员实时调度)。仓库沉积满了后,订单还在减少怎么办?工厂只能长期扩招一批工人来应答生产顶峰,而这批工人顶峰完结后是要清退的,所以称为临时工。过后临时工也以招满后(受限于工位限度,临时工数量有下限),前面的订单只能忍痛回绝了。

咱们做如下一番映射:

  • 工厂——线程池
  • 订单——工作(Runnable)
  • 正式工人——外围线程
  • 临时工——一般线程
  • 仓库——工作队列
  • 调度员——getTask()

getTask()是一个办法,将工作队列中的任务调度给闲暇线程,在解读线程池有具体介绍

映射后,造成线程池流程图如下,两者是不是有殊途同归之妙?

线程池流程图

这样,线程池的工作原理或者说流程就很好了解了,提炼成一个简图:

线程池的工作原理

深刻线程池

那么接下来,问题来了,线程池是具体如何实现这套工作机制的呢?从 Java 线程池 Executor 框架体系能够看出:线程池的真正实现类是ThreadPoolExecutor,因而咱们接下来重点钻研这个类。

线程池工作机制

构造方法

钻研一个类,先从它的构造方法开始。ThreadPoolExecutor提供了 4 个有参构造方法:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

解释一下构造方法中波及到的参数:

  • 「corePoolSize」(必须):外围线程数。即池中始终放弃存活的线程数,即便这些线程处于闲暇。然而将 allowCoreThreadTimeOut 参数设置为 true 后,外围线程处于闲暇一段时间以上,也会被回收。
  • 「maximumPoolSize」(必须):池中容许的最大线程数。当外围线程全副忙碌且工作队列打满之后,线程池会长期追加线程,直到总线程数达到 maximumPoolSize 这个下限。
  • 「keepAliveTime」(必须):线程闲暇超时工夫。当非核心线程处于闲暇状态的工夫超过这个工夫后,该线程将被回收。将 allowCoreThreadTimeOut 参数设置为 true 后,外围线程也会被回收。
  • 「unit」(必须):keepAliveTime参数的工夫单位。有:TimeUnit.DAYS(天)、TimeUnit.HOURS(小时)、TimeUnit.MINUTES(分钟)、「TimeUnit.SECONDS(秒)」「TimeUnit.MILLISECONDS(毫秒)」、TimeUnit.MICROSECONDS(微秒)、TimeUnit.NANOSECONDS(纳秒)
  • 「workQueue」(必须):工作队列,采纳阻塞队列实现。当外围线程全副忙碌时,后续由 execute 办法提交的 Runnable 将寄存在工作队列中,期待被线程解决。
  • 「threadFactory」(可选):线程工厂。指定线程池创立线程的形式。
  • 「handler」(可选):回绝策略。当线程池中线程数达到 maximumPoolSizeworkQueue打满时,后续提交的工作将被回绝,handler能够指定用什么形式回绝工作。

放到一起再看一下:

工厂与线程池

工作队列

应用 ThreadPoolExecutor 须要指定一个实现了 BlockingQueue 接口的工作期待队列。在 ThreadPoolExecutor 线程池的 API 文档中,一共举荐了三种期待队列,它们是:SynchronousQueueLinkedBlockingQueueArrayBlockingQueue

  1. 「SynchronousQueue」:同步队列。这是一个外部没有任何容量的阻塞队列,任何一次插入操作的元素都要期待绝对的删除 / 读取操作,否则进行插入操作的线程就要始终期待,反之亦然。
  2. 「LinkedBlockingQueue」:无界队列(严格来说并非无界,下限是Integer.MAX_VALUE),基于链表构造。应用无界队列后,当外围线程都忙碌时,后续工作能够有限退出队列,因而线程池中线程数不会超过外围线程数。这种队列能够进步线程池吞吐量,但代价是就义内存空间,甚至会导致内存溢出。另外,应用它时能够指定容量,这样它也就是一种有界队列了。
  3. 「ArrayBlockingQueue」:有界队列,基于数组实现。在线程池初始化时,指定队列的容量,后续无奈再调整。这种有界队列有利于避免资源耗尽,但可能更难调整和管制。

另外,Java 还提供了另外 4 种队列:

  1. 「PriorityBlockingQueue」:反对优先级排序的无界阻塞队列。寄存在 PriorityBlockingQueue 中的元素必须实现 Comparable 接口,这样能力通过实现 compareTo() 办法进行排序。优先级最高的元素将始终排在队列的头部;PriorityBlockingQueue不会保障优先级一样的元素的排序,也不保障以后队列中除了优先级最高的元素以外的元素,随时处于正确排序的地位。
  2. 「DelayQueue」:提早队列。基于二叉堆实现,同时具备:无界队列、阻塞队列、优先队列的特色。DelayQueue提早队列中寄存的对象,必须是实现 Delayed 接口的类对象。通过执行时延从队列中提取工作,工夫没到工作取不进去。更多内容请见 DelayQueue:面试官:谈谈 Java 中的阻塞提早队列 DelayQueue 原理和用法
  3. 「LinkedBlockingDeque」:双端队列。基于链表实现,既能够从尾部插入 / 取出元素,还能够从头部插入元素 / 取出元素。
  4. 「LinkedTransferQueue」:由链表构造组成的无界阻塞队列。这个队列比拟特地的时,采纳一种预占模式,意思就是消费者线程取元素时,如果队列不为空,则间接取走数据,若队列为空,那就生成一个节点(节点元素为 null)入队,而后消费者线程被期待在这个节点上,前面生产者线程入队时发现有一个元素为 null 的节点,生产者线程就不入队了,间接就将元素填充到该节点,并唤醒该节点期待的线程,被唤醒的消费者线程取走元素。

回绝策略

线程池有一个重要的机制:回绝策略。当线程池 workQueue 已满且无奈再创立新线程池时,就要回绝后续工作了。回绝策略须要实现 RejectedExecutionHandler 接口,不过 Executors 框架曾经为咱们实现了 4 种回绝策略:

  1. 「AbortPolicy」(默认):抛弃工作并抛出 RejectedExecutionException 异样。
  2. 「CallerRunsPolicy」:间接运行这个工作的 run 办法,但并非是由线程池的线程解决,而是交由工作的调用线程解决。
  3. 「DiscardPolicy」:间接抛弃工作,不抛出任何异样。
  4. 「DiscardOldestPolicy」:将以后处于期待队列列头的期待工作强行取出,而后再试图将以后被回绝的工作提交到线程池执行。

线程工厂指定创立线程的形式,这个参数不是必选项,Executors类曾经为咱们十分贴心地提供了一个默认的线程工厂:

/**
 * The default thread factory
 */
static class DefaultThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

线程池状态

线程池有 5 种状态:

volatile int runState;
// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

runState示意以后线程池的状态,它是一个 volatile 变量用来保障线程之间的可见性。

上面的几个 static final 变量示意 runState 可能的几个取值,有以下几个状态:

  • 「RUNNING」:当创立线程池后,初始时,线程池处于 RUNNING 状态;
  • 「SHUTDOWN」:如果调用了 shutdown() 办法,则线程池处于 SHUTDOWN 状态,此时线程池不可能承受新的工作,它会期待所有工作执行结束;
  • 「STOP」:如果调用了 shutdownNow()办法,则线程池处于 STOP 状态,此时线程池不能承受新的工作,并且会去尝试终止正在执行的工作;
  • 「TERMINATED」:当线程池处于 SHUTDOWNSTOP状态,并且所有工作线程曾经销毁,工作缓存队列曾经清空或执行完结后,线程池被设置为 TERMINATED 状态。

初始化 & 容量调整 & 敞开

「1、线程初始化」

默认状况下,创立线程池之后,线程池中是没有线程的,须要提交工作之后才会创立线程。

在理论中如果须要线程池创立之后立刻创立线程,能够通过以下两个办法办到:

  • 「prestartCoreThread()」:boolean prestartCoreThread(),初始化一个外围线程
  • 「prestartAllCoreThreads()」:int prestartAllCoreThreads(),初始化所有外围线程,并返回初始化的线程数
public boolean prestartCoreThread() {return addIfUnderCorePoolSize(null); // 留神传进去的参数是 null
}

public int prestartAllCoreThreads() {
    int n = 0;
    while (addIfUnderCorePoolSize(null))// 留神传进去的参数是 null
        ++n;
    return n;
}

「2、线程池敞开」

ThreadPoolExecutor提供了两个办法,用于线程池的敞开:

  • 「shutdown()」:不会立刻终止线程池,而是要等所有工作缓存队列中的工作都执行完后才终止,但再也不会承受新的工作
  • 「shutdownNow()」:立刻终止线程池,并尝试打断正在执行的工作,并且清空工作缓存队列,返回尚未执行的工作

「3、线程池容量调整」

ThreadPoolExecutor提供了动静调整线程池容量大小的办法:

  • 「setCorePoolSize」:设置外围池大小
  • 「setMaximumPoolSize」:设置线程池最大能创立的线程数目大小

当上述参数从小变大时,ThreadPoolExecutor进行线程赋值,还可能立刻创立新的线程来执行工作。

应用线程池

ThreadPoolExecutor

通过构造方法应用 ThreadPoolExecutor 是线程池最间接的应用形式,上面看一个实例:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MyTest {public static void main(String[] args) {
  // 创立线程池
  ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3, 5, 5, TimeUnit.SECONDS,
    new ArrayBlockingQueue<Runnable>(5));
  // 向线程池提交工作
  for (int i = 0; i < threadPool.getCorePoolSize(); i++) {threadPool.execute(new Runnable() {
    @Override
    public void run() {for (int x = 0; x < 2; x++) {System.out.println(Thread.currentThread().getName() + ":" + x);
      try {Thread.sleep(2000);
      } catch (InterruptedException e) {e.printStackTrace();
      }
     }
    }
   });
  }

  // 敞开线程池
  threadPool.shutdown(); // 设置线程池的状态为 SHUTDOWN,而后中断所有没有正在执行工作的线程
  // threadPool.shutdownNow(); // 设置线程池的状态为 STOP,而后尝试进行所有的正在执行或暂停工作的线程,并返回期待执行工作的列表,该办法要慎用,容易造成不可控的结果}
}

运行后果:

pool-1-thread-2:0
pool-1-thread-1:0
pool-1-thread-3:0
pool-1-thread-2:1
pool-1-thread-3:1
pool-1-thread-1:1

Executors 封装线程池

另外,Executors封装好了 4 种常见的性能线程池(还是那么地贴心):

「1、FixedThreadPool」

固定容量线程池。其特点是最大线程数就是外围线程数,意味着线程池只能创立外围线程,keepAliveTime为 0,即线程执行完工作立刻回收。工作队列未指定容量,代表应用默认值Integer.MAX_VALUE。实用于须要管制并发线程的场景。

// 应用默认线程工厂
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
// 须要自定义线程工厂
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}

应用示例:

// 1. 创立线程池对象,设置外围线程和最大线程数为 5
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
// 2. 创立 Runnable(工作)Runnable task =new Runnable(){public void run() {System.out.println(Thread.currentThread().getName() + "---> 运行");
  }
};
// 3. 向线程池提交工作
fixedThreadPool.execute(task);

「2、SingleThreadExecutor」

单线程线程池。特点是线程池中只有一个线程(外围线程),线程执行完工作立刻回收,应用有界阻塞队列(容量未指定,应用默认值Integer.MAX_VALUE

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
// 为节俭篇幅,省略了自定义线程工厂形式的源码

应用示例:

// 1. 创立单线程线程池
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
// 2. 创立 Runnable(工作)Runnable task = new Runnable(){public void run() {System.out.println(Thread.currentThread().getName() + "---> 运行");
  }
};
// 3. 向线程池提交工作
singleThreadExecutor.execute(task);

「3、ScheduledThreadPool」

定时线程池。指定外围线程数量,一般线程数量有限,线程执行完工作立刻回收,工作队列为延时阻塞队列。这是一个比拟特地的线程池,实用于「执行定时或周期性的工作」

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);
}

// 继承了 ThreadPoolExecutor
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor
        implements ScheduledExecutorService {
    // 构造函数,省略了自定义线程工厂的构造函数
 public ScheduledThreadPoolExecutor(int corePoolSize) {
     super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
           new DelayedWorkQueue());
 }
 
 // 延时执行工作
 public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {...}
 // 定时执行工作
 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {...}
}

应用示例:

// 1. 创立定时线程池
ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
// 2. 创立 Runnable(工作)Runnable task = new Runnable(){public void run() {System.out.println(Thread.currentThread().getName() + "---> 运行");
  }
};
// 3. 向线程池提交工作
scheduledThreadPool.schedule(task, 2, TimeUnit.SECONDS); // 提早 2s 后执行工作
scheduledThreadPool.scheduleAtFixedRate(task,50,2000,TimeUnit.MILLISECONDS);// 提早 50ms 后、每隔 2000ms 执行工作

「4、CachedThreadPool」

缓存线程池。没有外围线程,一般线程数量为 Integer.MAX_VALUE(能够了解为有限),线程闲置 60s 后回收,工作队列应用SynchronousQueue 这种无容量的同步队列。实用于 「任务量大但耗时低」 的场景。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

应用示例:

// 1. 创立缓存线程池
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// 2. 创立 Runnable(工作)Runnable task = new Runnable(){public void run() {System.out.println(Thread.currentThread().getName() + "---> 运行");
  }
};
// 3. 向线程池提交工作
cachedThreadPool.execute(task);

解读线程池

OK,置信后面内容浏览起来还算轻松愉悦吧,那么从这里开始就进入深水区了,如果前面内容能吃透,那么线程池常识就真的被你把握了。

咱们晓得,向线程池提交工作是用 ThreadPoolExecutorexecute()办法,但在其外部,线程工作的解决其实是相当简单的,波及到 ThreadPoolExecutorWorkerThread 三个类的 6 个办法:

向线程池提交工作

execute()

ThreadPoolExecutor 类中,工作提交办法的入口是 execute(Runnable command) 办法(submit()办法也是调用了 execute()),该办法其实只在尝试做一件事:通过各种校验之后,调用 addWorker(Runnable command,boolean core) 办法为线程池创立一个线程并执行工作,与之绝对应,execute() 的后果有两个:

「参数阐明:」

  1. 「Runnable command」:待执行的工作

「执行流程:」

1、通过 ctl.get() 失去线程池的以后线程数,如果线程数小于 corePoolSize,则调用 addWorker(commond,true) 办法创立新的线程执行工作,否则执行步骤 2;

2、步骤 1 失败,阐明曾经无奈再创立新线程,那么思考将工作放入阻塞队列,期待执行完工作的线程来解决。基于此,判断线程池是否处于 Running 状态(只有 Running 状态的线程池能够承受新工作),如果工作增加到工作队列胜利则进入步骤 3,失败则进入步骤 4;

3、来到这一步须要阐明工作曾经退出工作队列,这时要二次校验线程池的状态,会有以下情景:

  • 线程池不再是 Running 状态了,须要将工作从工作队列中移除,如果移除胜利则回绝本次工作
  • 线程池是 Running 状态,则判断线程池工作线程是否为 0,是则调用 addWorker(commond,true)增加一个没有初始工作的线程(这个线程将去获取曾经退出工作队列的本次工作并执行),否则进入步骤 4;
  • 线程池不是 Running 状态,但从工作队列移除工作失败(可能已被某线程获取?),进入步骤 4;

4、将线程池扩容至 maximumPoolSize 并调用 addWorker(commond,false)办法创立新的线程执行工作,失败则回绝本次工作。

「流程图:」

创立新的线程执行工作

「源码详读:」

/**
 * 在未来的某个时候执行给定的工作。工作能够在新线程中执行,也能够在现有的池线程中执行。* 如果因为此执行器已敞开或已达到其容量而无奈提交工作以供执行,则由以后的 {@code RejectedExecutionHandler} 解决该工作。* 
 * @param command the task to execute  待执行的工作命令
 */
public void execute(Runnable command) {if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     * 
     * 1. 如果运行的线程少于 corePoolSize,将尝试以给定的命令作为第一个工作启动新线程。*
     * 2. 如果一个工作能够胜利排队,那么咱们依然须要仔细检查两点,其一,咱们是否应该增加一个线程
     *(因为自从上次查看至今,一些存在的线程曾经死亡),其二,线程池状态此时已扭转成非运行态。因而,咱们从新查看状态,如果查看不通过,则移除曾经入列的工作,如果查看通过且线程池线程数为 0,则启动新线程。* 
     * 3. 如果无奈将工作退出工作队列,则将线程池扩容到极限容量并尝试创立一个新线程,如果失败则回绝工作。*/
    int c = ctl.get();
   
    // 步骤 1:判断线程池以后线程数是否小于线程池大小
    if (workerCountOf(c) < corePoolSize) {
        // 减少一个工作线程并增加工作,胜利则返回,否则进行步骤 2
        // true 代表应用 coreSize 作为边界束缚,否则应用 maximumPoolSize
        if (addWorker(command, true))
            return;
        c = ctl.get();}
    // 步骤 2:不满足 workerCountOf(c) < corePoolSize 或 addWorker 失败,进入步骤 2
    // 校验线程池是否是 Running 状态且工作是否胜利放入 workQueue(阻塞队列)if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();
        // 再次校验,如果线程池非 Running 且从工作队列中移除工作胜利,则回绝该工作
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 如果线程池工作线程数量为 0,则新建一个空工作的线程
        else if (workerCountOf(recheck) == 0)
            // 如果线程池不是 Running 状态,是退出不进去的
            addWorker(null, false);
    }
    // 步骤 3:如果线程池不是 Running 状态或工作入列失败,尝试扩容 maxPoolSize 后再次 addWorker,失败则回绝工作
    else if (!addWorker(command, false))
        reject(command);
}

addWorker()

addWorker(Runnable firstTask, boolean core) 办法,顾名思义,向线程池增加一个带有工作的工作线程。

「参数阐明:」

  1. 「Runnable firstTask」:新创建的线程应该首先运行的工作(如果没有,则为空)。
  2. 「boolean core」:该参数决定了线程池容量的约束条件,即以后线程数量以何值为极限值。参数为 true 则应用corePollSize 作为束缚值,否则应用maximumPoolSize

「执行流程:」

1、外层循环判断线程池的状态是否能够新增工作线程。这层校验基于上面两个准则:

  • 线程池为 Running 状态时,既能够承受新工作也能够解决工作
  • 线程池为敞开状态时只能新增空工作的工作线程(worker)解决工作队列(workQueue)中的工作不能承受新工作

2、内层循环向线程池增加工作线程并返回是否增加胜利的后果。

  • 首先校验线程数是否曾经超限度,是则返回false,否则进入下一步
  • 通过 CAS 使工作线程数 +1,胜利则进入步骤 3,失败则再次校验线程池是否是运行状态,是则持续内层循环,不是则返回外层循环

3、外围线程数量 + 1 胜利的后续操作:增加到工作线程汇合,并启动工作线程

  • 首先获取锁之后,再次校验线程池状态(具体校验规定见代码注解),通过则进入下一步,未通过则增加线程失败
  • 线程池状态校验通过后,再查看线程是否曾经启动,是则抛出异样,否则尝试将线程退出线程池
  • 查看线程是否启动胜利,胜利则返回true,失败则进入 addWorkerFailed 办法

「流程图:」

向线程池增加一个带有工作的工作线程

「源码详读:」

private boolean addWorker(Runnable firstTask, boolean core) {
    // 外层循环:判断线程池状态
    retry:
    for (;;) {int c = ctl.get();
        int rs = runStateOf(c);

        /** 
         * 1. 线程池为非 Running 状态(Running 状态则既能够新增外围线程也能够接受任务)* 2. 线程为 shutdown 状态且 firstTask 为空且队列不为空
         * 3. 满足条件 1 且条件 2 不满足,则返回 false
         * 4. 条件 2 解读:线程池为 shutdown 状态时且工作队列不为空时,能够新增空工作的线程来解决队列中的工作
         */
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

  // 内层循环:线程池增加外围线程并返回是否增加胜利的后果
        for (;;) {int wc = workerCountOf(c);
            // 校验线程池已有线程数量是否超限:// 1. 线程池最大下限 CAPACITY 
            // 2.corePoolSize 或 maximumPoolSize(取决于入参 core)if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize)) 
                return false;
            // 通过 CAS 操作使工作线程数 +1,跳出外层循环
            if (compareAndIncrementWorkerCount(c)) 
                break retry;
            // 线程 + 1 失败,重读 ctl
            c = ctl.get();   // Re-read ctl
            // 如果此时线程池状态不再是 running,则从新进行外层循环
            if (runStateOf(c) != rs)
                continue retry;
            // 其余 CAS 失败是因为工作线程数量扭转了,持续内层循环尝试 CAS 对线程数 +1
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    /**
     * 外围线程数量 + 1 胜利的后续操作:增加到工作线程汇合,并启动工作线程
     */
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        final ReentrantLock mainLock = this.mainLock;
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // 上面代码须要加锁:线程池主锁
            mainLock.lock(); 
            try {
                // 持锁期间从新查看,线程工厂创立线程失败或获取锁之前敞开的状况产生时,退出
                int c = ctl.get();
                int rs = runStateOf(c);

    // 再次测验线程池是否是 running 状态或线程池 shutdown 但线程工作为空
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // 线程曾经启动,则抛出非法线程状态异样
                    // 为什么会存在这种状态呢?未解决
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w); // 退出线程池
                    int s = workers.size();
                    // 如果当前工作线程数超过线程池已经呈现过的最大线程数,刷新后者值
                    if (s > largestPoolSize)
                        largestPoolSize = s; 
                    workerAdded = true;
                }
            } finally {mainLock.unlock();  // 开释锁
            }
            if (workerAdded) { // 工作线程增加胜利,启动该线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 线程启动失败,则进入 addWorkerFailed
        if (! workerStarted) 
            addWorkerFailed(w);
    }
    return workerStarted;
}

Worker 类

Worker类是外部类,既实现了Runnable,又继承了AbstractQueuedSynchronizer(以下简称AQS),所以其既是一个可执行的工作,又能够达到锁的成果。

Worker类次要保护正在运行工作的线程的中断管制状态,以及其余主要的记录。这个类适时地继承了 AbstractQueuedSynchronizer 类,以简化获取和开释锁(该锁作用于每个工作执行代码)的过程。这样能够避免去中断正在运行中的工作,只会中断在期待从工作队列中获取工作的线程。

咱们实现了一个简略的不可重入互斥锁,而不是应用可重入锁,因为咱们不心愿工作工作在调用 setCorePoolSize 之类的池管制办法时可能从新获取锁。另外,为了在线程真正开始运行工作之前禁止中断,咱们将锁状态初始化为负值,并在启动时革除它(在 runWorker 中)。

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;
 
    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread; 
     
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
     
    /** Per-thread task counter */
    volatile long completedTasks;
 
    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    // 通过构造函数初始化,Worker(Runnable firstTask) {
        // 设置 AQS 的同步状态
        // state:锁状态,- 1 为初始值,0 为 unlock 状态,1 为 lock 状态
        setState(-1); // inhibit interrupts until runWorker  在调用 runWorker 前,禁止中断
       
        this.firstTask = firstTask;
        // 线程工厂创立一个线程
        this.thread = getThreadFactory().newThread(this); 
    }
 
    /** Delegates main run loop to outer runWorker  */
    public void run() {runWorker(this); //runWorker()是 ThreadPoolExecutor 的办法}
 
    // Lock methods
    // The value 0 represents the unlocked state. 0 代表“没被锁定”状态
    // The value 1 represents the locked state. 1 代表“锁定”状态
 
    protected boolean isHeldExclusively() {return getState() != 0;
    }
 
    /**
     * 尝试获取锁的办法
     * 重写 AQS 的 tryAcquire(),AQS 原本就是让子类来实现的
     */
    protected boolean tryAcquire(int unused) {
        // 判断原值为 0,且重置为 1,所以 state 为 - 1 时,锁无奈获取。// 每次都是 0 ->1,保障了锁的不可重入性
        if (compareAndSetState(0, 1)) {
            // 设置 exclusiveOwnerThread= 以后线程
            setExclusiveOwnerThread(Thread.currentThread()); 
            return true;
        }
        return false;
    }
 
    /**
     * 尝试开释锁
     * 不是 state-1,而是置为 0
     */
    protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null); 
        setState(0);
        return true;
    }
 
    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }
 
    /**
     * 中断(如果运行)* shutdownNow 时会循环对 worker 线程执行
     * 且不须要获取 worker 锁,即便在 worker 运行时也能够中断
     */
    void interruptIfStarted() {
        Thread t;
        // 如果 state>=0、t!=null、且 t 没有被中断
        //new Worker()时 state==-1,阐明不能中断
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {t.interrupt();
            } catch (SecurityException ignore) {}}
    }
}

runWorker()

能够说,runWorker(Worker w) 是线程池中真正解决工作的办法,后面的execute() 和 addWorker() 都是在为该办法做筹备和铺垫。

「参数阐明:」

  1. 「Worker w」:封装的 Worker,携带了工作线程的诸多因素,包含Runnable(待处理工作)、lock(锁)、completedTasks(记录线程池已实现工作数)

「执行流程:」

1、判断当前任务或者从工作队列中获取的工作是否不为空,都为空则进入步骤 2,否则进入步骤 3

2、工作为空,则将 completedAbruptly 置为 false(即线程不是忽然终止),并执行processWorkerExit(w,completedAbruptly) 办法进入线程退出程序

3、工作不为空,则进入循环,并加锁

4、判断是否为线程增加中断标识,以下两个条件满足其一则增加中断标识:

  • 线程池状态 >=STOP, 即 STOPTERMINATED
  • 一开始判断线程池状态 <STOP,接下来查看发现 Thread.interrupted()true,即线程曾经被中断,再次查看线程池状态是否 >=STOP(以打消该霎时 shutdown 办法失效,使线程池处于 STOPTERMINATED

5、执行前置办法 beforeExecute(wt, task)(该办法为空办法,由子类实现)后执行task.run() 办法执行工作(执行不胜利抛出相应异样)

6、执行后置办法 afterExecute(task, thrown)(该办法为空办法,由子类实现)后将线程池已实现的工作数 +1,并开释锁。

7、再次进行循环条件判断。

「流程图:」

线程池流程图

「源码详读:」

final void runWorker(Worker w) {Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // allow interrupts
    // new Worker()是 state==-1,此处是调用 Worker 类的 tryRelease()办法,将 state 置为 0,而 interruptIfStarted()中只有 state>= 0 才容许调用中断
    w.unlock(); 
            
    // 线程退出的起因,true 是工作导致,false 是线程失常退出
    boolean completedAbruptly = true; 
    try {
        // 当前任务和从工作队列中获取的工作都为空,方进行循环
        while (task != null || (task = getTask()) != null) {// 上锁能够避免在 shutdown()时终止正在运行的 worker,而不是应答并发
            w.lock(); 
             
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            /**
             * 判断 1:确保只有在线程处于 stop 状态且 wt 未中断时,wt 才会被设置中断标识
             * 条件 1:线程池状态 >=STOP, 即 STOP 或 TERMINATED
             * 条件 2:一开始判断线程池状态 <STOP,接下来查看发现 Thread.interrupted()为 true,即线程曾经被中断,再次查看线程池状态是否 >=STOP(以打消该霎时 shutdown 办法失效,使线程池处于 STOP 或 TERMINATED),* 条件 1 与条件 2 任意称心一个,且 wt 不是中断状态,则中断 wt,否则进入下一步
             */
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt(); // 以后线程调用 interrupt()中断
             
            try {
                // 执行前(空办法,由子类重写实现)beforeExecute(wt, task);
                 
                Throwable thrown = null;
                try {task.run();
                } 
                catch (RuntimeException x) {thrown = x; throw x;} 
                catch (Error x) {thrown = x; throw x;} 
                catch (Throwable x) {thrown = x; throw new Error(x);
                } 
                finally {
                    // 执行后(空办法,由子类重写实现)afterExecute(task, thrown); 
                }
            } 
            finally {
                task = null; 
                w.completedTasks++; // 实现工作数 +1
                w.unlock(); // 开释锁}
        }
        // 
        completedAbruptly = false;
    } 
    finally {
        // 解决 worker 的退出
        processWorkerExit(w, completedAbruptly);
    }
}

「5、getTask()」

由函数调用关系图可知,在 ThreadPoolExecutor 类的实现中,Runnable getTask() 办法是为 void runWorker(Worker w) 办法服务的,它的作用就是在工作队列(workQueue)中获取 task(Runnable)。

「参数阐明」:无参数

「执行流程」

  1. timedOut(上次获取工作是否超时)置为false(首次执行办法,无上次,天然为false),进入一个有限循环
  2. 如果线程池为 Shutdown 状态且工作队列为空(线程池 shutdown 状态能够解决工作队列中的工作,不再承受新工作,这个是重点)或者线程池为 STOPTERMINATED状态,则意味着线程池不用再获取工作了,当前工作线程数量 - 1 并返回null,否则进入步骤 3
  3. 如果线程池数量超限度或者工夫超限且(工作队列为空或以后线程数 >1),则进入步骤 4,否则进入步骤 5。
  4. 移除工作线程,胜利则返回null,不胜利则进入下轮循环。
  5. 尝试用 poll() 或者 take()(具体用哪个取决于timed 的值)获取工作,如果工作不为空,则返回该工作。如果为空,则将 timeOut 置为 true 进入下一轮循环。如果获取工作过程产生异样,则将 timeOut置为 false 后进入下一轮循环。

「流程图」

线程池流程图

「源码详读:」

private Runnable getTask() {
    // 最新一次 poll 是否超时
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        /**
         * 条件 1:线程池状态 SHUTDOWN、STOP、TERMINATED 状态
         * 条件 2:线程池 STOP、TERMINATED 状态或 workQueue 为空
         * 条件 1 与条件 2 同时为 true,则 workerCount-1,并且返回 null
         * 注:条件 2 是思考到 SHUTDOWN 状态的线程池不会接受任务,但仍会解决工作
         */
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        /**
         * 下列两个条件满足任意一个,则给以后正在尝试获取工作的工作线程设置阻塞工夫限度(超时会被销毁?不太确定这点),否则线程能够始终放弃沉闷状态
         * 1.allowCoreThreadTimeOut:以后线程是否以 keepAliveTime 为超时时限期待工作
         * 2. 以后线程数量曾经超过了外围线程数
         */
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            
        // 两个条件全副为 true,则通过 CAS 使工作线程数 -1,即剔除工作线程
        // 条件 1:工作线程数大于 maximumPoolSize,或(工作线程阻塞工夫受限且上次在工作队列拉取工作超时)// 条件 2:wc > 1 或工作队列为空
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            // 移除工作线程,胜利则返回 null,不胜利则进入下轮循环
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

     // 执行到这里,阐明曾经通过后面重重校验,开始真正获取 task 了
        try {// 如果工作线程阻塞工夫受限,则应用 poll(), 否则应用 take()
            // poll()设定阻塞工夫,而 take()无工夫限度,直到拿到后果为止
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            // r 不为空,则返回该 Runnable
            if (r != null)
                return r;
            // 没能获取到 Runable,则将最近获取工作是否超时设置为 true
            timedOut = true;
        } catch (InterruptedException retry) {
            // 响应中断,进入下一次循环前将最近获取工作超时状态置为 false
            timedOut = false;
        }
    }
}

processWorkerExit()

processWorkerExit(Worker w, boolean completedAbruptly)执行线程退出的办法

「参数阐明:」

  1. 「Worker w」:要完结的工作线程。
  2. 「boolean completedAbruptly」:是否忽然实现(异样导致),如果工作线程因为用户异样死亡,则 completedAbruptly 参数为 true

「执行流程:」

1、如果 completedAbruptly 为 true,即工作线程因为异样忽然死亡,则执行工作线程 - 1 操作。

2、主线程获取锁后,线程池曾经实现的工作数追加 w(当前工作线程)实现的工作数,并从 workerset汇合中移除以后worker

3、依据线程池状态进行判断是否执行 tryTerminate() 完结线程池。

4、是否须要减少工作线程,如果线程池还没有齐全终止,仍须要放弃肯定数量的线程。

  • 如果以后线程是忽然终止的,调用 addWorker() 创立工作线程
  • 以后线程不是忽然终止,但当前工作线程数量小于线程池须要保护的线程数量,则创立工作线程。须要保护的线程数量为 corePoolSize(取决于成员变量 allowCoreThreadTimeOut 是否为 false)或 1。
  • 源码详读:
/**
 * Performs cleanup and bookkeeping for a dying worker. Called
 * only from worker threads. Unless completedAbruptly is set,
 * assumes that workerCount has already been adjusted to account
 * for exit.  This method removes thread from worker set, and
 * possibly terminates the pool or replaces the worker if either
 * it exited due to user task exception or if fewer than
 * corePoolSize workers are running or queue is non-empty but
 * there are no workers.
 *
 * @param w the worker
 * @param completedAbruptly if the worker died due to user exception
 */
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    /**
     * 1. 工作线程 - 1 操作
     * 1)如果 completedAbruptly 为 true,阐明工作线程产生异样,那么将正在工作的线程数量 -1
     * 2)如果 completedAbruptly 为 false,阐明工作线程无工作能够执行,由 getTask()执行 worker- 1 操作
     */
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    // 2. 从线程 set 汇合中移除工作线程,该过程须要加锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 将该 worker 已实现的工作数追加到线程池已实现的工作数
        completedTaskCount += w.completedTasks;
        // HashSet<Worker> 中移除该 worker
        workers.remove(w);
    } finally {mainLock.unlock();
    }
    
 // 3. 依据线程池状态进行判断是否完结线程池
    tryTerminate();
 
 /**
     * 4. 是否须要减少工作线程
     * 线程池状态是 running 或 shutdown
     * 如果以后线程是忽然终止的,addWorker()
     * 如果以后线程不是忽然终止的,但以后线程数量 < 要保护的线程数量,addWorker()
     * 故如果调用线程池 shutdown(),直到 workQueue 为空前,线程池都会维持 corePoolSize 个线程,而后再逐步销毁这 corePoolSize 个线程
     */
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

好啦,以上就是 Java 线程池的全部内容啦,保持读完的伙伴儿们你们播种如何?感觉有帮忙的就棘手点个赞吧,祝大家新年新气象,降级加薪!

正文完
 0