关于java:Java-异步调用实践

7次阅读

共计 16246 个字符,预计需要花费 41 分钟才能阅读完成。

本文介绍了线上业务中的一些异步调用实践经验,蕴含 IO 模型介绍、CompletableFuture 的根本应用、RPC 异步调用、异步 HTTP 客户端 Spring WebClient 的应用等。RPC 应用前文介绍的手写 RPC 框架,该框架反对异步调用。

本文要点:

  • 为什么须要异步调用
  • CompletableFuture 根本应用
  • RPC 异步调用
  • HTTP 异步调用
  • 编排 CompletableFuture 进步吞吐量

为什么异步

BIO 模型

首先咱们先回顾一下 BIO 模型:

当用户过程调用了 recvfrom 这个零碎调用,kernel 就开始了 IO 的第一个阶段:筹备数据。对于 network io 来说,很多时候数据在一开始还没有达到(比方,还没有收到一个残缺的 UDP 包),这个时候 kernel 就要期待足够的数据到来。而在用户过程这边,整个过程会被阻塞。当 kernel 始终等到数据筹备好了,它就会将数据从 kernel 中拷贝到用户内存,而后 kernel 返回后果,用户过程才解除 block 的状态,从新运行起来。所以,Blocking IO 的特点就是在 IO 执行的两个阶段都被 block 了。

同步调用

在同步调用的场景下,顺次申请多个接口,耗时长、性能差,接口响应时长 T > T1+T2+T3+……+Tn。

缩小同步期待

个别这个时候为了缩小同步等待时间,会应用线程池来同时解决多个工作,接口的响应工夫就是 MAX(T1,T2,T3):

大略代码如下:

Future<String> future = executorService.submit(() -> {Thread.sleep(2000);
  return "hello world";
});
while (true) {if (future.isDone()) {System.out.println(future.get());
    break;
  }
}

同步模型中应用线程池的确能实现异步调用的成果,也能压缩同步期待的工夫,然而也有一些缺点:

  • CPU 资源大量节约在阻塞期待上 ,导致 CPU 资源利用率低。
  • 为了减少并发度,会引入更多额定的线程池 ,随着 CPU 调度线程数的减少,会导致更重大的资源争用,上下文切换占用 CPU 资源。
  • 线程池中的线程都是阻塞的,硬件资源无奈充分利用 ,零碎吞吐量容易达到瓶颈。

NIO 模型

为了解决 BIO 中的缺点,引入 NIO 模型:

当用户过程收回 read 操作时,如果 kernel 中的数据还没有筹备好,那么它并不会 block 用户过程,而是立即返回一个 error。从用户过程角度讲,它发动一个 read 操作后,并不需要期待,而是马上就失去了一个后果。用户过程判断后果是一个 error 时,它就晓得数据还没有筹备好,于是它能够再次发送 read 操作。一旦 kernel 中的数据筹备好了,并且又再次收到了用户过程的 system call,那么它马上就将数据拷贝到了用户内存,而后返回。所以,用户过程其实是须要一直的被动询问 kernel 数据好了没有。

异步优化思路

咱们晓得了 NIO 的调用形式比 BIO 好,那咱们怎么能在业务编码中应用到 NIO 呢?本人入手将 BIO 替换成 NIO 必定不事实,已有组件反对 NIO 的能够间接应用,不反对的持续应用自定义线程池。

  • 通过 RPC NIO 异步调用、HTTP 异步调用的形式升高线程数,从而升高调度(上下文切换)开销。
  • 没有原生反对 NIO 异步调用的持续应用线程池。
  • 引入 CompletableFuture 对业务流程进行编排,升高依赖之间的阻塞。

简述 CompletableFuture

CompletableFuture 是 java.util.concurrent 库在 java 8 中新增的次要工具,同传统的 Future 相比,其反对流式计算、函数式编程、实现告诉、自定义异样解决等很多新的个性。

罕用 API 举例

supplyAsync
CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{    
  try{Thread.sleep(1000L);
    return "hello world";
  } catch (Exception e){return "failed";}
});
System.out.println(future.join());
// output
hello world

开启异步工作,到另一个线程执行。

