乐趣区

关于java:Java并发工具合集JUC大爆发

并发工具类

通常咱们所说的并发包也就是 java.util.concurrent(JUC),集中了 Java 并发的各种工具类,正当地应用它们能帮忙咱们疾速地实现性能。

  • 作者: 博学谷狂野架构师
  • GitHub:GitHub 地址(有我精心筹备的 130 本电子书 PDF)

    只分享干货、不吹水,让咱们一起加油!😄

1. CountDownLatch

CountDownLatch 是一个同步计数器,初始化的时候 传入须要计数的线程期待数,能够是须要期待执行实现的线程数,或者大于,个别称为 发令枪。\

​ countdownlatch 是一个同步类工具,不波及锁定,当 count 的值为零时以后线程持续运行,不波及同步,只波及线程通信的时候,应用它较为适合

1.1 作用

用来协调多个线程之间的同步,或者说起到线程之间的通信(而不是用作互斥的作用),是一组线程期待其余的线程实现工作当前在执行,相当于加强版 join。

留神:这是一个一次性操作 – 计数无奈重置。如果你须要一个重置的版本计数,思考应用 CyclicBarrier。

1.2 举例

​ 咱们去组团玩耍一样,总共 30 集体,来的人要期待还没有到的人,始终等到第 30 集体到了,咱们才开始登程,在期待过程中,其他人(线程)是期待状态不做任何事件的,始终等所有人(线程)到齐了(筹备实现)才开始执行。

1.3 概念

  • countDownLatch 这个类使一个线程期待其余线程各自执行结束后再执行。
  • 是通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行结束后,计数器的值就 -1,当计数器的值为 0 时,示意所有线程都执行结束,而后在闭锁上期待的线程就能够复原工作了。

咱们关上 CountDownLatch 的源代码剖析,咱们发现最重要的办法就是一下这两个办法:

// 阻塞以后线程,期待其余线程执行实现,直到计数器计数值减到 0。public void await() throws InterruptedException;
// 阻塞以后线程指定的工夫,如果达到工夫就放行,期待其余线程执行实现,直到计数器计数值减到 0。public boolean await(long timeout, TimeUnit unit) throws InterruptedException
// 负责计数器的减一。public void countDown():

1.4 利用场景

1.4.1 多线程压测

有时咱们想同时启动多个线程,实现最大水平的并行性。

​ 例如,咱们想测试一个单例类。如果咱们创立一个初始计数为 1 的 CountDownLatch,并让所有线程都在这个锁上期待,那么咱们能够很轻松地实现测试。咱们只需调用 一次 countDown()办法就能够让所有的期待线程同时复原执行。

1.4.2 期待其余线程

​ 例如应用程序启动类要确保在解决用户申请前,所有 N 个内部零碎曾经启动和运行了,例如解决 excel 中多个表单,如果一个一个进去很耗 IO 和性能,咱们能够等 100 或者 1000 个线程都实现了表单的操作后一下子写进 excel 表单中。

留神:一个线程不肯定只能做 countDown 一次,也能够 countDown 屡次

1.5 示例

1.5.1 筹备实现后执行

在理论我的项目中可能有些线程须要资源筹备实现后能力进行执行,这个时候就能够应用 countDownLatch

package chapter02.countdownlatch;

import java.util.Random;
import java.util.concurrent.*;

/**
 * countdownlatch 示例
 */
public class CountDownLatchTest {private static ExecutorService executorService = Executors.newFixedThreadPool(10);
    private static Random random = new Random();


    public static void execute(CountDownLatch countDownLatch) {
        // 获取一个随机数
        long sleepTime = random.nextInt(10);
        // 获取线程 ID
        long threadId = Thread.currentThread().getId();
        System.out.println("线程 ID" + threadId + ", 开始执行 --countDown");

        try {
            // 睡眠随机秒
            Thread.sleep(sleepTime * 1000);
        } catch (InterruptedException e) {e.printStackTrace();
        }
        // 计数器减 1
        countDownLatch.countDown();
        System.out.println("线程 ID" + threadId + ", 筹备工作实现耗时:" + sleepTime + "以后工夫" + System.currentTimeMillis());
        try {
            // 线程期待其余工作实现后唤醒
            countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();
        }
        System.out.println("线程 ID" + threadId + ", 开始执行工作,以后工夫:" + System.currentTimeMillis());
    }

