前言
大家好,明天开始给大家分享 — Dubbo 专题之 Dubbo 并发管制。在前一个章节中咱们介绍了 Dubbo 负载平衡,Dubbo 为咱们提供四种负载平衡算法别离是:加权随机算法、加权轮询算法、起码沉闷调用数算法、一致性 Hash 算法。同时咱们也例举了常见的应用场景并且进行了源码解析来剖析其实现原理。有的小伙伴学习了负载平衡算法后可能会想:当咱们有很多的生产线程时,如果服务提供端只有多数的实例,那么会不会把咱们的服务提供端线程生产殆尽呢?或者超出了咱们的业务解决线程池最大接管申请数又会产生什么呢?带着这些疑难咱们开始本章节学习,咱们会通过介绍什么是并发?怎么管制并发?Dubbo 中是怎么来解决这些问题。上面就让咱们疾速开始吧!
1. 并发管制简介
首先咱们得了解什么是 并发
,这里有另外一个概念 并行
。上面是来自百科的解释:并发和并行是即类似又有区别的两个概念,并行是指两个或者多个事件在同一时刻产生;而并发是指两个或多个事件在同一时间距离内产生。在多道程序环境下,并发性是指在一段时间内宏观上有多个程序在同时运行,但在单处理器零碎中,每一时刻却仅能有一道程序执行,故宏观上这些程序只能是分时地交替执行。假使在计算机系统中有多个处理机,则这些能够并发执行的程序便可被调配到多个处理机上,实现并行执行即利用每个处理机来解决一个可并发执行的程序,这样多个程序便能够同时执行。上面通过示例图进行阐明:
在上图中咱们能够看到单核解决在多线程并发执行工作时,同一时刻只有一个线程在执行,在 CPU 工夫片切换的时候会调度到其余线程进行执行这就叫做并发。同理当在多核处理器上多个线程同时执行且在不同 CPU 上的时,这就叫做并行执行,每一个线程都在一个 CPU 上执行且线程间互不影响。
2. 并发管制形式
在 Dubbo 中提供了两大类配置别离是:生产端管制配置、服务提供端管制配置。
2.1 服务端管制配置
- 限定服务的每个办法
<dubbo:service interface="com.muke.dubbocourse.common.api.BookFacade" executes="10" />
- 限定服务的某个办法
<dubbo:service interface="com.muke.dubbocourse.common.api.BookFacade">
<dubbo:method name="queryAll" executes="10" />
</dubbo:service>
2.2 生产端配置
1. 限定服务的所有办法
<dubbo:service interface="com.muke.dubbocourse.common.api.BookFacade" actives="10" />
或者
<dubbo:service interface="com.muke.dubbocourse.common.api.BookFacade">
<dubbo:method name="queryAll" actives="10" />
</dubbo:service>
-
限定服务的某个办法
<dubbo:reference interface="com.muke.dubbocourse.common.api.BookFacade" actives="10" />
或者
<dubbo:reference interface="com.muke.dubbocourse.common.api.BookFacade">
<dubbo:method name="queryAll" actives="10" />
</dubbo:service>
如果 <dubbo:service>
和 <dubbo:reference>
都配了actives
,<dubbo:reference>
优先,参见:笼罩策略参考。
咱们从下面的配置能够看出服务提供者端通过 executes
配置而生产端通过 actives
配置。
3. 应用场景
并发管制在咱们日常的工作中个别状况咱们很少会去间接配置,咱们个别是自定义业务解决线程池大小。在 Dubbo 中当接管到服务申请后会把申请转发到业务解决线程池去解决,所以接管申请的线程 IO 瓶颈不大。我能想到的应用场景如下:
- 机器性能差距大:假如咱们在一个服务集群中,其中一台服务器性能较差那么咱们能够手动配置这台服务的所能承受的最大申请并发数,防止申请失败。
- 压测服务器性能:比方咱们有几台机器须要进行性能评估,那么咱们可能须要跑压力测试,这时咱们能够把所有服务提供者的最大并发数设置为统一。这样咱们就能够在排除程序自身的因素来评估机器的性能。
4. 示例演示
咱们以获取图书列表为例来进行演示。我的项目构造如下:
这里咱们次要更改了服务提供者的 XML 配置文件dubbo-provider-xml.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
<dubbo:application name="demo-provider" metadata-type="remote"/>
<dubbo:registry address="zookeeper://127.0.0.1:2181"/>
<bean id="bookFacade" class="com.muke.dubbocourse.concurrent.provider.BookFacadeImpl"/>
<!-- 裸露本地服务为 Dubbo 服务,executes="10" 示意限度每个办法的并发数为 10-->
<dubbo:service interface="com.muke.dubbocourse.common.api.BookFacade" executes="10" ref="bookFacade" />
</beans>
下面次要减少了 executes="10"
配置,对服务 BookFacade
所有的办法进行最大并发数限度。
上面咱们看看生产端代码:
public static void main(String[] args) throws IOException {ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("concurrent/consumer/spring/dubbo-consumer-xml.xml");
context.start();
BookFacade bookFacade = context.getBean(BookFacade.class);
// 循环启动 30 个线程进行并发拜访
for (int i = 0; i < 30; i++) {
final int index = i;
// 开启线程
new Thread(()->{List<Book> books = bookFacade.queryAll();
System.out.println("The invoker"+index+"result is"+ books);
}).start();}
System.in.read();
//context.close();}
同时咱们须要设置下 queryAll
办法的执行工夫略微长一些这样能力看到演示成果,失常状况下咱们会看到如下谬误:
cause: The service using threads greater than <dubbo:service executes="10" /> limited.
提醒得非常明显,就是说咱们的服务最大并发为设置为 10。
5. 实现原理
在解说原理之前假如这个让咱们本人来实现的话咱们该怎么实现呢?我想小伙伴可能都会想到这里管制并发数量无非就是对调用办法或服务进行一个全局的计数统计,如果达到了阀值就开始执行限度。那咱们就来看看 Dubbo 中是怎么实现。在 Dubbo 中执行这个逻辑的类是 ActiveLimitFilter
其外围代码如下:
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {URL url = invoker.getUrl();
// 执行的办法名称
String methodName = invocation.getMethodName();
// 获取配置的并发参数配置
int max = invoker.getUrl().getMethodParameter(methodName, ACTIVES_KEY, 0);
final RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
// 开始计数
if (!RpcStatus.beginCount(url, methodName, max)) {
// 计数失败,获取超时工夫
long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), TIMEOUT_KEY, 0);
// 记录开始工夫
long start = System.currentTimeMillis();
long remain = timeout;
synchronized (rpcStatus) {
// 再次尝试计数
while (!RpcStatus.beginCount(url, methodName, max)) {
try {
// 计数失败 阻塞期待 期待接管到 onMessage、onError 办法回调开释 rpcStatus 的阻塞
rpcStatus.wait(remain);
} catch (InterruptedException e) {// ignore}
long elapsed = System.currentTimeMillis() - start;
remain = timeout - elapsed;
// 期待超时
if (remain <= 0) {
throw new RpcException(RpcException.LIMIT_EXCEEDED_EXCEPTION,
"Waiting concurrent invoke timeout in client-side for service:" +
invoker.getInterface().getName() + ", method:" + invocation.getMethodName() +
", elapsed:" + elapsed + ", timeout:" + timeout + ". concurrent invokes:" +
rpcStatus.getActive() + ". max concurrent invoke limit:" + max);
}
}
}
}
invocation.put(ACTIVELIMIT_FILTER_START_TIME, System.currentTimeMillis());
// 调用服务
return invoker.invoke(invocation);
}
在下面的代码中咱们能够看到在调用服务前对以后调用的办法进行计数,如果计数失败会阻塞期待指定的超时工夫,计数胜利则调用近程服务。
6. 小结
在本大节中咱们次要学习了 Dubbo 并发管制以及并发和并行的区别,同时也剖析了并发管制实现的原理,其本质上是通过对调用的办法或服务进行利用级别的计数统计,当达到阀值就限度拜访。
本节课程的重点如下:
- 了解 Dubbo 并发管制
- 理解了并发管制的应用形式
- 理解延并发管制应用场景
- 理解并发管制实现原理
作者
集体从事金融行业,就任过易极付、思建科技、某网约车平台等重庆一流技术团队,目前就任于某银行负责对立领取零碎建设。本身对金融行业有强烈的喜好。同时也实际大数据、数据存储、自动化集成和部署、散布式微服务、响应式编程、人工智能等畛域。同时也热衷于技术分享创建公众号和博客站点对常识体系进行分享。关注公众号:青年 IT 男 获取最新技术文章推送!
博客地址: http://youngitman.tech
微信公众号: