聊聊dubbo的Invoker-select

23次阅读

共计 7192 个字符,预计需要花费 18 分钟才能阅读完成。

本文主要研究一下 dubbo 的 Invoker select

InvokerInvocationHandler.invoke

dubbo-2.7.1-sources.jar!/org/apache/dubbo/rpc/proxy/InvokerInvocationHandler.java

public class InvokerInvocationHandler implements InvocationHandler {

    //......

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {return method.invoke(invoker, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {return invoker.equals(args[0]);
        }

        return invoker.invoke(createInvocation(method, args)).recreate();}

    //......
}
  • 这里 invoker 为 MockClusterInvoker

MockClusterInvoker

dubbo-2.7.1-sources.jar!/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterInvoker.java

public class MockClusterInvoker<T> implements Invoker<T> {

    //......

    public Result invoke(Invocation invocation) throws RpcException {
        Result result = null;

        String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
        if (value.length() == 0 || value.equalsIgnoreCase("false")) {
            //no mock
            result = this.invoker.invoke(invocation);
        } else if (value.startsWith("force")) {if (logger.isWarnEnabled()) {logger.warn("force-mock:" + invocation.getMethodName() + "force-mock enabled , url :" + directory.getUrl());
            }
            //force:direct mock
            result = doMockInvoke(invocation, null);
        } else {
            //fail-mock
            try {result = this.invoker.invoke(invocation);
            } catch (RpcException e) {if (e.isBiz()) {throw e;}
                
                if (logger.isWarnEnabled()) {logger.warn("fail-mock:" + invocation.getMethodName() + "fail-mock enabled , url :" + directory.getUrl(), e);
                }
                result = doMockInvoke(invocation, e);
            }
        }
        return result;
    }

    //......
}
  • this.invoker.invoke(invocation),这里 invoker 为 FailoverClusterInvoker

AbstractClusterInvoker.invoke

dubbo-2.7.1-sources.jar!/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvoker.java

public abstract class AbstractClusterInvoker<T> implements Invoker<T> {

    //......

    public Result invoke(final Invocation invocation) throws RpcException {checkWhetherDestroyed();

        // binding attachments into invocation.
        Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
        if (contextAttachments != null && contextAttachments.size() != 0) {((RpcInvocation) invocation).addAttachments(contextAttachments);
        }

        List<Invoker<T>> invokers = list(invocation);
        LoadBalance loadbalance = initLoadBalance(invokers, invocation);
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        return doInvoke(invocation, invokers, loadbalance);
    }

    //......
}
  • invoke 方法会回调子类 FailoverClusterInvoker 的 doInvoke 方法,这里创建的 loadbalance 默认为 RandomLoadBalance

FailoverClusterInvoker.doInvoke

public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {

    //......

    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        List<Invoker<T>> copyInvokers = invokers;
        checkInvokers(copyInvokers, invocation);
        String methodName = RpcUtils.getMethodName(invocation);
        int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        if (len <= 0) {len = 1;}
        // retry loop.
        RpcException le = null; // last exception.
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
        Set<String> providers = new HashSet<String>(len);
        for (int i = 0; i < len; i++) {
            //Reselect before retry to avoid a change of candidate `invokers`.
            //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
            if (i > 0) {checkWhetherDestroyed();
                copyInvokers = list(invocation);
                // check again
                checkInvokers(copyInvokers, invocation);
            }
            Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try {Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method" + methodName
                            + "in the service" + getInterface().getName()
                            + "was successful by the provider" + invoker.getUrl().getAddress()
                            + ", but there have been failed providers" + providers
                            + "(" + providers.size() + "/" + copyInvokers.size()
                            + ") from the registry" + directory.getUrl().getAddress()
                            + "on the consumer" + NetUtils.getLocalHost()
                            + "using the dubbo version" + Version.getVersion() + ". Last error is:"
                            + le.getMessage(), le);
                }
                return result;
            } catch (RpcException e) {if (e.isBiz()) { // biz exception.
                    throw e;
                }
                le = e;
            } catch (Throwable e) {le = new RpcException(e.getMessage(), e);
            } finally {providers.add(invoker.getUrl().getAddress());
            }
        }
        throw new RpcException(le.getCode(), "Failed to invoke the method"
                + methodName + "in the service" + getInterface().getName()
                + ". Tried" + len + "times of the providers" + providers
                + "(" + providers.size() + "/" + copyInvokers.size()
                + ") from the registry" + directory.getUrl().getAddress()
                + "on the consumer" + NetUtils.getLocalHost() + "using the dubbo version"
                + Version.getVersion() + ". Last error is:"
                + le.getMessage(), le.getCause() != null ? le.getCause() : le);
    }

    //......
}
  • 这里调用Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked)

AbstractClusterInvoker.select

dubbo-2.7.1-sources.jar!/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvoker.java

public abstract class AbstractClusterInvoker<T> implements Invoker<T> {

    //......

    protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation,
                                List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {if (CollectionUtils.isEmpty(invokers)) {return null;}
        String methodName = invocation == null ? StringUtils.EMPTY : invocation.getMethodName();

        boolean sticky = invokers.get(0).getUrl()
                .getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);

        //ignore overloaded method
        if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {stickyInvoker = null;}
        //ignore concurrency problem
        if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {if (availablecheck && stickyInvoker.isAvailable()) {return stickyInvoker;}
        }

        Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);

        if (sticky) {stickyInvoker = invoker;}
        return invoker;
    }

    private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation,
                                List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {if (CollectionUtils.isEmpty(invokers)) {return null;}
        if (invokers.size() == 1) {return invokers.get(0);
        }
        Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);

        //If the `invoker` is in the  `selected` or invoker is unavailable && availablecheck is true, reselect.
        if ((selected != null && selected.contains(invoker))
                || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
            try {Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
                if (rinvoker != null) {invoker = rinvoker;} else {
                    //Check the index of current selected invoker, if it's not the last one, choose the one at index+1.
                    int index = invokers.indexOf(invoker);
                    try {
                        //Avoid collision
                        invoker = invokers.get((index + 1) % invokers.size());
                    } catch (Exception e) {logger.warn(e.getMessage() + "may because invokers list dynamic change, ignore.", e);
                    }
                }
            } catch (Throwable t) {logger.error("cluster reselect fail reason is :" + t.getMessage() + "if can not solve, you can set cluster.availablecheck=false in url", t);
            }
        }
        return invoker;
    }

    //......
}
  • doSelect 方法判断如果 invokers.size() == 1 则直接返回,否则再利用 loadbalance.select 选取 invoker

小结

AbstractClusterInvoker 提供了 select 方法用于从 invokers 中选择一个 invoker,其内部是调用 doSelect 方法,而 doSelect 方法判断如果 invokers.size() == 1 则直接返回,否则再利用 loadbalance.select 选取 invoker

doc

  • AbstractClusterInvoker

正文完
 0