• 之前咱们介绍了线程池的四种回绝策略,理解了线程池参数的含意,那么明天咱们来聊聊Java 中常见的几种线程池,以及在jdk7 退出的 ForkJoin 新型线程池
  • 首先咱们列出Java 中的六种线程池如下
线程池名称形容
FixedThreadPool外围线程数与最大线程数雷同
SingleThreadExecutor一个线程的线程池
CachedThreadPool外围线程为0,最大线程数为Integer. MAX_VALUE
ScheduledThreadPool指定外围线程数的定时线程池
SingleThreadScheduledExecutor单例的定时线程池
ForkJoinPoolJDK 7 新退出的一种线程池
  • 在理解集中线程池时咱们先来相熟一下次要几个类的关系,ThreadPoolExecutor 的类图,以及 Executors 的次要办法:

  • 下面看到的类图,不便帮忙上面的了解和查看,咱们能够看到一个外围类 ExecutorService , 这是咱们线程池都实现的基类,咱们接下来说的都是它的实现类。

FixedThreadPool

  • FixedThreadPool 线程池的特点是它的外围线程数和最大线程数一样,咱们能够看它的实现代码在 Executors#newFixedThreadPool(int) 中,如下:
    public static ExecutorService newFixedThreadPool(int nThreads) {        return new ThreadPoolExecutor(nThreads, nThreads,                                      0L, TimeUnit.MILLISECONDS,                                      new LinkedBlockingQueue<Runnable>());    }
咱们能够看到办法内创立线程调用的理论是 ThreadPoolExecutor 类,这是线程池的外围执行器,传入的 nThread 参数作为外围线程数和最大线程数传入,队列采纳了一个链表构造的有界队列。
  • 这种线程池咱们能够看作是固定线程数的线程池,它只有在开始初始化的时候线程数会从0开始创立,然而创立好后就不再销毁,而是全副作为常驻线程池,这里如果对线程池参数不了解的能够看之前文章 《解释线程池各个参数的含意》。
  • 对于这种线程池他的第三个和第四个参数是没意义,它们是闲暇线程存活工夫,这里都是常驻不存在销毁,当线程解决不了时会退出到阻塞队列,这是一个链表构造的有界阻塞队列,最大长度是Integer. MAX_VALUE

SingleThreadExecutor

  • SingleThreadExecutor 线程的特点是它的外围线程数和最大线程数均为1,咱们也能够将其工作是一个单例线程池,它的实现代码是Executors#newSingleThreadExcutor() , 如下:
    public static ExecutorService newSingleThreadExecutor() {        return new FinalizableDelegatedExecutorService            (new ThreadPoolExecutor(1, 1,                                    0L, TimeUnit.MILLISECONDS,                                    new LinkedBlockingQueue<Runnable>()));    }    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {        return new FinalizableDelegatedExecutorService            (new ThreadPoolExecutor(1, 1,                                    0L, TimeUnit.MILLISECONDS,                                    new LinkedBlockingQueue<Runnable>(),                                    threadFactory));    }
  • 上述代码中咱们发现它有一个重载函数,传入了一个ThreadFactory 的参数,个别在咱们开发中会传入咱们自定义的线程创立工厂,如果不传入则会调用默认的线程工厂
  • 咱们能够看到它与 FixedThreadPool 线程池的区别仅仅是外围线程数和最大线程数改为了1,也就是说不论工作多少,它只会有惟一的一个线程去执行
  • 如果在执行过程中产生异样等导致线程销毁,线程池也会从新创立一个线程来执行后续的工作
  • 这种线程池非常适合所有工作都须要按被提交的程序来执行的场景,是个单线程的串行。

