CompletableFuture异步编程

29次阅读

共计 4604 个字符,预计需要花费 12 分钟才能阅读完成。

CompletableFuture 有什么用

  • CompletableFuture 是用来描述多线程任务的时序关系的:串行关系,并行关系,聚合关系。
  • CompletableFuture 是 Java 8 新增加的 Api, 该类实现,Future 和 CompletionStage 两个接口,提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合 CompletableFuture 的方法。
创建 CompletableFuture 对象

方式一:使用默认线程池

/**
 * 创建一个不带返回值得任务。*/
CompletableFuture<Void> f1 = CompletableFuture.runAsync(new Runnable() {
     @Override
     public void run() {// 业务逻辑}
 });
 /**
  * 创建一个带返回值的任务。*/
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(new Supplier<String>() {
    @Override
    public String get() {
        // 业务逻辑
        return null;
    }
});

方式二:使用自定义线程池(建议使用)

 // 创建线程池
ExecutorService    executor =    Executors.newFixedThreadPool(10); 
        
// 创建一个不带返回值得任务。CompletableFuture<Void> f1 = CompletableFuture.runAsync(new Runnable() {
    @Override
    public void run() {}
},executor);
// 创建一个带返回值的任务。CompletableFuture<String> f2 = CompletableFuture.supplyAsync(new Supplier<String>() {
    @Override
    public String get() {
        // 业务逻辑
        return null;
    }
},executor);    
  1. 默认情况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的 核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ForkJoinPool 线程池的线程数)。如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I / O 操 作,就会导致线程池中所有线程都阻塞在 I / O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要 根据不同的业务类型创建不同的线程池,以避免互相干扰。
  2. 创建完 CompletableFuture 对象之后,会自动地异步执行 runnable.run()方法或者 supplier.get()方法。因为 CompletableFuture 类实现了 Future 接口,所以这两个问题你都可以通过 Future 接口来解决。另外,CompletableFuture 类还实现了 CompletionStage 接口。

常用 API

  • public T get():获取计算结果,该方法为阻塞方法会等待计算结果完成。
  • public T get(long timeout,TimeUnit unit):有时间限制的阻塞方法
  • public T getNow(T valueIfAbsent)立即获取方法结果,如果没有计算结束则返回传的值
  • public T join()和 get() 方法类似也是主动阻塞线程,等待计算结果。和 get() 方法有细微的差别。
  • public boolean complete(T value):立即完成计算,并把结果设置为传的值,返回是否设置成功
  • public boolean completeExceptionally(Throwable ex):立即完成计算,并抛出异常

理解 CompletionStage 接口

CompletionStage 接口可以清晰地描述任务之间的这种时序关系,时序关系:串行,并行,汇聚。

串行

线程与线程之间的执行顺序是串行的。

//Async 代表的是异步执行 fn
CompletionStage<R>    thenApply(fn); 
CompletionStage<R>    thenApplyAsync(fn); 
CompletionStage<Void>    thenAccept(consumer); 
CompletionStage<Void>    thenAcceptAsync(consumer); 
CompletionStage<Void>    thenRun(action); 
CompletionStage<Void>    thenRunAsync(action); 
CompletionStage<R>    thenCompose(fn); 
CompletionStage<R>    thenComposeAsync(fn);
  • CompletionStage 接口里面描述串行关系,主要是 thenApply、thenAccept、thenRun 和 thenCompose 这四 个系列的接口。

    • thenApply 系列函数里参数 fn 的类型是接口 Function<T, R>,这个接口里与 CompletionStage 相关的方法是 R apply(T t),这个方法既能接收参数也支持返回值,所以 thenApply 系列方法返回的 是 CompletionStage<R>。
  • 而 thenAccept 系列方法里参数 consumer 的类型是接口 Consumer<T>,这个接口里与 CompletionStage 相关 的方法是 void accept(T t),这个方法虽然支持参数,但却不支持回值,所以 thenAccept 系列方法返回 的是 CompletionStage<Void>。

    • thenRun 系列方法里 action 的参数是 Runnable,所以 action 既不能接收参数也不支持返回值,所以 thenRun 系列方法返回的也是 CompletionStage<Void>。
    • 这些方法里面 Async 代表的是异步执行 fn、consumer 或者 action。其中,需要你注意的是 thenCompose 系列 方法,这个系列的方法会新创建出一个子流程,最终结果和 thenApply 系列是相同的。

演示串行

//supplyAsync()启动一个异步 流程
CompletableFuture<String> f0 = CompletableFuture.supplyAsync(() -> "Hello World")                        //①        
    .thenApply(s ->    s + "girl")        //②
    .thenApply(String::toUpperCase);//③
System.out.println(f0.join());
// 输出结果 HELLO WORLD    girl

虽然这是一个异步流程,但任务①②③却是 串行执行的,②依赖①的执行结果,③依赖②的执行结果。

AND 汇聚

CompletionStage 接口里面描述 AND 汇聚关系,主要是 thenCombine、thenAcceptBoth 和 runAfterBoth 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同。

CompletionStage<R>    thenCombine(other,    fn); 
CompletionStage<R>    thenCombineAsync(other,    fn); 
CompletionStage<Void>    thenAcceptBoth(other,    consumer); CompletionStage<Void>    thenAcceptBothAsync(other,    consumer); CompletionStage<Void>    runAfterBoth(other,    action); 
CompletionStage<Void>    runAfterBothAsync(other,    action);

演示:

         // 启动一个异步流程 f1
        CompletableFuture<Void> f1 = CompletableFuture.runAsync(()->{System.out.println("异步任务 -1");

        });
        // 启动一个异步流程 f2
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{
            String s = "异步任务 -2";
            System.out.println(s);
            return    s;

        });
        // 启动一个汇聚流程 f3
        CompletableFuture<String> f3 = f1.thenCombine(f2,(a,tf)->{return tf;//tf 是 f2 的值});

        // 等待任务 3 执行结果
        System.out.println(f3.join());
OR 汇聚

CompletionStage 接口里面描述 OR 汇聚关系,主要是 applyToEither、acceptEither 和 runAfterEither 系列的 接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同。

CompletionStage    applyToEither(other,    fn);
CompletionStage    applyToEitherAsync(other,    fn);
CompletionStage    acceptEither(other,    consumer);
CompletionStage    acceptEitherAsync(other,    consumer);
CompletionStage    runAfterEither(other,    action); 
CompletionStage    runAfterEitherAsync(other,    action);

功能演示:

         // 启动一个异步流程 f1
        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(()->{int    t    =     new Random().nextInt(10);
            System.out.println("f1-t ="+t);
            sleep(t,    TimeUnit.SECONDS);
            return    String.valueOf(t);
        });
     // 启动一个异步流程 f2
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{int    t    =     new Random().nextInt(10);
            System.out.println("f2-t ="+t);
            sleep(t,    TimeUnit.SECONDS);
            return    String.valueOf(t);
        });
        // 将 f1 和 f2 的值汇总到 f3
        CompletableFuture<String> f3 = f1.applyToEither(f2,s    ->
                    Integer.parseInt(f2.join())+Integer.parseInt(s)+""
        );
        System.out.println(f3.join());

码字不易如果对你有帮助请给个关注

爱技术爱生活 QQ 群: 894109590

正文完
 0