complete
CompletableFuture<String> future1 = new CompletableFuture<>();
future.complete("hello world");              // 异步线程执行
future.whenComplete((res, throwable) -> {System.out.println(res);
});
System.out.println(future1.join());
CompletableFuture<String> future2 = new CompletableFuture<>();
future.completeExceptionally(new Throwable("failed")); // 异步线程执行
System.out.println(future2.join());
// output
hello world
hello world
  
Exception in thread "main" 
java.util.concurrent.CompletionException: 
java.lang.Throwable: failed

complete 失常实现该 CompletableFuture。

completeExceptionally 异样实现该 CompletableFuture。

thenApply
String original = "Message";
CompletableFuture<String> cf = 
    CompletableFuture.completedFuture(original).thenApply(String::toUpperCase);
System.out.println(cf.join());
// output
MESSAGE

工作后置解决。

图示:

thenCombine
CompletableFuture<String> cf = 
    CompletableFuture.completedFuture("Message").thenApply(String::toUpperCase);
CompletableFuture<String> cf1 = 
    CompletableFuture.completedFuture("Message").thenApply(String::toLowerCase);
CompletableFuture<String> allCf = cf.thenCombine(cf1, (s1, s2) -> s1 + s2);
System.out.println(allCf.join());
// output
MSGmsg

合并工作,两个工作同时执行,后果由合并函数 BiFunction 返回。

图示:

allOf
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Message1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Message2");
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "Message3");
CompletableFuture<String> future = 
    CompletableFuture.allOf(future1, future2, future3).thenApply(v -> {String join1 = future1.join();
  String join2 = future2.join();
  String join3 = future3.join();
  return join1 + join2 + join3;});
System.out.println(future.join());
// output
Msg1Msg2Msg3

allOf 会阻塞期待所有异步线程工作完结。

allOf 里的 join 并不会阻塞,传给 thenApply 的函数是在 future1, future2, future3 全副实现时,才会执行。

图示:

CF 执行线程

上面有两个小 demo,能够先试着想想输入的后果:

String original = "Message";
CompletableFuture cf = CompletableFuture.supplyAsync(() -> {System.out.println("supplyAsync thread:" + Thread.currentThread().getName());
  return original;
}).thenApply(r -> {System.out.println("thenApply thread:" + Thread.currentThread().getName());
  return r;
});
System.out.println(cf.join());
// output
supplyAsync thread: ForkJoinPool.commonPool-worker-1
thenApply thread: main
Message
String original = "Message";
CompletableFuture cf = CompletableFuture.supplyAsync(() -> {System.out.println("supplyAsync thread:" + Thread.currentThread().getName());
  try {Thread.sleep(100);
  } catch (InterruptedException e) {throw new RuntimeException(e);
  }
  return original;
}).thenApply(r -> {System.out.println("thenApply thread:" + Thread.currentThread().getName());
  return r;
});
System.out.println(cf.join());
// output
supplyAsync thread: ForkJoinPool.commonPool-worker-1
thenApply thread: ForkJoinPool.commonPool-worker-1
Message

先看论断:

  • 执行 complete 的线程会执行以后调用链上的所有 CF。
  • 如果 CF 提前 complete,后续 CF 由初始线程执行。

异步工作里没有 sleep 的时候,异步工作很快就会实现,意味着 JVM 执行到 thenApply 的时候,前置 CF 曾经提前完成所以后续的 CF 会被 main 线程执行。

异步工作里有 sleep 的时候,JVM 执行到 thenApply 时,前置 CF 还没有实现,前置 CF complete 的线程会执行所有后续的 CF。

CF 嵌套 join

ExecutorService executorService = Executors.newFixedThreadPool(2);
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {Thread.sleep(3000);
  return 1;
}, executorService);
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {Thread.sleep(3000);
  return 2;
}, executorService);
Integer join1 = cf1.thenApply((cf1Val) -> {System.out.println("cf1 start value:" + cf1Val);
  Integer cf2Val = cf2.join();
  System.out.println("cf2 end value:" + cf2Val);
  return 3;
}).join();
//output
cf1 start value:1
cf2 end value:2

代码很简略,有一个线程数为 2 的线程池,cf1、cf2 都应用这个线程执行异步工作,特地的是在 cf1.thenApply 中会调用 cf2.join(),当线程数是 2 的时候能够顺利输入。

