本文主要研究一下dubbo的Invoker select

InvokerInvocationHandler.invoke

dubbo-2.7.1-sources.jar!/org/apache/dubbo/rpc/proxy/InvokerInvocationHandler.java

public class InvokerInvocationHandler implements InvocationHandler {    //......    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {        String methodName = method.getName();        Class<?>[] parameterTypes = method.getParameterTypes();        if (method.getDeclaringClass() == Object.class) {            return method.invoke(invoker, args);        }        if ("toString".equals(methodName) && parameterTypes.length == 0) {            return invoker.toString();        }        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {            return invoker.hashCode();        }        if ("equals".equals(methodName) && parameterTypes.length == 1) {            return invoker.equals(args[0]);        }        return invoker.invoke(createInvocation(method, args)).recreate();    }    //......}
  • 这里invoker为MockClusterInvoker

MockClusterInvoker

dubbo-2.7.1-sources.jar!/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterInvoker.java

public class MockClusterInvoker<T> implements Invoker<T> {    //......    public Result invoke(Invocation invocation) throws RpcException {        Result result = null;        String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();        if (value.length() == 0 || value.equalsIgnoreCase("false")) {            //no mock            result = this.invoker.invoke(invocation);        } else if (value.startsWith("force")) {            if (logger.isWarnEnabled()) {                logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());            }            //force:direct mock            result = doMockInvoke(invocation, null);        } else {            //fail-mock            try {                result = this.invoker.invoke(invocation);            } catch (RpcException e) {                if (e.isBiz()) {                    throw e;                }                                if (logger.isWarnEnabled()) {                    logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);                }                result = doMockInvoke(invocation, e);            }        }        return result;    }    //......}
  • this.invoker.invoke(invocation),这里invoker为FailoverClusterInvoker

AbstractClusterInvoker.invoke

dubbo-2.7.1-sources.jar!/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvoker.java

public abstract class AbstractClusterInvoker<T> implements Invoker<T> {    //......    public Result invoke(final Invocation invocation) throws RpcException {        checkWhetherDestroyed();        // binding attachments into invocation.        Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();        if (contextAttachments != null && contextAttachments.size() != 0) {            ((RpcInvocation) invocation).addAttachments(contextAttachments);        }        List<Invoker<T>> invokers = list(invocation);        LoadBalance loadbalance = initLoadBalance(invokers, invocation);        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);        return doInvoke(invocation, invokers, loadbalance);    }    //......}
  • invoke方法会回调子类FailoverClusterInvoker的doInvoke方法,这里创建的loadbalance默认为RandomLoadBalance

FailoverClusterInvoker.doInvoke

public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {    //......    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {        List<Invoker<T>> copyInvokers = invokers;        checkInvokers(copyInvokers, invocation);        String methodName = RpcUtils.getMethodName(invocation);        int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;        if (len <= 0) {            len = 1;        }        // retry loop.        RpcException le = null; // last exception.        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.        Set<String> providers = new HashSet<String>(len);        for (int i = 0; i < len; i++) {            //Reselect before retry to avoid a change of candidate `invokers`.            //NOTE: if `invokers` changed, then `invoked` also lose accuracy.            if (i > 0) {                checkWhetherDestroyed();                copyInvokers = list(invocation);                // check again                checkInvokers(copyInvokers, invocation);            }            Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);            invoked.add(invoker);            RpcContext.getContext().setInvokers((List) invoked);            try {                Result result = invoker.invoke(invocation);                if (le != null && logger.isWarnEnabled()) {                    logger.warn("Although retry the method " + methodName                            + " in the service " + getInterface().getName()                            + " was successful by the provider " + invoker.getUrl().getAddress()                            + ", but there have been failed providers " + providers                            + " (" + providers.size() + "/" + copyInvokers.size()                            + ") from the registry " + directory.getUrl().getAddress()                            + " on the consumer " + NetUtils.getLocalHost()                            + " using the dubbo version " + Version.getVersion() + ". Last error is: "                            + le.getMessage(), le);                }                return result;            } catch (RpcException e) {                if (e.isBiz()) { // biz exception.                    throw e;                }                le = e;            } catch (Throwable e) {                le = new RpcException(e.getMessage(), e);            } finally {                providers.add(invoker.getUrl().getAddress());            }        }        throw new RpcException(le.getCode(), "Failed to invoke the method "                + methodName + " in the service " + getInterface().getName()                + ". Tried " + len + " times of the providers " + providers                + " (" + providers.size() + "/" + copyInvokers.size()                + ") from the registry " + directory.getUrl().getAddress()                + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "                + Version.getVersion() + ". Last error is: "                + le.getMessage(), le.getCause() != null ? le.getCause() : le);    }    //......}
  • 这里调用Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked)

AbstractClusterInvoker.select

dubbo-2.7.1-sources.jar!/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvoker.java

public abstract class AbstractClusterInvoker<T> implements Invoker<T> {    //......    protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation,                                List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {        if (CollectionUtils.isEmpty(invokers)) {            return null;        }        String methodName = invocation == null ? StringUtils.EMPTY : invocation.getMethodName();        boolean sticky = invokers.get(0).getUrl()                .getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);        //ignore overloaded method        if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {            stickyInvoker = null;        }        //ignore concurrency problem        if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {            if (availablecheck && stickyInvoker.isAvailable()) {                return stickyInvoker;            }        }        Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);        if (sticky) {            stickyInvoker = invoker;        }        return invoker;    }    private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation,                                List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {        if (CollectionUtils.isEmpty(invokers)) {            return null;        }        if (invokers.size() == 1) {            return invokers.get(0);        }        Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);        //If the `invoker` is in the  `selected` or invoker is unavailable && availablecheck is true, reselect.        if ((selected != null && selected.contains(invoker))                || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {            try {                Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);                if (rinvoker != null) {                    invoker = rinvoker;                } else {                    //Check the index of current selected invoker, if it's not the last one, choose the one at index+1.                    int index = invokers.indexOf(invoker);                    try {                        //Avoid collision                        invoker = invokers.get((index + 1) % invokers.size());                    } catch (Exception e) {                        logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);                    }                }            } catch (Throwable t) {                logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);            }        }        return invoker;    }    //......}
  • doSelect方法判断如果invokers.size() == 1则直接返回,否则再利用loadbalance.select选取invoker

小结

AbstractClusterInvoker提供了select方法用于从invokers中选择一个invoker,其内部是调用doSelect方法,而doSelect方法判断如果invokers.size() == 1则直接返回,否则再利用loadbalance.select选取invoker

doc

  • AbstractClusterInvoker