关于高并发:并发编程中一种经典的分而治之的思想

33次阅读

共计 5946 个字符,预计需要花费 15 分钟才能阅读完成。

写在后面

在 JDK 中,提供了这样一种性能:它可能将简单的逻辑拆分成一个个简略的逻辑来并行执行,待每个并行执行的逻辑执行实现后,再将各个后果进行汇总,得出最终的后果数据。有点像 Hadoop 中的 MapReduce。

ForkJoin 是由 JDK1.7 之后提供的多线程并发解决框架。ForkJoin 框架的根本思维是分而治之。什么是分而治之?分而治之就是将一个简单的计算,依照设定的阈值分解成多个计算,而后将各个计算结果进行汇总。相应的,ForkJoin 将简单的计算当做一个工作,而合成的多个计算则是当做一个个子工作来并行执行。

注:文章已同步收录到:https://github.com/sunshinely… 和 https://gitee.com/binghe001/t…。如果文件对你有点帮忙,别忘记给个 Star 哦!如果小伙伴们有任何疑难,都能够加我微信【sun_shine_lyz】进行交换哦!

Java 并发编程的倒退

对于 Java 语言来说,生来就反对多线程并发编程,在并发编程畛域也是在一直倒退的。Java 在其倒退过程中对并发编程的反对越来越欠缺也正好印证了这一点。

  • Java 1 反对 thread,synchronized。
  • Java 5 引入了 thread pools,blocking queues, concurrent collections,locks, condition queues。
  • Java 7 退出了 fork-join 库。
  • Java 8 退出了 parallel streams。

并发与并行

并发和并行在实质上还是有所区别的。

并发

并发指的是在同一时刻,只有一个线程可能获取到 CPU 执行工作,而多个线程被疾速的轮换执行,这就使得在宏观上具备多个线程同时执行的成果,并发不是真正的同时执行,并发能够应用下图示意。

并行

并行指的是无论何时,多个线程都是在多个 CPU 外围上同时执行的,是真正的同时执行。

分治法

根本思维

把一个规模大的问题划分为规模较小的子问题,而后分而治之,最初合并子问题的解失去原问题的解。

步骤

①宰割原问题;

②求解子问题;

③合并子问题的解为原问题的解。

咱们能够应用如下伪代码来示意这个步骤。

if(工作很小){间接计算失去后果}else{
    分拆成 N 个子工作
    调用子工作的 fork()进行计算
    调用子工作的 join()合并计算结果}

在分治法中,子问题个别是互相独立的,因而,常常通过递归调用算法来求解子问题。

典型利用

  • 二分搜寻
  • 大整数乘法
  • Strassen 矩阵乘法
  • 棋盘笼罩
  • 合并排序
  • 疾速排序
  • 线性工夫抉择
  • 汉诺塔

ForkJoin 并行处理框架

ForkJoin 框架概述

Java 1.7 引入了一种新的并发框架—— Fork/Join Framework,次要用于实现“分而治之”的算法,特地是分治之后递归调用的函数。

ForkJoin 框架的实质是一个用于并行执行工作的框架,可能把一个大工作宰割成若干个小工作,最终汇总每个小工作后果后失去大工作的计算结果。在 Java 中,ForkJoin 框架与 ThreadPool 共存,并不是要替换 ThreadPool

其实,在 Java 8 中引入的并行流计算,外部就是采纳的 ForkJoinPool 来实现的。例如,上面应用并行流实现打印数组元组的程序。

public class SumArray {public static void main(String[] args){List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9);
        numberList.parallelStream().forEach(System.out::println);
    }
}

这段代码的背地就应用到了 ForkJoinPool。

说到这里,可能有读者会问:能够应用线程池的 ThreadPoolExecutor 来实现啊?为什么要应用 ForkJoinPool 啊?ForkJoinPool 是个什么鬼啊?! 接下来,咱们就来答复这个问题。

ForkJoin 框架原理

