关于java:Java异步编程指南

37次阅读

共计 8942 个字符,预计需要花费 23 分钟才能阅读完成。

在咱们平时开发中或多或少都会遇到须要调用接口来实现一个性能的需要,这个接口能够是外部零碎也能够是内部的,而后等到接口返回数据了能力持续其余的业务流程,这就是传统的 同步 模式。

同步模式尽管简略但毛病也很显著,如果对方服务解决迟缓迟迟未能返回数据,或网络问题导致响应变长,就会阻塞咱们调用方的线程,导致咱们主流程的耗时 latency 缩短,传统的解决形式是减少接口的超时 timeout 设置,避免无限期期待。但即便这样还是会占用 CPU 资源。

在咱们做 rpc 近程调用,redis,数据库拜访等比拟耗时的网络申请时常常要面对这样的问题,这种业务场景咱们能够引入 异步 的编程思维,即主流程不须要阻塞期待接口返回数据,而是持续往下执行,当真正须要这个接口返回后果时再通过 回调 或阻塞的形式获取,此时咱们的主流程和异步工作是并行执行的。

Java 中实现异步次要是通过 FutureCompletableFutureGuava ListenableFuture 以及一些异步响应式框架如 RxJava 实现。

上面咱们次要看下这几种组件实用的业务场景和须要留神的中央,防止踩坑。

一. Future

java.util.concurrent.Future是 JDK5 引入的,用来获取一个异步计算的后果。你能够应用 isDone 办法查看计算是否实现,也能够应用 get 阻塞住调用线程,直到计算实现返回后果,你也能够应用 cancel 办法进行工作的执行。

Future 的 api 阐明

理论开发中咱们个别会联合线程池的 submit 配合应用,代码如下:

package com.javakk;

import java.util.concurrent.*;

public class FutureTest {public static void main(String[] args) throws Exception {ExecutorService executor = Executors.newCachedThreadPool(); // 线程池
        Future<String> future = executor.submit(() ->{Thread.sleep(200); // 模仿接口调用,耗时 200ms
            return "hello world";
        });
        // 在输入上面异步后果时主线程能够不阻塞的做其余事件
        // TODO 其余业务逻辑

        System.out.println("异步后果:"+future.get()); // 主线程获取异步后果
        // 或者通过上面轮询的形式
        // while(!future.isDone());
    }
}

// 输入后果:异步后果:hello world

简略的说我有一个工作,提交给了 Future,Future 替我实现这个工作,这期间我能够去做别的事件。一段时间之后,我再从 Future 取出后果。

下面的代码有 2 个中央须要留神,在 15 行不倡议应用 future.get() 形式,而应该应用 future.get(long timeout, TimeUnit unit) 尤其是在生产环境肯定要设置正当的超时工夫,避免程序无限期期待上来。另外就是要思考异步工作执行过程中报错抛出异样的状况,须要捕捉 future 的异样信息。

通过代码能够看出一些简略的异步场景能够应用 Future 解决,然而对于后果的获取却不是很不便,只能通过阻塞或者轮询的形式失去工作的后果。阻塞的形式相当于把异步变成了同步,显然和异步编程的初衷相违反,轮询的形式又会节约 CPU 资源。

Future 没有提供告诉的机制,就是回调,咱们无奈晓得它什么工夫实现工作

而且在简单一点的状况下,比方多个异步工作的场景,一个异步工作依赖上一个异步工作的执行后果,异步工作合并等,Future 无奈满足需要。

二.ListenableFuture

Google 并发包下的 listenableFuture 对 Java 原生的 future 做了扩大,顾名思义就是应用监听器模式实现的回调,所以叫可监听的 future。

在咱们公司晚期的我的项目里 (jdk8 之前的版本) 都是应用 listenableFuture 来实现异步编程。

要应用 listenableFuture 还要联合 MoreExecutor 线程池,MoreExecutor是对 Java 原生线程池的封装,比方罕用的 MoreExecutors.listeningDecorator(threadPool); 批改 Java 原生线程池的submit 办法,封装了 future 返回listenableFuture

代码示例如下:

// ListeningExecutorService 继承 jdk 的 ExecutorService 接口,重写了 submit 办法,批改返回值类型为 ListenableFuture
ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
[ListenableFuture](http://javakk.com/tag/listenablefuture "查看更多对于 ListenableFuture 的文章")<String> listenableFuture = executor.submit(() -> {Thread.sleep(200); // 模仿接口调用,耗时 200ms
    return "hello world";
});

下面的代码是结构了一个 ListenableFuture 的异步工作,调用它的后果个别有两种形式:

基于addListener

listenableFuture.addListener(() -> {
    try {System.out.println("异步后果:" + listenableFuture.get());
    } catch (Exception e) {e.printStackTrace();
    }
}, executor);

// 输入后果:异步后果:hello world

基于 addCallback:

Futures.addCallback(listenableFuture, new FutureCallback<String>() {
    @Override
    public void onSuccess(String result) {System.out.println("异步后果:" + result);
    }

    @Override
    public void onFailure(Throwable t) {t.printStackTrace();
    }
}, executor);

// 输入后果:异步后果:hello world

