乐趣区

关于java:CompletableFuture详解

CompletableFuture

前言

CompletableFuture 继承于 java.util.concurrent.Future,它自身具备 Future 的所有个性,并且基于 JDK1.8 的流式编程以及 Lambda 表达式等实现一元操作符、异步性以及事件驱动编程模型,能够用来实现多线程的串行关系,并行关系,聚合关系。它的灵活性和更弱小的性能是 Future 无法比拟的。

一、创立形式

1. 用默认线程池

CompletableFuture<String> future = new CompletableFuture<>();

默认应用 ForkJoinPool.commonPool(),commonPool 是一个会被很多工作 共享 的线程池,比方同一 JVM 上的所有 CompletableFuture、并行 Stream 都将共享 commonPool,commonPool 设计时的指标场景是运行 非阻塞的 CPU 密集型工作,为最大化利用 CPU,其线程数默认为 CPU 数量 – 1。

2. 用自定义线程池

ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 4, 3,
                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3),
                new ThreadPoolExecutor.DiscardOldestPolicy());
CompletableFuture.runAsync(() -> System.out.println("Hello World!"), pool);
复制代码

二、应用示例

1. 构建异步工作

办法

有无返回值

形容

runAsync

进行数据处理,接管前一步骤传递的数据,无返回值。

supplyAsync

进行数据处理,接管前一步骤传递的数据,解决加工后返回。返回数据类型能够和前一步骤返回的数据类型不同。

(1)runAsync

源码

 public static CompletableFuture<Void> runAsync(Runnable runnable) {return asyncRunStage(asyncPool, runnable);
    }
复制代码

示例

public static void runAsync() {
    // 应用默认线程池
    CompletableFuture cf = CompletableFuture.runAsync(() -> System.out.println("Hello World!"));
    assertFalse(cf.isDone());    
    // 应用自定义线程池
    CompletableFuture.runAsync(() -> System.out.println("Hello World!"),                                                 Executors.newSingleThreadExecutor());
    }
复制代码

(2)supplyAsync

源码

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {return asyncSupplyStage(asyncPool, supplier);
}
复制代码

示例

public static void supplyAsync() throws ExecutionException, InterruptedException {CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> {
            try {
                //ForkJoinPool.commonPool-worker- 1 线程
                System.out.println(Thread.currentThread().getName());
                Thread.sleep(3000);
            } catch (InterruptedException e) {e.printStackTrace();
            }
            return "hello";
        });
        // 阻塞期待 3 秒 
        String result = f.get();
        //main 线程
        System.out.println(Thread.currentThread().getName());
        System.out.println(result);
}
复制代码

2. 单任务后果生产

办法

有无返回值

形容

thenApply

在前一个阶段上利用 thenApply 函数,将上一阶段实现的后果作为以后阶段的入参

thenAccept

无返回值

生产前一阶段的后果

thenRun

无返回值,并且无入参

当上一阶段实现后,执行本阶段的工作

(1)thenApply

源码

public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {return uniApplyStage(asyncPool, fn);
}
复制代码

示例

public static void thenApply() throws ExecutionException, InterruptedException {CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {System.out.println(s);
        return s.toUpperCase();}).thenApply(s->{System.out.println(s);
        return s + ":body";
    });
    System.out.println(cf.get());
}
复制代码

then意味着这个阶段的动作产生以后的阶段失常实现之后。本例中,以后节点实现,返回字符串message

Apply意味着返回的阶段将会对后果前一阶段的后果利用一个函数。

函数的执行会被 阻塞

(2)thenAccept

源码

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {return asyncSupplyStage(asyncPool, supplier);
}
复制代码

示例

public static void thenAccept() throws InterruptedException {CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {return "message";}).thenAccept((consumer) -> {System.out.println(consumer);
    });
}
复制代码

(3)thenRun

源码

public CompletableFuture<Void> thenRun(Runnable action) {return uniRunStage(null, action);
}
复制代码

示例

