关于java:Java多线程学习笔记六-长乐未央篇

24次阅读

共计 7325 个字符,预计需要花费 19 分钟才能阅读完成。

忽然发现我多线程系列的题目快用光了: 初遇、相识、甚欢、久处不厌、长乐无极、长乐未央。算算本人多线程相干的文章有:

  • 《当咱们说起多线程与高并发时》
  • 《Java 多线程学习笔记(一) 初遇篇》
  • 《Java 多线程学习笔记(二) 相识篇》
  • 《Java 多线程学习笔记(三) 甚欢篇》
  • 《Java 多线程学习笔记(四) 久处不厌》
  • 《Java 多线程学习笔记(五) 长乐无极》
  • 《ThreadLocal 学习笔记》

明天应该是多线程学习笔记的收官篇,到本篇 JDK 内多线程的基本概念,应用应该大抵都过了一遍,其实认真算算还有并发汇合、并行流还没介绍。

并发汇合我着重会放在其实现上,也就是下面文章介绍的一些根本类的原理,因为并发汇合和一般汇合应用起来没有多大区别,这也是下个系列的文章了,也就是源码系列的文章,去年十月份开了个头《当咱们说起看源码时,咱们是在看什么》,是时候该去填这个坑了,并行流还是放在 Stream 系列的文章中。这些文章目前大略都在掘金和思否,不大对立,有工夫会将三个平台文章对立以下。如果你在公众号曾经发现下面这些文章,那曾经大抵迁徙实现了。

Fork Join 模式简介

本篇的配角是 ForkJoin,作者是 Doug Lea 的作品,在看 ForkJoinPool 的时候灵机一动想看一下其余并发类的作者,而后发现以下这些不包含未发现的

  • CountDownLatch
  • CyclicBarrier
  • Semaphore
  • ThreadPoolExecutor
  • Future
  • Callable
  • ConcurrentHashMap
  • CopyOnWriteArrayList

仿佛 JDK 外面并发的类库就由这个老爷子一手打造,在我翻阅这位老爷子的作品的时候,还发现自己又漏掉的类:Phaser, 所以还会有一个拾遗篇,下一篇多线程的系列会将 Phaser 的补充回来。话说回来咱们接着来介绍 ForkJoin。在 IDEA 中用 ForkJoin 来全局搜寻, 后果如下:

咱们首先来看 ForkJoinPool 的继承类图:

咱们这里能够看到 ForkJoinPool 和 ThreadPoolExecutor 在同一个级别,ThreadPoolExecutor 是线程池,这个咱们熟,所以咱们能够揣测一下 ForkJoinPool 是另一种类型的线程池。那这种 ForkJoinPool 和 ThreadPoolExecutor 有什么区别呢? 带着这个疑难,咱们接着来看 ForkJoinPool 的正文, 留神因为 ForkJoinPool 和 ThreadPoolExecutor 都属于 ExecutorService 的子类,所以 ForkJoinPool 的正文不会说 ThreadPoolExecutor 是其余类型的线程池(Thread Pool, 而是说另一种模式的 ExecutorService)。

An ExecutorService for running ForkJoinTasks. A ForkJoinPool provides the entry point for submissions from non-ForkJoinTask clients, as well as management and monitoring operations.
A ForkJoinPool differs from other kinds of ExecutorService mainly by virtue of employing work-stealing: all threads in the pool attempt to find and execute tasks submitted to the pool and/or created by other active tasks (eventually blocking waiting for work if none exist). This enables efficient processing when most tasks spawn other subtasks (as do most ForkJoinTasks), as well as when many small tasks are submitted to the pool from external clients. Especially when setting asyncMode to true in constructors, ForkJoinPools may also be appropriate for use with event-style tasks that are never joined. All worker threads are initialized with Thread.isDaemon set true.

ForkJoinPool 这个异步执行服务 (或译为这个 ExecutorService 执行一些 ForkJoinTasks) 执行的是 ForkJoinTask(Fork: 分叉、岔开两条分支, Join 是合并),所以 ForkJoinTask 能够了解为宰割合并工作,也能执行一些不属于这种类型的工作、治理监控操作。

ForkJoinPool 次要不同于其余类型的 ExecutorService 的在于采取了工作 - 窃取算法: 该线程池中的所有线程都会尝试去寻找和执行被提交给该线程的工作和其余还未实现的工作(如果没有工作,所有的线程将会处于阻塞期待状态)。这种解决形式对于一些能够将大工作切割成子工作解决和和其余客户端向该线程池提交小工作的工作类型来说效率会很高(也就是 ForkJoin 工作),ForkJoinPool 也可能比拟适宜于事件驱动型工作,这些工作永远不须要合并后果。所有的工作线程在初始化的时候会被设置为守护线程。