    public static void main(String[] args) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(5);
        for (int i = 0; i < 5; i++) {executorService.submit(() -> {execute(countDownLatch);
            });
        }
        // 线程期待其余工作实现后唤醒
        countDownLatch.await();
        Thread.sleep(1000);
        executorService.shutdown();
        System.out.println("全副工作执行实现");
    }
}
1.5.2 多线程压测

在实战我的项目中,咱们除了应用 jemter 等工具进行压测外,还能够本人入手应用 CountDownLatch 类编写压测代码。

​ 能够说 jemter 的并发压测背地也是应用的 CountDownLatch,可见把握 CountDownLatch 类的应用是有如许的重要,CountDownLatch是 Java 多线程同步器的四大金刚之一,CountDownLatch 可能使一个线程期待其余线程实现各自的工作后再执行。

package chapter02.countdownlatch;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * countDownLatch 压测
 */
public class CountDownLatchPressure {

    /**
     * 压测业务代码
     */
    public void testLoad() {System.out.println("压测:" + Thread.currentThread().getId() + ":" + System.currentTimeMillis());
    }

    /**
     * 压测启动
     * 主线程负责压测线程筹备工作
     * 压测线程筹备实现后 调用 start.countDown(); 启动线程执行
     * @throws InterruptedException
     */
    private void latchTest() throws InterruptedException {
        // 压测线程数
        int testThreads = 300;
        final CountDownLatch start = new CountDownLatch(1);
        final CountDownLatch end = new CountDownLatch(testThreads);
        // 创立线程池
        ExecutorService exce = Executors.newFixedThreadPool(testThreads);
        // 筹备线程筹备
        for (int i = 0; i < testThreads; i++) {
            // 增加到线程池
            exce.submit(() -> {
                try {
                    // 启动后期待 唤醒
                    start.await();
                    testLoad();} catch (InterruptedException e) {e.printStackTrace();
                } finally {
                    // 压测实现
                    end.countDown();}
            });

        }

        // 连接池线程初始化实现 开始压测
        start.countDown();
        // 压测实现后完结
        end.await();
        // 敞开线程池
        exce.shutdown();}

    public static void main(String[] args) throws InterruptedException {CountDownLatchPressure countDownLatchPressure = new CountDownLatchPressure();
        // 开始压测
        countDownLatchPressure.latchTest();}
}

2. CyclicBarrier

2.1 简介

CyclicBarrier,是 JDK1.5 的 java.util.concurrent(JUC)并发包中提供的一个并发工具类

C yclicBarrier 能够使肯定数量的线程重复地在栅栏地位处会集,当线程达到栅栏地位时将调用 await 办法,这个办法将阻塞直到所有线程都达到栅栏地位,如果所有线程都达到栅栏地位,那么栅栏将关上,此时所有的线程都将被开释,而栅栏将被重置以便下次应用。

2.2 举例

就像生存中咱们会约敌人们到某个餐厅一起吃饭,有些敌人可能会早到,有些敌人可能会晚到,然而这个餐厅规定必须等到所有人到齐之后才会让咱们进去。

​ 这里的敌人们就是各个线程,餐厅就是 CyclicBarrier,感觉和 CountDownLatch 是一样的,然而他们是有区别的,吃完饭之后能够抉择去玩一会,去解决工作,而后期待第二次聚餐,反复循环。

2.3 性能

CyclicBarrierCountDownLatch 是十分相似的,CyclicBarrier外围的概念是在于设置一个期待线程的数量边界,达到了此边界之后进行执行。

CyclicBarrier类是一个同步辅助类,它容许一组线程相互期待,直到达到某个公共屏障点(Common Barrier Point)。

CyclicBarrier类是一种同步机制,它可能对解决一些算法的线程实现同。换句话讲,它就是一个所有线程必须期待的一个栅栏,直到所有线程都达到这里,而后所有线程才能够持续做其余事件。

​ 通过调用 CyclicBarrier 对象的 await() 办法,两个线程能够实现相互期待,一旦 N 个线程在期待 CyclicBarrier 达成,所有线程将被开释掉去继续执行。

2.4 构造方法

咱们能够看下 CyclicBarrier 源码的构造方法

