本文主要研究一下dubbo的ExecuteLimitFilter

ExecuteLimitFilter

dubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ExecuteLimitFilter.java

public class ExecuteLimitFilter extends ListenableFilter {    private static final String EXECUTELIMIT_FILTER_START_TIME = "execugtelimit_filter_start_time";    public ExecuteLimitFilter() {        super.listener = new ExecuteLimitListener();    }    @Override    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {        URL url = invoker.getUrl();        String methodName = invocation.getMethodName();        int max = url.getMethodParameter(methodName, EXECUTES_KEY, 0);        if (!RpcStatus.beginCount(url, methodName, max)) {            throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " +                    url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max +                    "\" /> limited.");        }        invocation.setAttachment(EXECUTELIMIT_FILTER_START_TIME, String.valueOf(System.currentTimeMillis()));        try {            return invoker.invoke(invocation);        } catch (Throwable t) {            if (t instanceof RuntimeException) {                throw (RuntimeException) t;            } else {                throw new RpcException("unexpected exception when ExecuteLimitFilter", t);            }        }    }    static class ExecuteLimitListener implements Listener {        @Override        public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {            RpcStatus.endCount(invoker.getUrl(), invocation.getMethodName(), getElapsed(invocation), true);        }        @Override        public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {            RpcStatus.endCount(invoker.getUrl(), invocation.getMethodName(), getElapsed(invocation), false);        }        private long getElapsed(Invocation invocation) {            String beginTime = invocation.getAttachment(EXECUTELIMIT_FILTER_START_TIME);            return StringUtils.isNotEmpty(beginTime) ? System.currentTimeMillis() - Long.parseLong(beginTime) : 0;        }    }}
  • ExecuteLimitFilter继承了ListenableFilter,其构造器初始化的listener为ExecuteLimitListener
  • invoke方法先调用RpcStatus.beginCount方法来判断是否可以通过,不通过则抛出RpcException,通过则记录开始执行的时间,然后执行invoker.invoke方法,执行结束时会回调Listener的onResponse或onError方法
  • ExecuteLimitListener的onResponse及onError方法均会调用RpcStatus.endCount;而该方法会通过getElapsed方法取出execugtelimit_filter_start_time值,计算执行耗时

RpcStatus

dubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcStatus.java

public class RpcStatus {    private static final ConcurrentMap<String, RpcStatus> SERVICE_STATISTICS = new ConcurrentHashMap<String, RpcStatus>();    private static final ConcurrentMap<String, ConcurrentMap<String, RpcStatus>> METHOD_STATISTICS = new ConcurrentHashMap<String, ConcurrentMap<String, RpcStatus>>();    private final ConcurrentMap<String, Object> values = new ConcurrentHashMap<String, Object>();    private final AtomicInteger active = new AtomicInteger();    private final AtomicLong total = new AtomicLong();    private final AtomicInteger failed = new AtomicInteger();    private final AtomicLong totalElapsed = new AtomicLong();    private final AtomicLong failedElapsed = new AtomicLong();    private final AtomicLong maxElapsed = new AtomicLong();    private final AtomicLong failedMaxElapsed = new AtomicLong();    private final AtomicLong succeededMaxElapsed = new AtomicLong();    //......    public static void beginCount(URL url, String methodName) {        beginCount(url, methodName, Integer.MAX_VALUE);    }    /**     * @param url     */    public static boolean beginCount(URL url, String methodName, int max) {        max = (max <= 0) ? Integer.MAX_VALUE : max;        RpcStatus appStatus = getStatus(url);        RpcStatus methodStatus = getStatus(url, methodName);        if (methodStatus.active.incrementAndGet() > max) {            methodStatus.active.decrementAndGet();            return false;        } else {            appStatus.active.incrementAndGet();            return true;        }    }    /**     * @param url     * @param elapsed     * @param succeeded     */    public static void endCount(URL url, String methodName, long elapsed, boolean succeeded) {        endCount(getStatus(url), elapsed, succeeded);        endCount(getStatus(url, methodName), elapsed, succeeded);    }    private static void endCount(RpcStatus status, long elapsed, boolean succeeded) {        status.active.decrementAndGet();        status.total.incrementAndGet();        status.totalElapsed.addAndGet(elapsed);        if (status.maxElapsed.get() < elapsed) {            status.maxElapsed.set(elapsed);        }        if (succeeded) {            if (status.succeededMaxElapsed.get() < elapsed) {                status.succeededMaxElapsed.set(elapsed);            }        } else {            status.failed.incrementAndGet();            status.failedElapsed.addAndGet(elapsed);            if (status.failedMaxElapsed.get() < elapsed) {                status.failedMaxElapsed.set(elapsed);            }        }    }    //......}
  • RpcStatus的beginCount方法会递增methodStatus.active,然后判断是否大于max值,超出则返回false并递减methodStatus.active;小于等于则递增appStatus.active;endCount方法会递减status.active,递增status.total,然后根据成功与否更新status.succeededMaxElapsed或status.failed、status.failedElapsed、status.failedMaxElapsed

小结

  • ExecuteLimitFilter继承了ListenableFilter,其构造器初始化的listener为ExecuteLimitListener
  • ExecuteLimitFilter的invoke方法先调用RpcStatus.beginCount方法来判断是否可以通过,不通过则抛出RpcException,通过则记录开始执行的时间,然后执行invoker.invoke方法,执行结束时会回调Listener的onResponse或onError方法
  • ExecuteLimitListener的onResponse及onError方法均会调用RpcStatus.endCount;而该方法会通过getElapsed方法取出execugtelimit_filter_start_time值,计算执行耗时

doc

  • ExecuteLimitFilter