工作窃取算法简介

在介绍工作窃取算法之前,咱们先来回顾 ThreadPoolExecutor 的工作模式, 客户端在向线程池提交工作的时候,线程池会首先判断以后的工作线程是否小于外围线程数,如果小于外围线程数,会持续向线程中增加工作线程,如果不小于外围线程数,则会将工作搁置于工作队列中,如果队列曾经满了,则判断以后的工作线程是否大于最大线程数,如果大于等于最大线程数,就会触发回绝策略。

// ThreadPoolExecutor 的 execute 办法 
public void execute(Runnable command) {if (command == null)
            throw new NullPointerException();
        int c = ctl.get(); //ctl 能够简略的了解为线程的状态量
        // workerCountOf 用于计算工作线程的数量
        if (workerCountOf(c) < corePoolSize) {
            // addWorker 用于的第二个参数用于指定增加的是否是外围线程
            if (addWorker(command, true))
                return;
            c = ctl.get();}
        // 如果线程处于运行状态且向工作队列增加工作失败, 则尝试增加非核心线程
        if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 增加失败则触发回绝策略
        else if (!addWorker(command, false))
            reject(command);
    }

在 ThreadPoolExecutor 中取得工作队列中的工作通过间接调用阻塞队列的 poll 和 take 办法来获取,然而为了防止多线程生产问题,阻塞队列在获取的时候是通过加锁来解决的:

 public E poll(long timeout, TimeUnit unit) throws InterruptedException {long nanos = unit.toNanos(timeout);
         // 咱们的老朋友 ReentrantLock
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {while (count == 0) {if (nanos <= 0L)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();} finally {lock.unlock();
        }
    }

如果能够的话,咱们心愿是线程在从工作队列中获取工作的时候的等待时间尽可能的短的,那咱们就须要筹备多个队列,为每个线程筹备一个队列,这样一个消费者解决完本人队列的工作的时候能够从其余工作线程对应的队列“窃取”工作进行解决,这样就不会导致工作线程闲置,并能加重其余消费者线程的累赘。这也就是工作窃取算法的思维。那工作窃取算法就一点故障都没有?如果是这样的话,ThreadPoolExecutor 仿佛在 ForkJoinPool 进去之后就应该早早的被打上 @Deprecated。到当初也没有,那么 ForkJoinPool 就必然有他的适应场景。应该不只是我一个人有这个疑难,我在 StackOverFlow 找到了我想要的答案:

  • ThreadPoolExecutor vs ForkJoinPool: stealing subtasks

Assume that I have created ThreadPoolExecutor with 10 threads and 3000 Callable tasks have been submitted. How these threads share the load of execution of sub tasks?

And How ForkJoin pool behaves differently for same use case?

假如 ThreadPoolExecutor 有 10 个线程,向 ThreadPoolExecutor 提交了 3000 个工作,这些线程将怎么协同执行这些工作?

ForkJoinPool 在同样的状况下有什么不同?

答案 1:如果你向线程池提交了 3000 个工作,这些工作也不能再拆分成子工作,ForkJoinPool 和 ThreadPoolExecutor 的行为没有什么显著不同: 10 个线程一次执行 10 个工作,直到这些工作被执行实现。ForkJoinPool 的实用场景为你有一些工作,然而这些工作能够合成为一些小工作。另一个答复也是从生产工作的时候造成线程饥饿的景象登程的。

我这里的领会是切割工作的粒度,当咱们须要将一些计算工作并行化的时候,咱们就会求助于线程池,转而向线程池中提交工作:

通常状况下咱们心愿工作的粒度尽可能的小,这样咱们就能够提交给线程池的时候就能够加大并发粒度,假如一个工作能够显著是没有划分好粒度过大,而咱们的线程池的外围线程是 10 个,提交给线程池的时候就只有这一个工作,那么线程池的工作线程就只有一个,又假如这个工作又能够被拆分成五个独立的子工作, 咱们权且命名为 A、B、C、D、E、F。F 的执行工夫最长,那么该工作的最终工夫可能就是 A +B+C+D+E+ F 的执行工夫,那如果咱们将其切割成五个独立的子工作,那么该工作的最终执行工夫就是最长的那个工作的执行工夫。这种状况是开发者能够明确的晓得工作的最小粒度,向 ThreadPoolExecutor 执行。然而有的时候对于某些工作咱们可能无奈手动切割粒度,咱们心愿让给一个粒度让程序依照此粒度去宰割,而后去执行,最初合并后果,这也就是 ForkJoinPool 的劣势计算场景。

切割为工作为 Fork:

合并工作后果为 Join:

这就是所谓的 ForkJoin 是也,其实咱们也能够只用 Fork。跟这个有点相似的就是归并排序:

归并排序过程是不是有点相似于咱们跟下面讲的 ForkJoin 很是相像?咱们接下来通过例子来感受一下 ForkJoin 模式。

而后用起来

ForkJoinPool 有四个构造函数:

1. ForkJoinPool() 
2. ForkJoinPool(int parallelism) 
3.ForkJoinPool(int parallelism,ForkJoinWorkerThreadFactory factory,UncaughtExceptionHandler handler, boolean asyncMode)
4.ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, int mode,  String workerNamePrefix)
5. ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler,  boolean asyncMode,    int corePoolSize,
 int maximumPoolSize,
 int minimumRunnable,
 Predicate<? super ForkJoinPool> saturate,
 long keepAliveTime,
  TimeUnit unit) // 自 JDK9 引入