public CyclicBarrier(int parties)
public CyclicBarrier(int parties, Runnable barrierAction)
2.4.1 参数介绍
  • parties : 是参加线程的个数 , 其参数示意屏障拦挡的线程数量,每个线程调用 await 办法通知 CyclicBarrier 我曾经达到了屏障,而后以后线程被阻塞。
  • barrierAction : 优先执行线程 , 用于在线程达到屏障时,优先执行 barrierAction,不便解决更简单的业务场景, 个别用于数据的整顿以及汇总,例如 excel 插入一样,等所有线程都插入完了,达到了屏障后,barrierAction 线程开始进行保留操作,实现后,接下来由其余线程开始进行插入,而后达到屏障接着就是保留,一直循环。

CyclicBarrier 能够用于多线程计算数据,最初合并计算结果的场景。

2.5 重要办法

咱们下面介绍了构造方法,上面咱们介绍下 CyclicBarrier 中重要的办法

// 阻塞以后线程,期待其余线程执行实现。public int await() throws InterruptedException, BrokenBarrierException
// 阻塞以后线程指定的工夫,如果达到工夫就放行,期待其余线程执行实现,public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
  • 线程调用 await() 示意本人曾经达到栅栏
  • BrokenBarrierException 示意栅栏曾经被毁坏,毁坏的起因可能是其中一个线程 await() 时被中断或者超时

2.6 根本应用

一个线程组的线程须要期待所有线程实现工作后再继续执行下一次工作

package chapter02.cyclicbarrier;

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CyclicBarrierTest {private static Random random = new Random();

    /**
     * 执行工作
     *
     * @param barrier
     */
    public static void execute(CyclicBarrier barrier) {
        // 获取一个随机数
        long sleepTime = random.nextInt(10);
        // 获取线程 id
        long threadId = Thread.currentThread().getId();
        try {
            // 睡眠随机秒
            Thread.sleep(sleepTime * 1000);
        } catch (InterruptedException e) {e.printStackTrace();
        }
        System.out.println("线程 ID" + threadId + ", 筹备工作实现耗时:" + sleepTime + "以后工夫" + System.currentTimeMillis());

        // 线程期待其余工作实现后唤醒
        try {barrier.await();
        } catch (InterruptedException e) {e.printStackTrace();
        } catch (BrokenBarrierException e) {e.printStackTrace();
        }
        System.out.println("线程 ID" + threadId + ", 开始执行工作,以后工夫:" + System.currentTimeMillis());
    }


    public static void main(String[] args) {
        // 初始化线程数量
        int threadNum = 5;
        // 初始化个别的线程
        CyclicBarrier barrier = new CyclicBarrier(5, () -> System.out.println("整顿工作开始..."));
        ExecutorService executor = Executors.newFixedThreadPool(threadNum);
        for (int i = 0; i < threadNum; i++) {executor.submit(() -> {execute(barrier);
            });
        }
    }
}

2.7 CyclicBarrier 与 CountDownLatch 区别

  • CountDownLatch 是一次性的,CyclicBarrier 是可循环利用的
  • CountDownLatch.await 个别阻塞工作线程,所有的进行预备工作的线程执行 countDown,而 CyclicBarrier 通过工作线程调用 await 从而自行阻塞,直到所有工作线程达到指定屏障,再大家一起往下走。
  • CountDownLatch 参加的线程的职责是不一样的,有的在倒计时,有的在期待倒计时完结。CyclicBarrier 参加的线程职责是一样的。
  • 在管制多个线程同时运行上,CountDownLatch 能够不限线程数量,而 CyclicBarrier 是固定线程数。
  • CyclicBarrier 还能够提供一个 barrierAction,合并多线程计算结果。

3. Semaphore

3.1 简介

Semaphore也叫信号量,在 JDK1.5 被引入,能够用来管制同时拜访特定资源的线程数量,通过协调各个线程,以保障正当的应用资源。

Semaphore 外部保护了一组虚构的许可,许可的数量能够通过构造函数的参数指定。

  • 拜访特定资源前,必须应用 acquire 办法取得许可,如果许可数量为 0,该线程则始终阻塞,直到有可用许可。
  • 拜访资源后,应用 release 开释许可。

Semaphore是一种在多线程环境下应用的设施,该设施负责协调各个线程,以保障它们可能正确、正当的应用公共资源的设施,也是操作系统中用于管制进程同步互斥的量。Semaphore 是一种计数信号量,用于治理一组资源,外部是基于 AQS 的共享模式。它相当于给线程规定一个量从而管制容许流动的线程数。

