关于多线程:Java多线程编程范式一-协作范式

51次阅读

共计 11815 个字符,预计需要花费 30 分钟才能阅读完成。

前言

原本本篇有个前置文章,然而有点卡文,所以本篇放大了须要的前置内容,浏览本篇须要晓得线程、线程池的概念。

Java 中任意一段代码在执行的时候都在一个线程当中。

CountDownLatch 示例

假如你须要在某个办法中,前面的操作你委托给了线程池进行解决,然而你心愿提交给线程池的工作处理完毕,办法才接着执行,这也就是线程相互期待:

public static void main(String[] args) {
    // 执行 main 办法的是 main 线程 
    doSomeThing();
    // 心愿将 requestThread 办法委托给线程池解决
    // doThing 办法在 requestThread 办法执行之后执行
    requestThread();
    doThing();}

咱们就能够这么写:

public class CountDownLatchDemo {

    /**
     * 举荐用 ThreadPoolExecutor 来创立线程池
     */
  private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(4);

  public static void main(String[] args) {doSomeThing();
        // 心愿将 requestThread 办法委托给线程池解决
        // doThing 办法在 requestThread 办法执行之后执行
          // 构造函数代表只有一个工作须要委托给线程池来实现
        CountDownLatch countDownLatch = new CountDownLatch(1);
        EXECUTOR_SERVICE.submit(()->{
            try {requestThread();
            }catch (Exception e){// 做日志输入}finally {      
                //countDown 办法代表实现一个工作
                countDownLatch.countDown();}
        });
        try {
            // 如果 requestThread 办法没被执行完,count 的调用次数不等于咱们在构造函数中给定的次数
            // 调用此行代码的线程会陷入阻塞
            countDownLatch.await();} catch (InterruptedException e) {
            // 做日志输入
            e.printStackTrace();}
        doThing();}
}    

下面这种是工作实现了接着执行上面的代码,其实咱们也能够反着用,假如在某场景下,多个线程须要期待一个线程的后果,相似于赛跑,主线程是给信号的人,其余线程是运动员:

public class CountDownLatchDemo {

    /**
     * 举荐用 ThreadPoolExecutor 来创立线程池
     */
  private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(4);

    public static void main(String[] args) {
        // 一共四个运动员
        CountDownLatch countDownLatch = new CountDownLatch(1);
        playerWait(countDownLatch);
        System.out.println(Thread.currentThread().getName()+"裁判开枪:");
        countDownLatch.countDown();
        EXECUTOR_SERVICE.shutdown();}

    private static void playerWait(CountDownLatch countDownLatch) {
        // 一共四个运动员
        for (int i = 0; i < 4 ; i++) {EXECUTOR_SERVICE.submit(()->{
                try {countDownLatch.await();
                } catch (InterruptedException e) {e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+"听到了枪声开始跑");
            });
        }
    }

下面两种其实就是 CountDownLatch 的经典用法:

  • 一个线程期待线程池的工作处理完毕再接着往下执行
  • 线程池的工作期待主线程执行结束再接着往下执行。

总结一下 CountDownLatch,CountDownLatch 的构造方法用于指定工作数,调用 countDown 办法一次代表实现一个工作,如果没有实现所有工作数,调用 await 办法的线程会陷入阻塞,实现了所有工作数之后,调用 await 办法陷入阻塞的线程会被唤醒。

一个线程池期待线程池的工作处理完毕再接着往下执行,其实还有另一种简洁的写法,即用 CompletabeFuture 来写。

CompletableFuture 示例

计算模型

public static void main(String[] args) {
    // doSomeThing  requestThread doThing 都是空办法
    // 不给出具体实现
    doSomeThing(); 
    CompletableFuture.runAsync(()->{requestThread();
    },EXECUTOR_SERVICE).exceptionally(e->{System.out.println(e);
        return null;
    }).join();
    doThing();
    EXECUTOR_SERVICE.shutdown();}

CompletableFuture 办法中的 runAsync 中第一个参数是 Runnable,第一个参数是 Executor。代表咱们批示 CompletableFuture 用咱们提供的线程池执行 Runable 工作。在 runAsync 办法中产生异样进入 exceptionally 中。join 办法用于获取 runAsnc 的计算结果,如果计算未完会陷入期待。get 办法和 join 办法相似都是用于获取最初的计算结果,然而 get 办法强制要求解决计算过程中的异样,也就是 get 办法上有查看异样,join 上是未查看性异样。

CompletableFuture 提供的线程计算模型是丰盛的,粗略的能够这么说,咱们有一个工作咱们将其宰割为若干个阶段,不同的阶段委托给不同的线程进行解决,有点流水线的感觉。假如咱们有两个运算,咱们将这两个运算给线程池,然而咱们心愿这两个运算实现触发某个办法或者是做个某操作,或者是回调。计算模型如下图所示:

这正是 CompletableFuture 的典型利用场景,咱们用代码模仿如下:

public class CompletableFutureDemo {
    // 理论我的项目中举荐用 ThreadPoolExecutor 创立线程池
    private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(4);
    
    public static void main(String[] args) {CompletableFuture<String> operationTwoTask = CompletableFuture.supplyAsync(() -> operationTwo(),EXECUTOR_SERVICE);
        CompletableFuture<String> operationOneTask = CompletableFuture.supplyAsync(() -> operationOne(),EXECUTOR_SERVICE);
        String finalResult = operationOneTask.thenCombineAsync(operationTwoTask, (o1, o2) -> {System.out.println(o2);
            System.out.println(o1);
            StringBuilder stringBuilder = new StringBuilder();
            stringBuilder.append(o1).append(o2);
            System.out.println("合并运算一和运算二的后果:" + stringBuilder.toString());
            return stringBuilder.toString();},EXECUTOR_SERVICE).join();
        System.out.println(finalResult);
        EXECUTOR_SERVICE.shutdown();}

    private static String operationTwo() {System.out.println("运算二实现");
        return "运算二实现";
    }

    private static String  operationOne() {
        try {TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {e.printStackTrace();
        }
        System.out.println("运算一实现");
        return "运算一实现";
    }
}

有同学看到这里可能会问,那如果我只须要解决这两个运算值,只须要在回调外面解决不须要返回,那 thenCombineAsync 不是满足不了我的需要了吗?对啊,这个不满足你需要,你能够用 thenAcceptBoth 就行了,CompletableFuture 合并后果的办法如下:

public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,
 BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor) 
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action)    
public <U> CompletableFuture<Void> thenAcceptBothAsync(
 CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action)    
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action, Executor executor)    

如果不相熟函数式编程,可能看这些办法签名会有些头疼,然而本篇只要求你懂一点 Java 中的 Lambda 函数的基本知识即可。第二个参数用于接两个运算的值,第一个参数给 CompletableFuture 的 supplyAsync 的返回就行。BiFunction 和 BiConsumer 这两个函数式接口的区别在于,BitFunction 要求给的 Lambda 函数有返回值,而 BitConsumer 是无返回值的。第三个参数如果不给的话,如果用的办法是带 Async 的,则会启用 CompletableFuture 内置的线程池,不举荐应用 CompletableFuture 内置的线程池来执行合并行为。不带 Async 的是用前两个运算中率先实现工作的线程来执行合并运算。

下面咱们讲的是计算模型是 CompletableFuture 反对的一种模型: 二元依赖。事实上 CompletableFuture 能够反对零元依赖、一元依赖、二元依赖、多元依赖。依赖的意思是某个回调须要满足某些条件能力触发,零元依赖是不须要任何依赖,间接异步执行一个办法,像上面这样:

 public static void main(String[] args) {
        // 线程池用的还是上文的线程池, 这里不再贴出
        // 创立零元依赖的第一种形式
        // 第一个参数用的是 Lambda 表达式 传入的办法体就是打印并返回 hello world
        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {System.out.println("hello world");
            return "hello world";
        }, EXECUTOR_SERVICE);
        // 形式二 通过静态方法创立
        CompletableFuture<String> cf2 = CompletableFuture.completedFuture("complete");
        System.out.println(cf2);
        // 形式三 经常被用来将一个一般的回调办法转换为 CompletableFuture, 纳入到 CompletableFuture 的编排中
        CompletableFuture cf3 = new CompletableFuture();
        cf3.complete("complete");
        System.out.println(cf3);
}

