注:本文基于Netty Final.3.9.4
咱们先来看看NioServerBossPool,也就是传说中的MainReactor,他有两个比拟重要的Fields,别离是
private final Boss[] bosses;private final Executor bossExecutor;
这个Boss的数组默认大小是1,即贮存一个Boss实例,这个Boss到底是啥货色,其实就是封装了Selector
Boss[]外面的是NioServerBoss,继承了AbstractNioSelector,这个AbstractNioSelector有要害的Fields
private final Executor executor;protected volatile Thread thread;protected volatile Selector selector;private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();
这个Executor就是咱们一开始的新建的用于boss线程的线程池,这个Selector就是java原生的Selector,在Linux平台就是EpollSelectorIpml。
构造大略就是这样,咱们来看一看这个mainReactor是怎么初始化运行的,
咱们回到NioServerBossPool,他的构造函数会执行init办法
protected void init() { if (initialized) { throw new IllegalStateException("initialized already"); } initialized = true; for (int i = 0; i < bosses.length; i++) { bosses[i] = newBoss(bossExecutor); } waitForBossThreads();}
很显著,就是new了一个NioServerBoss,NioServerBoss的构造函数又会执行openSelector办法,
private void openSelector(ThreadNameDeterminer determiner) { try { selector = SelectorUtil.open(); } catch (Throwable t) { throw new ChannelException("Failed to create a selector.", t); } // Start the worker thread with the new Selector. boolean success = false; try { DeadLockProofWorker.start(executor, newThreadRenamingRunnable(id, determiner)); success = true; } finally { if (!success) { // Release the Selector if the execution fails. try { selector.close(); } catch (Throwable t) { logger.warn("Failed to close a selector.", t); } selector = null; // The method will return to the caller at this point. } } assert selector != null && selector.isOpen();}
这个会执行一个很奇怪的办法,我也不晓得为什么要这样,具体还是看代码
public final class NioServerBoss extends AbstractNioSelector implements Boss {...@Overrideprotected ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner) { return new ThreadRenamingRunnable(this, "New I/O server boss #" + id, determiner);}...}
public final class DeadLockProofWorker {。。。 public static void start(final Executor parent, final Runnable runnable) { if (parent == null) { throw new NullPointerException("parent"); } if (runnable == null) { throw new NullPointerException("runnable"); } parent.execute(new Runnable() { public void run() { PARENT.set(parent); try { runnable.run(); } finally { PARENT.remove(); } } }); }。。。。}
疏忽其余代码,这个理论就是把NioServerBoss丢给咱们一开始新建的Executors.newCachedThreadPool()外面运行,这样咱们来看看NioServerBoss的run办法到底执行了什么,预计大家也能猜到,就是不停的执行select办法,外面还有解决100%cpu占用的bug代码,比较复杂,我简化了一下,大家能够对着源码看
@Overridepublic void run() { Thread.currentThread().setName(this.threadName); //始终循环 while (true) { try { //还没有链接 wakenUp.set(false); select(selector); processTaskQueue(); process(selector); } catch (Exception e) { // ignore } }}
能够看看我写成超级简化版netty,https://gitee.com/qimeila/tin...
持续看一下,NioWorker是如何承受音讯并传入到Handler链进行解决的,这里我就不从头开始了,字节来到NioWorker的read办法
@Overrideprotected boolean read(SelectionKey k) { final SocketChannel ch = (SocketChannel) k.channel(); final NioSocketChannel channel = (NioSocketChannel) k.attachment(); final ReceiveBufferSizePredictor predictor = channel.getConfig().getReceiveBufferSizePredictor(); final int predictedRecvBufSize = predictor.nextReceiveBufferSize(); final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory(); int ret = 0; int readBytes = 0; boolean failure = true; ByteBuffer bb = recvBufferPool.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder()); try { while ((ret = ch.read(bb)) > 0) { readBytes += ret; if (!bb.hasRemaining()) { break; } } failure = false; } catch (ClosedChannelException e) { // Can happen, and does not need a user attention. } catch (Throwable t) { fireExceptionCaught(channel, t); } if (readBytes > 0) { bb.flip(); final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes); buffer.setBytes(0, bb); buffer.writerIndex(readBytes); // Update the predictor. predictor.previousReceiveBufferSize(readBytes); // Fire the event. fireMessageReceived(channel, buffer); } if (ret < 0 || failure) { k.cancel(); // Some JDK implementations run into an infinite loop without this. close(channel, succeededFuture(channel)); return false; } return true;}
要害是在
// Fire the event.fireMessageReceived(channel, buffer);
持续往下看
public static void fireMessageReceived(Channel channel, Object message, SocketAddress remoteAddress) { channel.getPipeline().sendUpstream( new UpstreamMessageEvent(channel, message, remoteAddress));}
能够发现,这是将读取到的数据封装成了一个UpstreamMessageEvent并传入到Pipeline中去了,这个
Pipeline顾名思义就是由Handler组成的Handler链,像管道一样流转,当然在netty中,Handler封装成了ChannelHandlerContext
这个sendUpstream办法就是区第一个ChannelHandlerContext,并传入刚刚的UpstreamMessageEvent
void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) { try { ((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e); } catch (Throwable t) { notifyHandlerException(e, t); }}
接着往下看handleUpstream办法,这个办法在SimpleChannelUpstreamHandler中实现
public void handleUpstream( ChannelHandlerContext ctx, ChannelEvent e) throws Exception { if (e instanceof MessageEvent) { messageReceived(ctx, (MessageEvent) e); } else if (e instanceof WriteCompletionEvent) { WriteCompletionEvent evt = (WriteCompletionEvent) e; writeComplete(ctx, evt); } else if (e instanceof ChildChannelStateEvent) { ChildChannelStateEvent evt = (ChildChannelStateEvent) e; if (evt.getChildChannel().isOpen()) { childChannelOpen(ctx, evt); } else { childChannelClosed(ctx, evt); } } else if (e instanceof ChannelStateEvent) { ChannelStateEvent evt = (ChannelStateEvent) e; switch (evt.getState()) { case OPEN: if (Boolean.TRUE.equals(evt.getValue())) { channelOpen(ctx, evt); } else { channelClosed(ctx, evt); } break; case BOUND: if (evt.getValue() != null) { channelBound(ctx, evt); } else { channelUnbound(ctx, evt); } break; case CONNECTED: if (evt.getValue() != null) { channelConnected(ctx, evt); } else { channelDisconnected(ctx, evt); } break; case INTEREST_OPS: channelInterestChanged(ctx, evt); break; default: ctx.sendUpstream(e); } } else if (e instanceof ExceptionEvent) { exceptionCaught(ctx, (ExceptionEvent) e); } else { ctx.sendUpstream(e); }}
很显然会调用到咱们的messageReceived办法,然而这个办法咱们能够在咱们的Handler外面重写
public class EchoServerHandler extends SimpleChannelUpstreamHandler { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { e.getChannel().write(e.getMessage()); } @Override public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { super.channelOpen(ctx, e); } @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { super.channelConnected(ctx, e); }}
于是,由服务器传入的数据最终会通过一系列的Handler解决