乐趣区

聊聊dubbo的NettyServer

本文主要研究一下 dubbo 的 NettyServer

AbstractServer

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java

public abstract class AbstractServer extends AbstractEndpoint implements Server {

    protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler";
    private static final Logger logger = LoggerFactory.getLogger(AbstractServer.class);
    ExecutorService executor;
    private InetSocketAddress localAddress;
    private InetSocketAddress bindAddress;
    private int accepts;
    private int idleTimeout;

    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {super(url, handler);
        localAddress = getUrl().toInetSocketAddress();

        String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
        int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
        if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {bindIp = ANYHOST_VALUE;}
        bindAddress = new InetSocketAddress(bindIp, bindPort);
        this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
        this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
        try {doOpen();
            if (logger.isInfoEnabled()) {logger.info("Start" + getClass().getSimpleName() + "bind" + getBindAddress() + ", export" + getLocalAddress());
            }
        } catch (Throwable t) {throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind" + getClass().getSimpleName()
                    + "on" + getLocalAddress() + ", cause:" + t.getMessage(), t);
        }
        //fixme replace this with better method
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
    }

    protected abstract void doOpen() throws Throwable;

    protected abstract void doClose() throws Throwable;

    @Override
    public void reset(URL url) {if (url == null) {return;}
        try {if (url.hasParameter(ACCEPTS_KEY)) {int a = url.getParameter(ACCEPTS_KEY, 0);
                if (a > 0) {this.accepts = a;}
            }
        } catch (Throwable t) {logger.error(t.getMessage(), t);
        }
        try {if (url.hasParameter(IDLE_TIMEOUT_KEY)) {int t = url.getParameter(IDLE_TIMEOUT_KEY, 0);
                if (t > 0) {this.idleTimeout = t;}
            }
        } catch (Throwable t) {logger.error(t.getMessage(), t);
        }
        try {if (url.hasParameter(THREADS_KEY)
                    && executor instanceof ThreadPoolExecutor && !executor.isShutdown()) {ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
                int threads = url.getParameter(THREADS_KEY, 0);
                int max = threadPoolExecutor.getMaximumPoolSize();
                int core = threadPoolExecutor.getCorePoolSize();
                if (threads > 0 && (threads != max || threads != core)) {if (threads < core) {threadPoolExecutor.setCorePoolSize(threads);
                        if (core == max) {threadPoolExecutor.setMaximumPoolSize(threads);
                        }
                    } else {threadPoolExecutor.setMaximumPoolSize(threads);
                        if (core == max) {threadPoolExecutor.setCorePoolSize(threads);
                        }
                    }
                }
            }
        } catch (Throwable t) {logger.error(t.getMessage(), t);
        }
        super.setUrl(getUrl().addParameters(url.getParameters()));
    }

    @Override
    public void send(Object message, boolean sent) throws RemotingException {Collection<Channel> channels = getChannels();
        for (Channel channel : channels) {if (channel.isConnected()) {channel.send(message, sent);
            }
        }
    }

    @Override
    public void close() {if (logger.isInfoEnabled()) {logger.info("Close" + getClass().getSimpleName() + "bind" + getBindAddress() + ", export" + getLocalAddress());
        }
        ExecutorUtil.shutdownNow(executor, 100);
        try {super.close();
        } catch (Throwable e) {logger.warn(e.getMessage(), e);
        }
        try {doClose();
        } catch (Throwable e) {logger.warn(e.getMessage(), e);
        }
    }

    @Override
    public void close(int timeout) {ExecutorUtil.gracefulShutdown(executor, timeout);
        close();}

    @Override
    public InetSocketAddress getLocalAddress() {return localAddress;}

    public InetSocketAddress getBindAddress() {return bindAddress;}

    public int getAccepts() {return accepts;}

    public int getIdleTimeout() {return idleTimeout;}

    @Override
    public void connected(Channel ch) throws RemotingException {
        // If the server has entered the shutdown process, reject any new connection
        if (this.isClosing() || this.isClosed()) {logger.warn("Close new channel" + ch + ", cause: server is closing or has been closed. For example, receive a new connect request while in shutdown process.");
            ch.close();
            return;
        }

        Collection<Channel> channels = getChannels();
        if (accepts > 0 && channels.size() > accepts) {logger.error("Close channel" + ch + ", cause: The server" + ch.getLocalAddress() + "connections greater than max config" + accepts);
            ch.close();
            return;
        }
        super.connected(ch);
    }

    @Override
    public void disconnected(Channel ch) throws RemotingException {Collection<Channel> channels = getChannels();
        if (channels.isEmpty()) {logger.warn("All clients has disconnected from" + ch.getLocalAddress() + ". You can graceful shutdown now.");
        }
        super.disconnected(ch);
    }

}
  • AbstractServer 的构造器会从 url 读取 bindAddress、accepts、idleTimeout,然后执行 doOpen 方法;close 方法会关闭 executor,执行父类 close 方法,然后执行 doClose 方法;connected 方法会先判断 channels 是否超出 accepts 值,超过则直接 close;disconnected 则执行父类 disconnected 方法