public static void thenRun() throws InterruptedException {CompletableFuture.supplyAsync(() -> {
        // 执行异步工作
        System.out.println("执行工作");
        try {Thread.sleep(1000);
        } catch (InterruptedException e) {e.printStackTrace();
        }
        return "success";
    }).thenRun(() -> {
        // Computation Finished.
        System.out.println("上一阶段工作执行实现");
    });
    Thread.sleep(2000);
}
复制代码

3. 合并后果生产

办法

有无返回值

形容

thenCombine

合并另外一个工作,两个工作都实现后,执行 BiFunction,入参为两个工作后果,返回新后果

thenAcceptBoth

合并另外一个工作,两个工作都实现后,执行这个办法期待第一个阶段的实现(大写转换),它的后果传给一个指定的返回 CompletableFuture 函数,它的后果就是返回的 CompletableFuture 的后果,入参为两个工作后果,不返回新后果

runAfterBoth

无返回值无入参

合并另外一个工作,两个工作都实现后,执行 Runnable,留神,这里的两个工作是同时执行

(1)thenCombine

如果 CompletableFuture 依赖两个后面阶段的后果,它复合两个阶段的后果再返回一个后果,咱们就能够应用 thenCombine() 函数。整个流水线是同步的。

源码

public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,
                             BiFunction<? super T,? super U,? extends V> fn) {return biApplyStage(null, other, fn);
}
复制代码

示例

public static void thenCombine() {CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> {System.out.println("processing a...");
        return "hello";
    });
    CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> {System.out.println("processing b...");
        return "world";
    });
    CompletableFuture<String> cfC = CompletableFuture.supplyAsync(() -> {System.out.println("processing c...");
        return ", I'm CodingTao!";
    });
    cfA.thenCombine(cfB, (resultA, resultB) -> {System.out.println(resultA + resultB);  // hello world
        return resultA + resultB;
    }).thenCombine(cfC, (resultAB, resultC) -> {System.out.println(resultAB + resultC); // hello world, I'm CodingTao!
        return resultAB + resultC;
    });
}
复制代码

(2)thenAcceptBoth

源码

public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,
                          BiConsumer<? super T, ? super U> action) {return biAcceptStage(null, other, action);
}
复制代码

示例

private static void thenAcceptBoth() {CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> "resultA");
    CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> "resultB");
    cfA.thenAcceptBoth(cfB, (resultA, resultB) -> {
        //resultA,resultB
        System.out.println(resultA+","+resultB);
    });
}
复制代码

(3)runAfterBoth

源码

public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
                                            Runnable action) {return biRunStage(null, other, action);
}
复制代码

示例

private static void runAfterBoth() {CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> {
        try {Thread.sleep(5000);
        } catch (InterruptedException e) {e.printStackTrace();
        }
        System.out.println("process a");
        return "resultA";
    });
    CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> {
        try {Thread.sleep(5000);
        } catch (InterruptedException e) {e.printStackTrace();
        }
        System.out.println("process b");
        return "resultB";
    });
    cfB.runAfterBoth(cfA, () -> {
        //resultA,resultB
        System.out.println("工作 A 和工作 B 同时实现");
    });
    try {Thread.sleep(6000);
    } catch (InterruptedException e) {e.printStackTrace();
    }
}
复制代码

4. 任一后果生产

办法

有无返回值

形容

applyToEither

其中任一工作实现后,执行 Function,后果转换,入参为已实现的工作后果。返回新后果,要求两个工作后果为同一类型

acceptEither

其中任一工作实现后,执行 Consumer,生产后果,入参为已实现的工作后果。不返回新后果,要求两个工作后果为同一类型

runAfterEither

无返回值无入参

其中任一工作实现后,执行 Runnable,生产后果,无入参。不返回新后果,不要求两个工作后果为同一类型

场景

假如查问商品 a,有两种形式,A 和 B,然而 A 和 B 的执行速度不一样,心愿哪个先返回就用那个的返回值。

(1)applyToEither

源码

public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other,                                                  Function<? super T, U> fn) {return orApplyStage(null, other, fn);
}
复制代码

示例