CachedThreadPool

  • cachedThreadPool 线程池的特点是它的常驻外围线程数为0,正如其名字一样,它所有的县城都是长期的创立,对于它的实现在 Executors#newCachedThreadPool() 中,代码如下:
    public static ExecutorService newCachedThreadPool() {        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                                      60L, TimeUnit.SECONDS,                                      new SynchronousQueue<Runnable>());    }    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                                      60L, TimeUnit.SECONDS,                                      new SynchronousQueue<Runnable>(),                                      threadFactory);    }
  • 从上述代码中咱们能够看到 CachedThreadPool 线程池中,最大线程数为 Integer.MAX_VALUE , 意味着他的线程数简直能够有限减少。
  • 因为创立的线程都是长期线程,所以他们都会被销毁,这里闲暇 线程销毁工夫是60秒,也就是说当线程在60秒内没有工作执行则销毁
  • 这里咱们须要留神点,它应用了 SynchronousQueue 的一个阻塞队列来存储工作,这个队列是无奈存储的,因为他的容量为0,它只负责对工作的传递和直达,效率会更高,因为外围线程都为0,这个队列如果存储工作不存在意义。

ScheduledThreadPool

  • ScheduledThreadPool 线程池是反对定时或者周期性执行工作,他的创立代码 Executors.newSchedsuledThreadPool(int) 中,如下所示:
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {        return new ScheduledThreadPoolExecutor(corePoolSize);    }    public static ScheduledExecutorService newScheduledThreadPool(            int corePoolSize, ThreadFactory threadFactory) {        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);    }
  • 咱们发现这里调用了 ScheduledThreadPoolExecutor 这个类的构造函数,进一步查看发现 ScheduledThreadPoolExecutor 类是一个继承了 ThreadPoolExecutor 的,同时实现了 ScheduledExecutorService 接口,咱们看到它的几个构造函数都是调用父类 ThreadPoolExecutor 的构造函数
    public ScheduledThreadPoolExecutor(int corePoolSize) {        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,              new DelayedWorkQueue());    }    public ScheduledThreadPoolExecutor(int corePoolSize,                                       ThreadFactory threadFactory) {        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,              new DelayedWorkQueue(), threadFactory);    }    public ScheduledThreadPoolExecutor(int corePoolSize,                                       RejectedExecutionHandler handler) {        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,              new DelayedWorkQueue(), handler);    }    public ScheduledThreadPoolExecutor(int corePoolSize,                                       ThreadFactory threadFactory,                                       RejectedExecutionHandler handler) {        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,              new DelayedWorkQueue(), threadFactory, handler);    }
  • 从下面代码咱们能够看到和其余线程池创立并没有差别,只是这里的工作队列是 DelayedWorkQueue 对于阻塞丢列咱们下篇文章专门说,这里咱们先创立一个周期性的线程池来看一下
    public static void main(String[] args) {        ScheduledExecutorService service = Executors.newScheduledThreadPool(5);        // 1. 提早肯定工夫执行一次        service.schedule(() ->{            System.out.println("schedule ==> 云栖简码-i-code.online");        },2, TimeUnit.SECONDS);        // 2. 依照固定频率周期执行        service.scheduleAtFixedRate(() ->{            System.out.println("scheduleAtFixedRate ==> 云栖简码-i-code.online");        },2,3,TimeUnit.SECONDS);        //3. 依照固定频率周期执行        service.scheduleWithFixedDelay(() -> {            System.out.println("scheduleWithFixedDelay ==> 云栖简码-i-code.online");        },2,5,TimeUnit.SECONDS);    }
  • 下面代码是咱们简略创立了 newScheduledThreadPool ,同时演示了外面的三个外围办法,首先看执行的后果:

  • 首先咱们看第一个办法 schedule , 它有三个参数,第一个参数是线程工作,第二个delay 示意工作执行提早时长,第三个unit 示意延迟时间的单位,如下面代码所示就是提早两秒后执行工作
 public ScheduledFuture<?> schedule(Runnable command,                                       long delay, TimeUnit unit);
  • 第二个办法是 scheduleAtFixedRate 如下, 它有四个参数,command 参数示意执行的线程工作 ,initialDelay 参数示意第一次执行的延迟时间,period 参数示意第一次执行之后依照多久一次的频率来执行,最初一个参数是工夫单位。如下面案例代码所示,示意两秒后执行第一次,之后按每隔三秒执行一次
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,                                                  long initialDelay,                                                  long period,                                                  TimeUnit unit);
  • 第三个办法是 scheduleWithFixedDelay 如下,它与下面办法是十分相似的,也是周期性定时执行, 参数含意和下面办法统一。这个办法和 scheduleAtFixedRate 的区别次要在于工夫的终点计时不同
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,                                                     long initialDelay,                                                     long delay,                                                     TimeUnit unit);
  • scheduleAtFixedRate 是以工作开始的工夫为工夫终点来计时,工夫到就执行第二次工作,与工作执行所破费的工夫无关;而 scheduleWithFixedDelay 是以工作执行完结的工夫点作为计时的开始。如下所示

