Java并发3线程池应用以及原理剖析

20次阅读

共计 6528 个字符,预计需要花费 17 分钟才能阅读完成。

线程池应用以及原理剖析

1. 线程池的意义

  我们为什么要用线程池?讨论这个问题之前,应该先说明为什么使用多线程。使用多线程,本质上就是提高程序性能。我们经常说,多线程能让程序更快,快在哪里?所以应该说明一下,如何衡量性能的高低。

  性能的核心指标有 2 个,吞吐量和延迟。吞吐量是指,单位时间内能处理的请求的数量,吞吐量越大,意味着能处理的请求越多。延迟是指,从发出请求到收到响应的间隔时间,延迟越小,说明速度越快。这 2 个指标内部有一定的联系,同等条件下,延迟越小,吞吐量越大。但是不能相互转换,因为是不同维度的概念,延迟是时间维度,吞吐量是空间维度。

  那我们想要提升程序的性能,就得从这 2 个方面入手。降低延迟,提高吞吐量。基本上有 2 种手段,一是发挥算法的优势,二是发挥硬件的优势。后者与并发编程息息相关。那从硬件的角度来说,提高硬件的效率,无非是提高 CPU 的使用率和 IO 设备的使用率。如果只有 1 个线程工作,那么这个线程使用 CPU 的时候,IO 设备肯定是空闲的,使用 IO 设备的时候,CPU 肯定是空闲的。这就存在了资源的浪费。假设一个接口,CPU 处理时间 100 毫秒,IO 时间 100 毫秒,那么单线程环境下,1 秒中能响应 5 个请求。如果有 2 个线程,一个线程在使用 CPU 的时候,另一个使用 IO,这种情况下,CPU 和 IO 的利用率就是 100%,200 毫秒能处理 2 个请求(忽略线程切换成本),1 秒中处理 10 个请求,吞吐量多了一倍。

  好了,我们梳理完为什么使用多线程,再来看看为什么用线程池。首先,线程不能无节制的创建,线程在操作系统中是一种资源,创建销毁都需要时间空间。其次,根据 JVM 规范,一个线程默认的最大栈空间是 1M,这个空间是从系统内存中分配的,线程太多会占用大量的内存。再次,线程过多,会频繁的切换上下文,这是多余的损耗。还有,如果我们创建销毁一个线程总共需要 10ms,而任务只需要执行 5ms,那么按照传统的 创建线程 -> 执行任务 -> 销毁线程 这个流程,实际干活仅花了小部分时间,不如不销毁线程,因此出现了线程池:创建一定数量的线程执行任务,任务来的时候,直接从线程池取线程,任务结束了就把线程归还到线程池中。这样大大增加了线程的使用效率,不用耗费资源在创建销毁线程上。

2. 线程池的组成

  • 线程池管理器:创建并管理线程池。例如创建线程池、销毁线程池、添加新任务等等。
  • 工作线程:线程池中的工作线程。空闲时处于空闲状态,可以循环的执行任务。
  • 任务接口:每个任务必须实现任务接口,以供工作线程调度任务的执行。
  • 任务队列:存放待处理任务的队列。

3.Java 线程池 API 简介

最基础的接口和实现类

  • 接口:Executor,最上层的接口,只定义了执行任务的方法 execute(Runnable command)
  • 接口:ExecutorService,继承 Executor 接口,拓展了一些方法。
  • 接口:ScheduledExecutorService,继承 ExecutorService 接口,拓展了定时任务相关的一些方法。
  • 类:ThreadPoolExecutor,实现了 ExecutorService 接口,是最基础最标准的线程池实现类。
  • 类:ScheduledThreadPoolExecutor,继承 ThreadPoolExecutor,实现了 ScheduledExecutorService 接口,拥有定时任务相关功能。

下面介绍一下 ExecutorService 接口中定义的方法。

