乐趣区

聊聊dubbo的WrappedChannelHandler

本文主要研究一下 dubbo 的 WrappedChannelHandler

WrappedChannelHandler

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/WrappedChannelHandler.java

public class WrappedChannelHandler implements ChannelHandlerDelegate {protected static final Logger logger = LoggerFactory.getLogger(WrappedChannelHandler.class);

    protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));

    protected final ExecutorService executor;

    protected final ChannelHandler handler;

    protected final URL url;

    public WrappedChannelHandler(ChannelHandler handler, URL url) {
        this.handler = handler;
        this.url = url;
        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);

        String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
        if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {componentKey = CONSUMER_SIDE;}
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
    }

    public void close() {
        try {if (executor != null) {executor.shutdown();
            }
        } catch (Throwable t) {logger.warn("fail to destroy thread pool of server:" + t.getMessage(), t);
        }
    }

    @Override
    public void connected(Channel channel) throws RemotingException {handler.connected(channel);
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {handler.disconnected(channel);
    }

    @Override
    public void sent(Channel channel, Object message) throws RemotingException {handler.sent(channel, message);
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {handler.received(channel, message);
    }

    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {handler.caught(channel, exception);
    }

    public ExecutorService getExecutor() {return executor;}

    @Override
    public ChannelHandler getHandler() {if (handler instanceof ChannelHandlerDelegate) {return ((ChannelHandlerDelegate) handler).getHandler();} else {return handler;}
    }

    public URL getUrl() {return url;}

    public ExecutorService getExecutorService() {
        ExecutorService cexecutor = executor;
        if (cexecutor == null || cexecutor.isShutdown()) {cexecutor = SHARED_EXECUTOR;}
        return cexecutor;
    }

}
  • WrappedChannelHandler 的构造根据 ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url) 获取 ExecutorService,然后放到 dataStore 中

ExecutionChannelHandler

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/execution/ExecutionChannelHandler.java

public class ExecutionChannelHandler extends WrappedChannelHandler {public ExecutionChannelHandler(ChannelHandler handler, URL url) {super(handler, url);
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {ExecutorService executor = getExecutorService();
        if (message instanceof Request) {
            try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
            } catch (Throwable t) {
                // FIXME: when the thread pool is full, SERVER_THREADPOOL_EXHAUSTED_ERROR cannot return properly,
                // therefore the consumer side has to wait until gets timeout. This is a temporary solution to prevent
                // this scenario from happening, but a better solution should be considered later.
                if (t instanceof RejectedExecutionException) {Request request = (Request) message;
                    if (request.isTwoWay()) {String msg = "Server side(" + url.getIp() + "," + url.getPort()
                                + ") thread pool is exhausted, detail msg:" + t.getMessage();
                        Response response = new Response(request.getId(), request.getVersion());
                        response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                        response.setErrorMessage(msg);
                        channel.send(response);
                        return;
                    }
                }
                throw new ExecutionException(message, channel, getClass() + "error when process received event.", t);
            }
        } else {handler.received(channel, message);
        }
    }
}
  • ExecutionChannelHandler 继承了 WrappedChannelHandler,其 received 会创建 ChannelEventRunnable,然后放到 executor 去执行

ChannelEventRunnable

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/ChannelEventRunnable.java

public class ChannelEventRunnable implements Runnable {private static final Logger logger = LoggerFactory.getLogger(ChannelEventRunnable.class);

    private final ChannelHandler handler;
    private final Channel channel;
    private final ChannelState state;
    private final Throwable exception;
    private final Object message;

    public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state) {this(channel, handler, state, null);
    }

    public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Object message) {this(channel, handler, state, message, null);
    }

    public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Throwable t) {this(channel, handler, state, null, t);
    }

    public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Object message, Throwable exception) {
        this.channel = channel;
        this.handler = handler;
        this.state = state;
        this.message = message;
        this.exception = exception;
    }

    @Override
    public void run() {if (state == ChannelState.RECEIVED) {
            try {handler.received(channel, message);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle" + state + "operation error, channel is" + channel
                        + ", message is" + message, e);
            }
        } else {switch (state) {
            case CONNECTED:
                try {handler.connected(channel);
                } catch (Exception e) {logger.warn("ChannelEventRunnable handle" + state + "operation error, channel is" + channel, e);
                }
                break;
            case DISCONNECTED:
                try {handler.disconnected(channel);
                } catch (Exception e) {logger.warn("ChannelEventRunnable handle" + state + "operation error, channel is" + channel, e);
                }
                break;
            case SENT:
                try {handler.sent(channel, message);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle" + state + "operation error, channel is" + channel
                            + ", message is" + message, e);
                }
                break;
            case CAUGHT:
                try {handler.caught(channel, exception);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle" + state + "operation error, channel is" + channel
                            + ", message is:" + message + ", exception is" + exception, e);
                }
                break;
            default:
                logger.warn("unknown state:" + state + ", message is" + message);
            }
        }

    }

    /**
     * ChannelState
     *
     *
     */
    public enum ChannelState {

        /**
         * CONNECTED
         */
        CONNECTED,

        /**
         * DISCONNECTED
         */
        DISCONNECTED,

        /**
         * SENT
         */
        SENT,

        /**
         * RECEIVED
         */
        RECEIVED,

        /**
         * CAUGHT
         */
        CAUGHT
    }

}
  • ChannelEventRunnable 实现了 Runnable 接口,其 run 方法根据不同的 ChannelState 做不同处理

小结

  • WrappedChannelHandler 的构造根据 ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url) 获取 ExecutorService,然后放到 dataStore 中
  • ExecutionChannelHandler 继承了 WrappedChannelHandler,其 received 会创建 ChannelEventRunnable,然后放到 executor 去执行
  • ChannelEventRunnable 实现了 Runnable 接口,其 run 方法根据不同的 ChannelState 做不同处理

doc

  • WrappedChannelHandler
退出移动版