关于后端:一劳永逸的优化并发RPC调用小工具

2次阅读

共计 7184 个字符,预计需要花费 18 分钟才能阅读完成。

前言

零碎的性能优化是每一个程序员的必经之路,但也可能是走过的最深的套路。它不仅须要对各种工具的深刻理解,有时还须要联合具体的业务场景得出定制化的优化计划。当然,你也能够在代码中轻轻藏上一个 Thread.sleep,在须要优化的时候少睡几毫秒(手动狗头)。性能优化这个课题切实是太浩瀚了,以至于目前市面上没有一本优质的书可能全面的总结这个课题。不仅如此,即便是深刻到各个细分畛域上,性能优化的伎俩也十分丰盛,令人目迷五色。

本文也不会涵盖所有的优化套路,仅就最近我的项目开发过程中遇到的并发调用这一个场景给出本人的通用计划。大家能够间接打包或是复制粘贴到我的项目中应用。也欢送大家给出更多的意见还有优化场景。

背景

不知大家在开发过程中是否遇到这样的一个场景,咱们会先去调用服务 A,而后调用服务 B,组装一下数据之后再去调用一下服务 C(如果你在微服务零碎的开发中没有遇到这样的场景,我想说,要么你们零碎的拆分粒度太粗,要么这一个侥幸无上游服务依赖的底层零碎~)

这条链路的耗时就是 duration(A) + duration(B) + duration(C) + 其它操作 。从教训来看,大部分的耗时都来自于上游服务的解决耗时和网络 IO,利用外部的 CPU 操作的耗时相比而言根本能够忽略不计。然而,当咱们得悉对服务 A 和 B 的调用之间是无依赖的时候,是否能够通过同时并发调用 A 和 B 来缩小同步调用的期待耗时,这样现实状况下链路的耗时就能够优化成 max(duration(A),duration(B)) + duration(C) + 其它操作

再举一个例子,有时咱们可能须要批量调用上游服务,比方批量查问用户的信息。上游查问接口出于服务爱护往往会对单次能够查问的数量进行束缚,比方一次只能查一百条用户的信息。因而咱们须要多申请拆分屡次进行查问,于是耗时变成了 n*duration(A) + 其它操作 。同样,用并发申请的优化形式,现实状况下耗时能够降到 max(duration(A)) + 其它操作

这两种场景的代码实现根本相似,本文将会提供第二种场景的思路和残缺实现。

小试牛刀

并发 RPC 调用的整体实现类图如下:

首先咱们须要创立一个线程池用于并发执行。因为程序中通常还有别的应用线程池的场景,而咱们心愿 RPC 调用可能应用一个独自的线程池,因而这里用工厂办法进行了封装。

@Configuration
public class ThreadPoolExecutorFactory {

    @Resource
    private Map<String, AsyncTaskExecutor> executorMap;

    /**
    * 默认的线程池
    */
    @Bean(name = ThreadPoolName.DEFAULT_EXECUTOR)
    public AsyncTaskExecutor baseExecutorService() {
        // 后续反对各个服务定制化这部分参数
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        // 设置线程池参数信息
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(50);
        taskExecutor.setQueueCapacity(200);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setThreadNamePrefix(ThreadPoolName.DEFAULT_EXECUTOR + "--");
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        taskExecutor.setDaemon(Boolean.TRUE);
        // 批改回绝策略为应用以后线程执行
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 初始化线程池
        taskExecutor.initialize();

        return taskExecutor;
    }

