强大的CompletableFuture

38次阅读

共计 5959 个字符,预计需要花费 15 分钟才能阅读完成。

引子

为了让程序更加高效,让 CPU 最大效率的工作,我们会采用异步编程。首先想到的是开启一个新的线程去做某项工作。再进一步,为了让新线程可以返回一个值,告诉主线程事情做完了,于是乎 Future 粉墨登场。然而 Future 提供的方式是主线程主动问询新线程,要是有个回调函数就爽了。所以,为了满足 Future 的某些遗憾,强大的 CompletableFuture 随着 Java8 一起来了。

Future

传统多线程的却让程序更加高效,毕竟是异步,可以让 CPU 充分工作,但这仅限于新开的线程无需你的主线程再费心了。比如你开启的新线程仅仅是为了计算 1 +…+ n 再打印结果。有时候你需要子线程返回计算结果,在主线程中进行进一步计算,就需要 Future 了。

看下面这个例子,主线程计算 2 +4+6+8+10;子线程计算 1 +3+5+7+9;最后需要在主线程中将两部分结果再相加。

public class OddNumber implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {Thread.sleep(3000);
        int result = 1 + 3 + 5 + 7 + 9;
        return result;
    }
}
public class FutureTest {public static void main(String[] args) {ExecutorService executor = Executors.newCachedThreadPool();
        OddNumber oddNumber = new OddNumber();
        Future<Integer> future = executor.submit(oddNumber);
        long startTime = System.currentTimeMillis();
        int evenNumber = 2 + 4 + 6 + 8 + 10;
        try {Thread.sleep(1000);
            System.out.println("0. 开始了:"+ (System.currentTimeMillis()-startTime) +"秒");
            int oddNumberResult = future.get();// 这时间会被阻塞
            System.out.println("1+2+...+9+10="+(evenNumber+oddNumberResult));
            System.out.println("1. 开始了:"+ (System.currentTimeMillis()-startTime) +"秒");
        } catch (Exception e) {System.out.println(e);
        }
    }
}
输出结果:0. 开始了:1001 秒
1+2+...+9+10=55
1. 开始了:3002 秒

看一下 Future 接口,只有五个方法比较简单

// 取消任务,如果已经完成或者已经取消,就返回失败
boolean cancel(boolean mayInterruptIfRunning);
// 查看任务是否取消
boolean isCancelled();
// 查看任务是否完成
boolean isDone();
// 刚才用到了,查看结果,任务未完成就一直阻塞
V get() throws InterruptedException, ExecutionException;
// 同上,但是加了一个过期时间,防止长时间阻塞,主线程也做不了事情
V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;

CompletableFuture

上面的看到 Future 的五个方法,不是很丰富,既然我们的主线程叫做 main,就应该以我为主,我更希望子线程做完了事情主动通知我。为此,Java8 带来了 CompletableFuture,一个 Future 的实现类。其实 CompletableFuture 最迷人的地方并不是极大丰富了 Future 的功能,而是完美结合了 Java8 流的新特性。

实现回调,自动后续操作

提前说一下 CompletableFuture 实现回调的方法(之一):thenAccept()

    public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {return uniAcceptStage(null, action);
    }

参数有个 Consumer,用到了 Java8 新特性,行为参数化,就是参数不一定是基本类型或者类,也可使是函数(行为),或者说一个方法(接口)。

public class OddNumberPlus implements Supplier<Integer> {
    @Override
    public Integer get() {
        try {Thread.sleep(3000);
        } catch (InterruptedException e) {e.printStackTrace();
        }
        return 1+3+5+7+9;
    }
}
public class CompletableFutureTest {public static void main(String[] args) {long startTime = System.currentTimeMillis();
        final int evenNumber = 2 + 4 + 6 + 8 + 10;
        CompletableFuture<Integer> oddNumber = CompletableFuture.supplyAsync(new OddNumberPlus());
        try {Thread.sleep(1000);
            System.out.println("0. 开始了:"+ (System.currentTimeMillis()-startTime) +"秒");
            // 看这里,实现回调
            oddNumber.thenAccept(oddNumberResult->
                        {System.out.println("1. 开始了:"+ (System.currentTimeMillis()-startTime) +"秒");
                            System.out.println("此时计算结果为:"+(evenNumber+oddNumberResult));
                        });
            oddNumber.get();} catch (Exception e) {System.out.println(e);
        }
    }
}
输出结果:0. 开始了:1006 秒
1. 开始了:3006 秒
此时计算结果为:55

值得一提的是, 本例中并没有显示的创建任务连接池,程序会默认选择一个任务连接池 ForkJoinPool.commonPool()

    private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

ForkJoinPool 始自 JDK7,叫做分支 / 合并框架。可以通过将一个任务递归分成很多分子任务,形成不同的流,进行 并行 执行,同时还伴随着强大的 工作窃取 算法。极大的提高效率。当然,你也可以自己指定连接池。