void shutdown();// 优雅的关闭线程池,之前提交的任务将会执行,不接收新的 submit。等到线程池中的任务执行完毕后,才退出。List<Runnable> shutdownNow();// 关闭线程池,尝试 interrupt 线程池中正在执行的工作线程,但是不保证 interrupt 成功,因为如果线程中没有中断处理的逻辑,interrupt() 方法是无法中断线程的。取消任务队列里的任务,并返回这些任务。boolean isShutdown();// 如果调用了 shutdown 或者 shutdownNow,返回 true。boolean isTerminated();// 如果 shutdown 且所有任务都完成了,返回 true,如果 shutdownNow 且成功退出后,返回 true。boolean awaitTermination(long timeout, TimeUnit unit);// 阻塞当前线程,直到所有任务(正在执行的任务和任务队列里的任务)执行结束,或超时,或当前线程被中断(抛异常),才返回 true。实际作用是监控当前线程池是否已经关闭。<T> Future<T> submit(Callable<T> task);// 提交一个执行任务的 Callable,返回一个 Future 对象,用于获取 Callable 执行结果。<T> Future<T> submit(Runnable task, T result);// 提交一个执行任务的 Runnable,返回一个 Future 对象,执行结果为传入的 result 对象。这个方法感觉不好用。Future<?> submit(Runnable task);// 提交一个执行任务的 Runnable,返回一个 Future 对象,执行结果为 null。<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)// 执行 Callable 任务集合,执行完毕后,返回 Future 对象集合。<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)// 执行 Callable 任务集合,执行完毕或者超时以后,返回 Future 对象集合,其他任务终止。<T> T invokeAny(Collection<? extends Callable<T>> tasks)// 执行 Callable 任务集合,任意一个任务执行成功,返回结果。<T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)// 执行 Callable 任务集合,任意一个任务执行成功或者超时,返回结果。

下面介绍一下 ScheduledExecutorService 接口中定义的方法。

ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);// 提交一个延时 Runnable 任务,只会执行一次。public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);// 提交一个延时 Callable 任务,只会执行一次。public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);// 提交一个周期性任务,该任务在 initialDelay 时长后第一次执行任务,任务间隔时长为 period
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);// 提交一个周期性任务,该任务在 initialDelay 时长后第一次执行任务,任务间隔时长为 delay                                           

其中,scheduleAtFixedRate 和 scheduleWithFixedDelay 的区别在于,后者定时任务的间隔时间,是从任务执行结束后,开始累加的,前者不是。

scheduleAtFixedRate 的时间图:

scheduleWithFixedDelay 的时间图:

