关于java:教你用Java7的ForkJoin框架开发高并发程序

55次阅读

共计 2469 个字符,预计需要花费 7 分钟才能阅读完成。

摘要:Fork/Join 框架位于 J.U.C(java.util.concurrent) 中,是 Java7 中提供的用于执行并行任务的框架,其能够将大工作宰割成若干个小工作,最终汇总每个小工作的后果后失去最终后果。

本文分享自华为云社区《【高并发】如何应用 Java7 提供的 Fork/Join 框架实现高并发程序?》,作者:冰 河。

Fork/Join 框架

位于 J.U.C(java.util.concurrent) 中,是 Java7 中提供的用于执行并行任务的框架,其能够将大工作宰割成若干个小工作,最终汇总每个小工作的后果后失去最终后果。根本思维和 Hadoop 的 MapReduce 思维相似。
次要采纳的是工作窃取算法(某个线程从其余队列里窃取工作来执行),并行分治计算中的一种 Work-stealing 策略

为什么须要应用工作窃取算法呢?

如果咱们须要做一个比拟大的工作,咱们能够把这个工作宰割为若干互不依赖的子工作,为了缩小线程间的竞争,于是把这些子工作别离放到不同的队列里,并为每个队列创立一个独自的线程来执行队列里的工作,线程和队列一一对应,比方 A 线程负责解决 A 队列里的工作。然而有的线程会先把本人队列里的工作干完,而其余线程对应的队列里还有工作期待解决。干完活的线程与其等着,不如去帮其余线程干活,于是它就去其余线程的队列里窃取一个工作来执行。而在这时它们会拜访同一个队列,所以为了缩小窃取工作线程和被窃取工作线程之间的竞争,通常会应用双端队列,被窃取工作线程永远从双端队列的头部拿工作执行,而窃取工作的线程永远从双端队列的尾部拿工作执行。

工作窃取算法的长处

充分利用线程进行并行计算,并缩小了线程间的竞争

工作窃取算法的毛病

在某些状况下还是存在竞争,比方双端队列里只有一个工作时。并且该算法会耗费更多的系统资源,比方创立多个线程和多个双端队列。

Fork/Join 框架局限性

对于 Fork/Join 框架而言,当一个工作正在期待它应用 Join 操作创立的子工作完结时,执行这个工作的工作线程查找其余未被执行的工作,并开始执行这些未被执行的工作,通过这种形式,线程充分利用它们的运行工夫来进步应用程序的性能。为了实现这个指标,Fork/Join 框架执行的工作有一些局限性,如下所示。

• 工作只能应用 Fork 和 Join 操作来进行同步机制,如果应用了其余同步机制,则在同步操作时,工作线程就不能执行其余工作了。比方,在 Fork/Join 框架中,使工作进行了睡眠,那么,在睡眠期间内,正在执行这个工作的工作线程将不会执行其余工作了。
• 在 Fork/Join 框架中,所拆分的工作不应该去执行 IO 操作,比方:读写数据文件
• 工作不能抛出查看异样,必须通过必要的代码来进去这些异样

Fork/Join 框架的外围类

Fork/Join 框架的外围是两个类:ForkJoinPool 和 ForkJoinTask。ForkJoinPool 负责实现工作窃取算法、管理工作线程、提供对于工作的状态以及执行信息。ForkJoinTask 次要提供在工作中执行 Fork 和 Join 操作的机制。

示例代码

示例代码如下:

package io.binghe.concurrency.example.aqs;
 
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {
    public static final int threshold = 2;
    private int start;
    private int end;
    public ForkJoinTaskExample(int start, int end) {
        this.start = start;
        this.end = end;
    }
    @Override
    protected Integer compute() {
        int sum = 0;
        // 如果工作足够小就计算工作
        boolean canCompute = (end - start) <= threshold;
        if (canCompute) {for (int i = start; i <= end; i++) {sum += i;}
        } else {
            // 如果工作大于阈值,就决裂成两个子工作计算
            int middle = (start + end) / 2;
            ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
            ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);
 
            // 执行子工作
            leftTask.fork();
            rightTask.fork();
 
            // 期待工作执行完结合并其后果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();
 
            // 合并子工作
            sum = leftResult + rightResult;
        }
        return sum;
    }
    public static void main(String[] args) {ForkJoinPool forkjoinPool = new ForkJoinPool();
 
        // 生成一个计算工作,计算 1 +2+3+4
        ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);
 
        // 执行一个工作
        Future<Integer> result = forkjoinPool.submit(task);
 
        try {log.info("result:{}", result.get());
        } catch (Exception e) {log.error("exception", e);
        }
    }
}

点击关注,第一工夫理解华为云陈腐技术~

正文完
 0