序
本文主要研究一下 dubbo 的 AllDispatcher
Dispatcher
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Dispatcher.java
@SPI(AllDispatcher.NAME)
public interface Dispatcher {
/**
* dispatch the message to threadpool.
*
* @param handler
* @param url
* @return channel handler
*/
@Adaptive({Constants.DISPATCHER_KEY, "dispather", "channel.handler"})
// The last two parameters are reserved for compatibility with the old configuration
ChannelHandler dispatch(ChannelHandler handler, URL url);
}
- Dispatcher 接口定义了 dispatch 方法,返回 ChannelHandler
AllDispatcher
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/all/AllDispatcher.java
public class AllDispatcher implements Dispatcher {
public static final String NAME = "all";
@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {return new AllChannelHandler(handler, url);
}
}
- AllDispatcher 实现了 Dispatcher 接口,其 dispatch 方法返回的是 AllChannelHandler
AllChannelHandler
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/all/AllChannelHandler.java
public class AllChannelHandler extends WrappedChannelHandler {public AllChannelHandler(ChannelHandler handler, URL url) {super(handler, url);
}
@Override
public void connected(Channel channel) throws RemotingException {ExecutorService executor = getExecutorService();
try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {throw new ExecutionException("connect event", channel, getClass() + "error when process connected event .", t);
}
}
@Override
public void disconnected(Channel channel) throws RemotingException {ExecutorService executor = getExecutorService();
try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
} catch (Throwable t) {throw new ExecutionException("disconnect event", channel, getClass() + "error when process disconnected event .", t);
}
}
@Override
public void received(Channel channel, Object message) throws RemotingException {ExecutorService executor = getExecutorService();
try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
//TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
//fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
if(message instanceof Request && t instanceof RejectedExecutionException){Request request = (Request)message;
if(request.isTwoWay()){String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
channel.send(response);
return;
}
}
throw new ExecutionException(message, channel, getClass() + "error when process received event .", t);
}
}
@Override
public void caught(Channel channel, Throwable exception) throws RemotingException {ExecutorService executor = getExecutorService();
try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
} catch (Throwable t) {throw new ExecutionException("caught event", channel, getClass() + "error when process caught event .", t);
}
}
}
- AllChannelHandler 继承了 WrappedChannelHandler,其 connected、disconnected、received、caught 均是通过父类的 getExecutorService 获取线程池,然后执行创建的 ChannelEventRunnable;received 方法在捕获到异常时 RejectedExecutionException 且 message 是 Request,而且 request 是 twoWay 的时候会返回 SERVER_THREADPOOL_EXHAUSTED_ERROR
小结
- Dispatcher 接口定义了 dispatch 方法,返回 ChannelHandler
- AllChannelHandler 继承了 WrappedChannelHandler,其 connected、disconnected、received、caught 均是通过父类的 getExecutorService 获取线程池,然后执行创建的 ChannelEventRunnable
- AllChannelHandler 的 received 方法在捕获到异常时 RejectedExecutionException 且 message 是 Request,而且 request 是 twoWay 的时候会返回 SERVER_THREADPOOL_EXHAUSTED_ERROR
doc
- AllDispatcher