注:本文基于Netty Final.3.9.4

咱们先来看看NioServerBossPool,也就是传说中的MainReactor,他有两个比拟重要的Fields,别离是

private final Boss[] bosses;private final Executor bossExecutor;

这个Boss的数组默认大小是1,即贮存一个Boss实例,这个Boss到底是啥货色,其实就是封装了Selector

Boss[]外面的是NioServerBoss,继承了AbstractNioSelector,这个AbstractNioSelector有要害的Fields

private final Executor executor;protected volatile Thread thread;protected volatile Selector selector;private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();

这个Executor就是咱们一开始的新建的用于boss线程的线程池,这个Selector就是java原生的Selector,在Linux平台就是EpollSelectorIpml。

构造大略就是这样,咱们来看一看这个mainReactor是怎么初始化运行的,

咱们回到NioServerBossPool,他的构造函数会执行init办法

protected void init() {    if (initialized) {        throw new IllegalStateException("initialized already");    }    initialized = true;    for (int i = 0; i < bosses.length; i++) {        bosses[i] = newBoss(bossExecutor);    }    waitForBossThreads();}

很显著,就是new了一个NioServerBoss,NioServerBoss的构造函数又会执行openSelector办法,

private void openSelector(ThreadNameDeterminer determiner) {    try {        selector = SelectorUtil.open();    } catch (Throwable t) {        throw new ChannelException("Failed to create a selector.", t);    }    // Start the worker thread with the new Selector.    boolean success = false;    try {        DeadLockProofWorker.start(executor, newThreadRenamingRunnable(id, determiner));        success = true;    } finally {        if (!success) {            // Release the Selector if the execution fails.            try {                selector.close();            } catch (Throwable t) {                logger.warn("Failed to close a selector.", t);            }            selector = null;            // The method will return to the caller at this point.        }    }    assert selector != null && selector.isOpen();}

这个会执行一个很奇怪的办法,我也不晓得为什么要这样,具体还是看代码

public final class NioServerBoss extends AbstractNioSelector implements Boss {...@Overrideprotected ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner) {    return new ThreadRenamingRunnable(this,            "New I/O server boss #" + id, determiner);}...}
public final class DeadLockProofWorker {。。。    public static void start(final Executor parent, final Runnable runnable) {        if (parent == null) {            throw new NullPointerException("parent");        }        if (runnable == null) {            throw new NullPointerException("runnable");        }        parent.execute(new Runnable() {            public void run() {                PARENT.set(parent);                try {                    runnable.run();                } finally {                    PARENT.remove();                }            }        });    }。。。。}

疏忽其余代码,这个理论就是把NioServerBoss丢给咱们一开始新建的Executors.newCachedThreadPool()外面运行,这样咱们来看看NioServerBoss的run办法到底执行了什么,预计大家也能猜到,就是不停的执行select办法,外面还有解决100%cpu占用的bug代码,比较复杂,我简化了一下,大家能够对着源码看

@Overridepublic void run() {   Thread.currentThread().setName(this.threadName);   //始终循环   while (true) {      try {          //还没有链接         wakenUp.set(false);         select(selector);         processTaskQueue();         process(selector);      } catch (Exception e) {         // ignore      }   }}

能够看看我写成超级简化版netty,https://gitee.com/qimeila/tin...

持续看一下,NioWorker是如何承受音讯并传入到Handler链进行解决的,这里我就不从头开始了,字节来到NioWorker的read办法

@Overrideprotected boolean read(SelectionKey k) {    final SocketChannel ch = (SocketChannel) k.channel();    final NioSocketChannel channel = (NioSocketChannel) k.attachment();    final ReceiveBufferSizePredictor predictor =        channel.getConfig().getReceiveBufferSizePredictor();    final int predictedRecvBufSize = predictor.nextReceiveBufferSize();    final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();    int ret = 0;    int readBytes = 0;    boolean failure = true;    ByteBuffer bb = recvBufferPool.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder());    try {        while ((ret = ch.read(bb)) > 0) {            readBytes += ret;            if (!bb.hasRemaining()) {                break;            }        }        failure = false;    } catch (ClosedChannelException e) {        // Can happen, and does not need a user attention.    } catch (Throwable t) {        fireExceptionCaught(channel, t);    }    if (readBytes > 0) {        bb.flip();        final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);        buffer.setBytes(0, bb);        buffer.writerIndex(readBytes);        // Update the predictor.        predictor.previousReceiveBufferSize(readBytes);        // Fire the event.        fireMessageReceived(channel, buffer);    }    if (ret < 0 || failure) {        k.cancel(); // Some JDK implementations run into an infinite loop without this.        close(channel, succeededFuture(channel));        return false;    }    return true;}

要害是在

// Fire the event.fireMessageReceived(channel, buffer);

持续往下看

public static void fireMessageReceived(Channel channel, Object message, SocketAddress remoteAddress) {    channel.getPipeline().sendUpstream(            new UpstreamMessageEvent(channel, message, remoteAddress));}

能够发现,这是将读取到的数据封装成了一个UpstreamMessageEvent并传入到Pipeline中去了,这个

Pipeline顾名思义就是由Handler组成的Handler链,像管道一样流转,当然在netty中,Handler封装成了ChannelHandlerContext

这个sendUpstream办法就是区第一个ChannelHandlerContext,并传入刚刚的UpstreamMessageEvent

void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {    try {        ((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);    } catch (Throwable t) {        notifyHandlerException(e, t);    }}

接着往下看handleUpstream办法,这个办法在SimpleChannelUpstreamHandler中实现

public void handleUpstream(        ChannelHandlerContext ctx, ChannelEvent e) throws Exception {    if (e instanceof MessageEvent) {        messageReceived(ctx, (MessageEvent) e);    } else if (e instanceof WriteCompletionEvent) {        WriteCompletionEvent evt = (WriteCompletionEvent) e;        writeComplete(ctx, evt);    } else if (e instanceof ChildChannelStateEvent) {        ChildChannelStateEvent evt = (ChildChannelStateEvent) e;        if (evt.getChildChannel().isOpen()) {            childChannelOpen(ctx, evt);        } else {            childChannelClosed(ctx, evt);        }    } else if (e instanceof ChannelStateEvent) {        ChannelStateEvent evt = (ChannelStateEvent) e;        switch (evt.getState()) {        case OPEN:            if (Boolean.TRUE.equals(evt.getValue())) {                channelOpen(ctx, evt);            } else {                channelClosed(ctx, evt);            }            break;        case BOUND:            if (evt.getValue() != null) {                channelBound(ctx, evt);            } else {                channelUnbound(ctx, evt);            }            break;        case CONNECTED:            if (evt.getValue() != null) {                channelConnected(ctx, evt);            } else {                channelDisconnected(ctx, evt);            }            break;        case INTEREST_OPS:            channelInterestChanged(ctx, evt);            break;        default:            ctx.sendUpstream(e);        }    } else if (e instanceof ExceptionEvent) {        exceptionCaught(ctx, (ExceptionEvent) e);    } else {        ctx.sendUpstream(e);    }}

很显然会调用到咱们的messageReceived办法,然而这个办法咱们能够在咱们的Handler外面重写

public class EchoServerHandler extends SimpleChannelUpstreamHandler {    @Override    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {        e.getChannel().write(e.getMessage());    }    @Override    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {        super.channelOpen(ctx, e);    }    @Override    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {        super.channelConnected(ctx, e);    }}

于是,由服务器传入的数据最终会通过一系列的Handler解决