    /**
    * 并发调用独自的线程池
    */
    @Bean(name = ThreadPoolName.RPC_EXECUTOR)
    public AsyncTaskExecutor rpcExecutorService() {
        // 后续反对各个服务定制化这部分参数
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        // 设置线程池参数信息
        taskExecutor.setCorePoolSize(20);
        taskExecutor.setMaxPoolSize(100);
        taskExecutor.setQueueCapacity(200);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setThreadNamePrefix(ThreadPoolName.RPC_EXECUTOR + "--");
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        taskExecutor.setDaemon(Boolean.TRUE);
        // 批改回绝策略为应用以后线程执行
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 初始化线程池
        taskExecutor.initialize();

        return taskExecutor;
    }
    /**
     * 依据线程池名称获取线程池
     * 若找不到对应线程池,则抛出异样
     * @param name 线程池名称
     * @return 线程池
     * @throws RuntimeException 若找不到该名称的线程池
     */
    public AsyncTaskExecutor fetchAsyncTaskExecutor(String name) {AsyncTaskExecutor executor = executorMap.get(name);
        if (executor == null) {throw new RuntimeException("no executor name" + name);
        }
        return executor;
    }
}

public class ThreadPoolName {

    /**
     * 默认线程池
     */
    public static final String DEFAULT_EXECUTOR = "defaultExecutor";

    /**
     * 并发调用应用的线程池
     */
    public static final String RPC_EXECUTOR = "rpcExecutor";
}

如代码所示,咱们申明了两个 Spring 的线程池 AsyncTaskExecutor,别离是默认的线程池和 RPC 调用的线程池,并将它们装载到 map 中。调用方能够应用 fetchAsyncTaskExecutor 办法并传入线程池的名称来指定线程池执行。这里还有一个细节,Rpc 线程池的线程数要显著大于另一个线程池,是因为 Rpc 调用不是 CPU 密集型逻辑,往往随同着大量的期待。因而减少线程数量能够无效进步并发效率。

@Component
public class TracedExecutorService {

    @Resource
    private ThreadPoolExecutorFactory threadPoolExecutorFactory;


    /**
     * 指定线程池提交异步工作,并取得工作上下文
     * @param executorName 线程池名称
     * @param tracedCallable 异步工作
     * @param <T> 返回类型
     * @return 线程上下文
     */
    public <T> Future<T> submit(String executorName, Callable<T> tracedCallable) {return threadPoolExecutorFactory.fetchAsyncTaskExecutor(executorName).submit(tracedCallable);
    }
}

submit 办法封装了获取线程池和提交异步工作的逻辑。这里采纳 Callable+Future 的组合来获取异步线程的执行后果。

线程池准备就绪,接着咱们就须要申明一个接口用于提交并发调用服务:

public interface BatchOperateService {

    /**
     * 并发批量操作
     * @param function 执行的逻辑
     * @param requests 申请
     * @param config 配置
     * @return 全副响应
     */
    <T, R> List<R> batchOperate(Function<T, R> function, List<T> requests, BatchOperateConfig config);
}

@Data
public class BatchOperateConfig {

    /**
     * 超时工夫
     */
    private Long timeout;

    /**
     * 超时工夫单位
     */
    private TimeUnit timeoutUnit;

    /**
     * 是否须要全副执行胜利
     */
    private Boolean needAllSuccess;

}

batchOperate 办法中传入了 function 对象,这是须要并发执行的代码逻辑。requests 则是所有的申请,并发调用会递归这些申请并提交到异步线程。config 对象则能够对这次并发调用做一些配置,比方并发查问的超时工夫,以及如果局部调用异样时整个批量查问是否继续执行。

接下来看一看实现类:

@Service
@Slf4j
public class BatchOperateServiceImpl implements BatchOperateService{

    @Resource
    private TracedExecutorService tracedExecutorService;

