聊聊dubbo的ExecuteLimitFilter

36次阅读

共计 4718 个字符,预计需要花费 12 分钟才能阅读完成。

本文主要研究一下 dubbo 的 ExecuteLimitFilter

ExecuteLimitFilter

dubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ExecuteLimitFilter.java

public class ExecuteLimitFilter extends ListenableFilter {

    private static final String EXECUTELIMIT_FILTER_START_TIME = "execugtelimit_filter_start_time";

    public ExecuteLimitFilter() {super.listener = new ExecuteLimitListener();
    }

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {URL url = invoker.getUrl();
        String methodName = invocation.getMethodName();
        int max = url.getMethodParameter(methodName, EXECUTES_KEY, 0);
        if (!RpcStatus.beginCount(url, methodName, max)) {throw new RpcException("Failed to invoke method" + invocation.getMethodName() + "in provider" +
                    url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max +
                    "\" /> limited.");
        }

        invocation.setAttachment(EXECUTELIMIT_FILTER_START_TIME, String.valueOf(System.currentTimeMillis()));
        try {return invoker.invoke(invocation);
        } catch (Throwable t) {if (t instanceof RuntimeException) {throw (RuntimeException) t;
            } else {throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
            }
        }
    }

    static class ExecuteLimitListener implements Listener {
        @Override
        public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {RpcStatus.endCount(invoker.getUrl(), invocation.getMethodName(), getElapsed(invocation), true);
        }

        @Override
        public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {RpcStatus.endCount(invoker.getUrl(), invocation.getMethodName(), getElapsed(invocation), false);
        }

        private long getElapsed(Invocation invocation) {String beginTime = invocation.getAttachment(EXECUTELIMIT_FILTER_START_TIME);
            return StringUtils.isNotEmpty(beginTime) ? System.currentTimeMillis() - Long.parseLong(beginTime) : 0;
        }
    }
}
  • ExecuteLimitFilter 继承了 ListenableFilter,其构造器初始化的 listener 为 ExecuteLimitListener
  • invoke 方法先调用 RpcStatus.beginCount 方法来判断是否可以通过,不通过则抛出 RpcException,通过则记录开始执行的时间,然后执行 invoker.invoke 方法,执行结束时会回调 Listener 的 onResponse 或 onError 方法
  • ExecuteLimitListener 的 onResponse 及 onError 方法均会调用 RpcStatus.endCount;而该方法会通过 getElapsed 方法取出 execugtelimit_filter_start_time 值,计算执行耗时

RpcStatus

dubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcStatus.java

public class RpcStatus {private static final ConcurrentMap<String, RpcStatus> SERVICE_STATISTICS = new ConcurrentHashMap<String, RpcStatus>();

    private static final ConcurrentMap<String, ConcurrentMap<String, RpcStatus>> METHOD_STATISTICS = new ConcurrentHashMap<String, ConcurrentMap<String, RpcStatus>>();
    private final ConcurrentMap<String, Object> values = new ConcurrentHashMap<String, Object>();
    private final AtomicInteger active = new AtomicInteger();
    private final AtomicLong total = new AtomicLong();
    private final AtomicInteger failed = new AtomicInteger();
    private final AtomicLong totalElapsed = new AtomicLong();
    private final AtomicLong failedElapsed = new AtomicLong();
    private final AtomicLong maxElapsed = new AtomicLong();
    private final AtomicLong failedMaxElapsed = new AtomicLong();
    private final AtomicLong succeededMaxElapsed = new AtomicLong();

    //......

    public static void beginCount(URL url, String methodName) {beginCount(url, methodName, Integer.MAX_VALUE);
    }

    /**
     * @param url
     */
    public static boolean beginCount(URL url, String methodName, int max) {max = (max <= 0) ? Integer.MAX_VALUE : max;
        RpcStatus appStatus = getStatus(url);
        RpcStatus methodStatus = getStatus(url, methodName);
        if (methodStatus.active.incrementAndGet() > max) {methodStatus.active.decrementAndGet();
            return false;
        } else {appStatus.active.incrementAndGet();
            return true;
        }
    }

    /**
     * @param url
     * @param elapsed
     * @param succeeded
     */
    public static void endCount(URL url, String methodName, long elapsed, boolean succeeded) {endCount(getStatus(url), elapsed, succeeded);
        endCount(getStatus(url, methodName), elapsed, succeeded);
    }

    private static void endCount(RpcStatus status, long elapsed, boolean succeeded) {status.active.decrementAndGet();
        status.total.incrementAndGet();
        status.totalElapsed.addAndGet(elapsed);
        if (status.maxElapsed.get() < elapsed) {status.maxElapsed.set(elapsed);
        }
        if (succeeded) {if (status.succeededMaxElapsed.get() < elapsed) {status.succeededMaxElapsed.set(elapsed);
            }
        } else {status.failed.incrementAndGet();
            status.failedElapsed.addAndGet(elapsed);
            if (status.failedMaxElapsed.get() < elapsed) {status.failedMaxElapsed.set(elapsed);
            }
        }
    }

    //......
}
  • RpcStatus 的 beginCount 方法会递增 methodStatus.active,然后判断是否大于 max 值,超出则返回 false 并递减 methodStatus.active;小于等于则递增 appStatus.active;endCount 方法会递减 status.active,递增 status.total,然后根据成功与否更新 status.succeededMaxElapsed 或 status.failed、status.failedElapsed、status.failedMaxElapsed

小结

  • ExecuteLimitFilter 继承了 ListenableFilter,其构造器初始化的 listener 为 ExecuteLimitListener
  • ExecuteLimitFilter 的 invoke 方法先调用 RpcStatus.beginCount 方法来判断是否可以通过,不通过则抛出 RpcException,通过则记录开始执行的时间,然后执行 invoker.invoke 方法,执行结束时会回调 Listener 的 onResponse 或 onError 方法
  • ExecuteLimitListener 的 onResponse 及 onError 方法均会调用 RpcStatus.endCount;而该方法会通过 getElapsed 方法取出 execugtelimit_filter_start_time 值,计算执行耗时

doc

  • ExecuteLimitFilter

正文完
 0