CompletableFuture 合并

Java8 的确丰富了 Future 实现,CompletableFuture 有很多方法可供大家使用,但是但从上面的例子来看,其实 CompletableFuture 能做的功能,貌似 Future。毕竟你 CompletableFuture 用 get()这个方法的时候还不是阻塞了,我 Future 蛮可以自己拿到返回值,再手动执行一些操作嘛 (虽说这样 main 方法一定很不爽)。那么接下来的事情,Future 做起来就十分麻烦了。假设我们 main 方法只做奇数合集加上偶数合集这一个操作,提前算这两个合集的操作异步交给两个子线程,我们需要怎么做呢?没错,开启两个线程,等到两个线程都计算结束的时候,我们进行最后的相加,问题在于,你怎么知道那个子线程最后结束的呢?(貌似可以做个轮询,不定的调用 isDone() 这个方法 …)丰富的 CompletableFuture 功能为我们提供了一个方法,用于等待两个子线程都结束了,再进行相加操作:

    //asyncPool 就是上面提到的默认线程池 ForkJoinPool
    public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) {return biApplyStage(asyncPool, other, fn);
    }

看个例子:

public class OddCombine implements Supplier<Integer> {
    @Override
    public Integer get() {
        try {Thread.sleep(3000);
        } catch (InterruptedException e) {e.printStackTrace();
        }
        return 1+3+5+7+9;
    }
}
public class EvenCombine implements Supplier<Integer> {
    @Override
    public Integer get() {
        try {Thread.sleep(1000);
        } catch (InterruptedException e) {e.printStackTrace();
        }
        return 2+4+6+8+10;
    }
}
public class CompletableCombineTest {public static void main(String[] args) throws Exception{CompletableFuture<Integer> oddNumber = CompletableFuture.supplyAsync(new OddCombine());
        CompletableFuture<Integer> evenNumber = CompletableFuture.supplyAsync(new EvenCombine());
        long startTime = System.currentTimeMillis();
        CompletableFuture<Integer> resultFuturn = oddNumber.thenCombine(evenNumber,(odd,even)->{return odd + even;});
        System.out.println(resultFuturn.get());
        System.out.println("0. 开始了:"+ (System.currentTimeMillis()-startTime) +"秒");
    }
}
输出结果:55
0. 开始了:3000 秒

这边模拟一个睡 1 秒,一个睡 3 秒,但是真正的网络请求时间是不定的。是不是很爽,最爽的还不是现象,而是以上操作已经利用了 Java8 流的概念。

两个子线程还不够,那么还有 anyOff() 函数,可以承受多个 CompletableFuture,会等待所有任务都完成。

    public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {return andTree(cfs, 0, cfs.length - 1);
    }

与它长的很像的,有个方法, 是当第一个执行结束的时候,就结束,后面任务不再等了,可以看作充分条件。

    public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {return orTree(cfs, 0, cfs.length - 1);
    }

在上面那个例子的基础上,把 OddNumberPlus 类时间调长一点:

public class OddNumberPlus implements Supplier<Integer> {
    @Override
    public Integer get() {
        try {Thread.sleep(5000);
        } catch (InterruptedException e) {e.printStackTrace();
        }
        return 1+3+5+7+9;
    }
}
public class CompletableCombineTest {public static void main(String[] args) throws Exception{CompletableFuture<Integer> oddNumber = CompletableFuture.supplyAsync(new OddCombine());
        CompletableFuture<Integer> evenNumber = CompletableFuture.supplyAsync(new EvenCombine());
        CompletableFuture<Integer> testNumber = CompletableFuture.supplyAsync(new OddNumberPlus());
        long startTime = System.currentTimeMillis();
        CompletableFuture<Object> resultFuturn = CompletableFuture.anyOf(oddNumber,evenNumber,testNumber);
        System.out.println(resultFuturn.get());
        System.out.println("0. 开始了:"+ (System.currentTimeMillis()-startTime) +"秒");
    }
}
输出结果:30
0. 开始了:1000 秒

小结

CompletableFuture 的方法其实还有很多,常用的比如说 runAsync(), 类似于 supplyAsync(), 只是没有返回值; 除了 thenApply()可以加回调函数以外,还有 thenApply();还有注入 runAfterBoth()、runAfterEither(),这些见名知意。还有很多,可以点开 CompletableFuture 这个类的源码仔细看一看。见微知著,透过 CompletableFuture,更加感觉到 Java8 的强大,强大的流概念、行为参数化、高效的并行理念等等,不仅让 Java 写起来更爽,还不断丰富 Java 整个生态。Java 一直在进步,所以没有被时代淘汰,我们 Javaer 也可以继续职业生涯,感谢 Java, 一起进步。

正文完
 0