关于java:Java并发线程池篇附场景分析

42次阅读

共计 5695 个字符,预计需要花费 15 分钟才能阅读完成。

作者:汤圆

集体博客:javalover.cc

前言

后面咱们在创立线程时,都是间接 new Thread();

这样短期来看是没有问题的,然而一旦业务量增长,线程数过多,就有可能导致内存异样 OOM,CPU 爆满等问题

侥幸的是,Java 外面有线程池的概念,而线程池的外围框架,就是咱们明天的主题,Executor

接下来,就让咱们一起畅游在 Java 线程池的陆地中吧

本节会用银行办业务的场景来比照介绍线程池的外围概念,这样了解起来会很轻松

简介

Executor 是线程池的外围框架;

和它绝对应的有一个辅助工厂类 Executors,这个类提供了许多工厂办法,用来创立各种各样的线程池,上面咱们先看下几种常见的线程池

// 容量固定的线程池
Executor fixedThreadPool = Executors.newFixedThreadPool(5);
// 容量动静增减的线程池
Executor cachedThreadPool = Executors.newCachedThreadPool();
// 单个线程的线程池
Executor singleThreadExecutor = Executors.newSingleThreadExecutor();
// 基于调度机制的线程池(不同于下面的线程池,这个池创立的工作不会立马执行,而是定期或者延时执行)Executor scheduledThreadPool = Executors.newScheduledThreadPool(5);

下面这些线程池的区别次要就是线程数量的不同以及工作执行的机会

上面让咱们开始吧

文章如果有问题,欢送大家批评指正,在此谢过啦

目录

  1. 线程池的底层类ThreadPoolExecutor
  2. 为啥阿里不倡议应用 Executors 来创立线程池?
  3. 线程池的生命周期 ExecutorService

注释

1. 线程池的底层类 ThreadPoolExecutor

在文章结尾创立的几个线程池,外部都是有调用 ThreadPoolExecutor 这个类的,如下所示

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

这个类是 Exexutor 的一个实现类,关系图如下所示:

  • 其中 Executors 就是下面介绍的辅助工厂类,用来创立各种线程池
  • 接口 ExecutorService 是 Executor 的一个子接口,它对 Executor 进行了扩大,原有的 Executor 只能执行工作,而 ExecutorService 还能够治理线程池的生命周期(上面会介绍)

所以咱们先来介绍下这个底层类,它的残缺结构参数如下所示:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {

在介绍这些参数之前,咱们能够先举个生存中的例子 - 去银行办业务;而后比照着来了解,会比拟清晰

(图中绿色的窗口示意始终开着)

  • corePoolSize:外围线程数 ,就是始终存在的线程(不论用不必);=》 窗口的 1 号窗和 2 号窗
  • maximumPoolSize:最大线程数 ,就是最多能够创立多少个线程;=》 窗口的 1,2,3,4 号窗
  • keepAliveTime:多余的线程 (最大线程数 减去 外围线程数) 闲暇时存活的工夫 ;=》 窗口的 3 号窗和 4 号窗闲暇的工夫, 如果超过 keepAliveTime,还没有人来办业务,那么就会临时敞开 3 号窗和 4 号窗
  • workQueue: 工作队列 ,当外围线程数都在执行工作时,再进来的工作就会增加到工作队列中;=》 椅子,客户期待区
  • threadFactory:线程工厂,用来创立初始的外围线程,上面会有介绍;
  • handler:回绝策略 ,当所有线程都在执行工作,且工作队列也满时,再进来的工作就会被执行回绝策略(比方抛弃);=》 左下角的那个君子

根本的工作流程如下所示:

下面的参数咱们着重介绍下工作队列和回绝策略,线程工厂上面再介绍

工作队列:

  • ArrayBlockingQueue:

    • 数组阻塞队列,这个队列是一个 有界队列,遵循 FIFO,尾部插入,头部获取
    • 初始化时需指定队列的容量 capacity
    • 类比到下面的场景,就是 椅子的数量为初始容量 capacity
  • LinkedBlockingQueue:

    • 链表阻塞队列,这是一个 无界队列,遵循 FIFO,尾部插入,头部获取
    • 初始化时可不指定容量,此时默认的容量为 Integer.MAX_VALUE,基本上相当于无界了,此时队列可始终插入(如果解决工作的速度小于插入的速度,工夫长了就有可能导致 OOM)
    • 类比到下面的场景,就是 椅子的数量为 Integer.MAX_VALUE
  • SynchronousQueue:

    • 同步队列,阻塞队列的非凡版,即没有容量的阻塞队列,随进随出,不做停留
    • 类比到下面的场景,就是 椅子的数量为 0 ,来一个人就去柜台办理,如果柜台满了,就回绝
  • PriorityBlockingQueue

    • 优先级阻塞队列,这是一个无界队列,不遵循 FIFO,而是依据工作本身的优先级程序来执行
    • 初始化可不指定容量,默认 11(既然有容量,怎么还是无界的呢?因为它增加元素时会进行扩容)
    • 类比到下面的场景,就是新来的能够插队办理业务,好比各种会员

回绝策略:

  • AbortPolicy(默认):

    • 中断策略,抛出异样 RejectedExecutionException;
    • 如果线程数达到最大,且工作队列也满,此时再进来工作,则抛出 RejectedExecutionException(零碎会进行运行,然而不会退出)
  • DiscardPolicy:

    • 抛弃策略,丢掉新来的工作
    • 如果线程数达到最大,且工作队列也满,此时再进来工作,则间接丢掉(看工作的重要水平,不重要的工作能够用这个策略)
  • DiscardOldestPolicy:

    • 抛弃最旧策略,丢掉最先进入队列的工作(有点仁慈了),而后再次执行插入操作
    • 如果线程数达到最大,且工作队列也满,此时再进来工作,则间接丢掉队列头部的工作,并再次插入工作
  • CallerRunsPolicy:

    • 回去执行策略,让新来的工作返回到调用它的线程中去执行(比方 main 线程调用了 executors.execute(task),那么就会将 task 返回到 main 线程中去执行)
    • 如果线程数达到最大,且工作队列也满,此时再进来工作,则间接返回该工作,到调用它的线程中去执行

2. 为啥阿里不倡议应用 Executors 来创立线程池?

原话如下:

咱们能够写几个代码来测试一下

先测试 FixedThreadPool,代码如下:

public class FixedThreadPoolDemo {public static void main(String[] args) {
        // 创立一个固定容量为 10 的线程池,外围线程数和最大线程数都为 10
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 1_000_000; i++) {
            try{executorService.execute(()->{
                    try {Thread.sleep(1000);
                    } catch (InterruptedException e) {e.printStackTrace();
                    }
                });
            }catch (Exception e){e.printStackTrace();
            }
        }
    }
}

