关于java:netty源码中关于线程模型的探究

7次阅读

共计 5970 个字符,预计需要花费 15 分钟才能阅读完成。

注:本文基于 Netty Final.3.9.4

咱们先来看看 NioServerBossPool,也就是传说中的 MainReactor,他有两个比拟重要的 Fields,别离是

private final Boss[] bosses;
private final Executor bossExecutor;

这个 Boss 的数组默认大小是 1,即贮存一个 Boss 实例,这个 Boss 到底是啥货色,其实就是封装了 Selector

Boss[] 外面的是 NioServerBoss,继承了 AbstractNioSelector,这个 AbstractNioSelector 有要害的 Fields

private final Executor executor;

protected volatile Thread thread;

protected volatile Selector selector;

private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();

这个 Executor 就是咱们一开始的新建的用于 boss 线程的线程池,这个 Selector 就是 java 原生的 Selector,在 Linux 平台就是 EpollSelectorIpml。

构造大略就是这样,咱们来看一看这个 mainReactor 是怎么初始化运行的,

咱们回到 NioServerBossPool,他的构造函数会执行 init 办法

protected void init() {if (initialized) {throw new IllegalStateException("initialized already");
    }
    initialized = true;

    for (int i = 0; i < bosses.length; i++) {bosses[i] = newBoss(bossExecutor);
    }

    waitForBossThreads();}

很显著,就是 new 了一个 NioServerBoss,NioServerBoss 的构造函数又会执行 openSelector 办法,

private void openSelector(ThreadNameDeterminer determiner) {
    try {selector = SelectorUtil.open();
    } catch (Throwable t) {throw new ChannelException("Failed to create a selector.", t);
    }

    // Start the worker thread with the new Selector.
    boolean success = false;
    try {DeadLockProofWorker.start(executor, newThreadRenamingRunnable(id, determiner));
        success = true;
    } finally {if (!success) {
            // Release the Selector if the execution fails.
            try {selector.close();
            } catch (Throwable t) {logger.warn("Failed to close a selector.", t);
            }
            selector = null;
            // The method will return to the caller at this point.
        }
    }
    assert selector != null && selector.isOpen();}

这个会执行一个很奇怪的办法,我也不晓得为什么要这样,具体还是看代码

public final class NioServerBoss extends AbstractNioSelector implements Boss {
...
@Override
protected ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner) {
    return new ThreadRenamingRunnable(this,
            "New I/O server boss #" + id, determiner);
}
...

}
public final class DeadLockProofWorker {。。。public static void start(final Executor parent, final Runnable runnable) {if (parent == null) {throw new NullPointerException("parent");
        }
        if (runnable == null) {throw new NullPointerException("runnable");
        }

        parent.execute(new Runnable() {public void run() {PARENT.set(parent);
                try {runnable.run();
                } finally {PARENT.remove();
                }
            }
        });
    }。。。。}

疏忽其余代码,这个理论就是把 NioServerBoss 丢给咱们一开始新建的 Executors.newCachedThreadPool() 外面运行,这样咱们来看看 NioServerBoss 的 run 办法到底执行了什么,预计大家也能猜到,就是不停的执行 select 办法,外面还有解决 100%cpu 占用的 bug 代码,比较复杂,我简化了一下,大家能够对着源码看

