注:本文基于Netty Final.3.9.4
咱们先来看看NioServerBossPool,也就是传说中的MainReactor,他有两个比拟重要的Fields,别离是
private final Boss[] bosses;
private final Executor bossExecutor;
这个Boss的数组默认大小是1,即贮存一个Boss实例,这个Boss到底是啥货色,其实就是封装了Selector
Boss[]外面的是NioServerBoss,继承了AbstractNioSelector,这个AbstractNioSelector有要害的Fields
private final Executor executor;
protected volatile Thread thread;
protected volatile Selector selector;
private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();
这个Executor就是咱们一开始的新建的用于boss线程的线程池,这个Selector就是java原生的Selector,在Linux平台就是EpollSelectorIpml。
构造大略就是这样,咱们来看一看这个mainReactor是怎么初始化运行的,
咱们回到NioServerBossPool,他的构造函数会执行init办法
protected void init() {
if (initialized) {
throw new IllegalStateException("initialized already");
}
initialized = true;
for (int i = 0; i < bosses.length; i++) {
bosses[i] = newBoss(bossExecutor);
}
waitForBossThreads();
}
很显著,就是new了一个NioServerBoss,NioServerBoss的构造函数又会执行openSelector办法,
private void openSelector(ThreadNameDeterminer determiner) {
try {
selector = SelectorUtil.open();
} catch (Throwable t) {
throw new ChannelException("Failed to create a selector.", t);
}
// Start the worker thread with the new Selector.
boolean success = false;
try {
DeadLockProofWorker.start(executor, newThreadRenamingRunnable(id, determiner));
success = true;
} finally {
if (!success) {
// Release the Selector if the execution fails.
try {
selector.close();
} catch (Throwable t) {
logger.warn("Failed to close a selector.", t);
}
selector = null;
// The method will return to the caller at this point.
}
}
assert selector != null && selector.isOpen();
}
这个会执行一个很奇怪的办法,我也不晓得为什么要这样,具体还是看代码
public final class NioServerBoss extends AbstractNioSelector implements Boss {
...
@Override
protected ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner) {
return new ThreadRenamingRunnable(this,
"New I/O server boss #" + id, determiner);
}
...
}
public final class DeadLockProofWorker {
。。。
public static void start(final Executor parent, final Runnable runnable) {
if (parent == null) {
throw new NullPointerException("parent");
}
if (runnable == null) {
throw new NullPointerException("runnable");
}
parent.execute(new Runnable() {
public void run() {
PARENT.set(parent);
try {
runnable.run();
} finally {
PARENT.remove();
}
}
});
}
。。。。
}
疏忽其余代码,这个理论就是把NioServerBoss丢给咱们一开始新建的Executors.newCachedThreadPool()外面运行,这样咱们来看看NioServerBoss的run办法到底执行了什么,预计大家也能猜到,就是不停的执行select办法,外面还有解决100%cpu占用的bug代码,比较复杂,我简化了一下,大家能够对着源码看
@Override
public void run() {
Thread.currentThread().setName(this.threadName);
//始终循环
while (true) {
try {
//还没有链接
wakenUp.set(false);
select(selector);
processTaskQueue();
process(selector);
} catch (Exception e) {
// ignore
}
}
}
能够看看我写成超级简化版netty,https://gitee.com/qimeila/tin…
持续看一下,NioWorker是如何承受音讯并传入到Handler链进行解决的,这里我就不从头开始了,字节来到NioWorker的read办法
@Override
protected boolean read(SelectionKey k) {
final SocketChannel ch = (SocketChannel) k.channel();
final NioSocketChannel channel = (NioSocketChannel) k.attachment();
final ReceiveBufferSizePredictor predictor =
channel.getConfig().getReceiveBufferSizePredictor();
final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();
int ret = 0;
int readBytes = 0;
boolean failure = true;
ByteBuffer bb = recvBufferPool.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder());
try {
while ((ret = ch.read(bb)) > 0) {
readBytes += ret;
if (!bb.hasRemaining()) {
break;
}
}
failure = false;
} catch (ClosedChannelException e) {
// Can happen, and does not need a user attention.
} catch (Throwable t) {
fireExceptionCaught(channel, t);
}
if (readBytes > 0) {
bb.flip();
final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
buffer.setBytes(0, bb);
buffer.writerIndex(readBytes);
// Update the predictor.
predictor.previousReceiveBufferSize(readBytes);
// Fire the event.
fireMessageReceived(channel, buffer);
}
if (ret < 0 || failure) {
k.cancel(); // Some JDK implementations run into an infinite loop without this.
close(channel, succeededFuture(channel));
return false;
}
return true;
}
要害是在
// Fire the event.
fireMessageReceived(channel, buffer);
持续往下看
public static void fireMessageReceived(Channel channel, Object message, SocketAddress remoteAddress) {
channel.getPipeline().sendUpstream(
new UpstreamMessageEvent(channel, message, remoteAddress));
}
能够发现,这是将读取到的数据封装成了一个UpstreamMessageEvent并传入到Pipeline中去了,这个
Pipeline顾名思义就是由Handler组成的Handler链,像管道一样流转,当然在netty中,Handler封装成了ChannelHandlerContext
这个sendUpstream办法就是区第一个ChannelHandlerContext,并传入刚刚的UpstreamMessageEvent
void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
try {
((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
} catch (Throwable t) {
notifyHandlerException(e, t);
}
}
接着往下看handleUpstream办法,这个办法在SimpleChannelUpstreamHandler中实现
public void handleUpstream(
ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (e instanceof MessageEvent) {
messageReceived(ctx, (MessageEvent) e);
} else if (e instanceof WriteCompletionEvent) {
WriteCompletionEvent evt = (WriteCompletionEvent) e;
writeComplete(ctx, evt);
} else if (e instanceof ChildChannelStateEvent) {
ChildChannelStateEvent evt = (ChildChannelStateEvent) e;
if (evt.getChildChannel().isOpen()) {
childChannelOpen(ctx, evt);
} else {
childChannelClosed(ctx, evt);
}
} else if (e instanceof ChannelStateEvent) {
ChannelStateEvent evt = (ChannelStateEvent) e;
switch (evt.getState()) {
case OPEN:
if (Boolean.TRUE.equals(evt.getValue())) {
channelOpen(ctx, evt);
} else {
channelClosed(ctx, evt);
}
break;
case BOUND:
if (evt.getValue() != null) {
channelBound(ctx, evt);
} else {
channelUnbound(ctx, evt);
}
break;
case CONNECTED:
if (evt.getValue() != null) {
channelConnected(ctx, evt);
} else {
channelDisconnected(ctx, evt);
}
break;
case INTEREST_OPS:
channelInterestChanged(ctx, evt);
break;
default:
ctx.sendUpstream(e);
}
} else if (e instanceof ExceptionEvent) {
exceptionCaught(ctx, (ExceptionEvent) e);
} else {
ctx.sendUpstream(e);
}
}
很显然会调用到咱们的messageReceived办法,然而这个办法咱们能够在咱们的Handler外面重写
public class EchoServerHandler extends SimpleChannelUpstreamHandler {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
e.getChannel().write(e.getMessage());
}
@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
super.channelOpen(ctx, e);
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
super.channelConnected(ctx, e);
}
}
于是,由服务器传入的数据最终会通过一系列的Handler解决
发表回复