共计 4967 个字符,预计需要花费 13 分钟才能阅读完成。
ExecuteLimitFilter
ExecuteLimitFilter,在服务提供者,通过 <dubbo:service /> 的 “executes” 对立配置项开启:
示意每服务的每办法最大可并行执行申请数。
ExecuteLimitFilter 是通过信号量来实现的对服务端的并发数的管制。
ExecuteLimitFilter 执行流程:
- 首先会去取得服务提供者每服务每办法最大可并行执行申请数
- 如果每服务每办法最大可并行执行申请数大于零,那么就基于基于服务 URL + 办法维度获取一个 RpcStatus 实例
- 通过 RpcStatus 实例获取一个信号量,若果获取的这个信号量调用 tryAcquire 返回 false,则抛出异样
- 如果没有抛异样,那么久调用 RpcStatus 静态方法 beginCount,给这个 URL + 办法维度开始计数
- 调用服务
- 调用完结后计数调用 RpcStatus 静态方法 endCount,计数完结
- 开释信号量
ExecuteLimitFilter
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
Semaphore executesLimit = null;
boolean acquireResult = false;
int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);
if (max > 0) {RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
// if (count.getActive() >= max) {
/**
* http://manzhizhen.iteye.com/blog/2386408
* use semaphore for concurrency control (to limit thread number)
*/
executesLimit = count.getSemaphore(max);
if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {throw new RpcException("Failed to invoke method" + invocation.getMethodName() + "in provider" + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");
}
}
long begin = System.currentTimeMillis();
boolean isSuccess = true;
RpcStatus.beginCount(url, methodName);
try {Result result = invoker.invoke(invocation);
return result;
} catch (Throwable t) {
isSuccess = false;
if (t instanceof RuntimeException) {throw (RuntimeException) t;
} else {throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
}
} finally {RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
if(acquireResult) {executesLimit.release();
}
}
}
咱们接下来看看 RpcStatus 这个类
private static final ConcurrentMap<String, ConcurrentMap<String, RpcStatus>> METHOD_STATISTICS = new ConcurrentHashMap<String, ConcurrentMap<String, RpcStatus>>();
public static RpcStatus getStatus(URL url, String methodName) {String uri = url.toIdentityString();
ConcurrentMap<String, RpcStatus> map = METHOD_STATISTICS.get(uri);
if (map == null) {METHOD_STATISTICS.putIfAbsent(uri, new ConcurrentHashMap<String, RpcStatus>());
map = METHOD_STATISTICS.get(uri);
}
RpcStatus status = map.get(methodName);
if (status == null) {map.putIfAbsent(methodName, new RpcStatus());
status = map.get(methodName);
}
return status;
}
这个办法很简略,大略就是给 RpcStatus 这个类外面的动态属性 METHOD_STATISTICS 外面设值。外层的 map 是以 url 为 key,里层的 map 是以办法名为 key。
private volatile int executesPermits;
public Semaphore getSemaphore(int maxThreadNum) {if(maxThreadNum <= 0) {return null;}
if (executesLimit == null || executesPermits != maxThreadNum) {synchronized (this) {if (executesLimit == null || executesPermits != maxThreadNum) {executesLimit = new Semaphore(maxThreadNum);
executesPermits = maxThreadNum;
}
}
}
return executesLimit;
}
这个办法是获取信号量,如果这个实例外面的信号量是空的,那么就增加一个,如果不是空的就返回。
TPSLimiter
TpsLimitFilter 过滤器,用于服务提供者中,提供限流的性能。
配置形式:
- 通过 <dubbo:parameter key=”tps” value=”” /> 配置项,增加到 <dubbo:service /> 或 <dubbo:provider /> 或 <dubbo:protocol /> 中开启,例如:
dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoServiceImpl" protocol="injvm" >
<dubbo:parameter key="tps" value="100" />
</dubbo:service>
- 通过 <dubbo:parameter key=”tps.interval” value=”” /> 配置项,设置 TPS 周期。
源码剖析
TpsLimitFilter
private final TPSLimiter tpsLimiter = new DefaultTPSLimiter();
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {
throw new RpcException(
"Failed to invoke service" +
invoker.getInterface().getName() +
"." +
invocation.getMethodName() +
"because exceed max service tps.");
}
return invoker.invoke(invocation);
}
invoke 办法调用了 DefaultTPSLimiter 的 isAllowable,咱们进入到 isAllowable 办法看一下
DefaultTPSLimiter
private final ConcurrentMap<String, StatItem> stats
= new ConcurrentHashMap<String, StatItem>();
@Override
public boolean isAllowable(URL url, Invocation invocation) {
// 获取 tps 这个参数设置的大小
int rate = url.getParameter(Constants.TPS_LIMIT_RATE_KEY, -1);
// 获取 tps.interval 这个参数设置的大小,默认 60 秒
long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY,
Constants.DEFAULT_TPS_LIMIT_INTERVAL);
String serviceKey = url.getServiceKey();
if (rate > 0) {StatItem statItem = stats.get(serviceKey);
if (statItem == null) {
stats.putIfAbsent(serviceKey,
new StatItem(serviceKey, rate, interval));
statItem = stats.get(serviceKey);
}
return statItem.isAllowable();} else {StatItem statItem = stats.get(serviceKey);
if (statItem != null) {stats.remove(serviceKey);
}
}
return true;
}
若要限流,调用 StatItem#isAllowable(url, invocation) 办法,依据 TPS 限流规定判断是否限度此次调用。
StatItem
private long lastResetTime;
private long interval;
private AtomicInteger token;
private int rate;
public boolean isAllowable() {long now = System.currentTimeMillis();
// 若达到下一个周期,复原可用种子数,设置最初重置工夫。if (now > lastResetTime + interval) {token.set(rate);// 回复可用种子数
lastResetTime = now;// 最初重置工夫
}å
// CAS,直到或失去一个种子,或者没有足够种子
int value = token.get();
boolean flag = false;
while (value > 0 && !flag) {flag = token.compareAndSet(value, value - 1);
value = token.get();}
return flag;
}
关注公众号 Java 技术栈,在后盾回复:面试,能够获取我整顿的 Dubbo 系列面试题和答案。
作者:luozhiyun\
出处:https://www.cnblogs.com/luozh…
近期热文举荐:
1.600+ 道 Java 面试题及答案整顿 (2021 最新版)
2. 终于靠开源我的项目弄到 IntelliJ IDEA 激活码了,真香!
3. 阿里 Mock 工具正式开源,干掉市面上所有 Mock 工具!
4.Spring Cloud 2020.0.0 正式公布,全新颠覆性版本!
5.《Java 开发手册(嵩山版)》最新公布,速速下载!
感觉不错,别忘了顺手点赞 + 转发哦!