共计 9000 个字符,预计需要花费 23 分钟才能阅读完成。
序
本文主要研究一下 dubbo 的 NettyServer
AbstractServer
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java
public abstract class AbstractServer extends AbstractEndpoint implements Server {
protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler";
private static final Logger logger = LoggerFactory.getLogger(AbstractServer.class);
ExecutorService executor;
private InetSocketAddress localAddress;
private InetSocketAddress bindAddress;
private int accepts;
private int idleTimeout;
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {super(url, handler);
localAddress = getUrl().toInetSocketAddress();
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {bindIp = ANYHOST_VALUE;}
bindAddress = new InetSocketAddress(bindIp, bindPort);
this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
try {doOpen();
if (logger.isInfoEnabled()) {logger.info("Start" + getClass().getSimpleName() + "bind" + getBindAddress() + ", export" + getLocalAddress());
}
} catch (Throwable t) {throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind" + getClass().getSimpleName()
+ "on" + getLocalAddress() + ", cause:" + t.getMessage(), t);
}
//fixme replace this with better method
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}
protected abstract void doOpen() throws Throwable;
protected abstract void doClose() throws Throwable;
@Override
public void reset(URL url) {if (url == null) {return;}
try {if (url.hasParameter(ACCEPTS_KEY)) {int a = url.getParameter(ACCEPTS_KEY, 0);
if (a > 0) {this.accepts = a;}
}
} catch (Throwable t) {logger.error(t.getMessage(), t);
}
try {if (url.hasParameter(IDLE_TIMEOUT_KEY)) {int t = url.getParameter(IDLE_TIMEOUT_KEY, 0);
if (t > 0) {this.idleTimeout = t;}
}
} catch (Throwable t) {logger.error(t.getMessage(), t);
}
try {if (url.hasParameter(THREADS_KEY)
&& executor instanceof ThreadPoolExecutor && !executor.isShutdown()) {ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
int threads = url.getParameter(THREADS_KEY, 0);
int max = threadPoolExecutor.getMaximumPoolSize();
int core = threadPoolExecutor.getCorePoolSize();
if (threads > 0 && (threads != max || threads != core)) {if (threads < core) {threadPoolExecutor.setCorePoolSize(threads);
if (core == max) {threadPoolExecutor.setMaximumPoolSize(threads);
}
} else {threadPoolExecutor.setMaximumPoolSize(threads);
if (core == max) {threadPoolExecutor.setCorePoolSize(threads);
}
}
}
}
} catch (Throwable t) {logger.error(t.getMessage(), t);
}
super.setUrl(getUrl().addParameters(url.getParameters()));
}
@Override
public void send(Object message, boolean sent) throws RemotingException {Collection<Channel> channels = getChannels();
for (Channel channel : channels) {if (channel.isConnected()) {channel.send(message, sent);
}
}
}
@Override
public void close() {if (logger.isInfoEnabled()) {logger.info("Close" + getClass().getSimpleName() + "bind" + getBindAddress() + ", export" + getLocalAddress());
}
ExecutorUtil.shutdownNow(executor, 100);
try {super.close();
} catch (Throwable e) {logger.warn(e.getMessage(), e);
}
try {doClose();
} catch (Throwable e) {logger.warn(e.getMessage(), e);
}
}
@Override
public void close(int timeout) {ExecutorUtil.gracefulShutdown(executor, timeout);
close();}
@Override
public InetSocketAddress getLocalAddress() {return localAddress;}
public InetSocketAddress getBindAddress() {return bindAddress;}
public int getAccepts() {return accepts;}
public int getIdleTimeout() {return idleTimeout;}
@Override
public void connected(Channel ch) throws RemotingException {
// If the server has entered the shutdown process, reject any new connection
if (this.isClosing() || this.isClosed()) {logger.warn("Close new channel" + ch + ", cause: server is closing or has been closed. For example, receive a new connect request while in shutdown process.");
ch.close();
return;
}
Collection<Channel> channels = getChannels();
if (accepts > 0 && channels.size() > accepts) {logger.error("Close channel" + ch + ", cause: The server" + ch.getLocalAddress() + "connections greater than max config" + accepts);
ch.close();
return;
}
super.connected(ch);
}
@Override
public void disconnected(Channel ch) throws RemotingException {Collection<Channel> channels = getChannels();
if (channels.isEmpty()) {logger.warn("All clients has disconnected from" + ch.getLocalAddress() + ". You can graceful shutdown now.");
}
super.disconnected(ch);
}
}
- AbstractServer 的构造器会从 url 读取 bindAddress、accepts、idleTimeout,然后执行 doOpen 方法;close 方法会关闭 executor,执行父类 close 方法,然后执行 doClose 方法;connected 方法会先判断 channels 是否超出 accepts 值,超过则直接 close;disconnected 则执行父类 disconnected 方法
NettyServer
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java
public class NettyServer extends AbstractServer implements Server {private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
/**
* the cache for alive worker channel.
* <ip:port, dubbo channel>
*/
private Map<String, Channel> channels;
/**
* netty server bootstrap.
*/
private ServerBootstrap bootstrap;
/**
* the boss channel that receive connections and dispatch these to worker channel.
*/
private io.netty.channel.Channel channel;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
// the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
/**
* Init and start netty server
*
* @throws Throwable
*/
@Override
protected void doOpen() throws Throwable {bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {// FIXME: should we use getTimeout()?
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();}
@Override
protected void doClose() throws Throwable {
try {if (channel != null) {
// unbind.
channel.close();}
} catch (Throwable e) {logger.warn(e.getMessage(), e);
}
try {Collection<org.apache.dubbo.remoting.Channel> channels = getChannels();
if (channels != null && channels.size() > 0) {for (org.apache.dubbo.remoting.Channel channel : channels) {
try {channel.close();
} catch (Throwable e) {logger.warn(e.getMessage(), e);
}
}
}
} catch (Throwable e) {logger.warn(e.getMessage(), e);
}
try {if (bootstrap != null) {bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();}
} catch (Throwable e) {logger.warn(e.getMessage(), e);
}
try {if (channels != null) {channels.clear();
}
} catch (Throwable e) {logger.warn(e.getMessage(), e);
}
}
@Override
public Collection<Channel> getChannels() {Collection<Channel> chs = new HashSet<Channel>();
for (Channel channel : this.channels.values()) {if (channel.isConnected()) {chs.add(channel);
} else {channels.remove(NetUtils.toAddressString(channel.getRemoteAddress()));
}
}
return chs;
}
@Override
public Channel getChannel(InetSocketAddress remoteAddress) {return channels.get(NetUtils.toAddressString(remoteAddress));
}
@Override
public boolean canHandleIdle() {return true;}
@Override
public boolean isBound() {return channel.isActive();
}
}
- NettyServer 继承了 AbstractServer,其实现了 doOpen、doClose 方法;doOpen 方法会创建 netty 的 ServerBootstrap、bossGroup、workerGroup;doClose 方法会关闭 channel,关闭 bossGroup、workerGroup
NettyTransporter
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyTransporter.java
public class NettyTransporter implements Transporter {
public static final String NAME = "netty";
@Override
public Server bind(URL url, ChannelHandler listener) throws RemotingException {return new NettyServer(url, listener);
}
@Override
public Client connect(URL url, ChannelHandler listener) throws RemotingException {return new NettyClient(url, listener);
}
}
- NettyTransporter 实现了 Transporter 接口,其 bind 方法创建的是 NettyServer
小结
NettyServer 继承了 AbstractServer,其实现了 doOpen、doClose 方法;doOpen 方法会创建 netty 的 ServerBootstrap、bossGroup、workerGroup;doClose 方法会关闭 channel,关闭 bossGroup、workerGroup
doc
- NettyServer
正文完