private static void applyToEither() throws ExecutionException, InterruptedException {CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
        try {Thread.sleep(1000);
        } catch (InterruptedException e) {e.printStackTrace();
        }
        return "通过形式 A 获取商品 a";
    });
    CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> {
        try {Thread.sleep(2000);
        } catch (InterruptedException e) {e.printStackTrace();
        }
        return "通过形式 B 获取商品 a";
    });
    CompletableFuture<String> futureC = futureA.applyToEither(futureB, product -> "后果:" + product);
    // 后果: 通过形式 A 获取商品 a
    System.out.println(futureC.get());
}
复制代码

(2)acceptEither

(3)runAfterEither

5. 级联工作

办法

有无返回值

形容

thenCompose

当原工作实现后,以其后果为参数,返回一个新的工作(而不是新后果,相似 flatMap)

(1)thenCompose

这个办法期待第一个阶段的实现(大写转换),它的后果传给一个指定的返回 CompletableFuture 函数,它的后果就是返回的 CompletableFuture 的后果。

源码

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {return uniComposeStage(null, fn);
}
复制代码

示例

private static void thenCompose() {
    String original = "Message";
    CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s))
            .thenCompose(upper -> CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s))
                    .thenApply(s -> upper + s));
    // MESSAGEmessage
    System.out.println(cf.join());
}
复制代码

6. 单任务后果或异样生产

办法

有无返回值

形容

handle

工作实现后执行 BiFunction,后果转换,入参为后果或者异样,返回新后果

whenComplete

工作实现后执行 BiConsumer,后果生产,入参为后果或者异样,不返回新后果

exceptionally

工作异样,则执行 Function,异样转换,入参为原工作的异样信息,若原工作无异样,则返回原工作后果,即不执行转换

异样流程

CompletableFuture.supplyAsync(() -> "resultA")
              .thenApply(resultA -> resultA + "resultB")
              .thenApply(resultB -> resultB + "resultC")
              .thenApply(resultC -> resultC + "resultD");
复制代码

下面的代码中,工作 A、B、C、D 顺次执行,如果工作 A 抛出异样(当然下面的代码不会抛出异样),那么前面的工作都得不到执行。如果工作 C 抛出异样,那么工作 D 得不到执行。

那么咱们怎么解决异样呢?看上面的代码,咱们在工作 A 中抛出异样,并对其进行解决:

(1)handle

示例

private static void handle() {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "resultA")
            .thenApply(resultA -> resultA + "resultB")
            // 工作 C 抛出异样
            .thenApply(resultB -> {throw new RuntimeException();})
            // 解决工作 C 的返回值或异样
            .handle(new BiFunction<Object, Throwable, Object>() {
                @Override
                public Object apply(Object re, Throwable throwable) {if (throwable != null) {return "errorResultC";}
                    return re;
                }
            })
            .thenApply(resultC -> {System.out.println("resultC:" + resultC);
                return resultC + "resultD";
            });
    System.out.println(future.join());
}
复制代码

(2)whenComplete

示例

private static void whenComplete() throws ExecutionException, InterruptedException {
    // 创立异步执行工作:
    CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread()+"job1 start,time->"+System.currentTimeMillis());
        try {Thread.sleep(2000);
        } catch (InterruptedException e) { }
        if(true){throw new RuntimeException("test");
        }else{System.out.println(Thread.currentThread()+"job1 exit,time->"+System.currentTimeMillis());
            return 1.2;
        }
    });
    //cf 执行实现后会将执行后果和执行过程中抛出的异样传入回调办法
    // 如果是失常执行,a=1.2,b 则传入的异样为 null
    // 如果异样执行,a=null,b 则传入异样信息
    CompletableFuture<Double> cf2=cf.whenComplete((a,b)->{System.out.println(Thread.currentThread()+"job2 start,time->"+System.currentTimeMillis());
        try {Thread.sleep(2000);
        } catch (InterruptedException e) { }
        if(b!=null){System.out.println("error stack trace->");
            b.printStackTrace();}else{System.out.println("run succ,result->"+a);
        }
        System.out.println(Thread.currentThread()+"job2 exit,time->"+System.currentTimeMillis());
    });
    // 期待子工作执行实现
    System.out.println("main thread start wait,time->"+System.currentTimeMillis());
    // 如果 cf 是失常执行的,cf2.get 的后果就是 cf 执行的后果
    // 如果 cf 是执行异样,则 cf2.get 会抛出异样
    System.out.println("run result->"+cf2.get());
    System.out.println("main thread exit,time->"+System.currentTimeMillis());
}
复制代码