​ 能够用来管制同时拜访特定资源的线程数量,它通过协调各个线程,以保障正当的应用公共资源

3.2 举例

​ 这外面令牌就像停车位一样,来了十辆车,停车位只有三个,只有三辆车可能进行,只有等其余车开走后,其余车能力开进去,和锁的不一样的中央是,锁一次只能进入一辆车,然而 Semaphore 容许一次进入很多车,这个令牌是能够调整的,随时能够增减令牌。

3.3 利用场景

Semaphore 是 synchronized 的加强版,作用是控制线程的并发数量。就这一点而言,单纯的 synchronized 关键字是实现不了的。

​ Semaphore 能够用于做流量管制,特地是专用资源无限的利用场景,比方数据库连贯。如果有一个需要,要读取几万个文件的数据,因为都是 IO 密集型工作,咱们能够启动几十个线程并发地读取,然而如果读到内存后,还须要存储到数据库中,而数据库的连接数只有 10 个,这时咱们必须管制只有 10 个线程同时获取数据库连贯保留数据,否则会报错无奈获取数据库连贯。这个时候,就能够应用 Semaphore 来做流量管制

3.4 工作原理

以一个停车场是运作为例,为了简略起见,假如停车场只有三个车位,一开始三个车位都是空的。

​ 这时如果同时来了五辆车,看门人容许其中三辆不受妨碍的进入,而后放下车拦,剩下的车则必须在入口期待,尔后来的车也都不得不在入口处期待。

​ 这时,有一辆车来到停车场,看门人得悉后,关上车拦,放入一辆,如果又来到两辆,则又能够放入两辆,如此往返。

​ 这个停车零碎中,每辆车就好比一个线程,看门人就好比一个信号量,看门人限度了能够流动的线程,如果外面仍然是三个车位,然而看门人扭转了规定,要求每次只能停两辆车,那么一开始进入两辆车,前面得等到有车来到能力有车进入,然而得保障最多停两辆车。

​ 对于 Semaphore 类而言,就如同一个看门人,限度了可流动的线程数。

3.5 构造方法

创立具备给定许可数的计数信号量并设置为非偏心信号量

查看 Semaphore 源码发现他有这两个构造方法

public Semaphore(int permits)
public Semaphore(int permits, boolean fair)
3.5.1 参数介绍
  • permits 是设置同时容许通过的线程数
  • fair 等于 true 时,创立具备给定许可数的计数信号量并设置为偏心信号量。

3.6 其余办法

Semaphore 类外面还有一些重要的办法

// 从此信号量获取一个许可火线程将始终阻塞。相当于一辆车占了一个车位
public void acquire() throws InterruptedException
    
// 从此信号量获取给定数目许可,在提供这些许可前始终将线程阻塞。比方 n =2,就相当于一辆车占了两个车位。public void acquire(int permits) throws InterruptedException
    
// 开释一个许可,将其返回给信号量。就如同车开走返回一个车位。public void release()   
    
// 获取以后可用许可数    
public void release(int permits)   
    
 // 获取以后可用许可数
public int availablePermits()

3.7 示例代码

共有 5 个车位然而有 100 个线程进行占用,车停几秒后会来到,开释车位给其余线程。

package chapter02.semaphore;

import java.util.Random;
import java.util.concurrent.*;

public class SemaphoreTest {private static ExecutorService executorService = Executors.newCachedThreadPool();

    private static Random random = new Random();

    // 阻塞队列
    private static BlockingQueue<String> parks = new LinkedBlockingQueue<>(5);


    public static void execute(Semaphore semaphore) {
        // 获取一个随机数
        long sleepTime = random.nextInt(10);
        long threadId = Thread.currentThread().getId();
        String park = null;
        try {
            /**
             * 获取许可,首先判断 semaphore 外部的数字是否大于 0,如果大于 0,* 能力取得许可,而后将初始值 5 减去 1,线程才会接着去执行;如果没有
             * 取得许可(起因是因为曾经有 5 个线程取得到许可,semaphore 外部的数字为 0),* 线程会阻塞直到曾经取得到许可的线程,调用 release()办法,开释掉许可,* 也就是将 semaphore 外部的数字加 1,该线程才有可能取得许可。*/
            semaphore.acquire();
            /**
             *  对应的线程会到阻塞对,对应车辆去获取到车位,如果没有拿到统一阻塞,*  直到其余车辆偿还车位。*/
            park = parks.take();
            System.out.println("线程 ID" + threadId + ", 开始占用车位:" + park + ",以后残余车位" + semaphore.availablePermits());

        } catch (InterruptedException e) {e.printStackTrace();
        }
        try {
            // 睡眠随机秒
            Thread.sleep(sleepTime * 1000);
        } catch (InterruptedException e) {e.printStackTrace();
        }
        // 偿还车位
        parks.offer(park);
        System.out.println("线程 ID" + threadId + ", 开始偿还车位:" + park + ", 共占用" + sleepTime + "秒");
        // 线程开释掉许可,艰深来将就是将 semaphore 外部的数字加 1
        semaphore.release();}