其实上周这篇文章没发的一个起因就是我不了解 CompletableFuture 的 completedFuture、complete 办法,我不了解这两个办法是做啥用的,因为我将其类比到了 Stream 流的收集操作,认为是最终的工作实现之后会触发这个办法,但这个办法接管的是人一个值,这里我就又想不通了。其实咱们能够将 complete 办法能够了解为工作的终点。后面的例子一个典型的例子就是咱们晓得工作的终点,所以依据工作的不同类型抉择 runAsync(无返回值给下个阶段做运算),supplyAsync(有返回值给下个阶段做运算), 然而有一种计算模型是被咱们所疏忽的,即内部告诉咱们开始计算:

private static void noticeCompute() {
   //EXECUTOR_SERVICE 是一个线程池
    CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.thenAcceptAsync(lastResult->{System.out.println(lastResult);
    },EXECUTOR_SERVICE);
  // ompletableFuture.complete("hello world");  语句一
}

其实能够间接看出下面的代码什么也不会输入,起因在于 thenAcceptAsync 并未接管到值,咱们解除语句一的正文,就能输入 hello world。我将 complete 办法了解为告诉,像是多米诺骨牌的第一张倒下的牌。下面咱们介绍的是二元依赖,这是一个比拟显明的例子,二元依赖的合作工作都能实现,那就意味着能实现一元依赖或者多元依赖运算。所谓一元依赖运算指的是咱们有两个执行办法,权且称之为办法一和办法二,办法一是工作的终点,办法二依赖办法一的后果,更为艰深一点的介绍是办法一的返回后果是办法二的入参,如上面的代码所示:

 private static void oneDemo() {String methodOneResult = methodOne();
    methodTwo(methodOneResult);
 }

如果想将办法 methodOne、methodTwo 办法都各自委托给一个线程来实现下面的工作,咱们用 CompletableFuture 能够这么写

private static void oneAsyncDemo() {CompletableFuture.supplyAsync(() -> methodOne(),EXECUTOR_SERVICE).thenAcceptAsync(lastResult-> methodTwo(lastResult),EXECUTOR_SERVICE);
    }

有同学看到这里可能会说,thenAcceptAsync 承受的 lambda 表达式没有返回值,那如果我还想用 methodTwo 办法的返回值怎么办,CompletableFuture 也早有筹备,能够用 thenSupplyAsync 接口。其实看到这里咱们曾经能够大抵总结进去 CompletableFuture 办法的法则了:

  • 带 supply 办法要求给的 lambda 表达式有返回值
  • 带 accept 办法要求给的 lambda 表达式无返回值
  • 办法名带 async 的会启用一个线程去执行传入的 lambda 表达式
  • 合并两个线程的计算结果, 带 combine 承受的 lambda 表达式的要求有返回值,带 accept 承受的 lambda 表达式不要求有返回值。
  • join 和 get 办法都用来获取计算结果,get 办法有查看异样,要求开发者强制解决。

    上面咱们来介绍多元依赖运算,所谓多元依赖运算艰深的讲就是咱们四个办法: 办法一、办法二、办法三、办法四。办法四须要在办法一、办法二、办法三执行结束之后才执行,同时依据三个办法的返回值进行运算, 同步的写法如下图所示:

 private static void multiDemo() {String resultOne = methodOne();
        String resultTwo = methodTwo("");
        String resultThree = methodThree("");
        methodFour(resultOne,resultTwo,resultThree);  
 }

