摘要:Fork/Join 框架位于 J.U.C(java.util.concurrent) 中,是 Java7 中提供的用于执行并行任务的框架,其能够将大工作宰割成若干个小工作,最终汇总每个小工作的后果后失去最终后果。
本文分享自华为云社区《【高并发】如何应用 Java7 提供的 Fork/Join 框架实现高并发程序?》,作者:冰 河。
Fork/Join 框架
位于 J.U.C(java.util.concurrent) 中,是 Java7 中提供的用于执行并行任务的框架,其能够将大工作宰割成若干个小工作,最终汇总每个小工作的后果后失去最终后果。根本思维和 Hadoop 的 MapReduce 思维相似。
次要采纳的是工作窃取算法(某个线程从其余队列里窃取工作来执行),并行分治计算中的一种 Work-stealing 策略
为什么须要应用工作窃取算法呢?
如果咱们须要做一个比拟大的工作,咱们能够把这个工作宰割为若干互不依赖的子工作,为了缩小线程间的竞争,于是把这些子工作别离放到不同的队列里,并为每个队列创立一个独自的线程来执行队列里的工作,线程和队列一一对应,比方 A 线程负责解决 A 队列里的工作。然而有的线程会先把本人队列里的工作干完,而其余线程对应的队列里还有工作期待解决。干完活的线程与其等着,不如去帮其余线程干活,于是它就去其余线程的队列里窃取一个工作来执行。而在这时它们会拜访同一个队列,所以为了缩小窃取工作线程和被窃取工作线程之间的竞争,通常会应用双端队列,被窃取工作线程永远从双端队列的头部拿工作执行,而窃取工作的线程永远从双端队列的尾部拿工作执行。
工作窃取算法的长处
充分利用线程进行并行计算,并缩小了线程间的竞争
工作窃取算法的毛病
在某些状况下还是存在竞争,比方双端队列里只有一个工作时。并且该算法会耗费更多的系统资源,比方创立多个线程和多个双端队列。
Fork/Join 框架局限性
对于 Fork/Join 框架而言,当一个工作正在期待它应用 Join 操作创立的子工作完结时,执行这个工作的工作线程查找其余未被执行的工作,并开始执行这些未被执行的工作,通过这种形式,线程充分利用它们的运行工夫来进步应用程序的性能。为了实现这个指标,Fork/Join 框架执行的工作有一些局限性,如下所示。
• 工作只能应用 Fork 和 Join 操作来进行同步机制,如果应用了其余同步机制,则在同步操作时,工作线程就不能执行其余工作了。比方,在 Fork/Join 框架中,使工作进行了睡眠,那么,在睡眠期间内,正在执行这个工作的工作线程将不会执行其余工作了。
• 在 Fork/Join 框架中,所拆分的工作不应该去执行 IO 操作,比方:读写数据文件
• 工作不能抛出查看异样,必须通过必要的代码来进去这些异样
Fork/Join 框架的外围类
Fork/Join 框架的外围是两个类:ForkJoinPool 和 ForkJoinTask。ForkJoinPool 负责实现工作窃取算法、管理工作线程、提供对于工作的状态以及执行信息。ForkJoinTask 次要提供在工作中执行 Fork 和 Join 操作的机制。
示例代码
示例代码如下:
package io.binghe.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {
public static final int threshold = 2;
private int start;
private int end;
public ForkJoinTaskExample(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
// 如果工作足够小就计算工作
boolean canCompute = (end - start) <= threshold;
if (canCompute) {for (int i = start; i <= end; i++) {sum += i;}
} else {
// 如果工作大于阈值,就决裂成两个子工作计算
int middle = (start + end) / 2;
ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);
// 执行子工作
leftTask.fork();
rightTask.fork();
// 期待工作执行完结合并其后果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
// 合并子工作
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {ForkJoinPool forkjoinPool = new ForkJoinPool();
// 生成一个计算工作,计算 1 +2+3+4
ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);
// 执行一个工作
Future<Integer> result = forkjoinPool.submit(task);
try {log.info("result:{}", result.get());
} catch (Exception e) {log.error("exception", e);
}
}
}
点击关注,第一工夫理解华为云陈腐技术~