    public static void main(String[] args) {
        // 初始化线程数量
        int threadNum = 100;
        parks.offer("车位一");
        parks.offer("车位二");
        parks.offer("车位三");
        parks.offer("车位四");
        parks.offer("车位五");


        // 初始化 5 个许可证
        Semaphore semaphore = new Semaphore(5);
        // 能够提前开释然而车位就会被多个线程同时占用
        //semaphore.release(5);
        for (int i = 0; i < threadNum; i++) {executorService.submit(() -> {execute(semaphore);
            });
        }
    }
}

3.8 注意事项

即便创立信号量的时候,指定了信号量的大小,然而在通过 release()操作开释信号量依然能开释超过配置的大小,也就有可能同时执行的线程数量比最开始设置的要大,没有任何线程获取信号量的时候,仍然可能开释并且开释的无效。

​ 举荐的做法是一个线程先 acquire 而后 release,如果开释线程和获取线程不是同一个,那么最好保障这种对应关系。不要开释过多的许可证。

4. Fork/Join

4.1 简介

java 下多线程的开发能够咱们本人启用多线程,线程池,还能够应用 forkjoin,forkjoin 能够让咱们不去理解诸如 Thread,Runnable 等相干的常识,只有遵循 forkjoin 的开发模式,就能够写出很好的多线程并发程序

​ Fork/Join 框架是 Java7 提供了的一个用于并行执行工作的框架,是一个把大工作宰割成若干个小工作,最终汇总每个小工作后果后失去大工作后果的框架。

​ Fork/Join 框架是一个实现了 ExecutorService 接口的多线程处理器。它能够把一个大的工作划分为若干个小的工作并发执行,充分利用可用的资源,进而进步利用的执行效率。

Fork/Join 框架简化了并行程序的起因有:

  • 它简化了线程的创立,在框架中线程是主动被创立和治理。
  • 它主动应用多个处理器,因而程序能够扩大到应用可用处理器。

4.2 举例

​ 就像我须要解决一百万行的 excel,一般的解决是一个一个的 excel 进行解决,然而应用 Fork/Join 框架后的解决形式呢,退出咱们定义 100 条数据为一个批次,那么 Fork/Join 就会拆分这个 excel 先到两头拆分成各有 50 万的数据,而后还比 100 大就持续拆分,一直的细分,最初分到了每一个线程分失去了 100 条而后才开始执行。

4.3 分而治之

“分而治之”始终是一个无效的解决大量数据的办法。驰名的 MapReduce 也是采取了分而治之的思维。

​ 简略来说,就是如果你要解决 1000 个数据,然而你并不具备解决 1000 个数据的能力,那么你能够只解决其中的 10 个,而后,分阶段解决 100 次,将 100 次的后果进行合成,那就是最终想要的对原始的 1000 个数据的处理结果。

​ 同时 forkjoin 在解决某一类问题时十分的有用,哪一类问题?分而治之的问题。十大计算机经典算法:疾速排序、堆排序、归并排序、二分查找、线性查找、深度优先、广度优先、Dijkstra、动静布局、奢侈贝叶斯分类,有几个属于分而治之?3 个,疾速排序、归并排序、二分查找,还有大数据中 M / R 都是。

4.3.1 分治法的设计思维

​ 将一个难以间接解决的大问题,宰割成一些规模较小的雷同问题,以便各个击破,分而治之。

4.3.2 分治策略

​ 对于一个规模为 n 的问题,若该问题能够容易地解决(比如说规模 n 较小)则间接解决,否则将其合成为 k 个规模较小的子问题,这些子问题相互独立且与原问题模式雷同 ( 子问题相互之间有分割就会变为动静标准算法),递归地解这些子问题,而后将各子问题的解合并失去原问题的解。这种算法设计策略叫做分治法。

