共计 2793 个字符,预计需要花费 7 分钟才能阅读完成。
本文源码:GitHub·点这里 || GitEE·点这里
一、Fork/Join 框架
Java 提供 Fork/Join 框架用于并行执行工作,外围的思维就是将一个大工作切分成多个小工作,而后汇总每个小工作的执行后果失去这个大工作的最终后果。
这种机制策略在分布式数据库中十分常见,数据分布在不同的数据库的正本中,在执行查问时,每个服务都要跑查问工作,最初在一个服务上做数据合并,或者提供一个两头引擎层,用来汇总数据:
外围流程:切分工作,模块工作异步执行,单任务后果合并;在编程外面,通用的代码不多,然而通用的思维却随处可见。
二、外围 API 和办法
1、编码案例
基于 1 +2..+100 的计算案例演示 Fork/Join 框架根底用法。
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
public class ForkJoin01 {public static void main (String[] args) {int[] numArr = new int[100];
for (int i = 0; i < 100; i++) {numArr[i] = i + 1;
}
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Integer> forkJoinTask =
pool.submit(new SumTask(numArr, 0, numArr.length));
System.out.println("合并计算结果:" + forkJoinTask.invoke());
pool.shutdown();}
}
/**
* 线程工作
*/
class SumTask extends RecursiveTask<Integer> {
/*
* 切分工作块的阈值
* 如果 THRESHOLD=100
* 输入:main【求和:(0...100)=5050】合并计算结果: 5050
*/
private static final int THRESHOLD = 100;
private int arr[];
private int start;
private int over;
public SumTask(int[] arr, int start, int over) {
this.arr = arr;
this.start = start;
this.over = over;
}
// 求和计算
private Integer sumCalculate () {
Integer sum = 0;
for (int i = start; i < over; i++) {sum += arr[i];
}
String task = "【求和:(" + start + "..." + over + ")=" + sum +"】";
System.out.println(Thread.currentThread().getName() + task);
return sum ;
}
@Override
protected Integer compute() {if ((over - start) <= THRESHOLD) {return sumCalculate();
}else {int middle = (start + over) / 2;
SumTask left = new SumTask(arr, start, middle);
SumTask right = new SumTask(arr, middle, over);
left.fork();
right.fork();
return left.join() + right.join();
}
}
}
2、外围 API 阐明
ForkJoinPool:线程池最大的特点就是分叉 (fork) 合并 (join) 模式,将一个大工作拆分成多个小工作,并行执行,再联合工作窃取算法进步整体的执行效率,充分利用 CPU 资源。
ForkJoinTask:运行在 ForkJoinPool 的一个工作形象, 能够了解为类线程然而比线程轻量的实体, 在 ForkJoinPool 中运行的大量 ForkJoinWorkerThread 能够持有大量的 ForkJoinTask 和它的子工作,同时也是一个轻量的 Future, 应用时应防止较长阻塞或 IO。
继承子类:
- RecursiveAction:递归无返回值的 ForkJoinTask 子类;
- RecursiveTask:递归有返回值的 ForkJoinTask 子类;
外围办法:
- fork():在以后线程运行的线程池中创立一个子工作;
- join():模块子工作实现的时候返回工作后果;
- invoke():执行工作,也能够实时期待最终执行后果;
3、外围策略阐明
工作拆分
ForkJoinPool 基于分治算法,将大工作一直拆分上来,每个子工作再拆分一半,直到达到最阈值设定的工作粒度为止,并且把工作放到不同的队列外面,而后从最底层的工作开始执行计算,并且往上一层合并后果,这样用绝对少的线程解决大量的工作。
工作窃取算法
大工作被宰割为独立的子工作,并且子工作别离放到不同的队列里,并为每个队列创立一个线程来执行队列里的工作,假如线程 A 优先把调配到本人队列里的工作执行结束,此时如果线程 E 对应的队列里还有工作期待执行,闲暇的线程 A 会窃取线程 E 队列里工作执行,并且为了缩小窃取工作时线程 A 和被窃取工作线程 E 之间的产生竞争,窃取工作的线程 A 会从队列的尾部获取工作执行,被窃取工作线程 E 会从队列的头部获取工作执行。
工作窃取算法的长处:线程间的竞争很少,充分利用线程进行并行计算,然而在工作队列里只有一个工作时,也可能会存在竞争状况。
三、利用案例剖析
在后端系统的业务开发中,可用做权限校验,批量定时工作状态刷新等各种性能场景:
如上图,假如数据的主键 id 分段如下,数据场景可能是数据源的连贯信息,或者产品有效期相似业务,都能够基于线程池工作解决:
权限校验
基于数据源的连贯信息,判断数据源是否可用,例如:判断连贯是否可用,用户是否有库表的读写权限,在数据源多的状况下,基于线程池疾速校验。
状态刷新
在定时工作中,常常见到状态类的刷新操作,例如判断产品是否在有效期范畴内,在有效期范畴之外,把数据置为生效状态,都能够利用线程池疾速解决。
四、源代码地址
GitHub·地址
https://github.com/cicadasmile/java-base-parent
GitEE·地址
https://gitee.com/cicadasmile/java-base-parent
举荐浏览:Java 并发系列
序号 | 文章题目 |
---|---|
01 | Java 并发:线程的创立形式,状态周期治理 |
02 | Java 并发:线程外围机制,根底概念扩大 |
03 | Java 并发:多线程并发拜访,同步控制 |
04 | Java 并发:线程间通信,期待 / 告诉机制 |
05 | Java 并发:乐观锁和乐观锁机制 |
06 | Java 并发:Lock 机制下 API 用法详解 |