ExecutorService executorService = Executors.newFixedThreadPool(1);
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {Thread.sleep(3000);
  return 1;
}, executorService);
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {Thread.sleep(3000);
  return 2;
}, executorService);
    Integer join1 = cf1.thenApply((cf1Val) -> {System.out.println("cf1 start value:" + cf1Val);
  Integer cf2Val = cf2.join();
  System.out.println("cf2 end value:" + cf2Val);
  return 3;
}).join();
//output
cf1 start value:1

这时候咱们将线程池的线程数调整为 1,这时只会输入 cf1 start value:1,而后就始终阻塞。

应用 jstack -l pid 查看线程状态,发现是 WAITING,期待的中央正是咱们在代码里调用的 cf2.join()

"pool-1-thread-1" #11 prio=5 os_prio=31 tid=0x00000001429f5000 nid=0xa903 waiting on condition
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000076ba5f7d0> (a java.util.concurrent.CompletableFuture$Signaller)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
    at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
    at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
    at java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947)
    at com.ppphuang.demo.threadPool.ExecutorsTest.lambda$main$2(ThreadPoolExecutorsTest.java:34)

起因是咱们在惟一一个线程中调用 cf2.join(),阻塞期待 cf2 实现,然而 cf2 须要期待 cf1 实现之后才有闲暇线程去执行。这就相似于你右手正拿着一个水杯,而后期待右手拿水壶倒满水,这是不可能实现的。所以尽量不要嵌套 join,不留神隔离线程池的话很容易造成’死锁‘(线程阻塞)。

CF 罕用 API

API 形容
supplyAsync 开启异步工作,到另一个线程执行,异步工作有返回值。
complete 实现工作。
completeExceptionally 异样结束任务。
thenCombine 合并工作,两个工作同时执行,后果由合并函数 BiFunction 返回。
thenApply 工作后置解决。
applyToEither 会取两个工作最先实现的工作,上个工作和这个工作同时进行,哪个先完结,先用哪个后果。
handle 后续解决。
whenComplete 实现后的解决。
allOf 期待所有异步线程工作完结。
join 获取返回值,没有 complete 的 CF 对象调用 join 时,会期待 complete 再返回,曾经 complete 的 CF 对象调用 join 时,会立即返回后果。

优化过程

异步 RPC 客户端

咱们手写的这个 RPC 框架反对异步调用,如果你想看具体的实现,能够在文末找到源码链接。异步调用之前会设置一个 CallBack 办法,异步调用时会间接返回 null,不会期待服务端返回接果,服务端返回后果之后会通过 RPC 客户端自带的线程池执行设置的 CallBack 办法。

RPC 异步调用图示:

包装异步 RPC Client

通过 AsyncExecutor 包装 RPC 的客户端,AsyncExecutor 类中的 client 属性值为创立的某个 RPC 服务的异步客户端代理类,这个代理类在构造方法中创立并赋值给 client 属性。

类中的 async 办法承受 Function 类型的参数 function,能够通过 function.apply(client) 来通过 client 执行真正的 RPC 调用。

在 async 办法中实例化一个 CompletableFuture,并将 CompletableFuture 作为异步回调的上下文设置到 RPC 的异步回调中,之后将该 CompletableFuture 返回给调用者。

public class AsyncExecutor<C> {

    private C client;

    public AsyncExecutor(ClientProxyFactory clientProxyFactory, Class<C> clazz, String group, String version) {this.client = clientProxyFactory.getProxy(clazz, group, version, true);
    }

    public <R> CompletableFuture<R> async(Function<C, R> function) {CompletableFuture<R> future = new CompletableFuture<>();
        ClientProxyFactory.setLocalAsyncContextAndAsyncReceiveHandler(future, CompletableFutureAsyncCallBack.instance());
        try {function.apply(client);
        } catch (Exception e) {future.completeExceptionally(e);
        }
        return future;
    }
}
异步回调类
public class CompletableFutureAsyncCallBack extends AsyncReceiveHandler {
    private static volatile CompletableFutureAsyncCallBack INSTANCE;

    private CompletableFutureAsyncCallBack() {}

