前言

零碎的性能优化是每一个程序员的必经之路,但也可能是走过的最深的套路。它不仅须要对各种工具的深刻理解,有时还须要联合具体的业务场景得出定制化的优化计划。当然,你也能够在代码中轻轻藏上一个Thread.sleep,在须要优化的时候少睡几毫秒(手动狗头)。性能优化这个课题切实是太浩瀚了,以至于目前市面上没有一本优质的书可能全面的总结这个课题。不仅如此,即便是深刻到各个细分畛域上,性能优化的伎俩也十分丰盛,令人目迷五色。

本文也不会涵盖所有的优化套路,仅就最近我的项目开发过程中遇到的并发调用这一个场景给出本人的通用计划。大家能够间接打包或是复制粘贴到我的项目中应用。也欢送大家给出更多的意见还有优化场景。

背景

不知大家在开发过程中是否遇到这样的一个场景,咱们会先去调用服务A,而后调用服务B,组装一下数据之后再去调用一下服务C(如果你在微服务零碎的开发中没有遇到这样的场景,我想说,要么你们零碎的拆分粒度太粗,要么这一个侥幸无上游服务依赖的底层零碎~)

这条链路的耗时就是 duration(A) + duration(B) + duration(C) + 其它操作。从教训来看,大部分的耗时都来自于上游服务的解决耗时和网络IO,利用外部的CPU操作的耗时相比而言根本能够忽略不计。然而,当咱们得悉对服务A和B的调用之间是无依赖的时候,是否能够通过同时并发调用A和B来缩小同步调用的期待耗时,这样现实状况下链路的耗时就能够优化成 max(duration(A),duration(B)) + duration(C) + 其它操作

再举一个例子,有时咱们可能须要批量调用上游服务,比方批量查问用户的信息。上游查问接口出于服务爱护往往会对单次能够查问的数量进行束缚,比方一次只能查一百条用户的信息。因而咱们须要多申请拆分屡次进行查问,于是耗时变成了 n*duration(A) + 其它操作。同样,用并发申请的优化形式,现实状况下耗时能够降到 max(duration(A)) + 其它操作

这两种场景的代码实现根本相似,本文将会提供第二种场景的思路和残缺实现。

小试牛刀

并发RPC调用的整体实现类图如下:

首先咱们须要创立一个线程池用于并发执行。因为程序中通常还有别的应用线程池的场景,而咱们心愿RPC调用可能应用一个独自的线程池,因而这里用工厂办法进行了封装。