ForkJoin 框架是从 jdk1.7 中引入的新个性, 它同 ThreadPoolExecutor 一样,也实现了 Executor 和 ExecutorService 接口。它应用了一个有限队列来保留须要执行的工作,而线程的数量则是通过构造函数传入,如果没有向构造函数中传入指定的线程数量,那么以后计算机可用的 CPU 数量会被设置为线程数量作为默认值。

ForkJoinPool 次要应用 分治法 (Divide-and-Conquer Algorithm) 来解决问题。典型的利用比方疾速排序算法。这里的要点在于,ForkJoinPool 可能应用绝对较少的线程来解决大量的工作。比方要对 1000 万个数据进行排序,那么会将这个工作宰割成两个 500 万的排序工作和一个针对这两组 500 万数据的合并工作。以此类推,对于 500 万的数据也会做出同样的宰割解决,到最初会设置一个阈值来规定当数据规模到多少时,进行这样的宰割解决。比方,当元素的数量小于 10 时,会进行宰割,转而应用插入排序对它们进行排序。那么到最初,所有的工作加起来会有大略 200 万 + 个。问题的关键在于,对于一个工作而言,只有当它所有的子工作实现之后,它才可能被执行。

所以当应用 ThreadPoolExecutor 时,应用分治法会存在问题,因为 ThreadPoolExecutor 中的线程无奈向工作队列中再增加一个工作并在期待该工作实现之后再继续执行。而应用 ForkJoinPool 就可能解决这个问题,它就可能让其中的线程创立新的工作,并挂起以后的工作,此时线程就可能从队列中抉择子工作执行。

那么应用 ThreadPoolExecutor 或者 ForkJoinPool,性能上会有什么差别呢?

首先,应用 ForkJoinPool 可能应用数量无限的线程来实现十分多的具备父子关系的工作,比方应用 4 个线程来实现超过 200 万个工作。然而,应用 ThreadPoolExecutor 时,是不可能实现的,因为 ThreadPoolExecutor 中的 Thread 无奈抉择优先执行子工作,须要实现 200 万个具备父子关系的工作时,也须要 200 万个线程,很显然这是不可行的,也是很不合理的!!

工作窃取算法

如果咱们须要做一个比拟大的工作,咱们能够把这个工作宰割为若干互不依赖的子工作,为了缩小线程间的竞争,于是把这些子工作别离放到不同的队列里,并为每个队列创立一个独自的线程来执行队列里的工作,线程和队列一一对应,比方 A 线程负责解决 A 队列里的工作。然而有的线程会先把本人队列里的工作干完,而其余线程对应的队列里还有工作期待解决。干完活的线程与其等着,不如去帮其余线程干活,于是它就去其余线程的队列里窃取一个工作来执行。而在这时它们会拜访同一个队列,所以为了缩小窃取工作线程和被窃取工作线程之间的竞争,通常会应用双端队列,被窃取工作线程永远从双端队列的头部拿工作执行,而窃取工作的线程永远从双端队列的尾部拿工作执行。

工作窃取算法的长处:
充分利用线程进行并行计算,并缩小了线程间的竞争。

工作窃取算法的毛病:
在某些状况下还是存在竞争,比方双端队列里只有一个工作时。并且该算法会耗费更多的系统资源,比方创立多个线程和多个双端队列。

Fork/Join 框架局限性:

对于 Fork/Join 框架而言,当一个工作正在期待它应用 Join 操作创立的子工作完结时,执行这个工作的工作线程查找其余未被执行的工作,并开始执行这些未被执行的工作,通过这种形式,线程充分利用它们的运行工夫来进步应用程序的性能。为了实现这个指标,Fork/Join 框架执行的工作有一些局限性。

(1)工作只能应用 Fork 和 Join 操作来进行同步机制,如果应用了其余同步机制,则在同步操作时,工作线程就不能执行其余工作了。比方,在 Fork/Join 框架中,使工作进行了睡眠,那么,在睡眠期间内,正在执行这个工作的工作线程将不会执行其余工作了。
(2)在 Fork/Join 框架中,所拆分的工作不应该去执行 IO 操作,比方:读写数据文件。
(3)工作不能抛出查看异样,必须通过必要的代码来进去这些异样。

ForkJoin 框架的实现

ForkJoin 框架中一些重要的类如下所示。

