共计 3158 个字符,预计需要花费 8 分钟才能阅读完成。
序
本文主要研究一下 dubbo 的 TPSLimiter
TPSLimiter
dubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/TPSLimiter.java
public interface TPSLimiter {
/**
* judge if the current invocation is allowed by TPS rule
*
* @param url url
* @param invocation invocation
* @return true allow the current invocation, otherwise, return false
*/
boolean isAllowable(URL url, Invocation invocation);
}
- TPSLimiter 定义了 isAllowable 方法
DefaultTPSLimiter
dubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/DefaultTPSLimiter.java
public class DefaultTPSLimiter implements TPSLimiter {private final ConcurrentMap<String, StatItem> stats = new ConcurrentHashMap<String, StatItem>();
@Override
public boolean isAllowable(URL url, Invocation invocation) {int rate = url.getParameter(TPS_LIMIT_RATE_KEY, -1);
long interval = url.getParameter(TPS_LIMIT_INTERVAL_KEY, DEFAULT_TPS_LIMIT_INTERVAL);
String serviceKey = url.getServiceKey();
if (rate > 0) {StatItem statItem = stats.get(serviceKey);
if (statItem == null) {stats.putIfAbsent(serviceKey, new StatItem(serviceKey, rate, interval));
statItem = stats.get(serviceKey);
} else {
//rate or interval has changed, rebuild
if (statItem.getRate() != rate || statItem.getInterval() != interval) {stats.put(serviceKey, new StatItem(serviceKey, rate, interval));
statItem = stats.get(serviceKey);
}
}
return statItem.isAllowable();} else {StatItem statItem = stats.get(serviceKey);
if (statItem != null) {stats.remove(serviceKey);
}
}
return true;
}
}
- DefaultTPSLimiter 实现了 TPSLimiter,它使用 ConcurrentHashMap 来存储 StatItem,其 key 为 URL 中的 serviceKey;isAllowable 方法从 URL 中读取 tps 参数,默认为 -1,小于 0 则从 ConcurrentHashMap 中移除,大于 0 则创建或者获取 StatItem,调用 StatItem 的 isAllowable(
重置或递减 token 并返回结果
)
StatItem
dubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/StatItem.java
class StatItem {
private String name;
private long lastResetTime;
private long interval;
private LongAdder token;
private int rate;
StatItem(String name, int rate, long interval) {
this.name = name;
this.rate = rate;
this.interval = interval;
this.lastResetTime = System.currentTimeMillis();
this.token = buildLongAdder(rate);
}
public boolean isAllowable() {long now = System.currentTimeMillis();
if (now > lastResetTime + interval) {token = buildLongAdder(rate);
lastResetTime = now;
}
if (token.sum() < 0) {return false;}
token.decrement();
return true;
}
public long getInterval() {return interval;}
public int getRate() {return rate;}
long getLastResetTime() {return lastResetTime;}
long getToken() {return token.sum();
}
@Override
public String toString() {return new StringBuilder(32).append("StatItem")
.append("[name=").append(name).append(",")
.append("rate =").append(rate).append(",")
.append("interval =").append(interval).append("]")
.toString();}
private LongAdder buildLongAdder(int rate) {LongAdder adder = new LongAdder();
adder.add(rate);
return adder;
}
}
- StatItem 定义了 LongAdder 类型的 token,其 isAllowable 方法会判断是否需要重置 token,如果需要则使用 buildLongAdder 重置 token,不需要的话则在
token.sum() < 0
时返回 false,如果大于等于 0 则递减 token
小结
- TPSLimiter 定义了 isAllowable 方法
- DefaultTPSLimiter 实现了 TPSLimiter,它使用 ConcurrentHashMap 来存储 StatItem,其 key 为 URL 中的 serviceKey;isAllowable 方法从 URL 中读取 tps 参数,默认为 -1,小于 0 则从 ConcurrentHashMap 中移除,大于 0 则创建或者获取 StatItem,调用 StatItem 的 isAllowable(
重置或递减 token 并返回结果
) - StatItem 定义了 LongAdder 类型的 token,其 isAllowable 方法会判断是否需要重置 token,如果需要则使用 buildLongAdder 重置 token,不需要的话则在
token.sum() < 0
时返回 false,如果大于等于 0 则递减 token
doc
- TPSLimiter
- DefaultTPSLimiter
- StatItem
正文完