本文主要研究一下dubbo的CacheFilter

CacheFilter

dubbo-2.7.2/dubbo-filter/dubbo-filter-cache/src/main/java/org/apache/dubbo/cache/filter/CacheFilter.java

@Activate(group = {CONSUMER, PROVIDER}, value = CACHE_KEY)public class CacheFilter implements Filter {    private CacheFactory cacheFactory;    /**     * Dubbo will populate and set the cache factory instance based on service/method/consumer/provider configured     * cache attribute value. Dubbo will search for the class name implementing configured <b>cache</b> in file org.apache.dubbo.cache.CacheFactory     * under META-INF sub folders.     *     * @param cacheFactory instance of CacheFactory based on <b>cache</b> type     */    public void setCacheFactory(CacheFactory cacheFactory) {        this.cacheFactory = cacheFactory;    }    /**     * If cache is configured, dubbo will invoke method on each method call. If cache value is returned by cache store     * then it will return otherwise call the remote method and return value. If remote method's return valeu has error     * then it will not cache the value.     * @param invoker    service     * @param invocation invocation.     * @return Cache returned value if found by the underlying cache store. If cache miss it will call target method.     * @throws RpcException     */    @Override    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {        if (cacheFactory != null && ConfigUtils.isNotEmpty(invoker.getUrl().getMethodParameter(invocation.getMethodName(), CACHE_KEY))) {            Cache cache = cacheFactory.getCache(invoker.getUrl(), invocation);            if (cache != null) {                String key = StringUtils.toArgumentString(invocation.getArguments());                Object value = cache.get(key);                if (value != null) {                    if (value instanceof ValueWrapper) {                        return AsyncRpcResult.newDefaultAsyncResult(((ValueWrapper) value).get(), invocation);                    } else {                        return AsyncRpcResult.newDefaultAsyncResult(value, invocation);                    }                }                Result result = invoker.invoke(invocation);                if (!result.hasException()) {                    cache.put(key, new ValueWrapper(result.getValue()));                }                return result;            }        }        return invoker.invoke(invocation);    }    /**     * Cache value wrapper.     */    static class ValueWrapper implements Serializable {        private static final long serialVersionUID = -1777337318019193256L;        private final Object value;        public ValueWrapper (Object value) {            this.value = value;        }        public Object get() {            return this.value;        }    }}
  • CacheFilter实现了Filter接口,它的invoke方法会先判断cacheFactory是否不为null且invoker的URL中包含cache参数,如果该条件成立则从cacheFactory获取cache,然后在从cache中获取value,如果value不为null则返回AsyncRpcResult.newDefaultAsyncResult,如果value为null则执行invoke成功之后将该value缓存到cache中

实例

dubbo-2.7.2/dubbo-filter/dubbo-filter-cache/src/test/java/org/apache/dubbo/cache/filter/CacheFilterTest.java