(3)exceptionally

7. 合并多个 complete 为一个

办法

形容

allOf

合并多个 complete 为一个,期待全副实现

anyOf

合并多个 complete 为一个,期待其中之一实现

(1)allOf

咱们在解决业务时,有时会有多任务异步解决,同步返回后果的状况

  • 采纳多线程执异步行某种工作,比方在不同主机查问磁盘列表信息。
  • 将执行后果收集,分组分类,解决。
  • 将解决当前的后果给予展现。

示例

 // 创立异步执行工作:
        CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread()+"start job1,time->"+System.currentTimeMillis());
            try {Thread.sleep(2000);
            } catch (InterruptedException e) { }
            System.out.println(Thread.currentThread()+"exit job1,time->"+System.currentTimeMillis());
            return 1.2;
        });
        CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread()+"start job2,time->"+System.currentTimeMillis());
            try {Thread.sleep(1500);
            } catch (InterruptedException e) { }
            System.out.println(Thread.currentThread()+"exit job2,time->"+System.currentTimeMillis());
            return 3.2;
        });
        CompletableFuture<Double> cf3 = CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread()+"start job3,time->"+System.currentTimeMillis());
            try {Thread.sleep(1300);
            } catch (InterruptedException e) { }
//            throw new RuntimeException("test");
            System.out.println(Thread.currentThread()+"exit job3,time->"+System.currentTimeMillis());
            return 2.2;
        });
        //allof 期待所有工作执行实现才执行 cf4,如果有一个工作异样终止,则 cf4.get 时会抛出异样,都是失常执行,cf4.get 返回 null
        //anyOf 是只有一个工作执行实现,无论是失常执行或者执行异样,都会执行 cf4,cf4.get 的后果就是已执行实现的工作的执行后果
        CompletableFuture cf4=CompletableFuture.allOf(cf,cf2,cf3).whenComplete((a,b)->{if(b!=null){System.out.println("error stack trace->");
                b.printStackTrace();}else{System.out.println("run succ,result->"+a);
            }
        });
        System.out.println("main thread start cf4.get(),time->"+System.currentTimeMillis());
        // 期待子工作执行实现
        System.out.println("cf4 run result->"+cf4.get());
        System.out.println("main thread exit,time->"+System.currentTimeMillis());
复制代码

获取返回值办法

public <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futuresList) {
    CompletableFuture<Void> allFuturesResult =
            CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[futuresList.size()]));
    return allFuturesResult.thenApply(v ->
            futuresList.stream().
                    map(future -> future.join()).
                    collect(Collectors.<T>toList())
    );
}
复制代码

(2)anyOf

CompletableFuture.anyOf()和其名字介绍的一样,当任何一个 CompletableFuture 实现的时候【雷同的后果类型】,返回一个新的 CompletableFuture。

示例

private static void anyOf() throws ExecutionException, InterruptedException {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {throw new IllegalStateException(e);
            }
            return "Result of Future 1";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {throw new IllegalStateException(e);
            }
            return "Result of Future 2";
        });

        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            try {TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {throw new IllegalStateException(e);
            }
            return "Result of Future 3";
        });
        CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);
        System.out.println(anyOfFuture.get()); // Result of Future 2
    }

复制代码

三、其余相干 api

1. future 接口

(1)isDone()

判断工作是否实现。三种实现状况:normally(失常执行结束)、exceptionally(执行异样)、via cancellation(勾销)

(2)get()

阻塞获取后果或抛出受检测异样,须要显示进行 try…catch 解决。

(3)get(long timeout,TimeUnit unit)

超时阻塞获取后果

