摘要:Fork/Join框架位于J.U.C(java.util.concurrent)中,是Java7中提供的用于执行并行任务的框架,其能够将大工作宰割成若干个小工作,最终汇总每个小工作的后果后失去最终后果。
本文分享自华为云社区《【高并发】如何应用Java7提供的Fork/Join框架实现高并发程序?》,作者:冰 河。
Fork/Join框架
位于J.U.C(java.util.concurrent)中,是Java7中提供的用于执行并行任务的框架,其能够将大工作宰割成若干个小工作,最终汇总每个小工作的后果后失去最终后果。根本思维和Hadoop的MapReduce思维相似。
次要采纳的是工作窃取算法(某个线程从其余队列里窃取工作来执行),并行分治计算中的一种Work-stealing策略
为什么须要应用工作窃取算法呢?
如果咱们须要做一个比拟大的工作,咱们能够把这个工作宰割为若干互不依赖的子工作,为了缩小线程间的竞争,于是把这些子工作别离放到不同的队列里,并为每个队列创立一个独自的线程来执行队列里的工作,线程和队列一一对应,比方A线程负责解决A队列里的工作。然而有的线程会先把本人队列里的工作干完,而其余线程对应的队列里还有工作期待解决。干完活的线程与其等着,不如去帮其余线程干活,于是它就去其余线程的队列里窃取一个工作来执行。而在这时它们会拜访同一个队列,所以为了缩小窃取工作线程和被窃取工作线程之间的竞争,通常会应用双端队列,被窃取工作线程永远从双端队列的头部拿工作执行,而窃取工作的线程永远从双端队列的尾部拿工作执行。
工作窃取算法的长处
充分利用线程进行并行计算,并缩小了线程间的竞争
工作窃取算法的毛病
在某些状况下还是存在竞争,比方双端队列里只有一个工作时。并且该算法会耗费更多的系统资源,比方创立多个线程和多个双端队列。
Fork/Join框架局限性
对于Fork/Join框架而言,当一个工作正在期待它应用Join操作创立的子工作完结时,执行这个工作的工作线程查找其余未被执行的工作,并开始执行这些未被执行的工作,通过这种形式,线程充分利用它们的运行工夫来进步应用程序的性能。为了实现这个指标,Fork/Join框架执行的工作有一些局限性,如下所示。
• 工作只能应用Fork和Join操作来进行同步机制,如果应用了其余同步机制,则在同步操作时,工作线程就不能执行其余工作了。比方,在Fork/Join框架中,使工作进行了睡眠,那么,在睡眠期间内,正在执行这个工作的工作线程将不会执行其余工作了。
• 在Fork/Join框架中,所拆分的工作不应该去执行IO操作,比方:读写数据文件
• 工作不能抛出查看异样,必须通过必要的代码来进去这些异样
Fork/Join框架的外围类
Fork/Join框架的外围是两个类:ForkJoinPool和ForkJoinTask。ForkJoinPool负责实现工作窃取算法、管理工作线程、提供对于工作的状态以及执行信息。ForkJoinTask次要提供在工作中执行Fork和Join操作的机制。
示例代码
示例代码如下:
package io.binghe.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.Future;import java.util.concurrent.RecursiveTask;@Slf4jpublic class ForkJoinTaskExample extends RecursiveTask<Integer> { public static final int threshold = 2; private int start; private int end; public ForkJoinTaskExample(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; //如果工作足够小就计算工作 boolean canCompute = (end - start) <= threshold; if (canCompute) { for (int i = start; i <= end; i++) { sum += i; } } else { // 如果工作大于阈值,就决裂成两个子工作计算 int middle = (start + end) / 2; ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle); ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end); // 执行子工作 leftTask.fork(); rightTask.fork(); // 期待工作执行完结合并其后果 int leftResult = leftTask.join(); int rightResult = rightTask.join(); // 合并子工作 sum = leftResult + rightResult; } return sum; } public static void main(String[] args) { ForkJoinPool forkjoinPool = new ForkJoinPool(); //生成一个计算工作,计算1+2+3+4 ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100); //执行一个工作 Future<Integer> result = forkjoinPool.submit(task); try { log.info("result:{}", result.get()); } catch (Exception e) { log.error("exception", e); } }}
点击关注,第一工夫理解华为云陈腐技术~