下面的 methodFour 办法是用三个办法的返回值作为入参,如果将下面四个办法都各自委托给一个线程进行计算,咱们能够这么做:

private static void multiAsyncDemo() {CompletableFuture<String> oneFuture = CompletableFuture.supplyAsync(() -> methodOne(),EXECUTOR_SERVICE);
        CompletableFuture<String> twoFuture = CompletableFuture.supplyAsync(() -> methodTwo(""),EXECUTOR_SERVICE);
        CompletableFuture<String> threeFuture = CompletableFuture.supplyAsync(() -> methodThree(""),EXECUTOR_SERVICE);
        CompletableFuture<Void> cf = CompletableFuture.allOf(oneFuture, twoFuture, threeFuture);
        cf.thenAcceptAsync(o->{
            // thenAcceptAsync 在 oneFuture、twoFuture、threeFuture 计算实现之后, 才会执行
            // 所以这里能够间接调用 join 办法拿后果
            String resultOne = oneFuture.join();
            String resultTwo = twoFuture.join();
            String resultThree = threeFuture.join();
            System.out.println(resultOne+resultTwo+resultThree);
        },EXECUTOR_SERVICE);
 }

那有同学看到这里就会问了,那如果我想要 methodOne、methodTwo、methodThree 任意一个返回后果,我该怎么写,CompletableFuture 提供了 anyOf,写法示例如下图所示:

 private static void multiAsyncAnyDemo() {CompletableFuture<String> oneFuture = CompletableFuture.supplyAsync(() -> methodOne(),EXECUTOR_SERVICE);
        CompletableFuture<String> twoFuture = CompletableFuture.supplyAsync(() -> methodTwo(""),EXECUTOR_SERVICE);
        CompletableFuture<String> threeFuture = CompletableFuture.supplyAsync(() -> methodThree(""),EXECUTOR_SERVICE);
        CompletableFuture<Object> cf3 = CompletableFuture.anyOf(oneFuture, twoFuture, threeFuture);
        cf3.thenAcceptAsync(result ->{System.out.println("我是最初后果"+result);
        });
 }

其实下面的二元依赖还有一种模型被咱们所疏忽,如下图所示:

运算三接管运算一、运算二的哪一个后果都行,CompletableFuture 的示例如下图所示:

 private static void anyTwoResult() {CompletableFuture<String> oneFuture = CompletableFuture.supplyAsync(() -> methodOne(),EXECUTOR_SERVICE);
        CompletableFuture<String> twoFuture = CompletableFuture.supplyAsync(() -> methodTwo(""),EXECUTOR_SERVICE);
        oneFuture.acceptEither(twoFuture,anyResult->{System.out.println(anyResult);
        });
 }

异样解决

到当初为止咱们只是在第一个例子中用 CompletableFuture 的 exceptionally 解决过合作过程中的异样,exceptionally 的特点是处理过程中有异样就解决,无异样就不执行,那如果咱们想让异样解决阶段必不可少呢,CompletableFuture 提供了 handle 办法供咱们应用, handle 办法事无论异样是否产生都会执行,示例如下:

 private static void handleDemo() {CompletableFuture.supplyAsync(()->{
            int i = 1 / 0;
            System.out.println("hello world");
            return "hello world";
        }).handle((result, exception)->{if (exception != null){System.out.println("果然呈现了异样");
                return "产生了异样";
            }else {return result;}
        }).thenAccept(o->{System.out.println(o);
        });
}

那如果我并不想返回值呢,handle 办法要求提供的 lambda 办法有返回值,如果不想给返回值能够用 whenComplete, 如下图所示:

private static void whenCompleteDemo() {CompletableFuture.supplyAsync(()->{
        int i = 1 / 0;
        System.out.println("hello world");
        return "hello world";
    }).whenComplete((result,ex)->{if (ex != null){ex.printStackTrace();
        }else {System.out.println(result);
        }
    });
}

总结一下 CompletableFuture