    @Override
    public <T, R> List<R> batchOperate(Function<T, R> function, List<T> requests, BatchOperateConfig config) {log.info("batchOperate start function:{} request:{} config:{}", function, JSON.toJSONString(requests), JSON.toJSONString(config));

        // 以后工夫
        long startTime = System.currentTimeMillis();

        // 初始化
        int numberOfRequests = CollectionUtils.size(requests);

        // 所有异步线程执行后果
        List<Future<R>> futures = Lists.newArrayListWithExpectedSize(numberOfRequests);
        // 应用 countDownLatch 进行并发调用治理
        CountDownLatch countDownLatch = new CountDownLatch(numberOfRequests);
        List<BatchOperateCallable<T, R>> callables = Lists.newArrayListWithExpectedSize(numberOfRequests);

        // 别离提交异步线程执行
        for (T request : requests) {BatchOperateCallable<T, R> batchOperateCallable = new BatchOperateCallable<>(countDownLatch, function, request);
            callables.add(batchOperateCallable);

            // 提交异步线程执行
            Future<R> future = tracedExecutorService.submit(ThreadPoolName.RPC_EXECUTOR, batchOperateCallable);
            futures.add(future);
        }

        try {
            // 期待全副执行实现,如果超时且要求全副调用胜利,则抛出异样
            boolean allFinish = countDownLatch.await(config.getTimeout(), config.getTimeoutUnit());
            if (!allFinish && config.getNeedAllSuccess()) {throw new RuntimeException("batchOperate timeout and need all success");
            }
            // 遍历执行后果,如果有的执行失败且要求全副调用胜利,则抛出异样
            boolean allSuccess = callables.stream().map(BatchOperateCallable::isSuccess).allMatch(BooleanUtils::isTrue);
            if (!allSuccess && config.getNeedAllSuccess()) {throw new RuntimeException("some batchOperate have failed and need all success");
            }

            // 获取所有异步调用后果并返回
            List<R> result = Lists.newArrayList();
            for (Future<R> future : futures) {R r = future.get();
                if (Objects.nonNull(r)) {result.add(r);
                }
            }
            return result;
        } catch (Exception e) {throw new RuntimeException(e.getMessage());
        } finally {double duration = (System.currentTimeMillis() - startTime) / 1000.0;
            log.info("batchOperate finish duration:{}s function:{} request:{} config:{}", duration, function, JSON.toJSONString(requests), JSON.toJSONString(config));

        }
    }
}

通常咱们提交给线程池后间接遍历 Future 并期待获取后果就好了。然而这里咱们用 CountDownLatch 来做更加对立的超时治理。能够看一下 BatchOperateCallable 的实现:

public class BatchOperateCallable<T, R> implements Callable<R> {

    private final CountDownLatch countDownLatch;

    private final Function<T, R> function;

    private final T request;

    /**
     * 该线程解决是否胜利
     */
    private boolean success;

    public BatchOperateCallable(CountDownLatch countDownLatch, Function<T, R> function, T request) {
        this.countDownLatch = countDownLatch;
        this.function = function;
        this.request = request;
    }

    @Override
    public R call() {
        try {
            success = false;
            R result = function.apply(request);
            success = true;
            return result;
        } finally {countDownLatch.countDown();
        }
    }

    public boolean isSuccess() {return success;}
}

无论调用时胜利还是异样,咱们都会在完结后将计数器减一。当计数器被减到 0 时,则代表所有并发调用执行实现。否则如果在规定工夫内计数器没有归零,则代表并发调用超时,此时会抛出异样。

潜在问题

并发调用的一个问题在于咱们放大了拜访上游接口的流量,极其状况下甚至放大了成千盈百倍。如果上游服务并没有做限流等防御性措施,咱们极有可能将上游服务打挂(这种起因导致的故障不足为奇)。因而须要对整个并发调用做流量管制。流量管制的办法有两种,一种是如果微服务采纳 mesh 的模式,则能够在 sidecar 中配置 RPC 调用的 QPS,从而做到全局的管控对上游服务的拜访(这里抉择单机限流还是集群限流取决于 sidecar 是否反对的模式以及服务的流量大小。通常来说均匀流量较小则倡议抉择单机限流,因为集群限流的波动性往往比单机限流要高,流量过小会造成误判)。如果没有开启 mesh,则须要在代码中本人实现限流器,这里举荐 Guava 的 RateLimiter 类,然而它只反对单机限流,如果要想实现集群限流,则计划的复杂度还会进一步晋升

小结

将我的项目开发中遇到的场景进行形象并尽可能的给出通用的解决方案是咱们每一个开发者自我的重要形式,也是进步代码复用性和稳定性的利器。并发 Rpc 调用是一个常见解决思路,心愿本文的实现能够对你有帮忙。

正文完
 0