1.2.3 实质上都是调用 4. 咱们重点来讲下 4.

  • parallelism 并行粒度, 也就是设置该线程池的线程数。

应用无参结构, 为 Runtime.getRuntime().availableProcessors()的返回值,此办法返回的是逻辑外围而非物理外围。我的电脑是八核十六外围,所以失去的数字为 16

  • factory 线程工程, 在增加工作线程的时候调用此办法增加工作线程
  • handler 异样解决者, 工作线程遇到了问题该如何解决。
  • asyncMode 用于管制生产形式

if true, establishes local first-in-first-out scheduling mode for forked tasks that are never joined. This mode may be more appropriate than default locally stack-based mode in applications in which worker threads only process event-style asynchronous tasks. For default value, use false.

如果为真为 Fork 工作采取先进先出的调度形式,这些任何永远不会合并。这种调度模式更适宜一些基于本地栈的利用,这些工作线程解决的是事件驱动的异步工作。默认为 false。

  • workerNamePrefix: 用于管制工作线程的名称

接下来咱们来看怎么向 ForkJoinPool 提交工作, 咱们看到了一个新的参数类型: ForkJoinTask。咱们先一个例子来介绍 ForkJoin.

public class FibonacciForkJoinTask extends RecursiveTask<Integer> {

    private final int n;

    public FibonacciForkJoinTask(int n) {this.n = n;}
    @Override
    protected Integer compute() {if (n <= 1){return n;}
        FibonacciForkJoinTask f1 = new FibonacciForkJoinTask(n - 1);
        f1.fork(); // 合成 f1
        FibonacciForkJoinTask f2 = new FibonacciForkJoinTask(n - 2);
        f2.fork(); // 合成 f2
        return  f1.join() + f2.join(); // 合并 f1 和 f2 的计算结果
    }
}
public class ForkJoinDemo01 {public static void main(String[] args) throws Exception {ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Integer> forkJoinTaskResult = forkJoinPool.submit(new FibonacciForkJoinTask(4));
        System.out.println(forkJoinTaskResult.get());
    }
}

ForkJoinTask 及其子类概述

ForkJoinTask 是提交给 ForkJoinPool 工作的基类,这是一个抽象类。RecursiveTask 和 RecursiveAction 见名知义,用于递归切割工作粒度,区别在于 RecursiveTask 有返回值,RecursiveAction 无返回值。RecursiveTask、RecursiveAction1.7 推出,CountedCompleter 有一个钩子函数 onCompletion(CountedCompleter<?> caller),所有工作实现会触发此办法。下面的用 Fork/Join 计算斐波那契数列只是为了演示用法,单线程跑的也很快,也有更快的算法。并行流默认应用的也是 ForkJoinPool, 举荐应用 commonPool 来应用 ForkJoinPool.

总结一下

咱们对事物的认知是一个逐渐清晰的过程,本篇就当作 ForkJoin 的入门文章吧,还致力的挣扎了一下想看懂调用 fork 办法做了什么,最初还是放弃了。不是所有的场景 ForkJoin 都能胜任,如果咱们能比拟好的管制工作粒度,那么其实 ForkJoinPool 和 ThreadPoolExecutor 的执行速度没有什么区别,你也能够只 fork 不 Join。

参考资料

  • 《Java 多线程编程实战指南》第 2 版 黄文海著
  • ThreadPoolExecutor vs ForkJoinPool: stealing subtasks https://stackoverflow.com/que…
  • ForkJoin 理论中利用 https://juejin.cn/post/698395…
  • Stack-based 的虚拟机有什么罕用的优化策略?- 来自知乎

正文完
 0