其实两种形式都是基于回调,具体应用哪种看业务场景。

  • addListener须要本人代码里捕捉解决异常情况,最好设置超时工夫
  • addCallback把失常返回和异常情况做了拆散,不便咱们针对不同状况做解决

另外 Futures 里还有很多其余的 api,能够满足咱们负责场景,比方 transform() 能够解决异步工作之间的依赖状况,allAsList()将多个 ListenableFuture 合并成一个。

三. CompletableFuture

如果你们公司的 jdk 是 8 或以上的版本,那能够间接应用 CompletableFuture 类来实现异步编程。

Java8 新增的 CompletableFuture 类借鉴了 Google Guava 的 ListenableFuture,它蕴含 50 多个办法,默认应用forkJoinPool 线程池,提供了十分弱小的 Future 扩大性能,能够帮忙咱们简化异步编程的复杂性,联合函数式编程,通过回调的形式解决计算结果,并且提供了转换和组合 CompletableFuture 的多种办法,能够满足大部分异步回调场景。

CompletableFuture 的 api

尽管办法很多但有个特色:

  • 以 Async 结尾的办法签名示意是在异步线程里执行,没有以 Async 结尾的办法则是由主线程调用
  • 如果参数里有 Runnable 类型,则没有返回后果,即纯生产的办法
  • 如果参数里没有指定 executor 则默认应用 forkJoinPool 线程池,指定了则以指定的线程池来执行工作

上面就来看下罕用的几种 api 代码示例:

转换 : thenApplyAsync

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() ->
    "hello"
);
// f2 依赖 f1 的后果做转换
CompletableFuture<String> f2 = f1.thenApplyAsync(t ->
    t + "world"
);
System.out.println("异步后果:" + f2.get());

// 输入后果:异步后果:hello world

这里先阐明一下,示例代码只关注外围性能,如果要理论应用须要思考超时和异常情况,大家须要留神。

在下面的代码中异步工作 f2 须要异步工作 f1 的后果能力执行,但对于咱们的主线程来说,毋庸等到 f1 返回后果后再调用函数 f2,即不会阻塞主流程,而是通知 CompletableFuture 当执行完了f1 的办法再去执行f2,只有当须要最初的后果时再获取。

组合 : thenComposeAsync

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() ->
    "hello"
);
// f2 尽管依赖 f1 的后果,但不会期待 f1 后果返回,而是再包装成一个 future 返回
CompletableFuture<String> f2 = f1.thenComposeAsync(t ->
    CompletableFuture.supplyAsync(() ->
            t + "world"
    )
);
// 等到真正调用的时候再执行 f2 里的逻辑
System.out.println("异步后果:" + f2.get());

// 输入后果:异步后果:hello world

通过代码正文能看出 thenCompose 相当于 flatMap, 防止CompletableFuture<CompletableFuture<String>> 这种写法。

这也是 thenComposethenApply的区别,通过查看 api 也能看出:

thenApply:

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {return uniApplyStage(null, fn);
}

thenCompose:

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {return uniComposeStage(screenExecutor(executor), fn);
}

合并 : thenCombineAsync

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
    try {Thread.sleep(1000); // 模仿接口调用耗时 1 秒
    } catch (InterruptedException e) {e.printStackTrace();
    }
    return "hello";
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
    try {Thread.sleep(1000); // 模仿接口调用耗时 1 秒
    } catch (InterruptedException e) {e.printStackTrace();
    }
    return "world";
});
CompletableFuture<String> f3 = f1.thenCombineAsync(f2, (t1, t2) ->
    t1 + t2
);
long time = System.currentTimeMillis();
System.out.println("异步后果:" + f3.get());
System.out.println("耗时:" + (System.currentTimeMillis() - time));

// 输入后果:异步后果:hello world
耗时:1002

从代码输入后果能够看到两个异步工作 f1、f2 是并行执行,彼此无先后依赖程序,thenCombineAsync适宜将两个并行执行的异步工作的后果合并返回成一个新的 future。

还有一个相似的办法 thenAcceptBoth 也是合并两个 future 的后果,然而不会返回新的值,外部生产掉了。

二选一 : applyToEitherAsync

Random rand = new Random();
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
    try {Thread.sleep(1000 + rand.nextInt(1000)); // 模仿接口调用耗时 1 秒
    } catch (InterruptedException e) {e.printStackTrace();
    }
    return "hello";
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
    try {Thread.sleep(1000 + rand.nextInt(1000)); // 模仿接口调用耗时 1 秒
    } catch (InterruptedException e) {e.printStackTrace();
    }
    return "world";
});
CompletableFuture<String> f3 = f1.applyToEitherAsync(f2, t -> t);
long time = System.currentTimeMillis();
System.out.println("异步后果:" + f3.get());
System.out.println("耗时:" + (System.currentTimeMillis() - time));

输入的后果有时候是 hello 有时候是 world,哪个 future 先执行完就依据它的后果计算,取两个 future 最先返回的。

这里要阐明一点,如果两个 future 是同时返回后果,那么 applyToEitherAsync 永远以第一个 future 的后果为准 ,大家能够把下面代码的Thread.sleep 正文掉测试下。