    @Override
    public void callBack(Object context, Object result) {if (!(context instanceof CompletableFuture)) {throw new IllegalStateException("the context must be CompletableFuture");
        }
        CompletableFuture future = (CompletableFuture) context;
        if (result instanceof Throwable) {future.completeExceptionally((Throwable) result);
            return;
        }
        log.info("result:{}", result);
        future.complete(result);
    }
}

AsyncReceiveHandler 是 RPC 的异步回调抽象类,类中的 callBack、onException 形象办法须要子类实现。

CompletableFutureAsyncCallBack 实现了这个 callBack 形象办法,第一个参数是咱们在包装异步 RPC Client 时设置的 CompletableFuture 上下文,第二个参数是 RPC 返回的后果。办法中判断 RPC 返回的后果是否异样,若异样通过 completeExceptionally 异样完结这个 CompletableFuture,若失常通过 complete 失常完结这个 CompletableFuture。

注册异步客户端 Bean
@Component
public class AsyncExecutorConfig {
    @Autowired
    ClientProxyFactory clientProxyFactory;

    @Bean
    public AsyncExecutor<DemoService> demoServiceAsyncExecutor() {return new AsyncExecutor<>(clientProxyFactory, DemoService.class, "","");
    }
}
异步 RPC 调用
@Autowired
AsyncExecutor<DemoService> demoServiceAsyncExecutor;

CompletableFuture<String> pppName = demoServiceAsyncExecutor.async(service -> service.hello("ppp"));

String name = pppName.join();

异步 HTTP WebClient

WebClient 是从 Spring WebFlux 5.0 版本开始提供的一个非阻塞的基于响应式编程的进行 HTTP 申请的客户端工具。它的响应式编程的基于 Reactor 的。

WebClient VS RestTemplate

WebClient 的劣势在于:

  • 非阻塞响应式 IO,单位工夫内无限资源下反对更高的并发量。
  • 反对应用 Java8 Lambda 表达式函数。
  • 反对同步、异步、Stream 流式传输。
WebClient 应用
public CompletableFuture<String> asyncHttp(String url) {WebClient localhostWebClient = WebClient.builder().baseUrl("http://localhost:8080").build();
        Mono<HttpResult<String>> userMono = localhostWebClient.method(HttpMethod.GET).uri(url)
                .retrieve()
                .bodyToMono(new ParameterizedTypeReference<HttpResult<String>>() {})
                // 异样解决 有 onErrorReturn 时 doOnError 不会触发,所以不须要后续在 CompletableFuture 中 handle 解决异样
                // 如果不应用 onErrorReturn,倡议在后续 CompletableFuture 中 handle 解决异样
                .onErrorReturn(new HttpResult<>(201, "default", "default hello"))
                // 超时解决
                .timeout(Duration.ofSeconds(3))
                // 返回值过滤
                .filter(httpResult -> httpResult.code == 200)
                // 默认值
                .defaultIfEmpty(new HttpResult<>(201, "defaultIfEmpty", "defaultIfEmpty hello"))
                // 失败重试
                .retryWhen(Retry.backoff(1, Duration.ofSeconds(1)));
        CompletableFuture<HttpResult<String>> stringCompletableFuture = WebClientFutureFactory.getCompletableFuture(userMono);
        return stringCompletableFuture.thenApply(HttpResult::getData);
    }
WebClient 整合 CF

WebClientFutureFactory.getCompletableFuture 办法会把 WebClient 返回的后果组装成 CompletableFuture,应用的是 Mono 类的 doOnError 和 subscribe 办法,当失常返回时通过 subscribe 来调用 completableFuture.complete,当异样时通过 doOnError 来调用 completableFuture.completeExceptionally:

public class WebClientFutureFactory {public static <T> CompletableFuture<T> getCompletableFuture(Mono<T> mono) {CompletableFuture<T> completableFuture = new CompletableFuture<>();
        mono.doOnError(throwable -> {completableFuture.completeExceptionally(throwable);
            log.error("mono.doOnError throwable:{}", throwable.getMessage());
        }).subscribe(result -> {completableFuture.complete(result);
            log.debug("mono.subscribe execute thread: {}", Thread.currentThread().getName());
        });
        return completableFuture;
    }
}

WebClient 对同一服务的屡次调用:

public Flux<User> fetchUsers(List<Integer> userIds) {return Flux.fromIterable(userIds)
        .parallel()
        .flatMap(this::getUser)
        .ordered((u1, u2) -> u2.id() - u1.id());
}

