序
本文主要研究一下dubbo的TPSLimiter
TPSLimiter
dubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/TPSLimiter.java
public interface TPSLimiter { /** * judge if the current invocation is allowed by TPS rule * * @param url url * @param invocation invocation * @return true allow the current invocation, otherwise, return false */ boolean isAllowable(URL url, Invocation invocation);}
- TPSLimiter定义了isAllowable方法
DefaultTPSLimiter
dubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/DefaultTPSLimiter.java
public class DefaultTPSLimiter implements TPSLimiter { private final ConcurrentMap<String, StatItem> stats = new ConcurrentHashMap<String, StatItem>(); @Override public boolean isAllowable(URL url, Invocation invocation) { int rate = url.getParameter(TPS_LIMIT_RATE_KEY, -1); long interval = url.getParameter(TPS_LIMIT_INTERVAL_KEY, DEFAULT_TPS_LIMIT_INTERVAL); String serviceKey = url.getServiceKey(); if (rate > 0) { StatItem statItem = stats.get(serviceKey); if (statItem == null) { stats.putIfAbsent(serviceKey, new StatItem(serviceKey, rate, interval)); statItem = stats.get(serviceKey); } else { //rate or interval has changed, rebuild if (statItem.getRate() != rate || statItem.getInterval() != interval) { stats.put(serviceKey, new StatItem(serviceKey, rate, interval)); statItem = stats.get(serviceKey); } } return statItem.isAllowable(); } else { StatItem statItem = stats.get(serviceKey); if (statItem != null) { stats.remove(serviceKey); } } return true; }}
- DefaultTPSLimiter实现了TPSLimiter,它使用ConcurrentHashMap来存储StatItem,其key为URL中的serviceKey;isAllowable方法从URL中读取tps参数,默认为-1,小于0则从ConcurrentHashMap中移除,大于0则创建或者获取StatItem,调用StatItem的isAllowable(
重置或递减token并返回结果
)
StatItem
dubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/StatItem.java
class StatItem { private String name; private long lastResetTime; private long interval; private LongAdder token; private int rate; StatItem(String name, int rate, long interval) { this.name = name; this.rate = rate; this.interval = interval; this.lastResetTime = System.currentTimeMillis(); this.token = buildLongAdder(rate); } public boolean isAllowable() { long now = System.currentTimeMillis(); if (now > lastResetTime + interval) { token = buildLongAdder(rate); lastResetTime = now; } if (token.sum() < 0) { return false; } token.decrement(); return true; } public long getInterval() { return interval; } public int getRate() { return rate; } long getLastResetTime() { return lastResetTime; } long getToken() { return token.sum(); } @Override public String toString() { return new StringBuilder(32).append("StatItem ") .append("[name=").append(name).append(", ") .append("rate = ").append(rate).append(", ") .append("interval = ").append(interval).append("]") .toString(); } private LongAdder buildLongAdder(int rate) { LongAdder adder = new LongAdder(); adder.add(rate); return adder; }}
- StatItem定义了LongAdder类型的token,其isAllowable方法会判断是否需要重置token,如果需要则使用buildLongAdder重置token,不需要的话则在
token.sum() < 0
时返回false,如果大于等于0则递减token
小结
- TPSLimiter定义了isAllowable方法
- DefaultTPSLimiter实现了TPSLimiter,它使用ConcurrentHashMap来存储StatItem,其key为URL中的serviceKey;isAllowable方法从URL中读取tps参数,默认为-1,小于0则从ConcurrentHashMap中移除,大于0则创建或者获取StatItem,调用StatItem的isAllowable(
重置或递减token并返回结果
) - StatItem定义了LongAdder类型的token,其isAllowable方法会判断是否需要重置token,如果需要则使用buildLongAdder重置token,不需要的话则在
token.sum() < 0
时返回false,如果大于等于0则递减token
doc
- TPSLimiter
- DefaultTPSLimiter
- StatItem