@Override
public void run() {Thread.currentThread().setName(this.threadName);
   // 始终循环
   while (true) {
      try {
          // 还没有链接
         wakenUp.set(false);
         select(selector);
         processTaskQueue();
         process(selector);
      } catch (Exception e) {// ignore}
   }

}

能够看看我写成超级简化版 netty,https://gitee.com/qimeila/tin…

持续看一下,NioWorker 是如何承受音讯并传入到 Handler 链进行解决的,这里我就不从头开始了,字节来到 NioWorker 的 read 办法

@Override
protected boolean read(SelectionKey k) {final SocketChannel ch = (SocketChannel) k.channel();
    final NioSocketChannel channel = (NioSocketChannel) k.attachment();

    final ReceiveBufferSizePredictor predictor =
        channel.getConfig().getReceiveBufferSizePredictor();
    final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
    final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();

    int ret = 0;
    int readBytes = 0;
    boolean failure = true;

    ByteBuffer bb = recvBufferPool.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder());
    try {while ((ret = ch.read(bb)) > 0) {
            readBytes += ret;
            if (!bb.hasRemaining()) {break;}
        }
        failure = false;
    } catch (ClosedChannelException e) {// Can happen, and does not need a user attention.} catch (Throwable t) {fireExceptionCaught(channel, t);
    }

    if (readBytes > 0) {bb.flip();

        final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
        buffer.setBytes(0, bb);
        buffer.writerIndex(readBytes);

        // Update the predictor.
        predictor.previousReceiveBufferSize(readBytes);

        // Fire the event.
        fireMessageReceived(channel, buffer);
    }

    if (ret < 0 || failure) {k.cancel(); // Some JDK implementations run into an infinite loop without this.
        close(channel, succeededFuture(channel));
        return false;
    }

    return true;
}

要害是在

// Fire the event.
fireMessageReceived(channel, buffer);

持续往下看

public static void fireMessageReceived(Channel channel, Object message, SocketAddress remoteAddress) {channel.getPipeline().sendUpstream(new UpstreamMessageEvent(channel, message, remoteAddress));
}

能够发现,这是将读取到的数据封装成了一个 UpstreamMessageEvent 并传入到 Pipeline 中去了,这个

Pipeline 顾名思义就是由 Handler 组成的 Handler 链,像管道一样流转,当然在 netty 中,Handler 封装成了 ChannelHandlerContext

这个 sendUpstream 办法就是区第一个 ChannelHandlerContext,并传入刚刚的 UpstreamMessageEvent

void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
    try {((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
    } catch (Throwable t) {notifyHandlerException(e, t);
    }
}

接着往下看 handleUpstream 办法,这个办法在 SimpleChannelUpstreamHandler 中实现

public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {if (e instanceof MessageEvent) {messageReceived(ctx, (MessageEvent) e);
    } else if (e instanceof WriteCompletionEvent) {WriteCompletionEvent evt = (WriteCompletionEvent) e;
        writeComplete(ctx, evt);
    } else if (e instanceof ChildChannelStateEvent) {ChildChannelStateEvent evt = (ChildChannelStateEvent) e;
        if (evt.getChildChannel().isOpen()) {childChannelOpen(ctx, evt);
        } else {childChannelClosed(ctx, evt);
        }
    } else if (e instanceof ChannelStateEvent) {ChannelStateEvent evt = (ChannelStateEvent) e;
        switch (evt.getState()) {
        case OPEN:
            if (Boolean.TRUE.equals(evt.getValue())) {channelOpen(ctx, evt);
            } else {channelClosed(ctx, evt);
            }
            break;
        case BOUND:
            if (evt.getValue() != null) {channelBound(ctx, evt);
            } else {channelUnbound(ctx, evt);
            }
            break;
        case CONNECTED:
            if (evt.getValue() != null) {channelConnected(ctx, evt);
            } else {channelDisconnected(ctx, evt);
            }
            break;
        case INTEREST_OPS:
            channelInterestChanged(ctx, evt);
            break;
        default:
            ctx.sendUpstream(e);
        }
    } else if (e instanceof ExceptionEvent) {exceptionCaught(ctx, (ExceptionEvent) e);
    } else {ctx.sendUpstream(e);
    }
}

很显然会调用到咱们的 messageReceived 办法,然而这个办法咱们能够在咱们的 Handler 外面重写

public class EchoServerHandler extends SimpleChannelUpstreamHandler {

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {e.getChannel().write(e.getMessage());
    }

    @Override
    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {super.channelOpen(ctx, e);
    }

    @Override
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {super.channelConnected(ctx, e);
    }
}

于是,由服务器传入的数据最终会通过一系列的 Handler 解决

正文完
 0