public class CacheFilterTest {    private RpcInvocation invocation;    private CacheFilter cacheFilter = new CacheFilter();    private Invoker<?> invoker = mock(Invoker.class);    private Invoker<?> invoker1 = mock(Invoker.class);    private Invoker<?> invoker2 = mock(Invoker.class);    private Invoker<?> invoker3 = mock(Invoker.class);    private Invoker<?> invoker4 = mock(Invoker.class);    static Stream<Arguments> cacheFactories() {        return Stream.of(                Arguments.of("lru", new LruCacheFactory()),                Arguments.of("jcache", new JCacheFactory()),                Arguments.of("threadlocal", new ThreadLocalCacheFactory()),                Arguments.of("expiring", new ExpiringCacheFactory())        );    }    public void setUp(String cacheType, CacheFactory cacheFactory) {        invocation = new RpcInvocation();        cacheFilter.setCacheFactory(cacheFactory);        URL url = URL.valueOf("test://test:11/test?cache=" + cacheType);        given(invoker.invoke(invocation)).willReturn(AsyncRpcResult.newDefaultAsyncResult("value", invocation));        given(invoker.getUrl()).willReturn(url);        given(invoker1.invoke(invocation)).willReturn(AsyncRpcResult.newDefaultAsyncResult("value1", invocation));        given(invoker1.getUrl()).willReturn(url);        given(invoker2.invoke(invocation)).willReturn(AsyncRpcResult.newDefaultAsyncResult("value2", invocation));        given(invoker2.getUrl()).willReturn(url);        given(invoker3.invoke(invocation)).willReturn(AsyncRpcResult.newDefaultAsyncResult(new RuntimeException(), invocation));        given(invoker3.getUrl()).willReturn(url);        given(invoker4.invoke(invocation)).willReturn(AsyncRpcResult.newDefaultAsyncResult(invocation));        given(invoker4.getUrl()).willReturn(url);    }    @ParameterizedTest    @MethodSource("cacheFactories")    public void testNonArgsMethod(String cacheType, CacheFactory cacheFactory) {        setUp(cacheType, cacheFactory);        invocation.setMethodName("echo");        invocation.setParameterTypes(new Class<?>[]{});        invocation.setArguments(new Object[]{});        cacheFilter.invoke(invoker, invocation);        Result rpcResult1 = cacheFilter.invoke(invoker1, invocation);        Result rpcResult2 = cacheFilter.invoke(invoker2, invocation);        Assertions.assertEquals(rpcResult1.getValue(), rpcResult2.getValue());        Assertions.assertEquals(rpcResult1.getValue(), "value");    }    @ParameterizedTest    @MethodSource("cacheFactories")    public void testMethodWithArgs(String cacheType, CacheFactory cacheFactory) {        setUp(cacheType, cacheFactory);        invocation.setMethodName("echo1");        invocation.setParameterTypes(new Class<?>[]{String.class});        invocation.setArguments(new Object[]{"arg1"});        cacheFilter.invoke(invoker, invocation);        Result rpcResult1 = cacheFilter.invoke(invoker1, invocation);        Result rpcResult2 = cacheFilter.invoke(invoker2, invocation);        Assertions.assertEquals(rpcResult1.getValue(), rpcResult2.getValue());        Assertions.assertEquals(rpcResult1.getValue(), "value");    }    @ParameterizedTest    @MethodSource("cacheFactories")    public void testException(String cacheType, CacheFactory cacheFactory) {        setUp(cacheType, cacheFactory);        invocation.setMethodName("echo1");        invocation.setParameterTypes(new Class<?>[]{String.class});        invocation.setArguments(new Object[]{"arg2"});        cacheFilter.invoke(invoker3, invocation);        Result rpcResult = cacheFilter.invoke(invoker2, invocation);        Assertions.assertEquals(rpcResult.getValue(), "value2");    }    @ParameterizedTest    @MethodSource("cacheFactories")    public void testNull(String cacheType, CacheFactory cacheFactory) {        setUp(cacheType, cacheFactory);        invocation.setMethodName("echo1");        invocation.setParameterTypes(new Class<?>[]{String.class});        invocation.setArguments(new Object[]{"arg3"});        cacheFilter.invoke(invoker4, invocation);        Result result1 = cacheFilter.invoke(invoker1, invocation);        Result result2 = cacheFilter.invoke(invoker2, invocation);        Assertions.assertEquals(result1.getValue(), null);        Assertions.assertEquals(result2.getValue(), null);    }}
  • 这里分别验证了无参方法、带参数方法、抛异常的方法、value为null的场景

小结

CacheFilter实现了Filter接口,它的invoke方法会先判断cacheFactory是否不为null且invoker的URL中包含cache参数,如果该条件成立则从cacheFactory获取cache,然后在从cache中获取value,如果value不为null则返回AsyncRpcResult.newDefaultAsyncResult,如果value为null则执行invoke成功之后将该value缓存到cache中

doc

  • CacheFilter