SingleThreadScheduledExecutor

  • 它理论和 ScheduledThreadPool 线程池十分类似,它只是 ScheduledThreadPool 的一个特例,外部只有一个线程,它只是将 ScheduledThreadPool 的外围线程数设置为了 1。如源码所示:
    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {        return new DelegatedScheduledExecutorService            (new ScheduledThreadPoolExecutor(1));    }
  • 下面咱们介绍了五种常见的线程池,对于这些线程池咱们能够从外围线程数、最大线程数、存活工夫三个维度进行一个简略的比照,有利于咱们加深对这几种线程池的记忆。
FixedThreadPoolSingleThreadExecutorCachedThreadPoolScheduledThreadPoolSingleThreadScheduledExecutor
corePoolSize构造函数传入10构造函数传入1
maxPoolSize同corePoolSize1Integer. MAX_VALUEInteger. MAX_VALUEInteger. MAX_VALUE
keepAliveTime006000

ForkJoinPool

  • ForkJoinPool 这是一个在 JDK7 引入的新新线程池,它的次要特点是能够充分利用多核CPU , 能够把一个工作拆分为多个子工作,这些子工作放在不同的处理器上并行执行,当这些子工作执行完结后再把这些后果合并起来,这是一种分治思维。
  • ForkJoinPool 也正如它的名字一样,第一步进行 Fork 拆分,第二步进行 Join 合并,咱们先来看一下它的类图构造

  • ForkJoinPool 的应用也是通过调用 submit(ForkJoinTask<T> task) invoke(ForkJoinTask<T> task) 办法来执行指定工作了。其中工作的类型是 ForkJoinTask 类,它代表的是一个能够合并的子工作,他自身是一个抽象类,同时还有两个罕用的形象子类 RecursiveActionRecursiveTask ,其中 RecursiveTask 示意的是有返回值类型的工作,而 RecursiveAction 则示意无返回值的工作。上面是它们的类图:

  • 上面咱们通过一个简略的代码先来看一下如何应用 ForkJoinPool 线程池