4.4 Fork-Join 原理

​ Fork/Join 实现了 ExecutorService,所以它的工作也须要放在线程池中执行。它的不同在于它应用了工作窃取算法,闲暇的线程能够从满负荷的线程中窃取工作来帮忙执行。

​ 因为线程池中的每个线程都有一个队列,而且线程间互不影响,那么线程每次都从本人的工作队列的头部获取一个工作进去执行。如果某个时候一个线程的工作队列空了,而其余的线程工作队列中还有工作,那么这个线程就会从其余线程的工作队列中取一个工作进去帮忙执行。就像偷取了其他人的工作一样

4.4.1 工作宰割和合并

Fork/Join 框架的根本思维就是将一个大工作合成(Fork)成一系列子工作,子工作能够持续往下合成,当多个不同的子工作都执行实现后,能够将它们各自的后果合并(Join)成一个大后果,最终合并成大工作的后果

咱们看上面这个图

​ 首先 main Task 先 fork 成 0,1 两个工作 接着,因为还是太大,持续 fork 成 0-0,0-1,1-0,1-1 而后进行计算计算实现后进行 join 操作,0-0,1-1 join 到 0, 1-0,1-1 join 到 1 而后 0 和 1 持续 join 到 mainTask,实现计算工作。

4.4.2 工作密取

即以后线程的 Task 曾经全被执行结束,则主动取到其余线程的 Task 池中取出 Task 继续执行即如果一个工作线程没有事件要做,它能够从其余依然繁忙的线程窃取工作。

​ ForkJoinPool 中保护着多个线程(个别为 CPU 核数)在一直地执行 Task,每个线程除了执行本人职务内的 Task 之外,还会依据本人工作线程的闲置状况去获取其余忙碌的工作线程的 Task,如此一来就能可能缩小线程阻塞或是闲置的工夫,进步 CPU 利用率。

4.5 相干子类

​ 咱们曾经很分明 Fork/Join 框架的需要了,那么咱们能够思考一下,如果让咱们来设计一个 Fork/Join 框架,该如何设计?这个思考有助于你了解 Fork/Join 框架的设计。

​ 第一步宰割工作。首先咱们须要有一个 fork 类来把大工作宰割成子工作,有可能子工作还是很大,所以还须要不停的宰割,直到宰割出的子工作足够小。

​ 第二步执行工作并合并后果。宰割的子工作别离放在双端队列里,而后几个启动线程别离从双端队列里获取工作执行。子工作执行完的后果都对立放在一个队列里,启动一个线程从队列里拿数据,而后合并这些数据。

Fork/Join 应用两个类来实现以上两件事件:

##### 4.5.1 ForkJoinTask

​ 咱们要应用 ForkJoin 框架,必须首先创立一个 ForkJoin 工作。它提供在工作中执行 fork() 和 join() 操作的机制,通常状况下咱们不须要间接继承 ForkJoinTask 类,而只须要继承它的子类,Fork/Join 框架提供了以下两个子类:

4.5.1.1 RecursiveAction

用于没有返回后果的工作

4.5.1.2 RecursiveTask

用于有返回后果的工作。

4.5.2 ForkJoinPool

​ ForkJoinTask 须要通过 ForkJoinPool 来执行,工作宰割出的子工作会增加到当前工作线程所保护的双端队列中,进入队列的头部。当一个工作线程的队列里临时没有工作时,它会随机从其余工作线程的队列的尾部获取一个工作

4.6 Fork/Join 应用

​ Task 要通过 ForkJoinPool 来执行,应用 submit 或 invoke 提交,两者的区别是:invoke 是同步执行,调用之后须要期待工作实现,能力执行前面的代码;submit 是异步执行,join()和 get 办法当工作实现的时候返回计算结果

​ 在咱们本人实现的 compute 办法里,首先须要判断工作是否足够小,如果足够小就间接执行工作。如果不足够小,就必须宰割成两个子工作,每个子工作在调用 invokeAll 办法时,又会进入 compute 办法,看看以后子工作是否须要持续宰割成孙工作,如果不须要持续宰割,则执行以后子工作并返回后果。应用 join 办法会期待子工作执行完并失去其后果。

4.6.1 工作的提交逻辑

