共计 5139 个字符,预计需要花费 13 分钟才能阅读完成。
根本组件
- NioEventLoop: 监听客户端连贯和解决客户端读写
- Channel: 对一个 Socket 连贯的封装,可进行数据的读写
- Pipline: 对数据的逻辑解决链
- ChannelHandler: Pipline 里的一个解决逻辑
- ByteBuf: 字节缓冲的容器
Netty 服务端启动步骤:
1. 创立服务端 Channel
创立 JDK 定义的 Channel,将它包装成 netty 的 Channel,并创立一些根本组件绑定在 Channel 上
newChannel 办法里通过反射调用 clazz.newInstance()返回一个 Channel 对象,这个 clazz 的类型是在创立 ServerBootStrap 的时候传入的 NioServerSocketChannel。
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, true)
.childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
.handler(new ServerHandler())
.childHandler(null);
NioServerSocketChannel 的创立流程:
2. 初始化服务端 Channel
初始化一些根本属性,增加一些逻辑解决
3. 注册 Selector
将 JDK 的 Channel 注册到事件轮询器 Selector 上,并把 netty 的服务端 Channel 作为一个 attachment 绑定在 JDK 的 Channel 上
4. 端口绑定
调用 JDK API,实现对本地端口的监听
AbstractChannel#bind
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
...
// wasActive 示意端口绑定之前的状态
boolean wasActive = isActive();
try {// 调用 javaChannel().bind 将 JDK 的 channel 绑定到指定端口
doBind(localAddress);
} catch (Throwable t) {safeSetFailure(promise, t);
closeIfClosed();
return;
}
// 端口绑定之后,isActive 返回 true
if (!wasActive && isActive()) {invokeLater(new Runnable() {
@Override
public void run() {pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
fireChannelActive 办法流传事件调用到 doBeginRead 办法,增加 selectionKey 感兴趣的事件。
就是在服务端端口绑定胜利之后,开始承受连贯申请
protected void doBeginRead() throws Exception {
// selectionKey 是注册服务端 channel 到 selector 上的时候返回的
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {return;}
readPending = true;
// 获取以后感兴趣的事件,初始值是 0
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
// 增加感兴趣的事件,readInterestOp 代表的是 accept 事件,承受申请
selectionKey.interestOps(interestOps | readInterestOp);
}
}
NioEventLoop
NioEventLoop 创立
NioEventLoop 在 NioEventLoopGroup 构造函数中被创立,默认创立 2*CPU 核数个 NioEventLoop
每个 NioEventLoop 持有一个线程。
newChild()办法创立 NioEventLoop,次要做了三件事:
- 保留线程执行器 ThreadPerTaskExecutor
- 为 NioEventLoop 创立一个 Selector
- 创立一个 MpscQueue 工作队列
ThreadPerTaskExecutor: 执行每个工作都去创立一个线程去执行
chooserFactory.newChooser()
chooser 的作用: 采纳循环的形式为新连贯绑定一个 NioEventLoop
NioEventLoop 启动
NioEventLoop 启动触发条件
- 服务端启动绑定端口
- 新连贯接入通过 chooser 绑定一个 NioEventLoop
NioEventLoop 执行
protected void run() {for (;;) {
try {switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
// 轮询 I / O 事件
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {selector.wakeup();
}
default:
// fallthrough
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();}
} else {final long ioStartTime = System.nanoTime();
try {processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {if (isShuttingDown()) {closeAll();
if (confirmShutdown()) {return;}
}
} catch (Throwable t) {handleLoopException(t);
}
}
}
整个执行过程次要做三件事:
- select(): 查看是否有 I / O 事件
- processSelectKeys(): 解决 I / O 事件
- runAllTasks: 解决异步工作队列
- select():
select()办法也是在一个 for(;;) 循环里执行
- 判断 select 操作是否超过截止工夫
- 执行阻塞式的 select
- 为防止 jkd 空轮训的 bug,查看空轮训次数;如果超过 512 就创立一个新的 selector,并将原有 selector 的 selectionKey 赋给新的 selector
- processSelectKeys()
对 NioEventLoop 读取到的 selectionKey 对应的事件进行解决
- runAllTasks()
工作有两种,一种是一般的工作,另一种是定时工作。在解决之前,工作被增加到了不同的队列里,所以要先把两种工作合并到一个工作队列,而后顺次执行。
此外,还要查看以后工夫是否超过了容许执行工作的工夫,如果超时就间接中断,进行下一次 NioEventLoop 的执行循环。
新连贯接入
检测新连贯
总体流程
在服务端的 NioEventLoop 执行的第二个过程 processSelectKeys 中检测出 accept 事件后,通过 JDK 的 accept 办法创立 JDK 的一个 channel
创立 NioSocketChannel
逐层调用父类构造函数,将服务端 NioServerSocketChannel 和 accept()创立的 channel 作为参数;设置阻塞模式为 false,保留读事件,创立 unsafe、pipline 组件,禁止 Nagel 算法
服务端 channel 在检测到新连贯并创立完 NioSocketChannel 后,会通过 ServerBootStrapAcceptor 对 NioSocketChannel 做一些解决:
- 增加 childHandler
- 设置 options 和 attrs
- 通过 chooser 在 childGroup 中抉择 NioEventLoop 并注册 selector
创立实现之后,调用 pipeline.fireChannelActive()向 selector 注册读事件,开始筹备承受 I / O 数据。
在执行工作的时候,常常会用到 inEventLoop()办法,这个办法是用来判断以后线程是否是 EventLoop 中的线程。
如果是间接执行,不是就把工作放到 MpscQueue(Multi producer single consumer)工作队列中,这样工作交给指定的 NioEventLoop 执行,不用加锁。
pipeline
初始化
pipeline 在创立 channel 的时候被创立
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();}
pipeline 中的节点是 ChannelHandlerContext 类型的,双向链表构造;在初始化时就会创立 head、tail 两个节点
protected DefaultChannelPipeline(Channel channel) {this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
tail 和 head 别离是 inbound 和 outbound 类型的。
channelHandler 的增加在用户代码配置 childHandler 时实现
childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {ch.pipeline().addLast(new AuthHandler());
}
})
增加时首先判断是否反复增加,而后创立节点 (HandlerContext) 增加至链表,最初回调增加实现的事件。
channelHandler 的删除操作与增加相似,首先找到节点将它从链表上删除,最初回调删除 handler 事件。
事件流传
事件流传有两种形式:
- pipeline 流传: 这种形式会从 pipeline 的 head 节点开始流传
- channelContext 流传: 从以后节点开始流传
inbound 事件
inBound 事件包含 register、active、active 事件
每次流传都会找到下一个 inbound 类型的节点,最初流传给 tail 节点(tail 也是 inbound 类型)
outBound 事件
outBound 事件流传方向与 inBound 相同,是从 tail 向 head 流传
异样事件流传与以上两种不同,它不关怀是 inBoundhandler 还是 outboundhandler,并且只会从产生异样的节点开始向后流传,达到 tail 节点。
所以,最好在 tail 节点前创立一个专门用于解决异样的节点,重写 exceptionCaught 办法,这样所有异样都能被捕捉。