/** * @url: i-code.online * @author: AnonyStar * @time: 2020/11/2 10:01 */public class ForkJoinApp1 {    /**        指标: 打印0-200以内的数字,进行分段每个距离为10以上,测试forkjoin    */    public static void main(String[] args) {        // 创立线程池,        ForkJoinPool joinPool = new ForkJoinPool();        // 创立根工作        SubTask subTask = new SubTask(0,200);        // 提交工作        joinPool.submit(subTask);        //让线程阻塞期待所有工作实现 在进行敞开        try {            joinPool.awaitTermination(2, TimeUnit.SECONDS);        } catch (InterruptedException e) {            e.printStackTrace();        }        joinPool.shutdown();    }}class  SubTask extends RecursiveAction {    int startNum;    int endNum;    public SubTask(int startNum,int endNum){        super();        this.startNum = startNum;        this.endNum = endNum;    }    @Override    protected void compute() {        if (endNum - startNum < 10){            // 如果决裂的两者差值小于10 则不再持续,间接打印            System.out.println(Thread.currentThread().getName()+": [startNum:"+startNum+",endNum:"+endNum+"]");        }else {            // 取两头值            int middle = (startNum + endNum) / 2;            //创立两个子工作,以递归思维,            SubTask subTask = new SubTask(startNum,middle);            SubTask subTask1 = new SubTask(middle,endNum);            //执行工作, fork() 示意异步的开始执行            subTask.fork();            subTask1.fork();        }    }}

后果:

  • 从下面的案例咱们能够看到咱们,创立了很多个线程执行,因为我测试的电脑是12线程的,所以这里理论是创立了12个线程,也侧面阐明了充沛调用了每个解决的线程解决能力
  • 下面案例其实咱们发现很相熟的滋味,那就是以前接触过的递归思维,将下面的案例图像化如下,更直观的看到,

  • 下面的例子是无返回值的案例,上面咱们来看一个典型的有返回值的案例,置信大家都听过及很相熟斐波那契数列,这个数列有个特点就是最初一项的后果等于前两项的和,如: 0,1,1,2,3,5...f(n-2)+f(n-1), 即第0项为0 ,第一项为1,则第二项为 0+1=1,以此类推。咱们最后的解决办法就是应用递归来解决,如下计算第n项的数值:
    private int num(int num){        if (num <= 1){            return num;        }        num = num(num-1) + num(num -2);        return num;    }
  • 从下面简略代码中能够看到,当 n<=1 时返回 n , 如果n>1 则计算前一项的值f1,在计算前两项的值f2, 再将两者相加失去后果,这就是典型的递归问题,也是对应咱们的ForkJoin 的工作模式,如下所示,根节点产生子工作,子工作再次衍生出子子工作,到最初在进行整合汇聚,失去后果。

  • 咱们通过 ForkJoinPool 来实现斐波那契数列的计算,如下展现:
/** * @url: i-code.online * @author: AnonyStar * @time: 2020/11/2 10:01 */public class ForkJoinApp3 {    public static void main(String[] args) throws ExecutionException, InterruptedException {        ForkJoinPool pool = new ForkJoinPool();        //计算第二是项的数值        final ForkJoinTask<Integer> submit = pool.submit(new Fibonacci(20));        // 获取后果,这里获取的就是异步工作的最终后果        System.out.println(submit.get());    }}class Fibonacci extends RecursiveTask<Integer>{    int num;    public Fibonacci(int num){        this.num = num;    }    @Override    protected Integer compute() {        if (num <= 1) return num;        //创立子工作        Fibonacci subTask1 = new Fibonacci(num - 1);        Fibonacci subTask2 = new Fibonacci(num - 2);        // 执行子工作        subTask1.fork();        subTask2.fork();        //获取前两项的后果来计算和        return subTask1.join()+subTask2.join();    }}
  • 通过 ForkJoinPool 能够极大的施展多核处理器的劣势,尤其非常适合用于递归的场景,例如树的遍历、最优门路搜寻等场景。
  • 下面说的是ForkJoinPool 的应用上的,上面咱们来说一下其外部的结构,对于咱们后面说的几种线程池来说,它们都是外面只有一个队列,所有的线程共享一个。然而在ForkJoinPool 中,其外部有一个共享的工作队列,除此之外每个线程都有一个对应的双端队列Deque , 当一个线程中工作被Fork 决裂了,那么决裂进去的子工作就会放入到对应的线程本人的Deque中,而不是放入公共队列。这样对于每个线程来说老本会升高很多,能够间接从本人线程的队列中获取工作而不须要去公共队列中抢夺,无效的缩小了线程间的资源竞争和切换。

  • 有一种状况,当线程有多个如t1,t2,t3...,在某一段时间线程 t1 的工作特地沉重,决裂了数十个子工作,然而线程 t0 此时却无事可做,它本人的 deque 队列为空,这时为了提高效率,t0 就会想方法帮忙 t1 执行工作,这就是“work-stealing”的含意。
  • 双端队列 deque 中,线程 t1 获取工作的逻辑是后进先出,也就是LIFO(Last In Frist Out),而线程 t0 在“steal”偷线程 t1deque 中的工作的逻辑是先进先出,也就是FIFO(Fast In Frist Out),如图所示,图中很好的形容了两个线程应用双端队列别离获取工作的情景。你能够看到,应用 “work-stealing” 算法和双端队列很好地均衡了各线程的负载。

本文由AnonyStar 公布,可转载但需申明原文出处。
欢送关注微信公账号 :云栖简码 获取更多优质文章
更多文章关注笔者博客 :云栖简码 i-code.online