关于后端:与CompletableFuture有关的一些知识

33次阅读

共计 7270 个字符,预计需要花费 19 分钟才能阅读完成。

Java1.5 引入了 Future,在 1.8 中又引入了 CompletableFuture。他的呈现能够使咱们更好的去对工作进行编排,正当的应用会极大的缩减多任务的解决工夫,达到事倍功半的目标。上面让咱们一块来看一下与它相干的一些常识。

1 Java 中的函数式编程

在看 CompletableFuture 理解一点 Java 中函数式编程相干的常识会更有用一些。

在 Java1.8 中引入了 @FunctionalInterface,同时也在java.util.function 包中引入了很多函数式接口。咱们来看几个常见的:

1.1 Consumer

Consumer 消费者,关联的办法为void accept(T t),有参无返回值。

@FunctionalInterface
public interface Consumer<T> {void accept(T t);

1.2 Function

Function 函数,关联办法为R apply(T t),有参有返回值。

@FunctionalInterface
public interface Function<T, R> {R apply(T t);

1.3 Predicate

Predicate 谓语(断言?),关联办法为boolean test(T t),有参有返回值(bool)。

@FunctionalInterface
public interface Predicate<T> {boolean test(T t);

1.4 Supplier

Supplier 供应者,关联办法为T get(),无参有返回值。

@FunctionalInterface
public interface Supplier<T> {T get();
}

2 CompletableFuture

后续的内容次要关注点会放在其外部各种 API 下面。在后续的办法中,因为 CompletableFuture 实现了 CompletionStage 使其具备链式调用能力。大多数办法都会相似如下:

  • xxx(Function func): 会上文的 同一线程 去执行当前任务
  • xxxAsync(Function func): 会启用一个 新线程 去执行当前任务
  • xxxAsync(Function func, Executor executor): 会应用 executor 线程池中的 新线程 去执行当前任务

2.1 CompletableFuture 的创立

  • supplyAsync: 以 Supplier 为参,有返回值。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
  • runAsync: 以 Runnable 为参,返回值为Void
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
  • completedFuture: 创立一个已有值且实现的 CompletableFuture。
public static <U> CompletableFuture<U> completedFuture(U value)
  • failedFuture: 创立一个已出现异常且实现的 CompletableFuture。
public static <U> CompletableFuture<U> failedFuture(Throwable ex)

能够看到 runAsyncsupplyAsync都提供了额定参数 Executor,在未指定executor 时,默认会应用 ForkJoinPool.commonPool() 来执行异步工作,而 Parallel Stream 默认状况下也会应用该线程池,共用的话可能会导致非核心业务抢占外围业务的执行。一般来说都会应用自定义线程池来执行这些工作。

2.2 CompletableFuture 的后续操作

2.2.1 执行一个工作(1)
  • apply: 有参有返回值。会将后面的返回值作为以后函数的输出。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
  • accept: 有参无返回值。会将后面的输入作为以后函数的输出。
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
  • run: 编排执行另外一个毫无相干的工作
public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)

示例

@Test
void testApply() throws ExecutionException, InterruptedException {
    final CompletableFuture<Integer> future = CompletableFuture
            .supplyAsync(() -> 1)
            .thenApply((val) -> val + 1);
    assertEquals(2, future.get());
}

@Test
void testAccept() {
    final CompletableFuture<Void> future = CompletableFuture
            .supplyAsync(() -> 1)
            .thenAccept((val) -> {assertEquals(1, val);
            });
    future.join();}

@Test
void testRun() {
    final CompletableFuture<Void> future = CompletableFuture
            .supplyAsync(() -> 1)
            .thenRun(() -> {});
    future.join();}
2.2.1 执行一个工作(2)
  • whenComplete: 当工作实现时执行该办法,将返回值及异样传入BiConsumer(有参无返回值)
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
  • exceptionally: 当相应的工作出现异常时会调用该办法,将异样作为参数传进来
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
public CompletableFuture<T> exceptionallyAsync(Function<Throwable, ? extends T> fn)
public CompletableFuture<T> exceptionallyAsync(Function<Throwable, ? extends T> fn, Executor executor)
  • handler: 接管解决后面工作的后果BiFunction(有参有返回值)
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
2.2.2 执行两个工作其中一个即可
  • applyToEither: 执行俩工作,返回其中一个未出现异常的,并将该输入作为参数传入后续Function(有参有返回值)
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor)
  • acceptEither: 执行俩工作,返回其中一个未出现异常的,并将该输入作为参数传入后续Consumer(有参无返回值)
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) 
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
  • runAfterEither: 执行俩工作,返回其中一个未出现异常的,并将该输入作为参数传入后续Runnable(无参无返回值)
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action)
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action)
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor)
2.2.3 执行两个工作全副实现
  • thenCombine: 执行两个工作,并将这俩工作作为参数输出BiFunction(有参有返回值)
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
  • thenAcceptBoth: 执行两个工作,并将这俩工作作为参数输出BiConsumer(有参无返回值)
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)
  • runAfterBoth: 执行两个工作,而后执行Runnable(无参无返回值)
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action)
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)
2.2.4 执行两个有依赖的工作
  • thenCompose: 前一个工作的后果,要给第二个工作作为参数
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)
2.2.5 组合更多的工作
  • allOf: 期待所有工作都实现
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
  • anyOf: 期待实现任意一工作即可
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

2.3 后果获取

2.3.1 join

工作实现时获取数据,或者异样时抛出异样。不过要留神的是,它抛出的异样是 (unchecked) exceptionCancellationExceptionCompletionException

public T join()
2.3.2 get

在获取数据时: get()期待工作实现而后读取后果; get(long timeout, TimeUnit unit)无限工夫内期待实现读取后果,或抛出 TimeoutException 异样; getNow(T valueIfAbsent)立刻获取后果,或者缺省值;

public T get() throws InterruptedException, ExecutionException
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException 
public T getNow(T valueIfAbsent)

3 后记

明天次要相熟了 CompletableFuture 的绝大部分 API,只有在适合的场景去应用能力死记硬背。前面会结合实际场景来讲讲如何应用它来做工作编排。


echo '5Y6f5Yib5paH56ugOiDmjpjph5Eo5L2g5oCO5LmI5Zad5aW26Iy25ZWKWzkyMzI0NTQ5NzU1NTA4MF0pL+aAneWQpihscGUyMzQp' | base64 -d

正文完
 0