(4)cancel(boolean mayInterruptIfRunning)

勾销工作,若一个工作未实现,则以 CancellationException 异样。其相干未实现的子工作也会以 CompletionException 完结

(5)isCancelled()

是否已勾销,在工作失常执行实现前勾销,才为 true。否则为 false。

2. CompletableFuture 接口

(1)join

​ 阻塞获取后果或抛出非受检异样。

(2)getNow(T valueIfAbsent)

​ 若当前任务无后果,则返回 valueIfAbsent,否则返回已实现工作的后果。

(3)complete(T value)

​ 设置工作后果,工作失常完结,之后的工作状态为已实现。

(4)completeExceptionally(Throwable ex)

​ 设置工作异样后果,工作异样完结,之后的工作状态为已实现。

(5)isCompletedExceptionally()

​ 判断工作是否异样完结。异样可能的起因有:勾销、显示设置工作异样后果、工作动作执行异样等。

(6)getNumberOfDependents()

​ 返回依赖当前任务的工作数量,次要用于监控。

(7)orTimeout(long timeout,TimeUnit unit) jdk9

​ 设置工作实现超时工夫,若在指定工夫内未失常实现,则工作会以异样 (TimeoutException) 完结。

(8)completeOnTimeout(T value,long timeout,TimeUnit unit) jdk9

​ 设置工作实现超时工夫,若在指定工夫内未失常实现,则以给定的 value 为工作后果

四、实战

1. API 网关做接口的聚合

// 这两个参数从内部取得
Long userId = 10006L;
String orderId = "XXXXXXXXXXXXXXXXXXXXXX";
// 从用户服务获取用户信息
UserInfo userInfo = userService.getUserInfo(userId);
// 从用订单务获取订单信息
OrderInfo orderInfo = orderService.getOrderInfo(orderId);
// 返回两者的聚合 DTO
return new OrderDetailDTO(userInfo,orderInfo);
复制代码

​ 上面三个内部接口的信息肯定是不相关联的,也就是能够并行获取,三个接口的后果都获取结束之后做一次数据聚合到 DTO 即可,也就是聚合的耗时大抵是这三个接口中耗时最长的接口的响应工夫

@Service
public class OrderDetailService {
    /**
     * 建设一个线程池专门交给 CompletableFuture 应用
     */
    private final ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 0, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(100));
    @Autowired
    private UserService userService;
    @Autowired
    private OrderService orderService;
    public OrderDetailDTO getOrderDetail(Long userId, String orderId) throws Exception {CompletableFuture<UserInfo> userInfoCompletableFuture = CompletableFuture.supplyAsync(() -> userService.getUserInfo(userId), executor);
        CompletableFuture<OrderInfo> orderInfoCompletableFuture = CompletableFuture.supplyAsync(() -> orderService.getOrderInfo(orderId), executor);
        CompletableFuture<OrderDetailDTO> result
                = userInfoCompletableFuture.thenCombineAsync(orderInfoCompletableFuture, OrderDetailDTO::new, executor);
        return result.get();}
}
复制代码

五、区别

(1)whenComplete 和 handle 区别

whenCompletehandle 办法就相似于 try..catch..finanllyfinally 代码块。无论是否产生异样,都将会执行的。这两个办法区别在于 handle 反对返回后果。

(2)thenApply 与 thenCompose 的异同

​ 对于 thenApply,fn 函数是一个对一个已实现的 stage 或者说 CompletableFuture 的的返回值进行计算、操作;

​ 对于 thenCompose,fn 函数是对另一个 CompletableFuture 进行计算、操作。

(3)有无 Async 的区别

没有 Async 的在 CompleteableFuture 调用它的线程定义的线程上运行,因而通常不晓得在哪里执行该线程。如果后果曾经可用,它可能会立刻执行。

​ 有 Async 的无论环境如何,都在环境定义的执行程序上运行。为此 CompletableFuture 通常 ForkJoinPool.commonPool()。

参考:《2020 最新 Java 根底精讲视频教程和学习路线!》
链接:https://juejin.cn/post/694387…

退出移动版