Executor线程池原理与源码解读

28次阅读

共计 4421 个字符,预计需要花费 12 分钟才能阅读完成。

线程池为线程生命周期的开销和资源不足问题提供了解决方 案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。

线程实现方式

Thread、Runnable、Callable

// 实现 Runnable 接口的类将被 Thread 执行,表示一个基本任务
public interface Runnable {
    //run 方法就是它所有内容,就是实际执行的任务
    public abstract void run();}
//Callable 同样是任务,与 Runnable 接口的区别在于它接口泛型,同时它执行任务候带有返回值;//Callable 的使用通过外层封装成 Future 来使用
public interface Callable<V> {
    // 相对于 run 方法,call 方法带有返回值
    V call() throws Exception;}

注意:启动 Thread 线程只能用 start(JNI 方法)来启动,start 方法通知虚拟机,虚拟机通过调用器映射到底层操作系统,通过操作系统来创建线程来执行当前任务的 run 方法

Executor 框架

Executor 接口是线程池框架中最基础的部分,定义了一个用于执行 Runnable 的 execute 方法。
从图中可以看出 Exectuor 下有一个重要的子接口 ExecutorService,其中定义了线程池的具体行为:

  • execute(Runnable runnable):执行 Runnable 类型的任务
  • submit(task):用来提交 Callable 或者 Runnable 任务,并返回代表此任务的 Future 对象
  • shutdown():在完成已经提交的任务后封闭办事,不在接管新的任务
  • shutdownNow():停止所有正在履行的任务并封闭办事
  • isTerminated():是一个钩子函数,测试是否所有任务都履行完毕了
  • isShutdown():是一个钩子函数,测试是否该 ExecutorService 是否被关闭

ExecutorService 中的重点属性:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

ctl:对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段,它包含两部分信息:线程池的运行状态 (runState) 和线程池内有效线程的数量(workerCount)。

这里可以看到,使用 Integer 类型来保存,高 3 位保存 runState,低 29 位保存 workerCount。COUNT_BITS 就是 29,CAPACITY 就是 1 左移 29 位减 1(29 个 1),这个常量表示 workerCount 的上限值,大约是 5 亿。

ctl 相关方法:

// 获取运行状态
private static int runStateOf(int c)     {return c & ~CAPACITY;}
// 获取活动线程数
private static int workerCountOf(int c)  {return c & CAPACITY;}
// 获取运行状态和活动线程数的值
private static int ctlOf(int rs, int wc) {return rs | wc;}

线程池的状态:

RUNNING = ­1 << COUNT_BITS; // 高 3 位为 111
SHUTDOWN = 0 << COUNT_BITS; // 高 3 位为 000
STOP = 1 << COUNT_BITS; // 高 3 位为 001
TIDYING = 2 << COUNT_BITS; // 高 3 位为 010
TERMINATED = 3 << COUNT_BITS; // 高 3 位为 011

1、RUNNING

  • 状态说明:线程池处于 RUNNING 状态,能够接收新任务,以及对已添加的任务进行处理。
  • 状态切换:线程池的初始化状态是 RUNNING。换句话说,线程池一旦被创建,就处于 RUNNING 状态,并且线程池中的任务数为 0。

2、SHUTDOWN

  • 状态说明:线程池处于 SHUTDOWN 状态,不接收新任务,能够处理已经添加的任务。
  • 状态切换:调用 shutdown()方法时,线程池由 RUNNING -> SHUTDOWN。

3、STOP

  • 状态说明:线程池处于 STOP 状态,不接收新任务,不处理已提交的任务,并且会中断正在处理的任务。
  • 状态切换:调用线程池中的 shutdownNow()方法时,线程池由(RUNNING or SHUTDOWN) -> STOP。

