关于java:Java并发编程07ForkJoin框架机制详解

5次阅读

共计 2793 个字符,预计需要花费 7 分钟才能阅读完成。

本文源码:GitHub·点这里 || GitEE·点这里

一、Fork/Join 框架

Java 提供 Fork/Join 框架用于并行执行工作,外围的思维就是将一个大工作切分成多个小工作,而后汇总每个小工作的执行后果失去这个大工作的最终后果。

这种机制策略在分布式数据库中十分常见,数据分布在不同的数据库的正本中,在执行查问时,每个服务都要跑查问工作,最初在一个服务上做数据合并,或者提供一个两头引擎层,用来汇总数据:

外围流程:切分工作,模块工作异步执行,单任务后果合并;在编程外面,通用的代码不多,然而通用的思维却随处可见。

二、外围 API 和办法

1、编码案例

基于 1 +2..+100 的计算案例演示 Fork/Join 框架根底用法。

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

public class ForkJoin01 {public static void main (String[] args) {int[] numArr = new int[100];
        for (int i = 0; i < 100; i++) {numArr[i] = i + 1;
        }
        ForkJoinPool pool = new ForkJoinPool();
        ForkJoinTask<Integer> forkJoinTask =
                pool.submit(new SumTask(numArr, 0, numArr.length));
        System.out.println("合并计算结果:" + forkJoinTask.invoke());
        pool.shutdown();}
}
/**
 * 线程工作
 */
class SumTask extends RecursiveTask<Integer> {
    /*
     * 切分工作块的阈值
     * 如果 THRESHOLD=100
     * 输入:main【求和:(0...100)=5050】合并计算结果: 5050
     */
    private static final int THRESHOLD = 100;
    private int arr[];
    private int start;
    private int over;

    public SumTask(int[] arr, int start, int over) {
        this.arr = arr;
        this.start = start;
        this.over = over;
    }

    // 求和计算
    private Integer sumCalculate () {
        Integer sum = 0;
        for (int i = start; i < over; i++) {sum += arr[i];
        }
        String task = "【求和:(" + start + "..." + over + ")=" + sum +"】";
        System.out.println(Thread.currentThread().getName() + task);
        return sum ;
    }

    @Override
    protected Integer compute() {if ((over - start) <= THRESHOLD) {return sumCalculate();
        }else {int middle = (start + over) / 2;
            SumTask left = new SumTask(arr, start, middle);
            SumTask right = new SumTask(arr, middle, over);
            left.fork();
            right.fork();
            return left.join() + right.join();
        }
    }
}

2、外围 API 阐明

ForkJoinPool:线程池最大的特点就是分叉 (fork) 合并 (join) 模式,将一个大工作拆分成多个小工作,并行执行,再联合工作窃取算法进步整体的执行效率,充分利用 CPU 资源。

ForkJoinTask:运行在 ForkJoinPool 的一个工作形象, 能够了解为类线程然而比线程轻量的实体, 在 ForkJoinPool 中运行的大量 ForkJoinWorkerThread 能够持有大量的 ForkJoinTask 和它的子工作,同时也是一个轻量的 Future, 应用时应防止较长阻塞或 IO。

继承子类:

  • RecursiveAction:递归无返回值的 ForkJoinTask 子类;
  • RecursiveTask:递归有返回值的 ForkJoinTask 子类;

外围办法:

  • fork():在以后线程运行的线程池中创立一个子工作;
  • join():模块子工作实现的时候返回工作后果;
  • invoke():执行工作,也能够实时期待最终执行后果;

3、外围策略阐明

工作拆分

ForkJoinPool 基于分治算法,将大工作一直拆分上来,每个子工作再拆分一半,直到达到最阈值设定的工作粒度为止,并且把工作放到不同的队列外面,而后从最底层的工作开始执行计算,并且往上一层合并后果,这样用绝对少的线程解决大量的工作。

工作窃取算法

大工作被宰割为独立的子工作,并且子工作别离放到不同的队列里,并为每个队列创立一个线程来执行队列里的工作,假如线程 A 优先把调配到本人队列里的工作执行结束,此时如果线程 E 对应的队列里还有工作期待执行,闲暇的线程 A 会窃取线程 E 队列里工作执行,并且为了缩小窃取工作时线程 A 和被窃取工作线程 E 之间的产生竞争,窃取工作的线程 A 会从队列的尾部获取工作执行,被窃取工作线程 E 会从队列的头部获取工作执行。

工作窃取算法的长处:线程间的竞争很少,充分利用线程进行并行计算,然而在工作队列里只有一个工作时,也可能会存在竞争状况。

三、利用案例剖析

在后端系统的业务开发中,可用做权限校验,批量定时工作状态刷新等各种性能场景:

如上图,假如数据的主键 id 分段如下,数据场景可能是数据源的连贯信息,或者产品有效期相似业务,都能够基于线程池工作解决:

权限校验

基于数据源的连贯信息,判断数据源是否可用,例如:判断连贯是否可用,用户是否有库表的读写权限,在数据源多的状况下,基于线程池疾速校验。

状态刷新

在定时工作中,常常见到状态类的刷新操作,例如判断产品是否在有效期范畴内,在有效期范畴之外,把数据置为生效状态,都能够利用线程池疾速解决。

四、源代码地址

GitHub·地址
https://github.com/cicadasmile/java-base-parent
GitEE·地址
https://gitee.com/cicadasmile/java-base-parent

举荐浏览:Java 并发系列

序号 文章题目
01 Java 并发:线程的创立形式,状态周期治理
02 Java 并发:线程外围机制,根底概念扩大
03 Java 并发:多线程并发拜访,同步控制
04 Java 并发:线程间通信,期待 / 告诉机制
05 Java 并发:乐观锁和乐观锁机制
06 Java 并发:Lock 机制下 API 用法详解
正文完
 0