在咱们平时开发中或多或少都会遇到须要调用接口来实现一个性能的需要,这个接口能够是外部零碎也能够是内部的,而后等到接口返回数据了能力持续其余的业务流程,这就是传统的同步模式。
同步模式尽管简略但毛病也很显著,如果对方服务解决迟缓迟迟未能返回数据,或网络问题导致响应变长,就会阻塞咱们调用方的线程,导致咱们主流程的耗时latency缩短,传统的解决形式是减少接口的超时timeout设置,避免无限期期待。但即便这样还是会占用CPU资源。
在咱们做rpc近程调用,redis,数据库拜访等比拟耗时的网络申请时常常要面对这样的问题,这种业务场景咱们能够引入异步的编程思维,即主流程不须要阻塞期待接口返回数据,而是持续往下执行,当真正须要这个接口返回后果时再通过回调或阻塞的形式获取,此时咱们的主流程和异步工作是并行执行的。
Java中实现异步次要是通过Future
,CompletableFuture
,Guava ListenableFuture
以及一些异步响应式框架如RxJava实现。
上面咱们次要看下这几种组件实用的业务场景和须要留神的中央,防止踩坑。
一. Future
java.util.concurrent.Future
是JDK5引入的,用来获取一个异步计算的后果。你能够应用isDone
办法查看计算是否实现,也能够应用get阻塞住调用线程,直到计算实现返回后果,你也能够应用cancel
办法进行工作的执行。
Future的api阐明
理论开发中咱们个别会联合线程池的submit配合应用,代码如下:
package com.javakk;import java.util.concurrent.*;public class FutureTest { public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); // 线程池 Future<String> future = executor.submit(() ->{ Thread.sleep(200); // 模仿接口调用,耗时200ms return "hello world"; }); // 在输入上面异步后果时主线程能够不阻塞的做其余事件 // TODO 其余业务逻辑 System.out.println("异步后果:"+future.get()); //主线程获取异步后果 // 或者通过上面轮询的形式 // while(!future.isDone()); }}// 输入后果:异步后果:hello world
简略的说我有一个工作,提交给了Future,Future替我实现这个工作,这期间我能够去做别的事件。一段时间之后,我再从Future取出后果。
下面的代码有2个中央须要留神,在15行不倡议应用future.get()
形式,而应该应用future.get(long timeout, TimeUnit unit)
, 尤其是在生产环境肯定要设置正当的超时工夫,避免程序无限期期待上来。另外就是要思考异步工作执行过程中报错抛出异样的状况,须要捕捉future的异样信息。
通过代码能够看出一些简略的异步场景能够应用Future解决,然而对于后果的获取却不是很不便,只能通过阻塞或者轮询的形式失去工作的后果。阻塞的形式相当于把异步变成了同步,显然和异步编程的初衷相违反,轮询的形式又会节约CPU资源。
Future没有提供告诉的机制,就是回调,咱们无奈晓得它什么工夫实现工作。
而且在简单一点的状况下,比方多个异步工作的场景,一个异步工作依赖上一个异步工作的执行后果,异步工作合并等,Future无奈满足需要。
二.ListenableFuture
Google并发包下的listenableFuture
对Java原生的future
做了扩大,顾名思义就是应用监听器模式实现的回调,所以叫可监听的future。
在咱们公司晚期的我的项目里(jdk8之前的版本)都是应用listenableFuture
来实现异步编程。
要应用listenableFuture
还要联合MoreExecutor
线程池,MoreExecutor
是对Java原生线程池的封装,比方罕用的MoreExecutors.listeningDecorator(threadPool);
批改Java原生线程池的submit
办法,封装了future返回listenableFuture
。
代码示例如下:
// ListeningExecutorService继承jdk的ExecutorService接口,重写了submit办法,批改返回值类型为ListenableFutureListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());[ListenableFuture](http://javakk.com/tag/listenablefuture "查看更多对于 ListenableFuture 的文章")<String> listenableFuture = executor.submit(() -> { Thread.sleep(200); // 模仿接口调用,耗时200ms return "hello world";});
下面的代码是结构了一个ListenableFuture的异步工作,调用它的后果个别有两种形式:
基于addListener
:
listenableFuture.addListener(() -> { try { System.out.println("异步后果:" + listenableFuture.get()); } catch (Exception e) { e.printStackTrace(); }}, executor);// 输入后果:异步后果:hello world
基于addCallback:
Futures.addCallback(listenableFuture, new FutureCallback<String>() { @Override public void onSuccess(String result) { System.out.println("异步后果:" + result); } @Override public void onFailure(Throwable t) { t.printStackTrace(); }}, executor);// 输入后果:异步后果:hello world
其实两种形式都是基于回调,具体应用哪种看业务场景。
addListener
须要本人代码里捕捉解决异常情况,最好设置超时工夫addCallback
把失常返回和异常情况做了拆散,不便咱们针对不同状况做解决
另外Futures里还有很多其余的api,能够满足咱们负责场景,比方transform()
能够解决异步工作之间的依赖状况,allAsList()
将多个ListenableFuture合并成一个。
三. CompletableFuture
如果你们公司的jdk是8或以上的版本,那能够间接应用CompletableFuture
类来实现异步编程。
Java8新增的CompletableFuture
类借鉴了Google Guava的ListenableFuture
,它蕴含50多个办法,默认应用forkJoinPool
线程池,提供了十分弱小的Future扩大性能,能够帮忙咱们简化异步编程的复杂性,联合函数式编程,通过回调的形式解决计算结果,并且提供了转换和组合CompletableFuture
的多种办法,能够满足大部分异步回调场景。
CompletableFuture的api
尽管办法很多但有个特色:
- 以Async结尾的办法签名示意是在异步线程里执行,没有以Async结尾的办法则是由主线程调用
- 如果参数里有Runnable类型,则没有返回后果,即纯生产的办法
- 如果参数里没有指定executor则默认应用forkJoinPool线程池,指定了则以指定的线程池来执行工作
上面就来看下罕用的几种api代码示例:
转换 : thenApplyAsync
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "hello");// f2依赖f1的后果做转换CompletableFuture<String> f2 = f1.thenApplyAsync(t -> t + " world");System.out.println("异步后果:" + f2.get());// 输入后果:异步后果:hello world
这里先阐明一下,示例代码只关注外围性能,如果要理论应用须要思考超时和异常情况,大家须要留神。
在下面的代码中异步工作f2
须要异步工作f1
的后果能力执行,但对于咱们的主线程来说,毋庸等到f1
返回后果后再调用函数f2
,即不会阻塞主流程,而是通知CompletableFuture当执行完了f1
的办法再去执行f2
,只有当须要最初的后果时再获取。
组合 : thenComposeAsync
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "hello");// f2尽管依赖f1的后果,但不会期待f1后果返回,而是再包装成一个future返回CompletableFuture<String> f2 = f1.thenComposeAsync(t -> CompletableFuture.supplyAsync(() -> t + " world" ));// 等到真正调用的时候再执行f2里的逻辑System.out.println("异步后果:" + f2.get());// 输入后果:异步后果:hello world
通过代码正文能看出thenCompose
相当于flatMap
,防止CompletableFuture<CompletableFuture<String>>
这种写法。
这也是thenCompose
和thenApply
的区别,通过查看api也能看出:
thenApply:
public <U> CompletableFuture<U> thenApply( Function<? super T,? extends U> fn) { return uniApplyStage(null, fn);}
thenCompose:
public <U> CompletableFuture<U> thenCompose( Function<? super T, ? extends CompletionStage<U>> fn) { return uniComposeStage(screenExecutor(executor), fn);}
合并 : thenCombineAsync
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); // 模仿接口调用耗时1秒 } catch (InterruptedException e) { e.printStackTrace(); } return "hello";});CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); // 模仿接口调用耗时1秒 } catch (InterruptedException e) { e.printStackTrace(); } return " world";});CompletableFuture<String> f3 = f1.thenCombineAsync(f2, (t1, t2) -> t1 + t2);long time = System.currentTimeMillis();System.out.println("异步后果:" + f3.get());System.out.println("耗时:" + (System.currentTimeMillis() - time));// 输入后果:异步后果:hello world耗时:1002
从代码输入后果能够看到两个异步工作f1、f2是并行执行,彼此无先后依赖程序,thenCombineAsync
适宜将两个并行执行的异步工作的后果合并返回成一个新的future。
还有一个相似的办法thenAcceptBoth
也是合并两个future的后果,然而不会返回新的值,外部生产掉了。
二选一 : applyToEitherAsync
Random rand = new Random();CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000 + rand.nextInt(1000)); // 模仿接口调用耗时1秒 } catch (InterruptedException e) { e.printStackTrace(); } return "hello";});CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000 + rand.nextInt(1000)); // 模仿接口调用耗时1秒 } catch (InterruptedException e) { e.printStackTrace(); } return "world";});CompletableFuture<String> f3 = f1.applyToEitherAsync(f2, t -> t);long time = System.currentTimeMillis();System.out.println("异步后果:" + f3.get());System.out.println("耗时:" + (System.currentTimeMillis() - time));
输入的后果有时候是hello 有时候是world,哪个future先执行完就依据它的后果计算,取两个future最先返回的。
这里要阐明一点,如果两个future是同时返回后果,那么applyToEitherAsync永远以第一个future的后果为准,大家能够把下面代码的Thread.sleep
正文掉测试下。
另外acceptEither
办法和这个相似,然而没有返回值。
allOf / anyOf
后面讲的compose
,combine
,either
都是解决两个future的办法,如果是超过2个的能够应用allOf
或anyOf
allOf:
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); // 模仿接口调用耗时1秒 } catch (InterruptedException e) { e.printStackTrace(); } return "hello";});CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); // 模仿接口调用耗时1秒 } catch (InterruptedException e) { e.printStackTrace(); } return "world";});CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); // 模仿接口调用耗时1秒 } catch (InterruptedException e) { e.printStackTrace(); } return "java老k";});List<CompletableFuture<String>> list = new ArrayList<>();list.add(f1);list.add(f2);list.add(f3);CompletableFuture<Void> f4 = CompletableFuture.allOf(list.toArray(new CompletableFuture[]{}));long time = System.currentTimeMillis();f4.thenRunAsync(() -> list.forEach(f -> { try { System.out.println("异步后果:" + f.get()); } catch (Exception e) { e.printStackTrace(); } }));f4.get();System.out.println("耗时:" + (System.currentTimeMillis() - time));// 输入后果:耗时:1004异步后果:hello异步后果:world异步后果:java老k
allOf
办法是当所有的CompletableFuture都执行完后执行计算,无返回值。
anyOf:
Random rand = new Random(); // 随机数CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000 + rand.nextInt(1000)); // 模仿接口调用耗时1秒 } catch (InterruptedException e) { e.printStackTrace(); } return "hello";});CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000 + rand.nextInt(1000)); // 模仿接口调用耗时1秒 } catch (InterruptedException e) { e.printStackTrace(); } return "world";});CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000 + rand.nextInt(1000)); // 模仿接口调用耗时1秒 } catch (InterruptedException e) { e.printStackTrace(); } return "java老k";});CompletableFuture<Object> f4 = CompletableFuture.anyOf(f1, f2, f3);long time = System.currentTimeMillis();System.out.println("异步后果:" + f4.get());System.out.println("耗时:" + (System.currentTimeMillis() - time));// 输入后果:异步后果:java老k耗时:1075
屡次执行输入的后果不一样,anyOf
办法当任意一个CompletableFuture执行完后就会执行计算。
尽管说CompletableFuture更适宜I/O场景,但应用时肯定要联合具体业务,比如说有些公共办法解决异步工作时须要思考异常情况,这时候应用CompletableFuture.handle(BiFunction<? super T, Throwable, ? extends U> fn)更适合,handle办法会解决失常计算值和异样,因而它能够屏蔽异样,防止异样持续抛出。
CompletableFuture还有一个坑须要留神:如果线上流量比拟大的状况下会呈现响应迟缓的问题。
因为CompletableFuture默认应用的线程池是forkJoinPool,过后对一台应用了CompletableFuture实现异步回调性能的接口做压测,通过监控零碎发现有大量的ForkJoinPool.commonPool-worker-*
线程处于期待状态,进一步剖析dump信息发现是forkJoinPool的makeCommonPool问题,如下图:
看到这大家应该分明了,如果在我的项目里没有设置java.util.concurrent.ForkJoinPool.common.parallelism
的值,那么forkJoinPool线程池的线程数就是(cpu-1),咱们测试环境的机器是2核,这样理论执行工作的线程数只有1个,当有大量申请过去时,如果有耗时高的io操作,势必会造成更多的线程期待,进而连累服务响应工夫。
解决方案一个是设置java.util.concurrent.ForkJoinPool.common.parallelism
这个值(要在我的项目启动时指定),或者指定线程池不应用默认的forkJoinPool。
forkJoinPoll线程池不理解的能够看下这篇文章:线程池ForkJoinPool简介
线程数如何设置能够参考《Java并发编程实战》这本书给出的倡议,如下图:
线程池设置线程数公式:
threads = N CPU U CPU (1 + W/C)
其中:
- N CPU 是处理器的核数
- U CPU 是冀望的CPU利用率(介于0和1之间)
- W/C是等待时间与计算工夫的比率
网上也有这么辨别的:
如果服务是cpu密集型的,设置为电脑的核数
如果服务是io密集型的,设置为电脑的核数*2
其实我感觉并不谨严,尤其是io密集型的还要参考QPS和web服务器的配置。
线程池使用不当造成的结果和剖析能够在举荐浏览里理解。
明天次要讲了java实现异步编程的几种形式,大家能够联合本人的理论状况参考,下次有工夫会跟大家分享下咱们另外一个我的项目如何应用RxJava实现的全异步化服务。