共计 5970 个字符,预计需要花费 15 分钟才能阅读完成。
注:本文基于 Netty Final.3.9.4
咱们先来看看 NioServerBossPool,也就是传说中的 MainReactor,他有两个比拟重要的 Fields,别离是
private final Boss[] bosses;
private final Executor bossExecutor;
这个 Boss 的数组默认大小是 1,即贮存一个 Boss 实例,这个 Boss 到底是啥货色,其实就是封装了 Selector
Boss[] 外面的是 NioServerBoss,继承了 AbstractNioSelector,这个 AbstractNioSelector 有要害的 Fields
private final Executor executor;
protected volatile Thread thread;
protected volatile Selector selector;
private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();
这个 Executor 就是咱们一开始的新建的用于 boss 线程的线程池,这个 Selector 就是 java 原生的 Selector,在 Linux 平台就是 EpollSelectorIpml。
构造大略就是这样,咱们来看一看这个 mainReactor 是怎么初始化运行的,
咱们回到 NioServerBossPool,他的构造函数会执行 init 办法
protected void init() {if (initialized) {throw new IllegalStateException("initialized already");
}
initialized = true;
for (int i = 0; i < bosses.length; i++) {bosses[i] = newBoss(bossExecutor);
}
waitForBossThreads();}
很显著,就是 new 了一个 NioServerBoss,NioServerBoss 的构造函数又会执行 openSelector 办法,
private void openSelector(ThreadNameDeterminer determiner) {
try {selector = SelectorUtil.open();
} catch (Throwable t) {throw new ChannelException("Failed to create a selector.", t);
}
// Start the worker thread with the new Selector.
boolean success = false;
try {DeadLockProofWorker.start(executor, newThreadRenamingRunnable(id, determiner));
success = true;
} finally {if (!success) {
// Release the Selector if the execution fails.
try {selector.close();
} catch (Throwable t) {logger.warn("Failed to close a selector.", t);
}
selector = null;
// The method will return to the caller at this point.
}
}
assert selector != null && selector.isOpen();}
这个会执行一个很奇怪的办法,我也不晓得为什么要这样,具体还是看代码
public final class NioServerBoss extends AbstractNioSelector implements Boss {
...
@Override
protected ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner) {
return new ThreadRenamingRunnable(this,
"New I/O server boss #" + id, determiner);
}
...
}
public final class DeadLockProofWorker {。。。public static void start(final Executor parent, final Runnable runnable) {if (parent == null) {throw new NullPointerException("parent");
}
if (runnable == null) {throw new NullPointerException("runnable");
}
parent.execute(new Runnable() {public void run() {PARENT.set(parent);
try {runnable.run();
} finally {PARENT.remove();
}
}
});
}。。。。}
疏忽其余代码,这个理论就是把 NioServerBoss 丢给咱们一开始新建的 Executors.newCachedThreadPool() 外面运行,这样咱们来看看 NioServerBoss 的 run 办法到底执行了什么,预计大家也能猜到,就是不停的执行 select 办法,外面还有解决 100%cpu 占用的 bug 代码,比较复杂,我简化了一下,大家能够对着源码看
@Override
public void run() {Thread.currentThread().setName(this.threadName);
// 始终循环
while (true) {
try {
// 还没有链接
wakenUp.set(false);
select(selector);
processTaskQueue();
process(selector);
} catch (Exception e) {// ignore}
}
}
能够看看我写成超级简化版 netty,https://gitee.com/qimeila/tin…
持续看一下,NioWorker 是如何承受音讯并传入到 Handler 链进行解决的,这里我就不从头开始了,字节来到 NioWorker 的 read 办法
@Override
protected boolean read(SelectionKey k) {final SocketChannel ch = (SocketChannel) k.channel();
final NioSocketChannel channel = (NioSocketChannel) k.attachment();
final ReceiveBufferSizePredictor predictor =
channel.getConfig().getReceiveBufferSizePredictor();
final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();
int ret = 0;
int readBytes = 0;
boolean failure = true;
ByteBuffer bb = recvBufferPool.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder());
try {while ((ret = ch.read(bb)) > 0) {
readBytes += ret;
if (!bb.hasRemaining()) {break;}
}
failure = false;
} catch (ClosedChannelException e) {// Can happen, and does not need a user attention.} catch (Throwable t) {fireExceptionCaught(channel, t);
}
if (readBytes > 0) {bb.flip();
final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
buffer.setBytes(0, bb);
buffer.writerIndex(readBytes);
// Update the predictor.
predictor.previousReceiveBufferSize(readBytes);
// Fire the event.
fireMessageReceived(channel, buffer);
}
if (ret < 0 || failure) {k.cancel(); // Some JDK implementations run into an infinite loop without this.
close(channel, succeededFuture(channel));
return false;
}
return true;
}
要害是在
// Fire the event.
fireMessageReceived(channel, buffer);
持续往下看
public static void fireMessageReceived(Channel channel, Object message, SocketAddress remoteAddress) {channel.getPipeline().sendUpstream(new UpstreamMessageEvent(channel, message, remoteAddress));
}
能够发现,这是将读取到的数据封装成了一个 UpstreamMessageEvent 并传入到 Pipeline 中去了,这个
Pipeline 顾名思义就是由 Handler 组成的 Handler 链,像管道一样流转,当然在 netty 中,Handler 封装成了 ChannelHandlerContext
这个 sendUpstream 办法就是区第一个 ChannelHandlerContext,并传入刚刚的 UpstreamMessageEvent
void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
try {((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
} catch (Throwable t) {notifyHandlerException(e, t);
}
}
接着往下看 handleUpstream 办法,这个办法在 SimpleChannelUpstreamHandler 中实现
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {if (e instanceof MessageEvent) {messageReceived(ctx, (MessageEvent) e);
} else if (e instanceof WriteCompletionEvent) {WriteCompletionEvent evt = (WriteCompletionEvent) e;
writeComplete(ctx, evt);
} else if (e instanceof ChildChannelStateEvent) {ChildChannelStateEvent evt = (ChildChannelStateEvent) e;
if (evt.getChildChannel().isOpen()) {childChannelOpen(ctx, evt);
} else {childChannelClosed(ctx, evt);
}
} else if (e instanceof ChannelStateEvent) {ChannelStateEvent evt = (ChannelStateEvent) e;
switch (evt.getState()) {
case OPEN:
if (Boolean.TRUE.equals(evt.getValue())) {channelOpen(ctx, evt);
} else {channelClosed(ctx, evt);
}
break;
case BOUND:
if (evt.getValue() != null) {channelBound(ctx, evt);
} else {channelUnbound(ctx, evt);
}
break;
case CONNECTED:
if (evt.getValue() != null) {channelConnected(ctx, evt);
} else {channelDisconnected(ctx, evt);
}
break;
case INTEREST_OPS:
channelInterestChanged(ctx, evt);
break;
default:
ctx.sendUpstream(e);
}
} else if (e instanceof ExceptionEvent) {exceptionCaught(ctx, (ExceptionEvent) e);
} else {ctx.sendUpstream(e);
}
}
很显然会调用到咱们的 messageReceived 办法,然而这个办法咱们能够在咱们的 Handler 外面重写
public class EchoServerHandler extends SimpleChannelUpstreamHandler {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {e.getChannel().write(e.getMessage());
}
@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {super.channelOpen(ctx, e);
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {super.channelConnected(ctx, e);
}
}
于是,由服务器传入的数据最终会通过一系列的 Handler 解决