对返回雷同类型的不同服务进行屡次调用:

public Flux<User> fetchUserAndOtherUser(int id) {return Flux.merge(getUser(id), getOtherUser(id))
        .parallel()
        .runOn(Schedulers.elastic())
        .ordered((u1, u2) -> u2.id() - u1.id());
}

对不同类型的不同服务的屡次调用:

public Mono fetchUserAndItem(int userId, int itemId) {Mono<User> user = getUser(userId).subscribeOn(Schedulers.elastic());
    Mono<Item> item = getItem(itemId).subscribeOn(Schedulers.elastic());
 
    return Mono.zip(user, item, UserWithItem::new);
}

异步数据库调用

应用 CompletableFuture.supplyAsync 执行异步工作时,必须指定成本人的线程池,否则 CompletableFuture 会应用默认的线程池 ForkJoinPool,默认线程池数量为 cpus – 1:

WebClient<Boolean> dbFuture = CompletableFuture.supplyAsync(() -> getDb(id), ThreadPoolConfig.ASYNC_TASK_EXECUTOR);

编排 CF

结构了所有须要异步执行的 CompletableFuture 之后,应用 allOf 办法阻塞期待所有的 CompletableFuture 后果,allOf 响应之后能够通过 join 获取各个 CompletableFuture 的响应接口,这里的 join 是会立即返回的,不会阻塞:

//RPC 的 CompletableFuture
CompletableFuture<String> pppName = demoServiceAsyncExecutor.async(service -> service.hello("ppp"));

//RPC 的 CompletableFuture
CompletableFuture<String> huangName = demoServiceAsyncExecutor.async(service -> service.hello("huang"));

//DB 操作的 CompletableFuture
WebClient<Boolean> dbFuture = CompletableFuture.supplyAsync(() -> getDb(id), ThreadPoolConfig.ASYNC_TASK_EXECUTOR);

//allOf 办法阻塞期待所有的 CompletableFuture 后果     
return CompletableFuture.allOf(pppName, huangName, dbFuture)
              // 组装后果返回
        .thenApply(r -> pppName.join() && huangName.join() && dbFuture.join()).join();

超时解决

java9 中 CompletableFuture 才有超时解决,应用办法如下:

CompletableFuture.supplyAsync(() -> 6 / 3).orTimeout(1, TimeUnit.SECONDS);

java8 中须要配合 ScheduledExecutorService + applyToEither:

public class TimeoutUtils {private static final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1);

    static {Runtime.getRuntime().addShutdownHook(new Thread(scheduledExecutor::shutdownNow));
    }

    public static <T> CompletableFuture<T> timeout(CompletableFuture<T> cf, long timeout, TimeUnit unit) {CompletableFuture<T> result = new CompletableFuture<>();
        scheduledExecutor.schedule(() -> result.completeExceptionally(new TimeoutException()), timeout, unit);
        return cf.applyToEither(result, Function.identity());
    }
}

TimeoutUtils 类与有一个动态属性,值为初始化的一个 ScheduledExecutorService,还有一个静态方法 timeout,这个办法将传入的 cf 用 applyToEither 接口与一个调度计时的 CompletableFuture 组合,哪个 CompletableFuture 先执行实现,就返回哪个的后果。

具体应用如下:

CompletableFuture<Integer> future = demoServiceAsyncExecutor.async(service -> service.getAge(18));
CompletableFuture<Integer> futureWithTimeout = TimeoutUtils.timeout(future, 3, TimeUnit.SECONDS);
futureWithTimeout.join();

异样与默认值解决

CompletableFuture 中能够解决异样有上面三个 API:

public <U> CompletableFuture<U> handle(java.util.function.BiFunction<? super T, Throwable, ? extends U> fn)

handle 接口不管 CompletableFuture 执行胜利还是异样都会被处罚,handle 承受一个 BiFunction 参数,BiFunction 中的第一个参数为 CompletableFuture 的后果,另一个参数为 CompletableFuture 执行过程中的异样,Handle 能够返回任意类型的值。能够给 handle 传入自定义函数,依据后果跟执行异样返回最终数据。

public CompletableFuture<T> whenComplete(java.util.function.BiConsumer<? super T, ? super Throwable> action)