这里咱们需对 VM 参数做一点批改,让问题比拟容易复现

如下所示,咱们增加 -Xmx8m -Xms8m 到 VM option 中(-Xmx8m:JVM 堆的最大内存为 8M,-Xms8m,JVM 堆的初始化内存为 8M):

此时点击运行,就会发现报错如下:

Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
    at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
    at com.jalon.concurrent.chapter6.FixedThreadPoolDemo.main(FixedThreadPoolDemo.java:21)

咱们来剖析下起因

  • 首先,newFixedThreadPool 外部用的工作队列为 LinkedBlockingQueue,这是一个无界队列(容量最大为 Integer.MAX_VALUE,基本上可始终增加工作)
  • 如果工作插入的速度,超过了工作执行的速度,那么队列必定会越来越长,最终导致 OOM

CachedThreadPool 也是相似的起因,只不过它是因为最大线程数为 Integer.MAX_VALUE;

所以当工作插入的速度,超过了工作执行的速度,那么线程的数量会越来越多,最终导致 OOM

那咱们要怎么创立线程池呢?

能够用 ThreadPoolExecutor 来自定义创立,通过为最大线程数和工作队列都设置一个边界,来限度相干的数量,如下所示:

public class ThreadPoolExecutorDemo {public static void main(String[] args) {
        ExecutorService service = new ThreadPoolExecutor(
                1, // 外围线程数
                1, // 最大线程数
                60L, // 闲暇工夫
                TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<>(1), // 数组工作队列,长度 1
                new ThreadPoolExecutor.DiscardPolicy()); // 回绝策略:抛弃
        for (int i = 0; i < 1_000_000; i++) {
            // 通过这里的打印信息,咱们能够晓得循环了 3 次
            // 起因就是第一次的工作在外围线程中执行,第二次的工作放到了工作队列,第三次的工作被拒绝执行
            System.out.println(i);
            service.execute(()->{
                // 这里会报异样,是因为执行了回绝策略(达到了最大线程数,队列也满了,此时新进来的工作就会执行回绝策略)// 这里须要留神的是,抛出异样后,代码并不会退出,而是卡在异样这里,包含主线程也会被卡住(这个是默认的回绝策略)// 咱们能够用其余的回绝策略,比方 DiscardPolicy, 此时代码就会持续往下执行
                System.out.println(Thread.currentThread().getName());
            });
        }
        try {Thread.sleep(1000);
            System.out.println("主线程 sleep");
        } catch (InterruptedException e) {e.printStackTrace();
        }
    }
}

3. 线程池的生命周期 ExecutorService

Executor 接口默认只有一个办法void execute(Runnable command);,用来执行工作

工作一旦开启,咱们就无奈再去插手了,比方进行、监控等

此时就须要 ExecutorService 退场了,它是 Executor 的一个子接口,对其进行了扩大,办法如下:

public interface ExecutorService extends Executor {void shutdown(); // 优雅地敞开,这个敞开会继续一段时间,以期待曾经提交的工作去执行实现(然而在 shutdown 之后提交的工作会被回绝)List<Runnable> shutdownNow(); // 粗犷地敞开,这个敞开会立刻敞开所有正在执行的工作,并返回工作队列中期待的工作

    boolean isShutdown();

    boolean isTerminated();

    // 用来期待线程的执行
    // 如果在 timeout 之内,线程都执行完了,则返回 true;// 如果等了 timeout,还没执行完,则返回 false;// 如果 timeout 之内,线程被中断,则抛出中断异样
    boolean awaitTermination(long timeout, TimeUnit unit) 
        throws InterruptedException;

    <T> Future<T> submit(Callable<T> task);
   
    <T> Future<T> submit(Runnable task, T result);
}

从下面能够看到,线程池的生命周期分三步:

  1. 运行:创立后就开始运行
  2. 敞开:调用 shutdown 进入敞开状态
  3. 已终止:所有线程执行结束

总结

  1. 线程池的底层类 ThreadPoolExecutor:外围概念就是外围线程数、最大线程数、工作队列、回绝策略
  2. 为啥阿里不倡议应用 Executors 来创立线程池?:因为会导致 OOM,解决办法就是自定义ThreadPoolExecutor,为最大线程数和工作队列设置边界
  3. 线程池的生命周期ExecutorService:运行状态(创立后进入)、敞开状态(shutdown 后进入)、已终止状态(所有线程都执行实现后进入)

参考内容:

  • 《Java 并发编程实战》
  • 《实战 Java 高并发》
  • newFixedThreadPool 的弊病:https://my.oschina.net/langwa…
  • 银行办业务的场景参考:https://b23.tv/ygGjTH

后记

愿你的意中人亦是中意你之人

正文完
 0