@Configurationpublic class ThreadPoolExecutorFactory {    @Resource    private Map<String, AsyncTaskExecutor> executorMap;    /**    * 默认的线程池    */    @Bean(name = ThreadPoolName.DEFAULT_EXECUTOR)    public AsyncTaskExecutor baseExecutorService() {        //后续反对各个服务定制化这部分参数        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();        //设置线程池参数信息        taskExecutor.setCorePoolSize(10);        taskExecutor.setMaxPoolSize(50);        taskExecutor.setQueueCapacity(200);        taskExecutor.setKeepAliveSeconds(60);        taskExecutor.setThreadNamePrefix(ThreadPoolName.DEFAULT_EXECUTOR + "--");        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);        taskExecutor.setAwaitTerminationSeconds(60);        taskExecutor.setDaemon(Boolean.TRUE);        //批改回绝策略为应用以后线程执行        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());        //初始化线程池        taskExecutor.initialize();        return taskExecutor;    }    /**    * 并发调用独自的线程池    */    @Bean(name = ThreadPoolName.RPC_EXECUTOR)    public AsyncTaskExecutor rpcExecutorService() {        //后续反对各个服务定制化这部分参数        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();        //设置线程池参数信息        taskExecutor.setCorePoolSize(20);        taskExecutor.setMaxPoolSize(100);        taskExecutor.setQueueCapacity(200);        taskExecutor.setKeepAliveSeconds(60);        taskExecutor.setThreadNamePrefix(ThreadPoolName.RPC_EXECUTOR + "--");        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);        taskExecutor.setAwaitTerminationSeconds(60);        taskExecutor.setDaemon(Boolean.TRUE);        //批改回绝策略为应用以后线程执行        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());        //初始化线程池        taskExecutor.initialize();        return taskExecutor;    }    /**     * 依据线程池名称获取线程池     * 若找不到对应线程池,则抛出异样     * @param name 线程池名称     * @return 线程池     * @throws RuntimeException 若找不到该名称的线程池     */    public AsyncTaskExecutor fetchAsyncTaskExecutor(String name) {        AsyncTaskExecutor executor = executorMap.get(name);        if (executor == null) {            throw new RuntimeException("no executor name " + name);        }        return executor;    }}public class ThreadPoolName {    /**     * 默认线程池     */    public static final String DEFAULT_EXECUTOR = "defaultExecutor";    /**     * 并发调用应用的线程池     */    public static final String RPC_EXECUTOR = "rpcExecutor";}

如代码所示,咱们申明了两个Spring的线程池AsyncTaskExecutor,别离是默认的线程池和RPC调用的线程池,并将它们装载到map中。调用方能够应用fetchAsyncTaskExecutor办法并传入线程池的名称来指定线程池执行。这里还有一个细节,Rpc线程池的线程数要显著大于另一个线程池,是因为Rpc调用不是CPU密集型逻辑,往往随同着大量的期待。因而减少线程数量能够无效进步并发效率。

@Componentpublic class TracedExecutorService {    @Resource    private ThreadPoolExecutorFactory threadPoolExecutorFactory;    /**     * 指定线程池提交异步工作,并取得工作上下文     * @param executorName 线程池名称     * @param tracedCallable 异步工作     * @param <T> 返回类型     * @return 线程上下文     */    public <T> Future<T> submit(String executorName, Callable<T> tracedCallable) {        return threadPoolExecutorFactory.fetchAsyncTaskExecutor(executorName).submit(tracedCallable);    }}

submit办法封装了获取线程池和提交异步工作的逻辑。这里采纳Callable+Future的组合来获取异步线程的执行后果。

线程池准备就绪,接着咱们就须要申明一个接口用于提交并发调用服务:

public interface BatchOperateService {    /**     * 并发批量操作     * @param function 执行的逻辑     * @param requests 申请     * @param config 配置     * @return 全副响应     */    <T, R> List<R> batchOperate(Function<T, R> function, List<T> requests, BatchOperateConfig config);}@Datapublic class BatchOperateConfig {    /**     * 超时工夫     */    private Long timeout;    /**     * 超时工夫单位     */    private TimeUnit timeoutUnit;    /**     * 是否须要全副执行胜利     */    private Boolean needAllSuccess;}

batchOperate办法中传入了function对象,这是须要并发执行的代码逻辑。requests则是所有的申请,并发调用会递归这些申请并提交到异步线程。config对象则能够对这次并发调用做一些配置,比方并发查问的超时工夫,以及如果局部调用异样时整个批量查问是否继续执行。

接下来看一看实现类:

@Service@Slf4jpublic class BatchOperateServiceImpl implements BatchOperateService{    @Resource    private TracedExecutorService tracedExecutorService;    @Override    public <T, R> List<R> batchOperate(Function<T, R> function, List<T> requests, BatchOperateConfig config) {        log.info("batchOperate start function:{} request:{} config:{}", function, JSON.toJSONString(requests), JSON.toJSONString(config));        // 以后工夫        long startTime = System.currentTimeMillis();        // 初始化        int numberOfRequests = CollectionUtils.size(requests);        // 所有异步线程执行后果        List<Future<R>> futures = Lists.newArrayListWithExpectedSize(numberOfRequests);        // 应用countDownLatch进行并发调用治理        CountDownLatch countDownLatch = new CountDownLatch(numberOfRequests);        List<BatchOperateCallable<T, R>> callables = Lists.newArrayListWithExpectedSize(numberOfRequests);        // 别离提交异步线程执行        for (T request : requests) {            BatchOperateCallable<T, R> batchOperateCallable = new BatchOperateCallable<>(countDownLatch, function, request);            callables.add(batchOperateCallable);            // 提交异步线程执行            Future<R> future = tracedExecutorService.submit(ThreadPoolName.RPC_EXECUTOR, batchOperateCallable);            futures.add(future);        }        try {            // 期待全副执行实现,如果超时且要求全副调用胜利,则抛出异样            boolean allFinish = countDownLatch.await(config.getTimeout(), config.getTimeoutUnit());            if (!allFinish && config.getNeedAllSuccess()) {                throw new RuntimeException("batchOperate timeout and need all success");            }            // 遍历执行后果,如果有的执行失败且要求全副调用胜利,则抛出异样            boolean allSuccess = callables.stream().map(BatchOperateCallable::isSuccess).allMatch(BooleanUtils::isTrue);            if (!allSuccess && config.getNeedAllSuccess()) {                throw new RuntimeException("some batchOperate have failed and need all success");            }            // 获取所有异步调用后果并返回            List<R> result = Lists.newArrayList();            for (Future<R> future : futures) {                R r = future.get();                if (Objects.nonNull(r)) {                    result.add(r);                }            }            return result;        } catch (Exception e) {            throw new RuntimeException(e.getMessage());        } finally {            double duration = (System.currentTimeMillis() - startTime) / 1000.0;            log.info("batchOperate finish duration:{}s function:{} request:{} config:{}", duration, function, JSON.toJSONString(requests), JSON.toJSONString(config));        }    }}

通常咱们提交给线程池后间接遍历Future并期待获取后果就好了。然而这里咱们用CountDownLatch来做更加对立的超时治理。能够看一下BatchOperateCallable的实现:

public class BatchOperateCallable<T, R> implements Callable<R> {    private final CountDownLatch countDownLatch;    private final Function<T, R> function;    private final T request;    /**     * 该线程解决是否胜利     */    private boolean success;    public BatchOperateCallable(CountDownLatch countDownLatch, Function<T, R> function, T request) {        this.countDownLatch = countDownLatch;        this.function = function;        this.request = request;    }    @Override    public R call() {        try {            success = false;            R result = function.apply(request);            success = true;            return result;        } finally {            countDownLatch.countDown();        }    }    public boolean isSuccess() {        return success;    }}

无论调用时胜利还是异样,咱们都会在完结后将计数器减一。当计数器被减到0时,则代表所有并发调用执行实现。否则如果在规定工夫内计数器没有归零,则代表并发调用超时,此时会抛出异样。

潜在问题

并发调用的一个问题在于咱们放大了拜访上游接口的流量,极其状况下甚至放大了成千盈百倍。如果上游服务并没有做限流等防御性措施,咱们极有可能将上游服务打挂(这种起因导致的故障不足为奇)。因而须要对整个并发调用做流量管制。流量管制的办法有两种,一种是如果微服务采纳mesh的模式,则能够在sidecar中配置RPC调用的QPS,从而做到全局的管控对上游服务的拜访(这里抉择单机限流还是集群限流取决于sidecar是否反对的模式以及服务的流量大小。通常来说均匀流量较小则倡议抉择单机限流,因为集群限流的波动性往往比单机限流要高,流量过小会造成误判)。如果没有开启mesh,则须要在代码中本人实现限流器,这里举荐Guava的RateLimiter类,然而它只反对单机限流,如果要想实现集群限流,则计划的复杂度还会进一步晋升

小结

将我的项目开发中遇到的场景进行形象并尽可能的给出通用的解决方案是咱们每一个开发者自我的重要形式,也是进步代码复用性和稳定性的利器。并发Rpc调用是一个常见解决思路,心愿本文的实现能够对你有帮忙。