前言

在 java 中你不理解异步编程,crud 齐全没有问题,然而有的需要你无奈优雅的实现。

js 也存在异步编程,当你了解了用同步的思维编写异步的代码时,置信你在编程上的造诣又更进一步。

大多人都在追捧微服务,可能他们只会用 RibbonFeign。微服务是一个架构上的抉择,当你没有达到架构档次时,我认为你应该更加重视业务上的代码编写,即微服务中单体服务代码的编写。

单体服务性能极差,你的微服务整体性能也好不到哪里去,只能通过限流、熔断外加多部署机器来解决并发低的问题。在你想玩微服务之前,并发玩好了再思考高并发。先把 java 中 juc 包下的并发相干的常识整的明明白白再进行下一步,这花不了几个工夫。微服务是你进阶之后再学的。

原本打算持续写 Mysql,但切实提不起来我的兴致(还须要看书钻研,毕竟是个黑盒钻研),只好拿这篇实现工作了。

本文内容

  • js 中 Promise 和 async await 的一个列子
  • SpringBoot 中异步编程
  • Future
  • CompletableFuture

js 异步编程

要习惯应用 Promise ,防止把 fn 当成参数传递,防止回调天堂。这不仅仅是 api 调用的问题,这是你编程思维转变。

const awaitFunc = function _awaitFunc() {    return Promise.resolve('awaitFunc').then(data => {        console.log(data);        return 'awaitFunc-then-return-data';    });};const async = async function _async() {    setTimeout(() => {        console.log('验证退出了宏工作队列---1');    }, 0);    // 加不加 await 有什么区别?    await awaitFunc().then(data => {        console.log(data);        setTimeout(() => {            console.log('验证退出了宏工作队列---2');        }, 0);    });    console.log('awaitFunc 执行完在打印');};async();

SpringBoot 中异步编程

在 SpringBoot @EnableAsync@Async 就能够助你异步编程。底层原理就是 ThreadPoolExecutorFuture 的封装。

<img src="http://oss.mflyyou.cn/blog/20201108120510.png?author=zhangpanqin" alt="1583165-20200710095540896-1284477865" style="zoom: 33%;" />

咱们拿这个烧水举例子,当你同步串行执行,须要耗费 20 分钟。同步编程思维模型较简略,容易实现。

当你多线程异步执行,只须要耗费 16 分钟。异步编程思维模型略微简单一点,多线程之间通信异步转同步是一个挑战。

@GetMapping("/tea/async")public RetUtil makeTeaAsync() throws InterruptedException, ExecutionException {    // Stopwatch 用于计算代码执行工夫    final Stopwatch started = Stopwatch.createStarted();    final Future asyncResult = makeTeaService.boilWater();    final Future asyncResult1 = makeTeaService.washTeaCup();    asyncResult.get();    asyncResult1.get();    final long elapsed = started.elapsed(TimeUnit.SECONDS);    String str = StrUtil.format("工作执行了 {} 秒", elapsed);    final MakeTeaVO makeTeaVO = new MakeTeaVO();    makeTeaVO.setMessage(str);    return RetUtil.success(makeTeaVO);}@Servicepublic class IMakeTeaServiceImpl implements IMakeTeaService {    @Override    @Async    public AsyncResult<String> boilWater() throws InterruptedException {        System.out.println("洗水壶");        TimeUnit.SECONDS.sleep(1);        System.out.println("烧开水");        TimeUnit.SECONDS.sleep(15);        return new AsyncResult("洗水壶->烧开水");    }    @Override    @Async    public AsyncResult<String> washTeaCup() throws InterruptedException {        System.out.println("洗茶杯");        System.out.println("洗茶壶");        System.out.println("拿茶叶");        TimeUnit.SECONDS.sleep(4);        return new AsyncResult("洗茶杯,洗茶壶,拿茶叶");    }}

AsyncResultFuture 的实现类,当调用 Future.get 会阻塞期待后果的返回。@Async 也能够指定在那个线程池中执行工作。

final Future asyncResult = makeTeaService.boilWater();final Future asyncResult1 = makeTeaService.washTeaCup();asyncResult.get();asyncResult1.get();

这个 Demo 的实现,须要调用两次 Furute.get() 算是个不优雅的实现。

@Overridepublic String makeTea() throws InterruptedException {    final CountDownLatch count = new CountDownLatch(2);    THREAD_POOL_EXECUTOR.execute(() -> {        System.out.println("洗水壶");        System.out.println("烧开水");        try {            TimeUnit.SECONDS.sleep(16);        } catch (InterruptedException e) {            e.printStackTrace();        } finally {            count.countDown();        }    });    THREAD_POOL_EXECUTOR.execute(() -> {        System.out.println("洗茶杯");        System.out.println("洗茶壶");        System.out.println("拿茶叶");        try {            TimeUnit.SECONDS.sleep(4);        } catch (InterruptedException e) {            e.printStackTrace();        } finally {            count.countDown();        }    });    count.await();    System.out.println("泡茶");    return "";}@GetMapping("/tea/async2")public RetUtil makeTeaAsync2() throws InterruptedException, ExecutionException {    final Stopwatch started = Stopwatch.createStarted();    makeTeaService.makeTea();    final long elapsed = started.elapsed(TimeUnit.SECONDS);    String str = StrUtil.format("工作执行了 {} 秒", elapsed);    final MakeTeaVO makeTeaVO = new MakeTeaVO();    makeTeaVO.setMessage(str);    return RetUtil.success(makeTeaVO);}

应用 CountDownLatch 将异步代码转换为同步返回,这只是另一个实现

Future

public interface Future<V> {    /**     * 尝试勾销这个工作的执行.     * 如果工作执行实现之后,调用 cancel 返回 false.     * 如果工作曾经被勾销了,调用 cancel 也会返回 false     *     * 如果工作曾经执行了, mayInterruptIfRunning 标记是否中断执行工作的线程.     * mayInterruptIfRunning 为 true 会触发线程的中断(当线程睡眠,会抛出异样 InterruptedException),     * 为 false 时不中断工作执行,只扭转 Future 的状态     *      * 调用了 cancel 办法,调用 get 办法会抛出异样     */    boolean cancel(boolean mayInterruptIfRunning);    /**     * 工作实现之前调用 cancel ,此办法返回 true     */    boolean isCancelled();    /**     * 工作实现返回 true     */    boolean isDone();    /**     * 期待工作实现,而后返回其后果     * @return the computed result     * @throws CancellationException if the computation was cancelled     * @throws ExecutionException    if the computation threw an    exception     * @throws InterruptedException  if the current thread was interrupted while waiting     */    V get() throws InterruptedException, ExecutionException;    /**     * 期待工作实现,而后返回其后果.超时没有返回,抛出异样 TimeoutException     */    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;}

Future.cancel(true) 会触发线程休眠的中断,即 TimeUnit.SECONDS.sleep(10); 会抛出异样。

Future.cancel(true) 或者 Future.cancel(false) 都会触发 Future.get() 异样。

public static void main(String[] args) throws ExecutionException, InterruptedException {    final Future<String> submit = THREAD_POOL_EXECUTOR.submit(() -> {        System.out.println("工作开始执行");        try {            TimeUnit.SECONDS.sleep(10);        } catch (InterruptedException e) {            e.printStackTrace();        }        System.out.println("工作执行结束");        return "ok";    });    THREAD_POOL_EXECUTOR.execute(() -> {        System.out.println("执行 submit.cancel");        try {            TimeUnit.SECONDS.sleep(2);        } catch (InterruptedException e) {            e.printStackTrace();        }        submit.cancel(false);    });    // submit.get();    System.out.println("整个流程执行完结");}

JDK 提供 Future 的实现 FutureTask 源码绝对较简略,不再开展。

CompletableFuture

因为 Future 应用的局限性:不能链式调用、多个异步计算的后果不能传递下一个异步工作(能够做到,然而编程略微简单),异步执行异样的捕捉解决

JDK 1.8 开始,大佬 Doug Lea 带来了更加容易的异步编程模型,CompletableFuture。

CompletableFuture 能够做到

1、获取异步执行的后果链式传递下一个异步去执行

2、异步执行时,你有机会解决异步执行时产生的异样

总之,CompletableFuture 很想。

CompletableFuture 实现比较复杂,有的中央不是那么容易了解,当你了解其实现思维,你也算是一只脚迈入了响应式编程中去了。

开胃小菜

public class CompletableFutureBlog1 {    public static void main(String[] args) throws ExecutionException, InterruptedException {        final Stopwatch started = Stopwatch.createStarted();        // 洗水壶,烧水        CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {            System.out.println("洗水壶");            System.out.println("烧水");            try {                TimeUnit.SECONDS.sleep(16);            } catch (InterruptedException e) {                e.printStackTrace();            }            return "洗水壶 -> 烧水";        });                // 洗茶壶,洗茶杯 -> 拿茶叶        CompletableFuture<String> completableFuture2 =                 CompletableFuture.supplyAsync(() -> {                System.out.println("洗茶壶");                System.out.println("洗茶杯");                System.out.println("拿茶叶");                try {                    TimeUnit.SECONDS.sleep(4);                } catch (InterruptedException e) {                    e.printStackTrace();                }                return "洗茶壶,洗茶杯 -> 拿茶叶";            });                // 组合二者异步运算的后果,传递给办法计算        final CompletableFuture<String> completableFuture = completableFuture2.thenCombine(completableFuture1, (result2, result1) -> {            System.out.println(StrUtil.format("result2 是 洗茶壶,洗茶杯 -> 拿茶叶: {}", result2));            System.out.println(StrUtil.format("result1 是 洗水壶 -> 烧水: {}", result1));            System.out.println("泡茶");            return "泡茶";        });        completableFuture.get();        System.out.println("执行工夫: " + started.elapsed(TimeUnit.SECONDS));    }}

runAsync 和 supplyAsync 的区别

runAsyncsupplyAsync 区别就是你是否须要获取异步计算的后果。当你须要异步解决的后果,你须要 supplyAsync

public class CompletableFutureBlog2 {    public static void main(String[] args) throws ExecutionException, InterruptedException {        final Stopwatch started = Stopwatch.createStarted();        final CompletableFuture<Integer> ret = CompletableFuture.supplyAsync(() -> {            System.out.println("开始进行耗时的异步计算,耗费 3 秒");            try {                TimeUnit.SECONDS.sleep(3);            } catch (InterruptedException e) {                e.printStackTrace();            }            return 10;        });        final Integer integer = ret.get();        System.out.println(StrUtil.format("异步执行的后果: {}", integer));        System.out.println("执行工夫: " + started.elapsed(TimeUnit.SECONDS));    }}

thenApplyAsyncthenAcceptAsyncthenRunAsync

thenXX 都是为了在上一个异步计算的完结之后执行。

咱们对异步计算的后果分为以下几个状况:

  • 须要依赖异步计算的后果,并且依赖异步计算的后果计算返回另个一个后果 thenApplyAsync
  • 依赖异步计算的后果,然而不会产生新的后果,thenAcceptAsync
  • 不依赖计算计算的后果,并且没有返回值 thenRunAsync
public class CompletableFutureBlog3 {    public static void main(String[] args) throws ExecutionException, InterruptedException {        final Stopwatch started = Stopwatch.createStarted();        final CompletableFuture<Integer> ret = CompletableFuture.supplyAsync(() -> {            System.out.println("开始进行耗时的异步计算,耗费 3 秒");            try {                TimeUnit.SECONDS.sleep(3);            } catch (InterruptedException e) {                e.printStackTrace();            }            return 10;        });        final Integer result = ret.thenApplyAsync(data -> {            System.out.println("依赖上一个异步计算,耗费 5 秒");            try {                TimeUnit.SECONDS.sleep(5);            } catch (InterruptedException e) {                e.printStackTrace();            }            return data + 12;        }).get();        System.out.println(StrUtil.format("异步执行的后果: {}", result));        System.out.println("执行工夫: " + started.elapsed(TimeUnit.SECONDS));    }}

thenCombineAsync

联合另一个 CompletableFuture 异步计算,当两个异步计算执行完了,执行回调。

计算一个耗时的计算。将这个耗时计算拆成两个耗时的异步计算,当两个异步计算完结,在合并最终的后果

public class CompletableFutureBlog4 {    public static void main(String[] args) throws ExecutionException, InterruptedException {        final Stopwatch started = Stopwatch.createStarted();        final CompletableFuture<Integer> ret1 = CompletableFuture.supplyAsync(() -> {            System.out.println("开始进行耗时的异步计算,耗费 3 秒");            try {                TimeUnit.SECONDS.sleep(3);            } catch (InterruptedException e) {                e.printStackTrace();            }            return 10;        });        final CompletableFuture<Integer> ret2 = CompletableFuture.supplyAsync(() -> {            System.out.println("开始进行耗时的异步计算,耗费 5 秒");            try {                TimeUnit.SECONDS.sleep(5);            } catch (InterruptedException e) {                e.printStackTrace();            }            return 10;        });        final CompletableFuture<Integer> integerCompletableFuture = ret2.thenCombineAsync(ret1, (result1, result2) -> result1 + result2);        final Integer result = integerCompletableFuture.get();                System.out.println(StrUtil.format("异步执行的后果: {}", result));        System.out.println("执行工夫: " + started.elapsed(TimeUnit.SECONDS));    }}

allOfanyOf

能够组合多个 CompletableFuture ,当每个 CompletableFuture 都执行完,执行后续逻辑。

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {   return andTree(cfs, 0, cfs.length - 1);}

能够组合多个 CompletableFuture ,当任何一个 CompletableFuture 都执行完,执行后续逻辑。

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {    return orTree(cfs, 0, cfs.length - 1);}

future,future2,future3 执行完之后,再执行后续逻辑。

public class CompletableFutureBlog5 {    public static void main(String[] args) throws ExecutionException, InterruptedException {        final Stopwatch started = Stopwatch.createStarted();        final CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {            try {                TimeUnit.SECONDS.sleep(2);            } catch (InterruptedException e) {                e.printStackTrace();            }            System.out.println(1);        });        final CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {            try {                TimeUnit.SECONDS.sleep(3);            } catch (InterruptedException e) {                e.printStackTrace();            }            System.out.println(1);        });        final CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {            try {                TimeUnit.SECONDS.sleep(4);            } catch (InterruptedException e) {                e.printStackTrace();            }            System.out.println(1);        });        final CompletableFuture<Void> future1 = CompletableFuture.allOf(future3, future2, future);        future1.get();        System.out.println("执行工夫: " + started.elapsed(TimeUnit.SECONDS));    }}

将上述 demo 中 allOf 替换为 anyOf,当任一 CompletableFuture 执行结束,future1.get(); 就会返回后果。

别的办法看参数和正文就学会了。就不再一一列举了。

当应用的时候,先思考要不要依赖异步计算的后果,要不要解决异样,要不要返回新的异步计算结果,从这几个方面就能够晓得抉择哪个 api 了。


本文由 张攀钦的博客 http://www.mflyyou.cn/ 创作。 可自在转载、援用,但需署名作者且注明文章出处。

如转载至微信公众号,请在文末增加作者公众号二维码。微信公众号名称:Mflyyou