ForkJoinPool 框架中波及的次要类如下所示。

1.ForkJoinPool 类

实现了 ForkJoin 框架中的线程池,由类图能够看出,ForkJoinPool 类实现了线程池的 Executor 接口。

咱们也能够从下图中看出 ForkJoinPool 的类图关系。

其中,能够应用 Executors.newWorkStealPool()办法创立 ForkJoinPool。

ForkJoinPool 中提供了如下提交工作的办法。

public void execute(ForkJoinTask<?> task)
public void execute(Runnable task)
public <T> T invoke(ForkJoinTask<T> task)
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
public <T> ForkJoinTask<T> submit(Callable<T> task)
public <T> ForkJoinTask<T> submit(Runnable task, T result)
public ForkJoinTask<?> submit(Runnable task)

2.ForkJoinWorkerThread 类

实现 ForkJoin 框架中的线程。

3.ForkJoinTask<V> 类

ForkJoinTask 封装了数据及其相应的计算,并且反对细粒度的数据并行。ForkJoinTask 比线程要轻量,ForkJoinPool 中大量工作线程可能运行大量的 ForkJoinTask。

ForkJoinTask 类中次要包含两个办法 fork()和 join(),别离实现工作的分拆与合并。

fork()办法相似于 Thread.start(),然而它并不立刻执行工作,而是将工作放入工作队列中。跟 Thread.join()办法不同,ForkJoinTask 的 join()办法并不简略的阻塞线程,而是利用工作线程运行其余工作,当一个工作线程中调用 join(),它将解决其余工作,直到留神到指标子工作曾经实现。

咱们能够应用下图来示意这个过程。

ForkJoinTask 有 3 个子类:

  • RecursiveAction:无返回值的工作。
  • RecursiveTask:有返回值的工作。
  • CountedCompleter:实现工作后将触发其余工作。

4.RecursiveTask<V> 类

有返回后果的 ForkJoinTask 实现 Callable。

5.RecursiveAction 类

无返回后果的 ForkJoinTask 实现 Runnable。

6.CountedCompleter<T> 类

在工作实现执行后会触发执行一个自定义的钩子函数。

ForkJoin 示例程序

package io.binghe.concurrency.example.aqs;
 
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {
    public static final int threshold = 2;
    private int start;
    private int end;
    public ForkJoinTaskExample(int start, int end) {
        this.start = start;
        this.end = end;
    }
    @Override
    protected Integer compute() {
        int sum = 0;
        // 如果工作足够小就计算工作
        boolean canCompute = (end - start) <= threshold;
        if (canCompute) {for (int i = start; i <= end; i++) {sum += i;}
        } else {
            // 如果工作大于阈值,就决裂成两个子工作计算
            int middle = (start + end) / 2;
            ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
            ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);
 
            // 执行子工作
            leftTask.fork();
            rightTask.fork();
 
            // 期待工作执行完结合并其后果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();
 
            // 合并子工作
            sum = leftResult + rightResult;
        }
        return sum;
    }
    public static void main(String[] args) {ForkJoinPool forkjoinPool = new ForkJoinPool();
 
        // 生成一个计算工作,计算 1 +2+3+4
        ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);
 
        // 执行一个工作
        Future<Integer> result = forkjoinPool.submit(task);
 
        try {log.info("result:{}", result.get());
        } catch (Exception e) {log.error("exception", e);
        }
    }
}

重磅福利

微信搜一搜【冰河技术】微信公众号,关注这个有深度的程序员,每天浏览超硬核技术干货,公众号内回复【PDF】有我筹备的一线大厂面试材料和我原创的超硬核 PDF 技术文档,以及我为大家精心筹备的多套简历模板(不断更新中),心愿大家都能找到心仪的工作,学习是一条时而郁郁寡欢,时而开怀大笑的路,加油。如果你通过致力胜利进入到了心仪的公司,肯定不要懈怠放松,职场成长和新技术学习一样,逆水行舟。如果有幸咱们江湖再见!

另外,我开源的各个 PDF,后续我都会继续更新和保护,感激大家长期以来对冰河的反对!!

正文完
 0