whenComplete 接口与 handle 相似,whenComplete 承受一个 BiConsumer 参数,BiConsumer 中的第一个参数为 CompletableFuture 的后果,另一个参数为 CompletableFuture 执行过程中的异样,然而没有返回值。

public CompletableFuture<T> exceptionally(java.util.function.Function<Throwable, ? extends T> fn)

exceptionally 接口只有在执行异样的时候才会被触发,承受一个 Function 参会,Function 只有一个参数为 CompletableFuture 执行过程中的异样,能够有一个任意返回值。

下表是三个接口的比照:

handle() whenComplete() exceptionly()
拜访胜利 Yes Yes No
拜访失败 Yes Yes Yes
能从失败中复原 Yes No Yes
能转换后果从 T 到 U Yes No No
胜利时触发 Yes Yes No
失败时触发 Yes Yes Yes
有异步版本 Yes Yes Yes

咱们应用 handle 接口来解决异样与默认值,上面是封装的一个 handle 接口入参:

public class DefaultValueHandle<R> extends AbstractLogAction<R> implements BiFunction<R, Throwable, R> {public DefaultValueHandle(boolean isNullToDefault, R defaultValue, String methodName, Object... args) {super(methodName, args);
        this.defaultValue = defaultValue;
        this.isNullToDefault = isNullToDefault;
    }
      @Override
    public R apply(R result, Throwable throwable) {logResult(result, throwable);
        if (throwable != null) {return defaultValue;}
        if (result == null && isNullToDefault) {return defaultValue;}
        return result;
    }
}

这个类实现了 handle 接口须要的 BiFunction 类型,在构造方法中有四个参数 boolean isNullToDefault, R defaultValue, String methodName, Object... args 第一个参数是决定执行后果为空值时,是否将咱们传进来的第二个参数作为默认值返回。当异样时也会将第二个参数作为默认返回值。最初两个参数一个是办法名称,一个是调用参数,能够给父类用作日志记录。

与 CompletableFuture 配合应用如下:

CompletableFuture<String> pppName = demoServiceAsyncExecutor.async(service -> service.hello("ppp"))
        .handle(new DefaultValueHandle<>(true, "name", "service.hello", "ppp"));

日志

封装了一个实现 BiConsumer 的 LogErrorAction 类,父类有个抽象类 AbstractLogAction 这个类就是简略应用 logReslut 办法记录日志,能够本人随便实现:

public class LogErrorAction<R> extends AbstractLogAction<R> implements BiConsumer<R, Throwable>{
  @Override
    public void accept(R result, Throwable throwable) {logResult(result, throwable);
    }
}

与 CompletableFuture 配合应用如下:

CompletableFuture<String> pppName = demoServiceAsyncExecutor.async(service -> service.hello("ppp"))
  .whenComplete(new LogErrorAction<>("hello", "ppp")
    );

优化成果

优化前接口均匀响应耗时 350ms,优化后均匀响应耗时 180ms,降落 49% 左右。

最佳实际

  • 禁止嵌套 join,防止“死锁”(线程阻塞)。
  • 多个 CompletableFuture 聚合时倡议应用 allOf。
  • HTTP 应用无阻塞的 Spring webclient,防止自定义线程池线程阻塞。
  • 应用 RPC 或者 HTTP 异步调用生成的 CompletableFuture,后续的 thenAppply,handle 等禁止耗时操作,防止阻塞异步框架线程池。
  • 禁止应用 CompletableFuture 的默认线程池,不同工作自定义线程池,不同级别业务线程池隔离,依据测试状况设置线程数,队列长度,回绝策略。
  • 异步执行的操作都加上超时,CF 超时后不会终止线程中的超时工作,不设置超时可能导致线程长时间阻塞。
  • 倡议应用异样、默认值、空值替换、谬误日志等工具记录信息,不便排查问题。

示例代码

RPC 框架代码:https://github.com/PPPHUANG/r…

异步调用 demo:https://github.com/PPPHUANG/r…

总结

  • 为什么须要异步调用
  • CompletableFuture 根本应用
  • RPC 异步调用
  • HTTP 异步调用
  • 编排 CompletableFuture 进步吞吐量

公众号:DailyHappy 一位后端写码师,一位光明操持制造者。

正文完
 0