fork/join 其实大部分逻辑解决操作都集中在提交工作和解决工作这两块,理解工作的提交基本上前面就很容易了解了,fork/join 提交工作次要分为两种:

4.6.1.1 第一次提交到 forkJoinPool
// 创立初始化工作
SubmitTask submitTask = new SubmitTask(start, end);
// 将初始工作扔进连接池中执行
forkJoinPool.invoke(submitTask);
4.6.1.2 工作切分之后的提交
// 没有达到阈值 计算一个两头值
long mid = (start + end) / 2;
// 拆分 右边的
SubmitTask left = new SubmitTask(start, mid);
// 拆分左边的
SubmitTask right = new SubmitTask(mid + 1, end);
// 增加到工作列表
invokeAll(left, right);
4.6.1.3 合并工作
// 合并后果并返回
return left.join() + right.join();
4.6.1.4 代码案例
package chapter02.forkjoin;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

/**
 * 计算 0-10000 阶乘
 */
public class SubmitTask extends RecursiveTask<Long> {
    /**
     * 起始值
     */
    private long start;
    /**
     * 完结值
     */
    private long end;
    /**
     * 阈值
     */
    private long threshold = 10L;

    public SubmitTask(long start, long end) {
        this.start = start;
        this.end = end;
    }

    /**
     * 计算逻辑
     * 进行工作的拆分 以及 达到阈值的计算
     *
     * @return
     */
    @Override
    protected Long compute() {
        // 校验是否达到了阈值
        if (isLessThanThreshold()) {
            // 解决并返回后果
            return handle();} else {
            // 没有达到阈值 计算一个两头值
            long mid = (start + end) / 2;
            // 拆分 右边的
            SubmitTask left = new SubmitTask(start, mid);
            // 拆分左边的
            SubmitTask right = new SubmitTask(mid + 1, end);
            // 增加到工作列表
            invokeAll(left, right);
            // 合并后果并返回
            return left.join() + right.join();
        }
    }

    /**
     * 解决的工作
     *
     * @return
     */
    public Long handle() {
        long sum = 0;
        for (long i = start; i <= end; i++) {
            sum += i;
            try {Thread.sleep(1);
            } catch (InterruptedException e) {e.printStackTrace();
            }
        }
        return sum;
    }

    /* 是否达到了阈值 */
    private boolean isLessThanThreshold() {return end - start <= threshold;}

    /**
     * forkJoin 形式调用
     *
     * @param start
     * @param end
     */
    public static void forkJoinInvok(long start, long end) {
        long sum = 0;
        long currentTime = System.currentTimeMillis();
        // 创立 ForkJoinPool 连接池
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        // 创立初始化工作
        SubmitTask submitTask = new SubmitTask(start, end);
        // 将初始工作扔进连接池中执行
        forkJoinPool.invoke(submitTask);

        //forkJoinPool.submit(submitTask);
        // System.out.println("异步形式,工作完结才会调用该办法, 以后耗时"+(System.currentTimeMillis() - currentTime));
        // 期待返回后果
        sum = submitTask.join();
        //forkjoin 调用形式耗时
        System.out.println("forkJoin 调用:result:" + sum);
        System.out.println("forkJoin 调用耗时:" + (System.currentTimeMillis() - currentTime));
    }

    /**
     * 一般形式调用
     *
     * @param start
     * @param end
     */
    public static void normalInvok(long start, long end) {
        long sum = 0;
        long currentTime = System.currentTimeMillis();
        for (long i = start; i <= end; i++) {
            sum += i;
            try {Thread.sleep(1);
            } catch (InterruptedException e) {e.printStackTrace();
            }
        }
        // 一般调动形式耗时
        System.out.println("一般调用:result:" + sum);
        System.out.println("一般调用耗时:" + (System.currentTimeMillis() - currentTime));
    }

    public static void main(String[] args) {
        // 起始值的大小
        long start = 0;
        // 完结值的大小
        long end = 10000;
        //forkJoin 调用
        forkJoinInvok(start, end);
        System.out.println("========================");
        // 一般调用
        normalInvok(start, end);
    }
}

本文由 传智教育博学谷狂野架构师 教研团队公布。

如果本文对您有帮忙,欢送 关注 点赞 ;如果您有任何倡议也可 留言评论 私信,您的反对是我保持创作的能源。

转载请注明出处!

退出移动版