另外 acceptEither 办法和这个相似,然而没有返回值。

allOf / anyOf

后面讲的 compose,combine,either 都是解决两个 future 的办法,如果是超过 2 个的能够应用 allOfanyOf

allOf:

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
    try {Thread.sleep(1000); // 模仿接口调用耗时 1 秒
    } catch (InterruptedException e) {e.printStackTrace();
    }
    return "hello";
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
    try {Thread.sleep(1000); // 模仿接口调用耗时 1 秒
    } catch (InterruptedException e) {e.printStackTrace();
    }
    return "world";
});
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> {
    try {Thread.sleep(1000); // 模仿接口调用耗时 1 秒
    } catch (InterruptedException e) {e.printStackTrace();
    }
    return "java 老 k";
});
List<CompletableFuture<String>> list = new ArrayList<>();
list.add(f1);
list.add(f2);
list.add(f3);

CompletableFuture<Void> f4 = CompletableFuture.allOf(list.toArray(new CompletableFuture[]{}));
long time = System.currentTimeMillis();
f4.thenRunAsync(() ->
    list.forEach(f -> {
        try {System.out.println("异步后果:" + f.get());
        } catch (Exception e) {e.printStackTrace();
        }
    })
);
f4.get();
System.out.println("耗时:" + (System.currentTimeMillis() - time));
// 输入后果:耗时:1004
异步后果:hello
异步后果:world
异步后果:java 老 k 

allOf办法是当所有的 CompletableFuture 都执行完后执行计算,无返回值。

anyOf:

Random rand = new Random(); // 随机数
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
    try {Thread.sleep(1000 + rand.nextInt(1000)); // 模仿接口调用耗时 1 秒
    } catch (InterruptedException e) {e.printStackTrace();
    }
    return "hello";
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
    try {Thread.sleep(1000 + rand.nextInt(1000)); // 模仿接口调用耗时 1 秒
    } catch (InterruptedException e) {e.printStackTrace();
    }
    return "world";
});
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> {
    try {Thread.sleep(1000 + rand.nextInt(1000)); // 模仿接口调用耗时 1 秒
    } catch (InterruptedException e) {e.printStackTrace();
    }
    return "java 老 k";
});

CompletableFuture<Object> f4 = CompletableFuture.anyOf(f1, f2, f3);
long time = System.currentTimeMillis();
System.out.println("异步后果:" + f4.get());
System.out.println("耗时:" + (System.currentTimeMillis() - time));

// 输入后果:异步后果:java 老 k
耗时:1075

屡次执行输入的后果不一样,anyOf办法当任意一个 CompletableFuture 执行完后就会执行计算。

尽管说 CompletableFuture 更适宜 I / O 场景,但应用时肯定要联合具体业务,比如说有些公共办法解决异步工作时须要思考异常情况,这时候应用 CompletableFuture.handle(BiFunction<? super T, Throwable, ? extends U> fn)更适合,handle 办法会解决失常计算值和异样,因而它能够屏蔽异样,防止异样持续抛出

CompletableFuture 还有一个坑须要留神:如果线上流量比拟大的状况下会呈现响应迟缓的问题。

因为 CompletableFuture 默认应用的线程池是 forkJoinPool,过后对一台应用了 CompletableFuture 实现异步回调性能的接口做压测,通过监控零碎发现有大量的ForkJoinPool.commonPool-worker-* 线程处于期待状态,进一步剖析 dump 信息发现是 forkJoinPool 的 makeCommonPool 问题,如下图:


看到这大家应该分明了,如果在我的项目里没有设置 java.util.concurrent.ForkJoinPool.common.parallelism 的值,那么 forkJoinPool 线程池的线程数就是 (cpu-1),咱们测试环境的机器是 2 核, 这样理论执行工作的线程数只有 1 个,当有大量申请过去时,如果有耗时高的 io 操作,势必会造成更多的线程期待,进而连累服务响应工夫。

解决方案一个是设置 java.util.concurrent.ForkJoinPool.common.parallelism 这个值(要在我的项目启动时指定),或者指定线程池不应用默认的 forkJoinPool。

forkJoinPoll 线程池不理解的能够看下这篇文章:线程池 ForkJoinPool 简介

线程数如何设置能够参考《Java 并发编程实战》这本书给出的倡议,如下图:

线程池设置线程数公式:

threads = N CPU U CPU (1 + W/C)

其中:

  • N CPU 是处理器的核数
  • U CPU 是冀望的 CPU 利用率(介于 0 和 1 之间)
  • W/ C 是等待时间与计算工夫的比率

网上也有这么辨别的:

如果服务是 cpu 密集型的,设置为电脑的核数

如果服务是 io 密集型的,设置为电脑的核数 *2

其实我感觉并不谨严,尤其是 io 密集型的还要参考 QPS 和 web 服务器的配置。

线程池使用不当造成的结果和剖析能够在举荐浏览里理解。

明天次要讲了 java 实现异步编程的几种形式,大家能够联合本人的理论状况参考,下次有工夫会跟大家分享下咱们另外一个我的项目如何应用 RxJava 实现的全异步化服务。

正文完
 0