4、TIDYING

  • 状态说明:当所有的任务已经停止,ctl 记录“任务数量”为 0,线程池会变为 TIDYING 状态。当线程池处于 TIDYING 状态时,会执行钩子函数 terminated()。terminated()在 ThreadPoolExecutor 类中是空,的,若用户想在线程池变为 TIDYING 时,进行相应处理,可以通过重载 terminated()函数来实现。
  • 状态切换:当线程池在 SHUTDOWN 状态下,阻塞队列为空并且线程池中执行任务也为空时,就会由 SHUTDOWN -> TIDYING。当线程池在 STOP 状态下,线程池中执行的任务为空时,就会由 STOP-> TIDYING。

5、TERMINATED

  • 状态说明:线程池线程池彻底停止,线程池处于 TERMINATED 状态,
  • 状态切换:线程池处于 TIDYING 状态时,执行完 terminated()之后,就会由 TIDYING->TERMINATED。

线程池使用

RunTask 类:

public class RunTask implements Runnable {public void run() {System.out.println("Thread name:"+Thread.currentThread().getName());
    }
}

ExecutorSample 类:

public class ExecutorSample {public static void main(String[] args) {ExecutorService executor = Executors.newFixedThreadPool(5);
        for (int i=0;i<20;i++){
            // 提交任务无返回值
            executor.execute(new RunTask());
            // 任务执行完成后有返回值
            Future<Object> future = executor.submit(new RunTask());
        }
    }
}

线程池的具体使用:

  • ThreadPoolExecutor 默认线程池
  • ScheduledThreadPoolExecutor 定时线程池

ThreadPoolExecutor

线程池的创建:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         threadFactory, defaultHandler);
}
  • corePoolSize:线程池中的核心线程数。当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于 corePoolSize;如果当前线程数为 corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的 prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
  • maximumPoolSize:线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于 maximumPoolSize。
  • keepAliveTime:线程池维护线程所允许的空闲时间。当线程池中的线程数量大于 corePoolSize 时候,如果这时候没有新的任务提交,核心线程外的线程不会立即被销毁,而是会等待,直到等待的时间超过了 keepAliveTime

unit:keepAliveTime 的单位时间

  • workQueue:用于保存等待被执行的任务的阻塞队列,且任务必须实现 Runnable 接口,在 JDK 中提供了如下阻塞队列:

ArrayBlockingQueue:基于数组结构的有界阻塞队列,按 FIFO 排序任务。
LinkedBlockingQueue:基于链表结构的阻塞队列,按 FIFO 排序任务,吞吐量通常要高于 ArrayBlockingQueue。
SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常高于 LinkedBlockingQueue。

  • PriorityBlockingQueue:具有优先级的无界阻塞队列。
  • threadFactory:ThreadFactory 类型的变量,用来创建新线程。默认使用 ThreadFactory.defaultThreadFactory 来创建线程,会使新创建线程具有相同的 NORM_PRIORITY 优先级并且都是非守护线程,同时也设置了线程名称。
  • handler:线程池的饱和策略。当阻塞队列满了,且没有空闲的工作队列,如果继续提交任务,必须采用一种策略处理该任务.

线程池的监控:

public long getTaskCount() // 线程池已执行与未执行的任务总数
public long getCompletedTaskCount() // 已完成的任务数
public int getPoolSize() // 线程池当前的线程数
public int getActiveCount() // 线程池中正在执行任务的线程数量

线程池的原理

  • 如果当前运行的线程少于 corePoolSize,则创建新线程来执行任务(注意这一个步骤需要获取全局锁)。
  • 如果运行的线程等于或多于 corePoolSize,则将任务加入 BlockingQueue。
  • 如果无法将任务加入 BlockingQueue(队列已满),则创建新的线程来处理任务(注意这一个步骤需要获取全局锁)。
  • 如果创建的新线程将使当前运行的线程超出 maximumPoolSize,任务将被执行饱和策略。

ThreadPoolExecutor 采用上述的设计思路,是为执行 execute()方法时,尽可能避免获取全局锁 (一个严重的可伸缩瓶颈)。在 ThreadPoolExecutor 完成预热之后,几乎所有的 execute() 方法调用都是在执行步骤 2,而步骤 2 不需要获取全局锁。

正文完
 0