共计 7342 个字符,预计需要花费 19 分钟才能阅读完成。
在使用 dubbo 时,通常会遇到 timeout 这个属性,timeout 属性的作用是:给某个服务调用设置超时时间,如果服务在设置的时间内未返回结果,则会抛出调用超时异常:TimeoutException,在使用的过程中,我们有时会对 provider 和 consumer 两个配置都会设置 timeout 值,那么服务调用过程中会以哪个为准?橘子同学今天主要针对这个问题进行分析和扩展。
三种设置方式
以 provider 配置为例:
#### 方法级别
<dubbo:service interface="orangecsong.test.service.TestService" ref="testServiceImpl">
<dubbo:method name="test" timeout="10000"/>
</dubbo:service>
#### 接口级别
<dubbo:service interface="orangecsong.test.service.TestService" ref="testServiceImpl" timeout="10000"/>
#### 全局级别
<dubbo:service ="10000"/>
优先级选择
在 dubbo 中如果 provider 和 consumer 都配置了相同的一个属性,比如本文分析的 timeout,其实它们是有优先级的,consumer 方法配置 > provider 方法配置 > consumer 接口配置 > provider 接口配置 > consumer 全局配置 > provider 全局配置。所以对于小橘开始的提出的问题就有了结果,会以消费者配置的为准,接下结合源码来进行解析,其实源码很简单,在 RegistryDirectory 类中将服务列表转换为 DubboInvlker 方法中进行了处理:
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
if (urls == null || urls.isEmpty()) {return newUrlInvokerMap;}
Set<String> keys = new HashSet<String>();
String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
for (URL providerUrl : urls) {
// If protocol is configured at the reference side, only the matching protocol is selected
if (queryProtocols != null && queryProtocols.length() > 0) {
boolean accept = false;
String[] acceptProtocols = queryProtocols.split(",");
for (String acceptProtocol : acceptProtocols) {if (providerUrl.getProtocol().equals(acceptProtocol)) {
accept = true;
break;
}
}
if (!accept) {continue;}
}
if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {continue;}
if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {logger.error(new IllegalStateException("Unsupported protocol" + providerUrl.getProtocol() +
"in notified url:" + providerUrl + "from registry" + getUrl().getAddress() +
"to consumer" + NetUtils.getLocalHost() + ", supported protocol:" +
ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
continue;
}
// 重点就是下面这个方法
URL url = mergeUrl(providerUrl);
String key = url.toFullString(); // The parameter urls are sorted
if (keys.contains(key)) { // Repeated url
continue;
}
keys.add(key);
// Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // Not in the cache, refer again
try {
boolean enabled = true;
if (url.hasParameter(Constants.DISABLED_KEY)) {enabled = !url.getParameter(Constants.DISABLED_KEY, false);
} else {enabled = url.getParameter(Constants.ENABLED_KEY, true);
}
if (enabled) {invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
}
if (invoker != null) { // Put new invoker in cache
newUrlInvokerMap.put(key, invoker);
}
} else {newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}
重点就是上面 mergeUrl 方法,将 provider 和 comsumer 的 url 参数进行了整合,在 mergeUrl 方法有会调用 ClusterUtils.mergeUrl 方法进行整合,因为这个方法比较简单,就是对一些参数进行了整合了,会用 consumer 参数进行覆盖,这里就不分析了,如果感兴趣的同学可以去研究一下。
超时处理
在配置设置了超时 timeout,那么代码中是如何处理的,这里咱们在进行一下扩展,分析一下 dubbo 中是如何处理超时的,在调用服务方法,最后都会调用 DubboInvoker.doInvoke 方法,咱们就从这个方法开始分析:
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {currentClient = clients[0];
} else {currentClient = clients[index.getAndIncrement() % clients.length];
}
try {boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isAsyncFuture = RpcUtils.isReturnTypeFuture(inv);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) {boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();} else if (isAsync) {ResponseFuture future = currentClient.request(inv, timeout);
// For compatibility
FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);
RpcContext.getContext().setFuture(futureAdapter);
Result result;
// 异步处理
if (isAsyncFuture) {
// register resultCallback, sometimes we need the async result being processed by the filter chain.
result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
} else {result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
}
return result;
} else {
// 同步处理
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();}
} catch (TimeoutException e) {throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method:" + invocation.getMethodName() + ", provider:" + getUrl() + ", cause:" + e.getMessage(), e);
} catch (RemotingException e) {throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method:" + invocation.getMethodName() + ", provider:" + getUrl() + ", cause:" + e.getMessage(), e);
}
}
在这个方法中,就以同步模式进行分析,看 request 方法,request() 方法会返回一个 DefaultFuture 类,在去调用 DefaultFuture.get() 方法,这里其实涉及到一个在异步中实现同步的技巧,咱们这里不做分析,所以重点就在 get() 方法里:
@Override
public Object get() throws RemotingException {return get(timeout);
}
@Override
public Object get(int timeout) throws RemotingException {if (timeout <= 0) {timeout = Constants.DEFAULT_TIMEOUT;}
if (!isDone()) {long start = System.currentTimeMillis();
lock.lock();
try {while (!isDone()) {done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {break;}
}
} catch (InterruptedException e) {throw new RuntimeException(e);
} finally {lock.unlock();
}
if (!isDone()) {throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();}
在调用 get() 方法时,会去调用 get(timeout) 这个方法,在这个方法中会传一个 timeout 字段,在和 timeout 就是咱们配置的那个参数,在这个方法中咱们要关注下面一个代码块:
if (!isDone()) {long start = System.currentTimeMillis();
lock.lock();
try {while (!isDone()) {
// 线程阻塞
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {break;}
}
} catch (InterruptedException e) {throw new RuntimeException(e);
} finally {lock.unlock();
}
// 在超时时间里,还没有结果,则抛出超时异常
if (!isDone()) {throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
重点看 await() 方法,会进行阻塞 timeout 时间,如果阻塞时间到了,则会唤醒往下执行,超时跳出 while 循环中,判断是否有结果返回,如果没有(这个地方要注意:只有有结果返回,或超时才跳出循环中),则抛出超时异常。讲到这里,超时原理基本上其实差不多了,DefaultFuture 这个类还有个地方需要注意,在初始化 DefaultFuture 对象时,会去创建一个超时的延迟任务,延迟时间就是 timeout 值,在这个延迟任务中也会调用 signal() 方法唤醒阻塞。
分批调用
不过在调用 rpc 远程接口,如果对方的接口不能一次承载返回请求结果能力,我们一般做法是分批调用,将调用一次分成调用多次,然后对每次结果进行汇聚,当然也可以做用利用多线程的能力去执行。后面文章小橘将会介绍这种模式,敬请关注哦!
/**
* Description: 通用 分批调用工具类
* 场景:
* <pre>
* 比如 List 参数的 size 可能为 几十个甚至上百个
* 如果 invoke 接口比较慢, 传入 50 个以上会超时,那么可以每次传入 20 个,分批执行。* </pre>
* Author: OrangeCsong
*/
public class ParallelInvokeUtil {private ParallelInvokeUtil() {}
/**
* @param sourceList 源数据
* @param size 分批大小
* @param buildParam 构建函数
* @param processFunction 处理函数
* @param <R> 返回值
* @param <T> 入参 \
* @param <P> 构建参数
* @return
*/
public static <R, T, P> List<R> partitionInvokeWithRes(List<T> sourceList, Integer size,
Function<List<T>, P> buildParam,
Function<P, List<R>> processFunction) {if (CollectionUtils.isEmpty(sourceList)) {return new ArrayList<>(0);
}
Preconditions.checkArgument(size > 0, "size 大小必须大于 0");
return Lists.partition(sourceList, size).stream()
.map(buildParam)
.map(processFunction)
.filter(Objects::nonNull)
.reduce(new ArrayList<>(),
(resultList1, resultList2) -> {resultList1.addAll(resultList2);
return resultList1;
});
}
}