共计 15305 个字符,预计需要花费 39 分钟才能阅读完成。
欢迎关注公众号:【爱编程 】
如果有需要后台回复 2019 赠送 1T 的学习资料 哦!!
前文再续,书接上一回【NioEventLoop】。
在研究 NioEventLoop 执行过程的时候,检测 IO 事件(包括新连接),处理 IO 事件,执行所有任务三个过程。其中检测 IO 事件中通过持有的 selector 去轮询事件,检测出新连接。这里复用同一段代码。
Channel 的设计
在开始分析前,先了解一下 Channel 的设计
顶层 Channel 接口定义了 socket 事件如读、写、连接、绑定等事件,并使用 AbstractChannel 作为骨架实现了这些方法。查看器成员变量,发现大多数通用的组件,都被定义在这里
第二层 AbstractNioChannel 定义了以 NIO,即 Selector 的方式进行读写事件的监听。其成员变量保存了 selector 相关的一些属性。
第三层内容比较多,定义了服务端 channel(左边继承了 AbstractNioMessageChannel 的 NioServerSocketChannel)以及客户端 channel(右边继承了 AbstractNioByteChannel 的 NioSocketChannel)。
如何接入新连接?
本文开始探索一下 Netty 是如何接入新连接?主要分为四个部分
1. 检测新连接
2. 创建 NioSocketChannel
3. 分配线程和注册 Selector
4. 向 Selector 注册读事件
1. 检测新连接
Netty 服务端在启动的时候会绑定一个 bossGroup,即 NioEventLoop, 在 bind()
绑定端口的时候注册 accept(新连接接入)事件。扫描到该事件后,便处理。因此入口从:NioEventLoop#processSelectedKeys()
开始。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// 省略代码
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
// 如果当前 NioEventLoop 是 workGroup 则可能是 OP_READ,bossGroup 是 OP_ACCEPT
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// 新连接接入以及读事件处理入口
unsafe.read();}
}
关键的新连接接入以及读事件处理入口unsafe.read();
a). 这里的 unsafe
是在 Channel 创建过程的时候,调用了父类 AbstractChannel#AbstractChannel()
的构造方法,和 pipeline
一起初始化的。
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();}
服务端:
unsafe 为 NioServerSockeChanne
l 的父类 AbstractNioMessageChannel#newUnsafe() 创建, 可以看到对应的是 AbstractNioMessageChannel 的内部类NioMessageUnsafe
;
客户端:
unsafe 为 NioSocketChannel
的的父类 AbstractNioUnsafe#newUnsafe()创建的话,它对应的是 AbstractNioByteChannel 的内部类NioByteUnsafe
b).unsafe.read()
NioMessageUnsafe.read()
中主要的操作如下:
1. 循环调用 jdk 底层的代码创建 channel,并用 netty 的 NioSocketChannel 包装起来,代表新连接成功接入一个通道。
2. 将所有获取到的 channel 存储到一个容器当中,检测接入的连接数,默认是一次接 16 个连接
3. 遍历容器中的 channel, 依次调用方法 fireChannelRead,4.fireChannelReadComplete,fireExceptionCaught 来触发对应的传播事件。
private final class NioMessageUnsafe extends AbstractNioUnsafe {
// 临时存储读到的连接
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
// 服务端接入速率处理器
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {//while 循环调用 doReadMessages()创建新连接对象
do {
// 获取 jdk 底层的 channel, 并加入 readBuf 容器
int localRead = doReadMessages(readBuf);
if (localRead == 0) {break;}
if (localRead < 0) {
closed = true;
break;
}
// 把读到的连接做一个累加 totalMessages,默认最多累计读取 16 个连接,结束循环
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {exception = t;}
// 触发 readBuf 容器内所有的传播事件:ChannelRead 读事件
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
// 清空容器
readBuf.clear();
allocHandle.readComplete();
// 触发传播事件:ChannelReadComplete,所有的读事件完成
pipeline.fireChannelReadComplete();
if (exception != null) {closed = closeOnReadError(exception);
// 触发传播事件:exceptionCaught,触发异常
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {removeReadOp();
}
}
}
}
而这一段关键代码逻辑中 int localRead = doReadMessages(readBuf);
它创建 jdk 底层 channel 并且用 NioSocketChannel 包装起来,将该 channel 添加到传入的容器保存起来,同时返回一个计数。
protected int doReadMessages(List<Object> buf) throws Exception {SocketChannel ch = SocketUtils.accept(javaChannel());
try {if (ch != null) {
// 将 jdk 底层的 channel 封装到 netty 的 channel,并存储到传入的容器当中
//this 为服务端 channel
buf.add(new NioSocketChannel(this, ch));
// 成功和创建 客户端接入的一条通道,并返回
return 1;
}
} catch (Throwable t) {logger.warn("Failed to create a new channel from an accepted socket.", t);
try {ch.close();
} catch (Throwable t2) {logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
2. 创建 NioSocketChannel
通过检测 IO 事件轮询新连接,当前成功检测到连接接入事件之后,会调用 NioServerSocketChannel#doReadMessages()
方法,进行创建NioSocketChannel
, 即客户端 channel 的过程。
下面就来了解一下 NioSocketChannel 的主要工作:
. 查看原代码做了两件事,调用父类构造方法,实例化一个 NioSocketChannelConfig。
public NioSocketChannel(Channel parent, SocketChannel socket) {super(parent, socket);
// 实例化一个 NioSocketChannelConfig
config = new NioSocketChannelConfig(this, socket.socket());
}
1)、查看 NioSocketChannel 父类构造方法, 主要是 保存客户端注册的读事件、channel 为成员变量,以及设置阻塞模式为非阻塞。
public NioSocketChannel(Channel parent, SocketChannel socket) {super(parent, socket);
// 实例化一个 NioSocketChannelConfig
config = new NioSocketChannelConfig(this, socket.socket());
}
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
// 传入感兴趣的读事件:客户端 channel 的读事件
super(parent, ch, SelectionKey.OP_READ);
}
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);
// 保存客户端 channel 为成员变量
this.ch = ch;
// 保存感兴趣的读事件为成员变量
this.readInterestOp = readInterestOp;
try {
// 配置阻塞模式为非阻塞
ch.configureBlocking(false);
} catch (IOException e) {
try {ch.close();
} catch (IOException e2) {if (logger.isWarnEnabled()) {
logger.warn("Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
最后调用父类的构造方法,是设置 该客户端 channel 对应的服务端 channel,以及 channel 的 id 和两大组件 unsafe 和 pipeline
protected AbstractChannel(Channel parent) {
//parent 为创建次客户端 channel 的服务端 channel(服务端启动过程中通过反射创建的)this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();}
2)、再看 NioSocketChannelConfig 实例化。主要是保存了 javaSocket,并且通过 setTcpNoDelay(true);
禁止了 tcp 的 Nagle 算法,目的是为了尽量让小的数据包整合成大的发送出去, 降低延时.
private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {super(channel, javaSocket);
calculateMaxBytesPerGatheringWrite();}
public DefaultSocketChannelConfig(SocketChannel channel, Socket javaSocket) {super(channel);
if (javaSocket == null) {throw new NullPointerException("javaSocket");
}
// 保存 socket
this.javaSocket = javaSocket;
// Enable TCP_NODELAY by default if possible.
if (PlatformDependent.canEnableTcpNoDelayByDefault()) {
try {
// 禁止 Nagle 算法, 目的是为了让小的数据包尽量集合成大的数据包发送出去
setTcpNoDelay(true);
} catch (Exception e) {// Ignore.}
}
}
3. 分配线程和注册 Selector
服务端启动初始化的时候 ServerBootstrap#init()
,主要做了一些参数的配置。其中对于childGroup,childOptions,childAttrs,childHandler
等参数被进行了单独配置。作为参数和 ServerBootstrapAcceptor
一起,被当作一个特殊的 handle,封装到 pipeline 中。ServerBootstrapAcceptor
中的 eventLoop
为workGroup
。
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
// 省略了很多代码.............
@Override
void init(Channel channel) throws Exception {
// 配置 AbstractBootstrap.option
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {setChannelOptions(channel, options, logger);
}
// 配置 AbstractBootstrap.attr
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
// 配置 pipeline
ChannelPipeline p = channel.pipeline();
// 获取 ServerBootstrapAcceptor 配置参数
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();
// 配置 AbstractBootstrap.handler
ChannelHandler handler = config.handler();
if (handler != null) {pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 配置 ServerBootstrapAcceptor, 作为 Handle 紧跟 HeadContext
pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
// 省略了很多代码.............
}
可见,整个服务端 pipeline 的结构如下图所示。bossGroup
控制 IO 事件的检测与处理,整个 bossGroup
对应的 pipeline 只包括头(HeadContext
)尾 (TailContext
) 以及中部的ServerBootstrap.ServerBootstrapAcceptor
。
当新连接接入的时候 AbstractNioMessageChannel.NioMessageUnsafe#read()
方法被调用,最终调用 fireChannelRead()
,方法来触发下一个 Handler 的channelRead
方法。而这个 Handler 正是ServerBootstrapAcceptor
它是 ServerBootstrap 的内部类,同时继承自ChannelInboundHandlerAdapter
。也是一个ChannelInboundHandler
。其中 channelRead 主要做了以下几件事。
1. 为客户端 channel 的 pipeline 添加 childHandler
2. 设置客户端 TCP 相关属性 childOptions 和自定义属性 childAttrs
3.workGroup 选择 NioEventLoop 并注册 Selector
1)、为客户端 channel 的 pipeline 添加 childHandler
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
private final EventLoopGroup childGroup;
private final ChannelHandler childHandler;
private final Entry<ChannelOption<?>, Object>[] childOptions;
private final Entry<AttributeKey<?>, Object>[] childAttrs;
private final Runnable enableAutoReadTask;
ServerBootstrapAcceptor(
final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;
// 省略了一些代码。。。。。@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 该 channel 为客户端接入时创建的 channel
final Channel child = (Channel) msg;
// 添加 childHandler
child.pipeline().addLast(childHandler);
// 设置 TCP 相关属性:childOptions
setChannelOptions(child, childOptions, logger);
// 设置自定义属性:childAttrs
for (Entry<AttributeKey<?>, Object> e: childAttrs) {child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
// 选择 NioEventLoop 并注册 Selector
childGroup.register(child)
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {forceClose(child, t);
}
}
// 省略了一些代码。。。。。}
客户端 channel
的 pipeline 添加 childHandler
,在服务端 EchoServer 创建流程中,childHandler 的时候, 使用了ChannelInitializer
的一个自定义实例。并且覆盖了其 initChannel
方法,改方法获取到 pipeline 并添加具体的 Handler。查看 ChannelInitializer
具体的添加逻辑,handlerAdded
方法。其实在 initChannel
逻辑中,首先是 回调到用户代码执行initChannel
, 用户代码执行添加 Handler 的添加操作,之后将 ChannelInitializer 自己从 pipeline 中删除。
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {if (ctx.channel().isRegistered()) {
// This should always be true with our current DefaultChannelPipeline implementation.
// The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
// surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
// will be added in the expected order.
// 初始化 Channel
if (initChannel(ctx)) {
// We are done with init the Channel, removing the initializer now.
removeState(ctx);
}
}
}
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {if (initMap.add(ctx)) { // Guard against re-entrance.
try {
// 回调到用户代码
initChannel((C) ctx.channel());
} catch (Throwable cause) {// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
// We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause);
} finally {ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
// 删除本身
pipeline.remove(this);
}
}
return true;
}
return false;
}
}
2)、设置客户端 TCP 相关属性 childOptions 和自定义属性 childAttrs
这点在 ServerBootstrapAcceptor#init()
方法中已经体现
3)、workGroup 选择 NioEventLoop 并注册 Selector
这要从 AbstractBootstrap#initAndRegister()
方法开始,然后跟踪源码会来到 AbstractUnsafe#register()
方法
protected abstract class AbstractUnsafe implements Unsafe {
// 省略了一些代码。。。。。@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {if (eventLoop == null) {throw new NullPointerException("eventLoop");
}
if (isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(new IllegalStateException("incompatible event loop type:" + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {register0(promise);
} else {
try {eventLoop.execute(new Runnable() {
@Override
public void run() {register0(promise);
}
});
} catch (Throwable t) {
logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
// 省略了一些代码。。。。。}
最后调用 AbstractNioUnsafe#doRegister()
方法通过 jdk 的 javaChannel().register
完成注册功能。
protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {
// 省略了一些代码。。。。。@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
// 省略了一些代码。。。。。}
4. 向 Selector 注册读事件
a)、入口:ServerBootstrap.ServerBootstrapAcceptor#channelRead()#childGroup.register()
;
public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {forceClose(child, t);
}
}
b)、实际上调用了AbstractChannel.AbstractUnsafe#register0()
,触发了通道激活事件;
// 触发通道激活事件,调用 HeadContent 的
pipeline.fireChannelActive();
c)、pipeline
的头部开始,即 DefaultChannelPipeline.HeadContext#channelActive()
从而触发了readIfIsAutoRead()
;
@Override
public void channelActive(ChannelHandlerContext ctx) {ctx.fireChannelActive();
readIfIsAutoRead();}
d)、读事件将从尾部的 TailContent#read()被触发,从而依次执行 ctx.read(),从尾部开始,每个 outboundHandler 的 read()事件都被触发。直到头部。
@Override
public final ChannelPipeline read() {tail.read();
return this;
}
@Override
public ChannelHandlerContext read() {
// 获取最近的 outboundhandler
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
// 并依次执行其 read 方法
if (executor.inEventLoop()) {next.invokeRead();
} else {
Tasks tasks = next.invokeTasks;
if (tasks == null) {next.invokeTasks = tasks = new Tasks(next);
}
executor.execute(tasks.invokeReadTask);
}
return this;
}
e)、进入头部 HeadContext#read(),并且最终更改了 selectionKey, 向 selector 注册了读事件
HeadContext#read()
@Override
public void read(ChannelHandlerContext ctx) {unsafe.beginRead();
}
AbstractChannel#beginRead()
@Override
public final void beginRead() {assertEventLoop();
if (!isActive()) {return;}
try {doBeginRead();
} catch (final Exception e) {invokeLater(new Runnable() {
@Override
public void run() {pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
AbstractNioMessageChannel#doBeginRead
@Override
protected void doBeginRead() throws Exception {if (inputShutdown) {return;}
super.doBeginRead();}
AbstractNioChannel#doBeginRead()
@Override
protected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {return;}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {selectionKey.interestOps(interestOps | readInterestOp);
}
}
参考文章:
Jorgezhong
总结
Netty 如何接入新连接基本流程如上所述,如果有误,还望各位指正。建议先从前两篇看起比较好理解点。
【Netty】服务端和客户端
学习 NioEventLoop
最后
如果对 Java、大数据感兴趣请长按二维码关注一波,我会努力带给你们价值。觉得对你哪怕有一丁点帮助的请帮忙点个赞或者转发哦。
关注公众号 【爱编码】,回复2019 有相关资料哦。