CompletableFuture 为咱们提供的计算模型如下

  • 咱们本人指定计算终点
  • 由内部抉择开始执行计算模型的机会
  • 合并多个计算结果或抉择任意一个计算结果

Semaphore 示例

Semaphore 象征信号量,我集体将其了解为通行证,咱们能够用这个设置最大拜访数,举个事实的例子就是节假日风景区会涌来很多人,然而景区的接待能力是无限的,所以景区会抉择通过门票管制人数,门票卖光就须要期待其他人从风景区进去,这也相似于去饭店吃饭,饭店的人满了,就会发号,让人在饭店外期待。示例如下图所示:

private static void demo01() {
    // 一共只有两张票
    Semaphore semaphore = new Semaphore(2);
    for (int i = 0; i < 4 ; i++) {EXECUTOR_SERVICE.submit(()->{
            try {
                 // 申请票 申请一次票少一张
                // 如果票数为 0,其余线程调用会陷入阻塞。semaphore.acquire();
                System.out.println(Thread.currentThread().getName()+"进入饭店吃饭");
                System.out.println(Thread.currentThread().getName()+"走出饭店 腾出一张桌子");
                // 开释通行证, 开释通行证的时候会唤醒
                // 因调用 acquire 办法陷入阻塞的线程
                semaphore.release();} catch (InterruptedException e) {e.printStackTrace();
            }
        });
    }
}

CyclicBarrier 示例

CountDownLatch 中有两个示例: 一个是一个等多个,即线程在执行以后办法的时候,为了晋升整体的运行速度,抉择将一些工作提交给线程池解决,等线程池将工作处理完毕再接着往下执行。第二个则是多等一,多个线程先调用 await 办法,某个线程解决实现调用 countDown。CyclicBarrier 提供的合作模型跟 CountDownLatch 略有不同,CyclicBarrier 实现的是相似于收集龙珠,每个人负责收集一颗龙珠,收集到龙珠之后,进行期待,所有人收集龙珠结束,大家就都能够喊出号召神龙了:

public class CyclicBarrierDemo {public static void main(String[] args) {CyclicBarrier cyclicBarrier = new CyclicBarrier(7);
        for(int i = 1;i <= 7; i++) {
            int finalI = i;
            EXECUTOR_SERVICE.submit(() -> {System.out.println(Thread.currentThread().getName() + "\t 收集到第" + finalI + "颗龙珠");
                try {
                    // 留神线程池的外围线程数要求是 7 个
                    // 此处须要七次调用。// 号召神龙能力输入
                    cyclicBarrier.await();
                    System.out.println("**** 号召神龙");
                } catch (InterruptedException e) {e.printStackTrace();
                } catch (BrokenBarrierException e) {e.printStackTrace();
                }
            });
        }
    }
}

写在最初

咱们比照一下 CountDownLatch、Semaphore、CyclicBarrier。这三个都能够用来实现线程之间进行相互期待,在下面的示例中咱们抉择用 CountDownLatch 用作工作计数器,用构造函数指定总任务数,每当一个线程实现一个工作,咱们调用 countDown 办法,对实现工作减一。如果所有工作没有实现,调用 CountDownLatch 的 await 办法的线程会陷入阻塞。而 Semaphore 则是对某资源实现管制,咱们在构造函数中指定的数字就是容许线程调用 acquire 办法的次数,调用一次许可证减一,调用次数用进,其余线程无奈执行 acquire 办法之下的代码。而 CyclicBarrier 与 CountDownLatch 相似,然而 CyclicBarrier 的 await 办法相似于 CountDownLatch 的 countDown 和 await 办法的结合体,先 countDown 再调用 await,此时工作未实现,陷入阻塞。咱们能够用 CompletableFuture 来实现 CountDownLatch 相似的成果。其实原本还打算介绍一下 JDK 7 的 Phaser,然而思考到一篇外面塞太多货色也不利于排汇,就放到下一篇了。

参考资料

  • Java CompletableFuture – Exception Handling https://www.logicbig.com/tuto…

正文完
 0