4. 线程池的使用示例

   在这一小节里,将会有几个线程池参数相关的使用示例,帮助我们更深刻的理解线程池的属性。

  • 初始化线程池,核心线程数量 5 个,最大线程数量 10 个,超出核心线程数的线程存活时间为 5 秒,任务队列使用无界队列,拒绝策略采取默认的。
    public void test1() throws InterruptedException{
        // 初始化线程池:核心线程 5 个;最大数量 10 个;超出核心线程数量的线程存活时间:5 秒;无界阻塞队列;默认拒绝策略
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 
                10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
        testCommon(executor);
    }
    
    /** 公共的测试方法,传入一个线程池
     * @throws InterruptedException */
    public void testCommon(ThreadPoolExecutor threadPoolExecutor) throws InterruptedException{
        // 提交 15 个任务,每个任务执行时间 3 秒
        for(int i = 1;i <= 15; i++){
            int n = i;
            Runnable task = new Runnable() {
                @Override
                public void run() {
                    try {Thread.sleep(3000);
                        System.err.println("任务执行完毕 -" + n);
                    } catch (InterruptedException e) {e.printStackTrace();
                    }
                }
            };
            threadPoolExecutor.submit(task);// 提交任务
            System.out.println("任务" + n + "已提交(可能被拒绝,过会打印执行完毕就是没被拒绝)");
        }
        // 等待 0.5 秒
        Thread.sleep(500);
        // 提交完 15 个任务以后,查看队列的相关信息
        System.out.println("提交 500 毫秒后线程池中的工作线程的数量" + threadPoolExecutor.getPoolSize());
        System.out.println("提交 500 毫秒后线程池中的任务队列的任务数量" + threadPoolExecutor.getQueue().size());
        
        Thread.sleep(15000);
        // 等待 15.5 秒
        System.out.println("提交 15 秒后线程池的数量" + threadPoolExecutor.getPoolSize());
        System.out.println("提交 15 秒后线程池等待线程的数量" + threadPoolExecutor.getQueue().size());
    }

  在一次性提交完 15 个任务过后 500ms,线程池中任务队列的数量为 10 个,工作线程的数量是 5 个。这是为什么呢?原理如下:在刚创建线程池的时候,此时池中的工作线程个数为核心线程个数。如果来了一个任务,而此时有空闲的线程,那么直接交给空闲线程处理。当所有的核心线程都在忙碌,并在此时又来了一个新任务,那么线程池会把任务交给任务队列,只要任务队列没满,新任务就一直放到队列里。在这个例子中,我们使用的是无界队列,上限无限制的,所以就有了开头的现象,任务队列数量为 10 个。那有人可能问,最大线程数是干嘛的?这里说明一下,当核心线程都在忙碌,并且任务队列满了时,这时如果提交新任务,线程池就会判断,工作线程的数量是否达到了最大线程数量,如果没达到,就会创建线程,执行新任务,如果达到了最大线程数量,就会执行拒绝策略。

  • 初始化线程池,核心线程数量 5 个,最大线程数量 10 个,超出核心线程数的线程存活时间为 5 秒,任务队列使用有界队列(长度为 3),拒绝策略就是打印信息,不执行任务。
    public void test2() throws InterruptedException{
        // 初始化线程池:核心线程 5 个;最大数量 10 个;超出核心线程数量的线程存活时间:5 秒;有界队列
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS
                ,new ArrayBlockingQueue<Runnable>(3), new RejectedExecutionHandler() {
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {System.out.println("线程池满了,执行拒绝策略,不添加任务");
                    }
                });
        
        testCommon(executor);
    }

  这个线程池的执行结果是,前 5 个任务,直接由核心线程执行,第 6 - 8 个任务,被线程池添加到任务队列里,第 9 -13 个任务,线程池会创建新的线程处理,第 14、15 个任务,会拒绝处理。并且由于每个任务实际上是阻塞 3 秒,等同于 1 -5、9-13 这 10 个任务所在的线程,会近乎于同时阻塞 3 秒,等这 10 个线程中的某个线程执行任务完毕后,线程池会将队列中的任务(第 6 - 8 个任务),交给这个空闲线程处理。

  • 初始化线程池,核心线程数量 0 个,最大线程数量 Integer.MAX_VALUE 个,超出核心线程数的线程存活时间为 60 秒,任务队列使用同步队列,默认拒绝策略。
    public void test3() throws InterruptedException{
        // 核心数量为 0,说明线程池一开始没有任何线程。提交的 15 个任务,都会直接进入队列中等待。// 但是,SynchronousQueue 这种队列,实际上不是一个真正的队列,它没有为元素维护存储空间。线程池将任务放到队列的操作就会失败。// 因此,线程池就会创建一个临时线程,来处理这个任务。(因为临时线程的上限是 Integer.MAX_VALUE)// 这种线程池的作用:如果来了一个任务,有空闲线程,就用空闲线程,没有空闲线程,就开一个新线程处理。实际环境不能用 Integer.MAX_VALUE。ThreadPoolExecutor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60,
                TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        testCommon(executor);
    }
  • 初始化定时任务类型的线程池,核心线程数为 5, 提交任务 2 秒后执行。提交 100 个任务。
    public void test4() throws InterruptedException{ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(5);
        for(int i = 1; i <= 100; i++){executor.schedule(new Runnable() {
                @Override
                public void run() {System.out.println("执行任务:" + Thread.currentThread().getName());
                    try {Thread.sleep(1000);
                    } catch (InterruptedException e) {e.printStackTrace();
                    }
                }
            }, 2, TimeUnit.SECONDS);
        }
    }

  这个线程池的执行结果是,每隔 1 秒钟,控制台打印 5 行 ” 执行任务:线程名 ”。为什么是隔 1 秒,不是隔 2 秒?因为这 100 个任务,实际是阻塞线程 1 秒钟。当 100 个任务提交以后,过了 2 秒后,线程池中的 5 个线程,开始执行 5 个任务,5 个线程执行任务,看上去总共只需要花 1 秒(因为任务实际就是阻塞 1 秒,阻塞完就完了),等到 1 秒过后,只要有线程的任务结束了,就再取新的任务执行。因此是间隔 1 秒。

正文完
 0