序
本文主要研究一下 dubbo 的 WrappedChannelHandler
WrappedChannelHandler
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/WrappedChannelHandler.java
public class WrappedChannelHandler implements ChannelHandlerDelegate {protected static final Logger logger = LoggerFactory.getLogger(WrappedChannelHandler.class);
protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));
protected final ExecutorService executor;
protected final ChannelHandler handler;
protected final URL url;
public WrappedChannelHandler(ChannelHandler handler, URL url) {
this.handler = handler;
this.url = url;
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {componentKey = CONSUMER_SIDE;}
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
}
public void close() {
try {if (executor != null) {executor.shutdown();
}
} catch (Throwable t) {logger.warn("fail to destroy thread pool of server:" + t.getMessage(), t);
}
}
@Override
public void connected(Channel channel) throws RemotingException {handler.connected(channel);
}
@Override
public void disconnected(Channel channel) throws RemotingException {handler.disconnected(channel);
}
@Override
public void sent(Channel channel, Object message) throws RemotingException {handler.sent(channel, message);
}
@Override
public void received(Channel channel, Object message) throws RemotingException {handler.received(channel, message);
}
@Override
public void caught(Channel channel, Throwable exception) throws RemotingException {handler.caught(channel, exception);
}
public ExecutorService getExecutor() {return executor;}
@Override
public ChannelHandler getHandler() {if (handler instanceof ChannelHandlerDelegate) {return ((ChannelHandlerDelegate) handler).getHandler();} else {return handler;}
}
public URL getUrl() {return url;}
public ExecutorService getExecutorService() {
ExecutorService cexecutor = executor;
if (cexecutor == null || cexecutor.isShutdown()) {cexecutor = SHARED_EXECUTOR;}
return cexecutor;
}
}
- WrappedChannelHandler 的构造根据 ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url) 获取 ExecutorService,然后放到 dataStore 中
ExecutionChannelHandler
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/execution/ExecutionChannelHandler.java
public class ExecutionChannelHandler extends WrappedChannelHandler {public ExecutionChannelHandler(ChannelHandler handler, URL url) {super(handler, url);
}
@Override
public void received(Channel channel, Object message) throws RemotingException {ExecutorService executor = getExecutorService();
if (message instanceof Request) {
try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
// FIXME: when the thread pool is full, SERVER_THREADPOOL_EXHAUSTED_ERROR cannot return properly,
// therefore the consumer side has to wait until gets timeout. This is a temporary solution to prevent
// this scenario from happening, but a better solution should be considered later.
if (t instanceof RejectedExecutionException) {Request request = (Request) message;
if (request.isTwoWay()) {String msg = "Server side(" + url.getIp() + "," + url.getPort()
+ ") thread pool is exhausted, detail msg:" + t.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
channel.send(response);
return;
}
}
throw new ExecutionException(message, channel, getClass() + "error when process received event.", t);
}
} else {handler.received(channel, message);
}
}
}
- ExecutionChannelHandler 继承了 WrappedChannelHandler,其 received 会创建 ChannelEventRunnable,然后放到 executor 去执行
ChannelEventRunnable
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/ChannelEventRunnable.java
public class ChannelEventRunnable implements Runnable {private static final Logger logger = LoggerFactory.getLogger(ChannelEventRunnable.class);
private final ChannelHandler handler;
private final Channel channel;
private final ChannelState state;
private final Throwable exception;
private final Object message;
public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state) {this(channel, handler, state, null);
}
public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Object message) {this(channel, handler, state, message, null);
}
public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Throwable t) {this(channel, handler, state, null, t);
}
public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Object message, Throwable exception) {
this.channel = channel;
this.handler = handler;
this.state = state;
this.message = message;
this.exception = exception;
}
@Override
public void run() {if (state == ChannelState.RECEIVED) {
try {handler.received(channel, message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle" + state + "operation error, channel is" + channel
+ ", message is" + message, e);
}
} else {switch (state) {
case CONNECTED:
try {handler.connected(channel);
} catch (Exception e) {logger.warn("ChannelEventRunnable handle" + state + "operation error, channel is" + channel, e);
}
break;
case DISCONNECTED:
try {handler.disconnected(channel);
} catch (Exception e) {logger.warn("ChannelEventRunnable handle" + state + "operation error, channel is" + channel, e);
}
break;
case SENT:
try {handler.sent(channel, message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle" + state + "operation error, channel is" + channel
+ ", message is" + message, e);
}
break;
case CAUGHT:
try {handler.caught(channel, exception);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle" + state + "operation error, channel is" + channel
+ ", message is:" + message + ", exception is" + exception, e);
}
break;
default:
logger.warn("unknown state:" + state + ", message is" + message);
}
}
}
/**
* ChannelState
*
*
*/
public enum ChannelState {
/**
* CONNECTED
*/
CONNECTED,
/**
* DISCONNECTED
*/
DISCONNECTED,
/**
* SENT
*/
SENT,
/**
* RECEIVED
*/
RECEIVED,
/**
* CAUGHT
*/
CAUGHT
}
}
- ChannelEventRunnable 实现了 Runnable 接口,其 run 方法根据不同的 ChannelState 做不同处理
小结
- WrappedChannelHandler 的构造根据 ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url) 获取 ExecutorService,然后放到 dataStore 中
- ExecutionChannelHandler 继承了 WrappedChannelHandler,其 received 会创建 ChannelEventRunnable,然后放到 executor 去执行
- ChannelEventRunnable 实现了 Runnable 接口,其 run 方法根据不同的 ChannelState 做不同处理
doc
- WrappedChannelHandler