NettyServer

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java

public class NettyServer extends AbstractServer implements Server {private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
    /**
     * the cache for alive worker channel.
     * <ip:port, dubbo channel>
     */
    private Map<String, Channel> channels;
    /**
     * netty server bootstrap.
     */
    private ServerBootstrap bootstrap;
    /**
     * the boss channel that receive connections and dispatch these to worker channel.
     */
    private io.netty.channel.Channel channel;

    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;

    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
        // the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }

    /**
     * Init and start netty server
     *
     * @throws Throwable
     */
    @Override
    protected void doOpen() throws Throwable {bootstrap = new ServerBootstrap();

        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                new DefaultThreadFactory("NettyServerWorker", true));

        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();

        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {// FIXME: should we use getTimeout()?
                        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                .addLast("decoder", adapter.getDecoder())
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();}

    @Override
    protected void doClose() throws Throwable {
        try {if (channel != null) {
                // unbind.
                channel.close();}
        } catch (Throwable e) {logger.warn(e.getMessage(), e);
        }
        try {Collection<org.apache.dubbo.remoting.Channel> channels = getChannels();
            if (channels != null && channels.size() > 0) {for (org.apache.dubbo.remoting.Channel channel : channels) {
                    try {channel.close();
                    } catch (Throwable e) {logger.warn(e.getMessage(), e);
                    }
                }
            }
        } catch (Throwable e) {logger.warn(e.getMessage(), e);
        }
        try {if (bootstrap != null) {bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();}
        } catch (Throwable e) {logger.warn(e.getMessage(), e);
        }
        try {if (channels != null) {channels.clear();
            }
        } catch (Throwable e) {logger.warn(e.getMessage(), e);
        }
    }

    @Override
    public Collection<Channel> getChannels() {Collection<Channel> chs = new HashSet<Channel>();
        for (Channel channel : this.channels.values()) {if (channel.isConnected()) {chs.add(channel);
            } else {channels.remove(NetUtils.toAddressString(channel.getRemoteAddress()));
            }
        }
        return chs;
    }

    @Override
    public Channel getChannel(InetSocketAddress remoteAddress) {return channels.get(NetUtils.toAddressString(remoteAddress));
    }

    @Override
    public boolean canHandleIdle() {return true;}

    @Override
    public boolean isBound() {return channel.isActive();
    }

}
  • NettyServer 继承了 AbstractServer,其实现了 doOpen、doClose 方法;doOpen 方法会创建 netty 的 ServerBootstrap、bossGroup、workerGroup;doClose 方法会关闭 channel,关闭 bossGroup、workerGroup

NettyTransporter

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyTransporter.java

public class NettyTransporter implements Transporter {

    public static final String NAME = "netty";

    @Override
    public Server bind(URL url, ChannelHandler listener) throws RemotingException {return new NettyServer(url, listener);
    }

    @Override
    public Client connect(URL url, ChannelHandler listener) throws RemotingException {return new NettyClient(url, listener);
    }

}
  • NettyTransporter 实现了 Transporter 接口,其 bind 方法创建的是 NettyServer

小结

NettyServer 继承了 AbstractServer,其实现了 doOpen、doClose 方法;doOpen 方法会创建 netty 的 ServerBootstrap、bossGroup、workerGroup;doClose 方法会关闭 channel,关闭 bossGroup、workerGroup

doc

  • NettyServer
退出移动版