本文主要研究一下dubbo的Filter

Filter

dubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Filter.java

@SPIpublic interface Filter {    /**     * Does not need to override/implement this method.     */    Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;    /**     * Filter itself should only be response for passing invocation, all callbacks has been placed into {@link Listener}     *     * @param appResponse     * @param invoker     * @param invocation     * @return     */    @Deprecated    default Result onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {        return appResponse;    }    interface Listener {        void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation);        void onError(Throwable t, Invoker<?> invoker, Invocation invocation);    }}
  • Filter定义了invoke、onResponse方法,另外还定义了Listener接口,该接口定义了onResponse、onError方法

ProtocolFilterWrapper

dubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java

public class ProtocolFilterWrapper implements Protocol {    private final Protocol protocol;    public ProtocolFilterWrapper(Protocol protocol) {        if (protocol == null) {            throw new IllegalArgumentException("protocol == null");        }        this.protocol = protocol;    }    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {        Invoker<T> last = invoker;        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);        if (!filters.isEmpty()) {            for (int i = filters.size() - 1; i >= 0; i--) {                final Filter filter = filters.get(i);                final Invoker<T> next = last;                last = new Invoker<T>() {                    @Override                    public Class<T> getInterface() {                        return invoker.getInterface();                    }                    @Override                    public URL getUrl() {                        return invoker.getUrl();                    }                    @Override                    public boolean isAvailable() {                        return invoker.isAvailable();                    }                    @Override                    public Result invoke(Invocation invocation) throws RpcException {                        Result asyncResult;                        try {                            asyncResult = filter.invoke(next, invocation);                        } catch (Exception e) {                            // onError callback                            if (filter instanceof ListenableFilter) {                                Filter.Listener listener = ((ListenableFilter) filter).listener();                                if (listener != null) {                                    listener.onError(e, invoker, invocation);                                }                            }                            throw e;                        }                        return asyncResult;                    }                    @Override                    public void destroy() {                        invoker.destroy();                    }                    @Override                    public String toString() {                        return invoker.toString();                    }                };            }        }        return new CallbackRegistrationInvoker<>(last, filters);    }    @Override    public int getDefaultPort() {        return protocol.getDefaultPort();    }    @Override    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {        if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {            return protocol.export(invoker);        }        return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));    }    @Override    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {        if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {            return protocol.refer(type, url);        }        return buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);    }    @Override    public void destroy() {        protocol.destroy();    }    static class CallbackRegistrationInvoker<T> implements Invoker<T> {        private final Invoker<T> filterInvoker;        private final List<Filter> filters;        public CallbackRegistrationInvoker(Invoker<T> filterInvoker, List<Filter> filters) {            this.filterInvoker = filterInvoker;            this.filters = filters;        }        @Override        public Result invoke(Invocation invocation) throws RpcException {            Result asyncResult = filterInvoker.invoke(invocation);            asyncResult.thenApplyWithContext(r -> {                for (int i = filters.size() - 1; i >= 0; i--) {                    Filter filter = filters.get(i);                    // onResponse callback                    if (filter instanceof ListenableFilter) {                        Filter.Listener listener = ((ListenableFilter) filter).listener();                        if (listener != null) {                            listener.onResponse(r, filterInvoker, invocation);                        }                    } else {                        filter.onResponse(r, filterInvoker, invocation);                    }                }                return r;            });            return asyncResult;        }        @Override        public Class<T> getInterface() {            return filterInvoker.getInterface();        }        @Override        public URL getUrl() {            return filterInvoker.getUrl();        }        @Override        public boolean isAvailable() {            return filterInvoker.isAvailable();        }        @Override        public void destroy() {            filterInvoker.destroy();        }    }}
  • ProtocolFilterWrapper实现了Protocol接口,它定义了一个静态类CallbackRegistrationInvoker,该类实现了Invoker接口,其invoke方法首先会调用filterInvoker的invoke方法获取asyncResult,之后通过thenApplyWithContext注册rpc调用完成时的回调,这里会挨个遍历filters,回调每个filter的onResponse方法

小结

  • Filter定义了invoke、onResponse方法,另外还定义了Listener接口,该接口定义了onResponse、onError方法
  • Filter定义的invoke方法返回的Result有个抽象类为AbstractResult,而AbstractResult有几个子类,分别为AppResponse、AsyncRpcResult(替代原来的RpcResult)
  • ProtocolFilterWrapper定义了一个静态类CallbackRegistrationInvoker,该类实现了Invoker接口,其invoke方法首先会调用filterInvoker的invoke方法获取asyncResult,之后通过thenApplyWithContext注册rpc调用完成时的回调,这里会挨个遍历filters,回调每个filter的onResponse方法

doc

  • Filter
  • AsyncRpcResult
  • ProtocolFilterWrapper