关于netty:一文聊透-Netty-IO-事件的编排利器-pipeline-详解所有-IO-事件的触发时机以及传播路径

90次阅读

共计 75622 个字符,预计需要花费 190 分钟才能阅读完成。

本系列 Netty 源码解析文章基于 4.1.56.Final版本

1. 前文回顾

在前边的系列文章中,笔者为大家具体分析了 Reactor 模型在 netty 中的创立,启动,运行,接管连贯,接收数据,发送数据的残缺流程,在具体分析整个 Reactor 模型如何在 netty 中实现的过程里,咱们或多或少的见到了 pipeline 的身影。

比方在 Reactor 启动的过程中首先须要创立 NioServerSocketChannel,在创立的过程中会为 NioServerSocketChannel 创立调配一个 pipeline,用于对 OP_ACCEPT 事件的编排。

当 NioServerSocketChannel 向 main reactor 注册胜利后,会在 pipeline 中触发 ChannelRegistered 事件的流传。

当 NioServerSocketChannel 绑定端口胜利后,会在 pipeline 中触发 ChannelActive 事件的流传。

又比方在 Reactor 接管连贯的过程中,当客户端发动一个连贯并实现三次握手之后,连贯对应的 Socket 会寄存在内核中的全连贯队列中,随后 JDK Selector 会告诉 main reactor 此时 NioServerSocketChannel 上有 OP_ACCEPT 事件沉闷,最初 main reactor 开始执行 NioServerSocketChannel 的底层操作类 NioMessageUnsafe#read 办法在 NioServerSocketChannel 中的 pipeline 中流传 ChannelRead 事件。

最终会在 NioServerSocketChannel 的 pipeline 中的 ServerBootstrapAcceptor 中响应 ChannelRead 事件并创立初始化 NioSocketChannel,随后会为每一个新创建的 NioSocetChannel 创立调配一个独立的 pipeline,用于各自 NioSocketChannel 上的 IO 事件的编排。并向 sub reactor 注册 NioSocketChannel,随后在 NioSocketChannel 的 pipeline 中流传 ChannelRegistered 事件,最初流传 ChannelActive 事件。

还有在《Netty 如何高效接管网络数据》一文中,咱们也提过当 sub reactor 读取 NioSocketChannel 中来自客户端的申请数据时,会在 NioSocketChannel 的 pipeline 中流传 ChannelRead 事件,在一个残缺的 read loop 读取结束后会流传 ChannelReadComplete 事件。

在《一文搞懂 Netty 发送数据全流程》一文中,咱们讲到了在用户通过业务解决后,通过 write 办法和 flush 办法别离在 NioSocketChannel 的 pipeline 中流传 write 事件和 flush 事件的过程。

笔者带大家又回顾了一下在前边系列文章中对于 pipeline 的应用场景,然而在这些系列文章中并未对 pipeline 相干的细节进行残缺全面地形容,那么本文笔者将为大家具体的分析下 pipeline 在 IO 事件的编排和流传场景下的残缺实现原理。

2. pipeline 的创立

Netty 会为每一个 Channel 调配一个独立的 pipeline,pipeline 随同着 channel 的创立而创立。

前边介绍到 NioServerSocketChannel 是在 netty 服务端启动的过程中创立的。而 NioSocketChannel 的创立是在当 NioServerSocketChannel 上的 OP_ACCEPT 事件沉闷时,由 main reactor 线程在 NioServerSocketChannel 中创立,并在 NioServerSocketChannel 的 pipeline 中对 OP_ACCEPT 事件进行编排时(图中的 ServerBootstrapAcceptor 中)初始化的。

无论是创立 NioServerSocketChannel 里的 pipeline 还是创立 NioSocketChannel 里的 pipeline , 最终都会委托给它们的父类 AbstractChannel。

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {protected AbstractChannel(Channel parent) {
this.parent = parent;
//channel 全局惟一 ID machineId+processId+sequence+timestamp+random
id = newId();
//unsafe 用于底层 socket 的相干操作
unsafe = newUnsafe();
// 为 channel 调配独立的 pipeline 用于 IO 事件编排
pipeline = newChannelPipeline();}
protected DefaultChannelPipeline newChannelPipeline() {return new DefaultChannelPipeline(this);
}
}
public class DefaultChannelPipeline implements ChannelPipeline {
....................
//pipeline 中的头结点
final AbstractChannelHandlerContext head;
//pipeline 中的尾结点
final AbstractChannelHandlerContext tail;
//pipeline 中持有对应 channel 的援用
private final Channel channel;
....................
protected DefaultChannelPipeline(Channel channel) {
//pipeline 中持有对应 channel 的援用
this.channel = ObjectUtil.checkNotNull(channel, "channel");
............ 省略.......
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
....................
}

在前边的系列文章中笔者屡次提到过,pipeline 的构造是由 ChannelHandlerContext 类型的节点形成的双向链表。其中头结点为 HeadContext,尾结点为 TailContext。其初始构造如下:

2.1 HeadContext

private static final String HEAD_NAME = generateName0(HeadContext.class);
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
//headContext 中持有对 channel unsafe 操作类的援用 用于执行 channel 底层操作
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {super(pipeline, null, HEAD_NAME, HeadContext.class);
// 持有 channel unsafe 操作类的援用,后续用于执行 channel 底层操作
unsafe = pipeline.channel().unsafe();
// 设置 channelHandler 的状态为 ADD_COMPLETE
setAddComplete();}
@Override
public ChannelHandler handler() {return this;}
.......................
}

咱们晓得双向链表构造的 pipeline 中的节点元素为 ChannelHandlerContext,既然 HeadContext 作为 pipeline 的头结点,那么它肯定是 ChannelHandlerContext 类型的,所以它须要继承实现 AbstractChannelHandlerContext,相当于一个哨兵的作用,因为用户能够以任意程序向 pipeline 中增加 ChannelHandler,须要用 HeadContext 来固定指向第一个 ChannelHandlerContext。

在《一文搞懂 Netty 发送数据全流程》一文中的《1. ChannelHandlerContext》大节中,笔者曾为大家具体介绍过 ChannelHandlerContext 在 pipeline 中的作用,遗记的同学能够在回看下。

于此同时 HeadContext 又实现了 ChannelInboundHandler 和 ChannelOutboundHandler 接口,阐明 HeadContext 即是一个 ChannelHandlerContext 又是一个 ChannelHandler,它能够同时解决 Inbound 事件和 Outbound 事件。

咱们也留神到 HeadContext 中持有了对应 channel 的底层操作类 unsafe,这也阐明 IO 事件在 pipeline 中的流传最终会落在 HeadContext 中进行最初的 IO 解决。它是 Inbound 事件的解决终点,也是 Outbound 事件的解决起点。这里也能够看出 HeadContext 除了起到哨兵的作用,它还承当了对 channel 底层相干的操作。

比方咱们在《Reactor 在 Netty 中的实现(启动篇)》中介绍的 NioServerSocketChannel 在向 main reactor 注册实现后会触发 ChannelRegistered 事件从 HeadContext 开始顺次在 pipeline 中向后流传。

@Override
public void channelRegistered(ChannelHandlerContext ctx) {
// 此时 firstRegistration 曾经变为 false, 在 pipeline.invokeHandlerAddedIfNeeded 中已被调用过
invokeHandlerAddedIfNeeded();
ctx.fireChannelRegistered();}

以及 NioServerSocketChannel 在与端口绑定胜利后会触发 ChannelActive 事件从 HeadContext 开始顺次在 pipeline 中向后流传,并在 HeadContext 中通过 unsafe.beginRead() 注册 OP_ACCEPT 事件到 main reactor 中。

@Override
public void read(ChannelHandlerContext ctx) {
// 触发注册 OP_ACCEPT 或者 OP_READ 事件
unsafe.beginRead();}

同理在 NioSocketChannel 在向 sub reactor 注册胜利后。会先后触发 ChannelRegistered 事件和 ChannelActive 事件从 HeadContext 开始在 pipeline 中向后流传。并在 HeadContext 中通过 unsafe.beginRead() 注册 OP_READ 事件到 sub reactor 中。

@Override
public void channelActive(ChannelHandlerContext ctx) {
//pipeline 中持续向后流传 channelActive 事件
ctx.fireChannelActive();
// 如果是 autoRead 则主动触发 read 事件流传
// 在 read 回调函数中 触发 OP_ACCEPT 或者 OP_READ 事件注册
readIfIsAutoRead();}

在《一文搞懂 Netty 发送数据全流程》中介绍的 write 事件和 flush 事件最终会在 pipeline 中从后向前始终流传到 HeadContext,并在 HeadContext 中相应事件回调函数中调用 unsafe 类操作底层 channel 发送数据。

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
// 到 headContext 这里 msg 的类型必须是 ByteBuffer,也就是说必须通过编码器将业务层写入的实体编码为 ByteBuffer
unsafe.write(msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) {unsafe.flush();
}

从本大节的内容介绍中,咱们能够看出在 Netty 中对于 Channel 的相干底层操作调用均是在 HeadContext 中触发的。

2.2 TailContext

private static final String TAIL_NAME = generateName0(TailContext.class);
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {TailContext(DefaultChannelPipeline pipeline) {super(pipeline, null, TAIL_NAME, TailContext.class);
// 设置 channelHandler 的状态为 ADD_COMPLETE
setAddComplete();}
@Override
public ChannelHandler handler() {return this;}
......................
}

同样 TailContext 作为双向链表构造的 pipeline 中的尾结点,也须要继承实现 AbstractChannelHandlerContext。但它同时又实现了 ChannelInboundHandler。

这阐明 TailContext 除了是一个 ChannelHandlerContext 同时也是一个 ChannelInboundHandler。

2.2.1 TailContext 作为一个 ChannelHandlerContext 的作用

TailContext 作为一个 ChannelHandlerContext 的作用是负责将 outbound 事件从 pipeline 的开端始终向前流传直到 HeadContext。当然前提是用户须要调用 channel 的相干 outbound 办法。

public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {ChannelFuture write(Object msg);
ChannelFuture write(Object msg, ChannelPromise promise);
ChannelOutboundInvoker flush();
ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
ChannelFuture writeAndFlush(Object msg);
}
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
@Override
public ChannelFuture write(Object msg) {return pipeline.write(msg);
}
@Override
public Channel flush() {pipeline.flush();
return this;
}
@Override
public ChannelFuture writeAndFlush(Object msg) {return pipeline.writeAndFlush(msg);
}
}
public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelFuture write(Object msg) {return tail.write(msg);
}
@Override
public final ChannelPipeline flush() {tail.flush();
return this;
}
@Override
public final ChannelFuture writeAndFlush(Object msg) {return tail.writeAndFlush(msg);
}
}

这里咱们能够看到,当咱们在自定义 ChannelHandler 中调用 ctx.channel().write(msg) 时,会在 AbstractChannel 中触发 pipeline.write(msg),最终在 DefaultChannelPipeline 中调用 tail.write(msg)。使得 write 事件能够从 pipeline 的开端开始向前流传,其余 outbound 事件的流传也是一样的情理。

public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {ctx.channel().write(msg);
}
}

而咱们自定义的 ChannelHandler 会被封装在一个 ChannelHandlerContext 中从而退出到 pipeline 中,而这个用于装载自定义 ChannelHandler 的 ChannelHandlerContext 与 TailContext 一样实质也都是 ChannelHandlerContext,只不过在 pipeline 中的地位不同罢了。

public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {ChannelFuture write(Object msg);
ChannelFuture write(Object msg, ChannelPromise promise);
ChannelOutboundInvoker flush();
ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
ChannelFuture writeAndFlush(Object msg);
}

咱们看到 ChannelHandlerContext 接口自身也会继承 ChannelInboundInvoker
和 ChannelOutboundInvoker 接口,所以说 ContextHandlerContext 也能够触发 inbound 事件和 outbound 事件,只不过表白的语义是在 pipeline 中从以后 ChannelHandler 开始向前或者向后流传 outbound 事件或者 inbound 事件。

public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {ctx.write(msg);
}
}

这里示意 write 事件从以后 EchoServerHandler 开始在 pipeline 中向前流传直到 HeadContext。

2.2.2 TailContext 作为一个 ChannelInboundHandler 的作用

最初 TailContext 作为一个 ChannelInboundHandler 的作用就是为 inbound 事件在 pipeline 中的流传做一个兜底的解决。

这里提到的兜底解决是什么意思呢?

比方咱们前边介绍到的,在 NioSocketChannel 向 sub reactor 注册胜利后之后触发的 ChannelRegistered 事件和 ChannelActive 事件。或者在 reactor 线程读取 NioSocketChannel 中的申请数据时所触发的 channelRead 事件和 ChannelReadComplete 事件。

这些 inbound 事件都会首先从 HeadContext 开始在 pipeline 中一个一个的向后传递。

极其的状况是如果 pipeline 中所有 ChannelInboundHandler 中相应的 inbound 事件回调办法均不对事件作出解决,并持续向后流传。如下示例代码所示:

public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {ctx.fireChannelRead(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {ctx.fireChannelReadComplete();
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelRegistered();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelActive();
}
}

最终这些 inbound 事件在 pipeline 中得不到解决,最初会流传到 TailContext 中。

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {onUnhandledInboundMessage(ctx, msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {onUnhandledInboundChannelReadComplete();
}
@Override
public void channelActive(ChannelHandlerContext ctx) {onUnhandledInboundChannelActive();
}
}

而在 TailContext 中须要对这些得不到任何解决的 inbound 事件做出最终解决。比方抛弃该 msg,并开释所占用的 directByteBuffer,免得产生内存泄露。

protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {onUnhandledInboundMessage(msg);
if (logger.isDebugEnabled()) {logger.debug("Discarded message pipeline : {}. Channel : {}.",
ctx.pipeline().names(), ctx.channel());
}
}
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug("Discarded inbound message {} that reached at the tail of the pipeline." +
"Please check your pipeline configuration.", msg);
} finally {ReferenceCountUtil.release(msg);
}
}

3. pipeline 中的事件分类

在前边的系列文章中,笔者屡次介绍过,Netty 中的 IO 事件一共分为两大类:inbound 类事件和 outbound 类事件。其实如果严格来分的话应该分为三类。第三种事件类型为 exceptionCaught 异样事件类型。

而 exceptionCaught 事件在事件流传角度上来说和 inbound 类事件一样,都是从 pipeline 的 HeadContext 开始始终向后传递或者从以后 ChannelHandler 开始始终向后传递直到 TailContext。所以个别也会将 exceptionCaught 事件对立归为 inbound 类事件。

而依据事件类型的分类,相应负责处理事件回调的 ChannelHandler 也会被分为两类:

  • ChannelInboundHandler:次要负责响应解决 inbound 类事件回调和 exceptionCaught 事件回调。
  • ChannelOutboundHandler:次要负责响应解决 outbound 类事件回调。

那么咱们常说的 inbound 类事件和 outbound 类事件具体都蕴含哪些事件呢?

3.1 inbound 类事件

final class ChannelHandlerMask {
// inbound 事件汇合
static final int MASK_ONLY_INBOUND = MASK_CHANNEL_REGISTERED |
MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |
MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;
private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_INBOUND;
// inbound 类事件相干掩码
static final int MASK_EXCEPTION_CAUGHT = 1;
static final int MASK_CHANNEL_REGISTERED = 1 << 1;
static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
static final int MASK_CHANNEL_ACTIVE = 1 << 3;
static final int MASK_CHANNEL_INACTIVE = 1 << 4;
static final int MASK_CHANNEL_READ = 1 << 5;
static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;
}

netty 会将其反对的所有异步事件用掩码来示意,定义在 ChannelHandlerMask 类中,netty 框架通过这些事件掩码能够很不便的晓得用户自定义的 ChannelHandler 是属于什么类型的(ChannelInboundHandler or ChannelOutboundHandler)。

除此之外,inbound 类事件如此之多,用户也并不是对所有的 inbound 类事件感兴趣,用户能够在自定义的 ChannelInboundHandler 中笼罩本人感兴趣的 inbound 事件回调,从而达到针对特定 inbound 事件的监听。

这些用户感兴趣的 inbound 事件汇合同样也会用掩码的模式保留在自定义 ChannelHandler 对应的 ChannelHandlerContext 中,这样当特定 inbound 事件在 pipeline 中开始流传的时候,netty 能够依据对应 ChannelHandlerContext 中保留的 inbound 事件汇合掩码来判断,用户自定义的 ChannelHandler 是否对该 inbound 事件感兴趣,从而决定是否执行用户自定义 ChannelHandler 中的相应回调办法或者跳过对该 inbound 事件不感兴趣的 ChannelHandler 持续向后流传。

从以上形容中,咱们也能够窥探出,Netty 引入 ChannelHandlerContext 来封装 ChannelHandler 的起因,在代码设计上还是遵循繁多职责的准则,ChannelHandler 是用户接触最频繁的一个 netty 组件,netty 心愿用户可能把全副注意力放在最外围的 IO 解决上,用户只须要关怀本人对哪些异步事件感兴趣并思考相应的解决逻辑即可,而并不需要关怀异步事件在 pipeline 中如何传递,如何抉择具备执行条件的 ChannelHandler 去执行或者跳过。这些切面性质的逻辑,netty 将它们作为上下文信息全副封装在 ChannelHandlerContext 中由 netty 框架自身负责解决。

以上这些内容,笔者还会在事件流传相干大节做具体的介绍,之所以这里引出,还是为了让大家感触下利用掩码进行汇合操作的便利性,netty 中相似这样的设计还有很多,比方前边系列文章中屡次提到过的,channel 再向 reactor 注册 IO 事件时,netty 也是将 channel 感兴趣的 IO 事件用掩码的模式存储于 SelectionKey 中的 int interestOps 中。

接下来笔者就为大家介绍下这些 inbound 事件,并梳理出这些 inbound 事件的触发机会。不便大家依据各自业务需要灵便地进行监听。

3.1.1 ExceptionCaught 事件

在本大节介绍的这些 inbound 类事件在 pipeline 中流传的过程中,如果在相应事件回调函数执行的过程中产生异样,那么就会触发对应 ChannelHandler 中的 exceptionCaught 事件回调。

private void invokeExceptionCaught(final Throwable cause) {if (invokeHandler()) {
try {handler().exceptionCaught(this, cause);
} catch (Throwable error) {if (logger.isDebugEnabled()) {
logger.debug("An exception {}" +
"was thrown by a user handler's exceptionCaught() "+"method while handling the following exception:",
ThrowableUtil.stackTraceToString(error), cause);
} else if (logger.isWarnEnabled()) {
logger.warn("An exception'{}'[enable DEBUG level for full stacktrace]" +
"was thrown by a user handler's exceptionCaught() "+"method while handling the following exception:", error, cause);
}
}
} else {fireExceptionCaught(cause);
}
}

当然用户能够抉择在 exceptionCaught 事件回调中是否执行 ctx.fireExceptionCaught(cause) 从而决定是否将 exceptionCaught 事件持续向后流传。

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
..........
ctx.fireExceptionCaught(cause);
}

当 netty 内核解决连贯的接管,以及数据的读取过程中如果产生异样,会在整个 pipeline 中触发 exceptionCaught 事件的流传。

这里笔者为什么要独自强调在 inbound 事件流传的过程中产生异样,才会回调 exceptionCaught 呢 ?

因为 inbound 事件个别都是由 netty 内核触发流传的,而 outbound 事件个别都是由用户抉择触发的,比方用户在解决完业务逻辑触发的 write 事件或者 flush 事件。

而在用户触发 outbound 事件后,个别都会失去一个 ChannelPromise。用户能够向 ChannelPromise 增加各种 listener。当 outbound 事件在流传的过程中产生异样时,netty 会告诉用户持有的这个 ChannelPromise,但不会触发 exceptionCaught 的回调

比方咱们在《一文搞懂 Netty 发送数据全流程》一文中介绍到的在 write 事件流传的过程中就不会触发 exceptionCaught 事件回调。只是去告诉用户的 ChannelPromise。

private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
// 调用以后 ChannelHandler 中的 write 办法
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {notifyOutboundHandlerException(t, promise);
}
}
private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
}

而 outbound 事件中只有 flush 事件的流传是个例外,当 flush 事件在 pipeline 流传的过程中产生异样时,会触发对应异样 ChannelHandler 的 exceptionCaught 事件回调。因为 flush 办法的签名中不会给用户返回 ChannelPromise。

@Override
ChannelHandlerContext flush();
private void invokeFlush0() {
try {((ChannelOutboundHandler) handler()).flush(this);
} catch (Throwable t) {invokeExceptionCaught(t);
}
}

3.1.2 ChannelRegistered 事件

当 main reactor 在启动的时候,NioServerSocketChannel 会被创立并初始化,随后就会向 main reactor 注册,当注册胜利后就会在 NioServerSocketChannel 中的 pipeline 中流传 ChannelRegistered 事件。

当 main reactor 接管客户端发动的连贯后,NioSocketChannel 会被创立并初始化,随后会向 sub reactor 注册,当注册胜利后会在 NioSocketChannel 中的 pipeline 流传 ChannelRegistered 事件。

private void register0(ChannelPromise promise) {
................
// 执行真正的注册操作
doRegister();
...........
// 触发 channelRegister 事件
pipeline.fireChannelRegistered();
.......
}

留神:此时对应的 channel 还没有注册 IO 事件到相应的 reactor 中。

3.1.3 ChannelActive 事件

当 NioServerSocketChannel 再向 main reactor 注册胜利并触发 ChannelRegistered 事件流传之后,随后就会在 pipeline 中触发 bind 事件,而 bind 事件是一个 outbound 事件,会从 pipeline 中的尾结点 TailContext 始终向前流传最终在 HeadContext 中执行真正的绑定操作。

@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
// 触发 AbstractChannel->bind 办法 执行 JDK NIO SelectableChannel 执行底层绑定操作
unsafe.bind(localAddress, promise);
}
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
..............
doBind(localAddress);
...............
// 绑定胜利后 channel 激活 触发 channelActive 事件流传
if (!wasActive && isActive()) {invokeLater(new Runnable() {
@Override
public void run() {
//HeadContext->channelActive 回调办法 执行注册 OP_ACCEPT 事件
pipeline.fireChannelActive();}
});
}
...............
}

当 netty 服务端 NioServerSocketChannel 绑定端口胜利之后,才算是真正的 Active,随后触发 ChannelActive 事件在 pipeline 中的流传。

之前咱们也提到过判断 NioServerSocketChannel 是否 Active 的规范就是 : 底层 JDK Nio ServerSocketChannel 是否 open 并且 ServerSocket 是否曾经实现绑定。

@Override
public boolean isActive() {return isOpen() && javaChannel().socket().isBound();}

而客户端 NioSocketChannel 中触发 ChannelActive 事件就会比较简单,当 NioSocketChannel 再向 sub reactor 注册胜利并触发 ChannelRegistered 之后,紧接着就会触发 ChannelActive 事件在 pipeline 中流传。

private void register0(ChannelPromise promise) {
................
// 执行真正的注册操作
doRegister();
...........
// 触发 channelRegister 事件
pipeline.fireChannelRegistered();
.......
if (isActive()) {if (firstRegistration) {
// 触发 channelActive 事件
pipeline.fireChannelActive();} else if (config().isAutoRead()) {beginRead();
}
}
}

而客户端 NioSocketChannel 是否 Active 的标识是:底层 JDK NIO
SocketChannel 是否 open 并且底层 socket 是否连贯。毫无疑问,这里的 socket 肯定是 connected。所以间接触发 ChannelActive 事件。

@Override
public boolean isActive() {SocketChannel ch = javaChannel();
return ch.isOpen() && ch.isConnected();
}

留神:此时 channel 才会到相应的 reactor 中去注册感兴趣的 IO 事件。当用户自定义的 ChannelHandler 接管到 ChannelActive 事件时,表明 IO 事件曾经注册到 reactor 中了。

3.1.4 ChannelRead 和 ChannelReadComplete 事件

当客户端有新连贯申请的时候,服务端的 NioServerSocketChannel 上的 OP_ACCEPT 事件会沉闷,随后 main reactor 会在一个 read loop 中一直的调用 serverSocketChannel.accept() 接管新的连贯直到全副接管结束或者达到 read loop 最大次数 16 次。

在 NioServerSocketChannel 中,每 accept 一个新的连贯,就会在 pipeline 中触发 ChannelRead 事件。一个残缺的 read loop 完结之后,会触发 ChannelReadComplete 事件。

private final class NioMessageUnsafe extends AbstractNioUnsafe {
@Override
public void read() {
......................
try {
do {
// 底层调用 NioServerSocketChannel->doReadMessages 创立客户端 SocketChannel
int localRead = doReadMessages(readBuf);
.................
} while (allocHandle.continueReading());
} catch (Throwable t) {exception = t;}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {pipeline.fireChannelRead(readBuf.get(i));
}
pipeline.fireChannelReadComplete();
.................
}
}

当客户端 NioSocketChannel 上有申请数据到来时,NioSocketChannel 上的 OP_READ 事件沉闷,随后 sub reactor 也会在一个 read loop 中对 NioSocketChannel 中的申请数据进行读取直到读取结束或者达到 read loop 的最大次数 16 次。

在 read loop 的读取过程中,每读取一次就会在 pipeline 中触发 ChannelRead 事件。当一个残缺的 read loop 完结之后,会在 pipeline 中触发 ChannelReadComplete 事件。

这里须要留神的是当 ChannelReadComplete 事件触发时,此时并不代表 NioSocketChannel 中的申请数据曾经读取结束,可能的状况是发送的申请数据太多,在一个 read loop 中读取不完达到了最大限度次数 16 次,还没全副读取结束就退出了 read loop。一旦退出 read loop 就会触发 ChannelReadComplete 事件。具体内容能够查看笔者的这篇文章《Netty 如何高效接管网络数据》。

3.1.5 ChannelWritabilityChanged 事件

当咱们解决完业务逻辑失去业务处理结果后,会调用 ctx.write(msg) 触发 write 事件在 pipeline 中的流传。

@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {ctx.write(msg);
}

最终 netty 会将发送数据 msg 写入 NioSocketChannel 中的待发送缓冲队列 ChannelOutboundBuffer 中。并期待用户调用 flush 操作从 ChannelOutboundBuffer 中将待发送数据 msg,写入到底层 Socket 的发送缓冲区中。

当对端的接管处理速度十分慢或者网络情况极度拥塞时,使得 TCP 滑动窗口一直的放大,这就导致发送端的发送速度也变得越来越小,而此时用户还在一直的调用 ctx.write(msg),这就会导致 ChannelOutboundBuffer 会急剧增大,从而可能导致 OOM。netty 引入了高下水位线来管制 ChannelOutboundBuffer 的内存占用。

public final class WriteBufferWaterMark {
private static final int DEFAULT_LOW_WATER_MARK = 32 * 1024;
private static final int DEFAULT_HIGH_WATER_MARK = 64 * 1024;
}

当 ChanneOutboundBuffer 中的内存占用量超过高水位线时,netty 就会将对应的 channel 置为不可写状态,并在 pipeline 中触发 ChannelWritabilityChanged 事件。

private void setUnwritable(boolean invokeLater) {for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue | 1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {if (oldValue == 0) {
// 触发 fireChannelWritabilityChanged 事件 示意以后 channel 变为不可写
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}

当 ChannelOutboundBuffer 中的内存占用量低于低水位线时,netty 又会将对应的 NioSocketChannel 设置为可写状态,并再次触发 ChannelWritabilityChanged 事件。

private void setWritable(boolean invokeLater) {for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue & ~1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {if (oldValue != 0 && newValue == 0) {fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}

用户可在自定义 ChannelHandler 中通过 ctx.channel().isWritable() 判断以后 channel 是否可写。

@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {if (ctx.channel().isWritable()) {........... 以后 channel 可写.........} else {........... 以后 channel 不可写.........}
}

3.1.6 UserEventTriggered 事件

netty 提供了一种事件扩大机制能够容许用户自定义异步事件,这样能够使得用户可能灵便的定义各种简单场景的解决机制。

上面咱们来看下如何在 Netty 中自定义异步事件。

  1. 定义异步事件。
public final class OurOwnDefinedEvent {public static final OurOwnDefinedEvent INSTANCE = new OurOwnDefinedEvent();
private OurOwnDefinedEvent() {}
}
  1. 触发自定义事件的流传
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
...... 省略.......
// 事件在 pipeline 中从以后 ChannelHandlerContext 开始向后流传
ctx.fireUserEventTriggered(OurOwnDefinedEvent.INSTANCE);
// 事件从 pipeline 的头结点 headContext 开始向后流传
ctx.channel().pipeline().fireUserEventTriggered(OurOwnDefinedEvent.INSTANCE);
}
}
  1. 自定义事件的响应和解决。
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (OurOwnDefinedEvent.INSTANCE == evt) {..... 自定义事件处理......}
}
}

后续随着咱们源码解读的深刻,咱们还会看到 Netty 本人自身也定义了许多 UserEvent 事件,咱们前面还会在介绍,大家这里只是略微理解一下相干的用法即可。

3.1.7 ChannelInactive 和 ChannelUnregistered 事件

当 Channel 被敞开之后会在 pipeline 中先触发 ChannelInactive 事件的流传而后在触发 ChannelUnregistered 事件的流传。

咱们能够在 Inbound 类型的 ChannelHandler 中响应 ChannelInactive 和 ChannelUnregistered 事件。

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
...... 响应 inActive 事件...
// 持续向后流传 inActive 事件
super.channelInactive(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
...... 响应 Unregistered 事件...
// 持续向后流传 Unregistered 事件
super.channelUnregistered(ctx);
}

这里和连贯建设之后的事件触发程序正好相同,连贯建设之后是先触发 ChannelRegistered 事件而后在触发 ChannelActive 事件。

3.2 Outbound 类事件

final class ChannelHandlerMask {
// outbound 事件的汇合
static final int MASK_ONLY_OUTBOUND = MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |
MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;
private static final int MASK_ALL_OUTBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_OUTBOUND;
// outbound 事件掩码
static final int MASK_BIND = 1 << 9;
static final int MASK_CONNECT = 1 << 10;
static final int MASK_DISCONNECT = 1 << 11;
static final int MASK_CLOSE = 1 << 12;
static final int MASK_DEREGISTER = 1 << 13;
static final int MASK_READ = 1 << 14;
static final int MASK_WRITE = 1 << 15;
static final int MASK_FLUSH = 1 << 16;
}

和 Inbound 类事件一样,Outbound 类事件也有对应的掩码示意。上面咱们来看下 Outbound 类事件的触发机会:

3.2.1 read 事件

大家这里须要留神辨别 read 事件和 ChannelRead 事件的不同

ChannelRead 事件前边咱们曾经介绍了,当 NioServerSocketChannel 接管到新连贯时,会触发 ChannelRead 事件在其 pipeline 上流传。

当 NioSocketChannel 上有申请数据时,在 read loop 中读取申请数据时会触发 ChannelRead 事件在其 pipeline 上流传。

而 read 事件则和 ChannelRead 事件齐全不同,read 事件特指使 Channel 具备感知 IO 事件的能力。NioServerSocketChannel 对应的 OP_ACCEPT 事件的感知能力,NioSocketChannel 对应的是 OP_READ 事件的感知能力。

read 事件的触发是在当 channel 须要向其对应的 reactor 注册读类型事件时(比方 OP_ACCEPT 事件 和 OP_READ 事件)才会触发。read 事件的响应就是将 channel 感兴趣的 IO 事件注册到对应的 reactor 上。

比方 NioServerSocketChannel 感兴趣的是 OP_ACCEPT 事件,NioSocketChannel 感兴趣的是 OP_READ 事件。

在前边介绍 ChannelActive 事件时咱们提到,当 channel 处于 active 状态后会在 pipeline 中流传 ChannelActive 事件。而在 HeadContext 中的 ChannelActive 事件回调中会触发 Read 事件的流传。

final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
@Override
public void channelActive(ChannelHandlerContext ctx) {ctx.fireChannelActive();
readIfIsAutoRead();}
private void readIfIsAutoRead() {if (channel.config().isAutoRead()) {
// 如果是 autoRead 则触发 read 事件流传
channel.read();}
}
@Override
public void read(ChannelHandlerContext ctx) {
// 触发注册 OP_ACCEPT 或者 OP_READ 事件
unsafe.beginRead();}
}

而在 HeadContext 中的 read 事件回调中会调用 Channel 的底层操作类 unsafe 的 beginRead 办法,在该办法中会向 reactor 注册 channel 感兴趣的 IO 事件。对于 NioServerSocketChannel 来说这里注册的就是 OP_ACCEPT 事件,对于 NioSocketChannel 来说这里注册的则是 OP_READ 事件。

@Override
protected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {return;}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
// 注册监听 OP_ACCEPT 或者 OP_READ 事件
selectionKey.interestOps(interestOps | readInterestOp);
}
}

仔细的同学可能留神到了 channel 对应的配置类中蕴含了一个 autoRead 属性,那么这个 autoRead 到底是干什么的呢?

其实这是 netty 为大家提供的一种背压机制,用来避免 OOM,设想一下当对端发送数据十分多并且发送速度十分快,而服务端处理速度十分慢,一时间生产不过去。而对端又在不停的大量发送数据,服务端的 reactor 线程不得不在 read loop 中不停的读取,并且为读取到的数据调配 ByteBuffer。而服务端业务线程又解决不过去,这就导致了大量来不及解决的数据占用了大量的内存空间,从而导致 OOM。

面对这种状况,咱们能够通过 channelHandlerContext.channel().config().setAutoRead(false) 将 autoRead 属性设置为 false。随后 netty 就会将 channel 中感兴趣的读类型事件从 reactor 中登记,从此 reactor 不会再对相应事件进行监听。这样 channel 就不会在读取数据了。

这里 NioServerSocketChannel 对应的是 OP_ACCEPT 事件,NioSocketChannel 对应的是 OP_READ 事件。

protected final void removeReadOp() {SelectionKey key = selectionKey();
if (!key.isValid()) {return;}
int interestOps = key.interestOps();
if ((interestOps & readInterestOp) != 0) {key.interestOps(interestOps & ~readInterestOp);
}
}

而当服务端的处理速度恢复正常,咱们又能够通过 channelHandlerContext.channel().config().setAutoRead(true) 将 autoRead 属性设置为 true。这样 netty 会在 pipeline 中触发 read 事件,最终在 HeadContext 中的 read 事件回调办法中通过调用 unsafe#beginRead 办法将 channel 感兴趣的读类型事件从新注册到对应的 reactor 中。

@Override
public ChannelConfig setAutoRead(boolean autoRead) {boolean oldAutoRead = AUTOREAD_UPDATER.getAndSet(this, autoRead ? 1 : 0) == 1;
if (autoRead && !oldAutoRead) {
//autoRead 从 false 变为 true
channel.read();} else if (!autoRead && oldAutoRead) {
//autoRead 从 true 变为 false
autoReadCleared();}
return this;
}

read 事件能够了解为使 channel 领有读的能力,当有了读的能力后,channelRead 就能够读取具体的数据了。

3.2.2 write 和 flush 事件

write 事件和 flush 事件咱们在《一文搞懂 Netty 发送数据全流程》一文中曾经十分详尽的介绍过了,这里笔者在带大家简略回顾一下。

write 事件和 flush 事件均由用户在解决完业务申请失去业务后果后在业务线程中被动触发。

用户既能够通过 ChannelHandlerContext 触发也能够通过 Channel 来触发。

不同之处在于如果通过 ChannelHandlerContext 触发,那么 write 事件或者 flush 事件就会在 pipeline 中从以后 ChannelHandler 开始始终向前流传直到 HeadContext。

@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();
}

如果通过 Channel 触发,那么 write 事件和 flush 事件就会从 pipeline 的尾部节点 TailContext 开始始终向前流传直到 HeadContext。

@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {ctx.channel().write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {ctx.channel().flush();}

当然还有一个 writeAndFlush 办法,也会分为 ChannelHandlerContext 触发和 Channel 的触发。触发 writeAndFlush 后,write 事件首先会在 pipeline 中流传,最初 flush 事件在 pipeline 中流传。

netty 对 write 事件的解决最终会将发送数据写入 Channel 对应的写缓冲队列 ChannelOutboundBuffer 中。此时数据并没有发送进来而是在写缓冲队列中缓存,这也是 netty 实现异步写的外围设计。

最终通过 flush 操作从 Channel 中的写缓冲队列 ChannelOutboundBuffer 中获取到待发送数据,并写入到 Socket 的发送缓冲区中。

3.2.3 close 事件

当用户在 ChannelHandler 中调用如下办法对 Channel 进行敞开时,会触发 Close 事件在 pipeline 中从后向前流传。

//close 事件从以后 ChannelHandlerContext 开始在 pipeline 中向前流传
ctx.close();
//close 事件从 pipeline 的尾结点 tailContext 开始向前流传
ctx.channel().close();

咱们能够在 Outbound 类型的 ChannelHandler 中响应 close 事件。

public class ExampleChannelHandler extends ChannelOutboundHandlerAdapter {
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
..... 客户端 channel 敞开之前的解决回调.....
// 持续向前流传 close 事件
super.close(ctx, promise);
}
}

最终 close 事件会在 pipeline 中始终向前流传直到头结点 HeadConnect 中,并在 HeadContext 中实现连贯敞开的操作,当连贯实现敞开之后,会在 pipeline 中先后触发 ChannelInactive 事件和 ChannelUnregistered 事件。

3.2.4 deRegister 事件

用户可调用如下代码将以后 Channel 从 Reactor 中登记掉。

//deregister 事件从以后 ChannelHandlerContext 开始在 pipeline 中向前流传
ctx.deregister();
//deregister 事件从 pipeline 的尾结点 tailContext 开始向前流传
ctx.channel().deregister();

咱们能够在 Outbound 类型的 ChannelHandler 中响应 deregister 事件。

public class ExampleChannelHandler extends ChannelOutboundHandlerAdapter {
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
..... 客户端 channel 勾销注册之前的解决回调.....
// 持续向前流传 connect 事件
super.deregister(ctx, promise);
}
}

最终 deRegister 事件会流传至 pipeline 中的头结点 HeadContext 中,并在 HeadContext 中实现底层 channel 勾销注册的操作。当 Channel 从 Reactor 上登记之后,从此 Reactor 将不会在监听 Channel 上的 IO 事件,并触发 ChannelUnregistered 事件在 pipeline 中流传。

3.2.5 connect 事件

在 Netty 的客户端中咱们能够利用 NioSocketChannel 的 connect 办法触发 connect 事件在 pipeline 中流传。

//connect 事件从以后 ChannelHandlerContext 开始在 pipeline 中向前流传
ctx.connect(remoteAddress);
//connect 事件从 pipeline 的尾结点 tailContext 开始向前流传
ctx.channel().connect(remoteAddress);

咱们能够在 Outbound 类型的 ChannelHandler 中响应 connect 事件。

public class ExampleChannelHandler extends ChannelOutboundHandlerAdapter {
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
..... 客户端 channel 连贯胜利之前的解决回调.....
// 持续向前流传 connect 事件
super.connect(ctx, remoteAddress, localAddress, promise);
}
}

最终 connect 事件会在 pipeline 中的头结点 headContext 中触发底层的连贯建设申请。当客户端胜利连贯到服务端之后,会在客户端 NioSocketChannel 的 pipeline 中流传 channelActive 事件。

3.2.6 disConnect 事件

在 Netty 的客户端中咱们也能够调用 NioSocketChannel 的 disconnect 办法在 pipeline 中触发 disconnect 事件,这会导致 NioSocketChannel 的敞开。

//disconnect 事件从以后 ChannelHandlerContext 开始在 pipeline 中向前流传
ctx.disconnect();
//disconnect 事件从 pipeline 的尾结点 tailContext 开始向前流传
ctx.channel().disconnect();

咱们能够在 Outbound 类型的 ChannelHandler 中响应 disconnect 事件。

public class ExampleChannelHandler extends ChannelOutboundHandlerAdapter {
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
..... 客户端 channel 行将敞开前的解决回调.....
// 持续向前流传 disconnect 事件
super.disconnect(ctx, promise);
}
}

最终 disconnect 事件会流传到 HeadContext 中,并在 HeadContext 中实现底层的断开连接操作,当客户端断开连接胜利敞开之后,会在 pipeline 中先后触发 ChannelInactive 事件和 ChannelUnregistered 事件。

4. 向 pipeline 增加 channelHandler

在咱们具体介绍了全副的 inbound 类事件和 outbound 类事件的掩码示意以及事件的触发和流传门路后,置信大家当初能够通过 ChannelInboundHandler 和 ChannelOutboundHandler 来依据具体的业务场景抉择适合的 ChannelHandler 类型以及监听适合的事件来实现业务需要了。

本大节就该介绍一下自定义的 ChannelHandler 是如何增加到 pipeline 中的,netty 在这个过程中帮咱们作了哪些工作?

final EchoServerHandler serverHandler = new EchoServerHandler();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.............
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();
p.addLast(serverHandler);
...... 可增加多个 channelHandler......
}
});

以上是笔者简化的一个 netty 服务端配置 ServerBootstrap 启动类的一段示例代码。咱们能够看到再向 channel 对应的 pipeline 中增加 ChannelHandler 是通过 ChannelPipeline#addLast 办法将指定 ChannelHandler 增加到 pipeline 的开端处。

public interface ChannelPipeline
extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {
// 向 pipeline 的开端处批量增加多个 channelHandler
ChannelPipeline addLast(ChannelHandler... handlers);
// 指定 channelHandler 的 executor,由指定的 executor 执行 channelHandler 中的回调办法
ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);
// 为 channelHandler 指定名称
ChannelPipeline addLast(String name, ChannelHandler handler);
// 为 channelHandler 指定 executor 和 name
ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
}
public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {return addLast(null, handlers);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {ObjectUtil.checkNotNull(handlers, "handlers");
for (ChannelHandler h: handlers) {if (h == null) {break;}
addLast(executor, null, h);
}
return this;
}
@Override
public final ChannelPipeline addLast(String name, ChannelHandler handler) {return addLast(null, name, handler);
}
}

最终 addLast 的这些重载办法都会调用到 DefaultChannelPipeline#addLast(EventExecutorGroup, String, ChannelHandler) 这个办法从而实现 ChannelHandler 的增加。

@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// 查看同一个 channelHandler 实例是否容许被反复增加
checkMultiplicity(handler);
// 创立 channelHandlerContext 包裹 channelHandler 并封装执行流传事件相干的上下文信息
newCtx = newContext(group, filterName(name, handler), handler);
// 将 channelHandelrContext 插入到 pipeline 中的开端处。双向链表操作
// 此时 channelHandler 的状态还是 ADD_PENDING,只有当 channelHandler 的 handlerAdded 办法被回调后,状态才会为 ADD_COMPLETE
addLast0(newCtx);
// 如果以后 channel 还没有向 reactor 注册,则将 handlerAdded 办法的回调增加进 pipeline 的工作队列中
if (!registered) {
// 这里次要是用来解决 ChannelInitializer 的状况
// 设置 channelHandler 的状态为 ADD_PENDING 即期待增加, 当状态变为 ADD_COMPLETE 时 channelHandler 中的 handlerAdded 会被回调
newCtx.setAddPending();
// 向 pipeline 中增加 PendingHandlerAddedTask 工作,在工作中回调 handlerAdded
// 当 channel 注册到 reactor 后,pipeline 中的 pendingHandlerCallbackHead 工作链表会被挨个执行
callHandlerCallbackLater(newCtx, true);
return this;
}
// 如果以后 channel 曾经向 reactor 注册胜利,那么就间接回调 channelHandler 中的 handlerAddded 办法
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
// 这里须要确保 channelHandler 中 handlerAdded 办法的回调是在 channel 指定的 executor 中
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
// 回调 channelHandler 中的 handlerAddded 办法
callHandlerAdded0(newCtx);
return this;
}

这个办法的逻辑还是比较复杂的,波及到很多细节,为了清晰地为大家讲述,笔者这里还是采纳总分总的构造,先形容该办法的总体逻辑,而后在针对外围细节要点开展细节剖析。

因为向 pipeline 中增加 channelHandler 的操作可能会在多个线程中进行,所以为了确保增加操作的线程安全性,这里采纳一个 synchronized 语句块将整个增加逻辑包裹起来。

  1. 通过 checkMultiplicity 查看被增加的 ChannelHandler 是否是共享的(标注 @Sharable 注解),如果不是共享的那么则不会容许该 ChannelHandler 的 同一实例 被增加进多个 pipeline 中。如果是共享的,则容许该 ChannelHandler 的 同一个实例 被屡次增加进多个 pipeline 中。
private static void checkMultiplicity(ChannelHandler handler) {if (handler instanceof ChannelHandlerAdapter) {ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
// 只有标注 @Sharable 注解的 channelHandler,才被容许同一个实例被增加进多个 pipeline 中
// 留神:标注 @Sharable 之后,一个 channelHandler 的实例能够被增加到多个 channel 对应的 pipeline 中
// 可能被多线程执行,须要确保线程平安
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(h.getClass().getName() +
"is not a @Sharable handler, so can't be added or removed multiple times.");
}
h.added = true;
}
}

这里大家须要留神的是,如果一个 ChannelHandler 被标注了 @Sharable 注解, 这就意味着它的一个实例能够被屡次增加进多个 pipeline 中(每个 channel 对应一个 pipeline 实例),而这多个不同的 pipeline 可能会被不同的 reactor 线程执行,所以在应用共享 ChannelHandler 的时候须要确保其线程安全性。

比方上面的实例代码:

@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {............. 须要确保线程平安.......}
final EchoServerHandler serverHandler = new EchoServerHandler();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
..................
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();
p.addLast(serverHandler);
}
});

EchoServerHandler 为咱们自定义的 ChannelHandler,它被 @Sharable 注解标注,全局只有一个实例,被增加进多个 Channel 的 pipeline 中。从而会被多个 reactor 线程执行到。

  1. 为 ChannelHandler 创立其 ChannelHandlerContext,用于封装 ChannelHandler 的名称,状态信息,执行上下文信息,以及用于感知 ChannelHandler 在 pipeline 中的地位信息。newContext 办法波及的细节较多,前面咱们独自介绍。
  2. 通过 addLast0 将新创建进去的 ChannelHandlerContext 插入到 pipeline 中开端处。办法的逻辑很简略其实就是一个一般的双向链表插入操作。
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}

然而这里大家须要留神的点是:尽管此时 ChannelHandlerContext 被物理的插入到了 pipeline 中,然而此时 channelHandler 的状态仍然为 INIT 状态,从逻辑上来说并未算是真正的插入到 pipeline 中,须要等到 ChannelHandler 的 handlerAdded 办法被回调时,状态才变为 ADD_COMPLETE,而只有 ADD_COMPLETE 状态的 ChannelHandler 能力响应 pipeline 中流传的事件。

在上篇文章《一文搞懂 Netty 发送数据全流程》中的《3.1.5 触发 nextChannelHandler 的 write 办法回调》大节中咱们也提过,在每次 write 事件或者 flush 事件流传的时候,都须要通过 invokeHandler 办法来判断 channelHandler 的状态是否为 ADD_COMPLETE,否则以后 channelHandler 则不能响应正在 pipeline 中流传的事件。必须要等到对应的 handlerAdded 办法被回调才能够,因为 handlerAdded 办法中可能蕴含一些 ChannelHandler 初始化的重要逻辑。

private boolean invokeHandler() {
// 这里是一个优化点,netty 用一个局部变量保留 handlerState
// 目标是缩小 volatile 变量 handlerState 的读取次数
int handlerState = this.handlerState;
return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
}
void invokeWrite(Object msg, ChannelPromise promise) {if (invokeHandler()) {invokeWrite0(msg, promise);
} else {
// 以后 channelHandler 尽管增加到 pipeline 中,然而并没有调用 handlerAdded
// 所以不能调用以后 channelHandler 中的回调办法,只能持续向前传递 write 事件
write(msg, promise);
}
}
private void invokeFlush() {if (invokeHandler()) {invokeFlush0();
} else {
// 如果该 ChannelHandler 尽管退出到 pipeline 中但 handlerAdded 办法并未被回调,则持续向前传递 flush 事件
flush();}
}

事实上不仅仅是 write 事件和 flush 事件在流传的时候须要判断 ChannelHandler 的状态,所有的 inbound 类事件和 outbound 类事件在流传的时候都须要通过 invokeHandler 办法来判断以后 ChannelHandler 的状态是否为 ADD_COMPLETE,须要确保在 ChannelHandler 响应事件之前,它的 handlerAdded 办法被回调。

  1. 如果向 pipeline 中增加 ChannelHandler 的时候,channel 还没来得及注册到 reactor 中,那么须要将以后 ChannelHandler 的状态先设置为 ADD_PENDING,并将回调该 ChannelHandler 的 handlerAdded 办法封装成 PendingHandlerAddedTask 工作增加进 pipeline 中的工作列表中,等到 channel 向 reactor 注册之后,reactor 线程会挨个执行 pipeline 中工作列表中的工作。

这段逻辑次要用来解决 ChannelInitializer 的增加场景,因为目前只有 ChannelInitializer 这个非凡的 channelHandler 会在 channel 没有注册之前被增加进 pipeline 中

if (!registered) {newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}

向 pipeline 的工作列表 pendingHandlerCallbackHead 中增加 PendingHandlerAddedTask 工作:

public class DefaultChannelPipeline implements ChannelPipeline {
// pipeline 中的工作列表
private PendingHandlerCallback pendingHandlerCallbackHead;
// 向工作列表尾部增加 PendingHandlerAddedTask
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
assert !registered;
PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
PendingHandlerCallback pending = pendingHandlerCallbackHead;
if (pending == null) {pendingHandlerCallbackHead = task;} else {
// Find the tail of the linked-list.
while (pending.next != null) {pending = pending.next;}
pending.next = task;
}
}
}

PendingHandlerAddedTask 工作负责回调 ChannelHandler 中的 handlerAdded 办法。

private final class PendingHandlerAddedTask extends PendingHandlerCallback {
...............
@Override
public void run() {callHandlerAdded0(ctx);
}
...............
}
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {ctx.callHandlerAdded();
} catch (Throwable t) {...............}
}

  1. 除了 ChannelInitializer 这个非凡的 ChannelHandler 的增加是在 channel 向 reactor 注册之前外,剩下的这些用户自定义的 ChannelHandler 的增加,均是在 channel 向 reactor 注册之后被增加进 pipeline 的。这种场景下的解决就会变得比较简单,在 ChannelHandler 被插入到 pipeline 中之后,就会立刻回调该 ChannelHandler 的 handlerAdded 办法。然而须要确保 handlerAdded 办法的回调在 channel 指定的 executor 中进行。
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
callHandlerAdded0(newCtx);

如果以后执行线程并不是 ChannelHandler 指定的 executor (!executor.inEventLoop() ), 那么就须要确保 handlerAdded 办法的回调在 channel 指定的 executor 中进行。

private void callHandlerAddedInEventLoop(final AbstractChannelHandlerContext newCtx, EventExecutor executor) {newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {callHandlerAdded0(newCtx);
}
});
}

这里须要留神的是须要在回调 handlerAdded 办法之前将 ChannelHandler 的状态提前设置为 ADD_COMPLETE。 因为用户可能在 ChannelHandler 中的 handerAdded 回调中触发一些事件,而如果此时 ChannelHandler 的状态不是 ADD_COMPLETE 的话,就会进行对事件的响应,从而错过事件的解决。

这种属于一种用户极其的应用状况。

final void callHandlerAdded() throws Exception {if (setAddComplete()) {handler().handlerAdded(this);
}
}

5. ChanneHandlerContext 的创立

在介绍完 ChannelHandler 向 pipeline 增加的整个逻辑过程后,本大节咱们来看下如何为 ChannelHandler 创立对应的 ChannelHandlerContext,以及 ChannelHandlerContext 中具体蕴含了哪些上下文信息。

public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
................
// 创立 channelHandlerContext 包裹 channelHandler 并封装执行流传相干的上下文信息
newCtx = newContext(group, filterName(name, handler), handler);
................
}
}
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
}

在创立 ChannelHandlerContext 之前,须要做两个重要的前置操作:

  • 通过 filterName 办法为 ChannelHandlerContext 过滤出在 pipeline 中惟一的名称。
  • 如果用户为 ChannelHandler 指定了非凡的 EventExecutorGroup,这里就须要通过 childExecutor 办法从指定的 EventExecutorGroup 中选出一个 EventExecutor 与 ChannelHandler 绑定。

5.1 filterName

private String filterName(String name, ChannelHandler handler) {if (name == null) {
// 如果没有指定 name, 则会为 handler 默认生成一个 name,该办法可确保默认生成的 name 在 pipeline 中不会反复
return generateName(handler);
}
// 如果指定了 name,须要确保 name 在 pipeline 中是惟一的
checkDuplicateName(name);
return name;
}

如果用户再向 pipeline 增加 ChannelHandler 的时候,为其指定了具体的名称,那么这里须要确保用户指定的名称在 pipeline 中是惟一的。

private void checkDuplicateName(String name) {if (context0(name) != null) {throw new IllegalArgumentException("Duplicate handler name:" + name);
}
}
/**
* 通过指定名称在 pipeline 中查找对应的 channelHandler 没有返回 null
* */
private AbstractChannelHandlerContext context0(String name) {
AbstractChannelHandlerContext context = head.next;
while (context != tail) {if (context.name().equals(name)) {return context;}
context = context.next;
}
return null;
}

如果用户没有为 ChannelHandler 指定名称,那么就须要为 ChannelHandler 在 pipeline 中默认生成一个惟一的名称。

// pipeline 中 channelHandler 对应的 name 缓存
private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
new FastThreadLocal<Map<Class<?>, String>>() {
@Override
protected Map<Class<?>, String> initialValue() {return new WeakHashMap<Class<?>, String>();
}
};
private String generateName(ChannelHandler handler) {
// 获取 pipeline 中 channelHandler 对应的 name 缓存
Map<Class<?>, String> cache = nameCaches.get();
Class<?> handlerType = handler.getClass();
String name = cache.get(handlerType);
if (name == null) {
// 以后 handler 还没对应的 name 缓存,则默认生成:simpleClassName + #0
name = generateName0(handlerType);
cache.put(handlerType, name);
}
if (context0(name) != null) {
// 一直重试名称后缀 #n + 1 直到没有反复
String baseName = name.substring(0, name.length() - 1);
for (int i = 1;; i ++) {
String newName = baseName + i;
if (context0(newName) == null) {
name = newName;
break;
}
}
}
return name;
}
private static String generateName0(Class<?> handlerType) {return StringUtil.simpleClassName(handlerType) + "#0";
}

pipeline 中应用了一个 FastThreadLocal 类型的 nameCaches 来缓存各种类型 ChannelHandler 的根底名称。前面会依据这个根底名称一直的重试生成一个没有抵触的正式名称。缓存 nameCaches 中的 key 示意特定的 ChannelHandler 类型,value 示意该特定类型的 ChannelHandler 的根底名称 simpleClassName + #0

主动为 ChannelHandler 生成默认名称的逻辑是:

  • 首先从缓存中 nameCaches 获取以后增加的 ChannelHandler 的根底名称 simpleClassName + #0
  • 如果该根底名称 simpleClassName + #0 在 pipeline 中是惟一的,那么就将根底名称作为 ChannelHandler 的名称。
  • 如果缓存的根底名称在 pipeline 中不是惟一的,则一直的减少名称后缀 simpleClassName#1 ,simpleClassName#2 ...... simpleClassName#n 直到产生一个没有反复的名称。

尽管用户不大可能将同一类型的 channelHandler 反复增加到 pipeline 中,然而 netty 为了避免这种重复增加同一类型 ChannelHandler 的行为导致的名称抵触,从而利用 nameCaches 来缓存同一类型 ChannelHandler 的根底名称 simpleClassName + #0,而后通过一直的重试递增名称后缀,来生成一个在 pipeline 中惟一的名称。

5.2 childExecutor

通过前边的介绍咱们理解到,当咱们向 pipeline 增加 ChannelHandler 的时候,netty 容许咱们为 ChannelHandler 指定特定的 executor 去执行 ChannelHandler 中的各种事件回调办法。

通常咱们会为 ChannelHandler 指定一个 EventExecutorGroup,在创立 ChannelHandlerContext 的时候,会通过 childExecutor 办法从 EventExecutorGroup 中选取一个 EventExecutor 来与该 ChannelHandler 绑定。

EventExecutorGroup 是 netty 自定义的一个线程池模型,其中蕴含多个 EventExecutor,而 EventExecutor 在 netty 中是一个线程的执行模型。相干的具体实现和用法笔者曾经在《Reactor 在 Netty 中的实现(创立篇)》一文中给出了详尽的介绍,遗记的同学能够在回顾下。

在介绍 executor 的绑定逻辑之前,这里笔者须要先为大家介绍一个相干的重要参数:SINGLE_EVENTEXECUTOR_PER_GROUP,默认为 true。

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.........
.childOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP,true

咱们晓得在 netty 中,每一个 channel 都会对应一个独立的 pipeline,如果咱们开启了 SINGLE_EVENTEXECUTOR_PER_GROUP 参数,示意在一个 channel 对应的 pipeline 中,如果咱们为多个 ChannelHandler 指定了同一个 EventExecutorGroup,那么这多个 channelHandler 只能绑定到 EventExecutorGroup 中的同一个 EventExecutor 上。

什么意思呢??比方咱们有上面一段初始化 pipeline 的代码:

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
........................
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(eventExecutorGroup,channelHandler1)
pipeline.addLast(eventExecutorGroup,channelHandler2)
pipeline.addLast(eventExecutorGroup,channelHandler3)
}
});

eventExecutorGroup 中蕴含 EventExecutor1,EventExecutor2,EventExecutor3 三个执行线程。

假如此时第一个连贯进来,在创立 channel1 后初始化 pipeline1 的时候,如果在开启 SINGLE_EVENTEXECUTOR_PER_GROUP 参数的状况下,那么在 channel1 对应的 pipeline1 中 channelHandler1,channelHandler2,channelHandler3 绑定的 EventExecutor 均为 EventExecutorGroup 中的 EventExecutor1。

第二个连贯 channel2 对应的 pipeline2 中 channelHandler1,channelHandler2,channelHandler3 绑定的 EventExecutor 均为 EventExecutorGroup 中的 EventExecutor2。

第三个连贯 channel3 对应的 pipeline3 中 channelHandler1,channelHandler2,channelHandler3 绑定的 EventExecutor 均为 EventExecutorGroup 中的 EventExecutor3。

以此类推 ……..

如果在敞开 SINGLE_EVENTEXECUTOR_PER_GROUP 参数的状况下,
channel1 对应的 pipeline1 中 channelHandler1 会绑定到 EventExecutorGroup 中的 EventExecutor1,channelHandler2 会绑定到 EventExecutor2,channelHandler3 会绑定到 EventExecutor3。

同理其余 channel 对应的 pipeline 中的 channelHandler 绑定逻辑同 channel1。它们均会绑定到 EventExecutorGroup 中的不同 EventExecutor 中。

当咱们理解了 SINGLE_EVENTEXECUTOR_PER_GROUP 参数的作用之后,再来看上面这段绑定逻辑就很容易了解了。

// 在每个 pipeline 中都会保留 EventExecutorGroup 中绑定的线程
private Map<EventExecutorGroup, EventExecutor> childExecutors;
private EventExecutor childExecutor(EventExecutorGroup group) {if (group == null) {return null;}
Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
if (pinEventExecutor != null && !pinEventExecutor) {
// 如果没有开启 SINGLE_EVENTEXECUTOR_PER_GROUP,则按程序从指定的 EventExecutorGroup 中为 channelHandler 调配 EventExecutor
return group.next();}
// 获取 pipeline 绑定到 EventExecutorGroup 的线程(在一个 pipeline 中会为每个指定的 EventExecutorGroup 绑定一个固定的线程)Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
if (childExecutors == null) {childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
}
// 获取该 pipeline 绑定在指定 EventExecutorGroup 中的线程
EventExecutor childExecutor = childExecutors.get(group);
if (childExecutor == null) {childExecutor = group.next();
childExecutors.put(group, childExecutor);
}
return childExecutor;
}

如果咱们并未非凡指定 ChannelHandler 的 executor,那么默认会是对应 channel 绑定的 reactor 线程负责执行该 ChannelHandler。

如果咱们未开启 SINGLE_EVENTEXECUTOR_PER_GROUP ,netty 就会从咱们指定的 EventExecutorGroup 中依照 round-robin 的形式为 ChannelHandler 绑定其中一个 eventExecutor。

如果咱们开启了 SINGLE_EVENTEXECUTOR_PER_GROUP 雷同的 EventExecutorGroup 在同一个 pipeline 实例中的绑定关系是固定的。在 pipeline 中如果多个 channelHandler 指定了同一个 EventExecutorGroup,那么这些 channelHandler 的 executor 均会绑定到一个固定的 eventExecutor 上。

这种固定的绑定关系缓存于每个 pipeline 中的 Map<EventExecutorGroup, EventExecutor> childExecutors 字段中,key 是用户为 channelHandler 指定的 EventExecutorGroup,value 为该 EventExecutorGroup 在 pipeline 实例中的绑定 eventExecutor。

接下来就是从 childExecutors 中获取指定 EventExecutorGroup 在该 pipeline 实例中的绑定 eventExecutor,如果绑定关系还未建设,则通过 round-robin 的形式从 EventExecutorGroup 中选取一个 eventExecutor 进行绑定,并在 childExecutor 中缓存绑定关系。

如果绑定关系曾经建设,则间接为 ChannelHandler 指定绑定好的 eventExecutor。

5.3 ChanneHandlerContext

在介绍完创立 ChannelHandlerContext 的两个前置操作后,咱们回头来看下 ChannelHandlerContext 中蕴含了哪些具体的上下文信息。

final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
// ChannelHandlerContext 包裹的 channelHandler
private final ChannelHandler handler;
DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {super(pipeline, executor, name, handler.getClass());
// 包裹的 channelHandler
this.handler = handler;
}
@Override
public ChannelHandler handler() {return handler;}
}
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
// 对应 channelHandler 的名称
private final String name;
//ChannelHandlerContext 中持有 pipeline 的援用
private final DefaultChannelPipeline pipeline;
// channelHandler 对应的 executor 默认为 reactor
final EventExecutor executor;
//channelHandlerContext 中保留 channelHandler 的执行条件掩码(是什么类型的 ChannelHandler, 对什么事件感兴趣)private final int executionMask;
//false 示意 当 channelHandler 的状态为 ADD_PENDING 的时候,也能够响应 pipeline 中的事件
//true 示意只有在 channelHandler 的状态为 ADD_COMPLETE 的时候能力响应 pipeline 中的事件
private final boolean ordered;
//channelHandelr 的状态,初始化为 INIT
private volatile int handlerState = INIT;
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
String name, Class<? extends ChannelHandler> handlerClass) {this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
//channelHandlerContext 中保留 channelHandler 的执行条件掩码(是什么类型的 ChannelHandler, 对什么事件感兴趣)this.executionMask = mask(handlerClass);
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
}

这里笔者重点介绍 orderd 属性和 executionMask 属性,其余的属性大家很容易了解。

  ordered = executor == null || executor instanceof OrderedEventExecutor;

当咱们不指定 channelHandler 的 executor 时或者指定的 executor 类型为 OrderedEventExecutor 时,ordered = true。

那么这个 ordered 属性对于 ChannelHandler 响应 pipeline 中的事件有什么影响呢?

咱们之前介绍过在 ChannelHandler 响应 pipeline 中的事件之前都会调用 invokeHandler() 办法来判断是否回调 ChannelHandler 的事件回调办法还是跳过。

private boolean invokeHandler() {
int handlerState = this.handlerState;
return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
}
  • ordered == false 时,channelHandler 的状态为 ADD_PENDING 的时候,也能够响应 pipeline 中的事件。
  • ordered == true 时,只有在 channelHandler 的状态为 ADD_COMPLETE 的时候能力响应 pipeline 中的事件

另一个重要的属性 executionMask 保留的是以后 ChannelHandler 的一些执行条件信息掩码,比方:

  • 以后 ChannelHandler 是什么类型的(ChannelInboundHandler or ChannelOutboundHandler ?)。
  • 以后 ChannelHandler 对哪些事件感兴趣(笼罩了哪些事件回调办法?)
private static final FastThreadLocal<Map<Class<? extends ChannelHandler>, Integer>> MASKS =
new FastThreadLocal<Map<Class<? extends ChannelHandler>, Integer>>() {
@Override
protected Map<Class<? extends ChannelHandler>, Integer> initialValue() {return new WeakHashMap<Class<? extends ChannelHandler>, Integer>(32);
}
};
static int mask(Class<? extends ChannelHandler> clazz) {
// 因为每建设一个 channel 就会初始化一个 pipeline,这里须要将 ChannelHandler 对应的 mask 缓存
Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();
Integer mask = cache.get(clazz);
if (mask == null) {
// 计算 ChannelHandler 对应的 mask(什么类型的 ChannelHandler,对什么事件感兴趣)mask = mask0(clazz);
cache.put(clazz, mask);
}
return mask;
}

这里须要一个 FastThreadLocal 类型的 MASKS 字段来缓存 ChannelHandler 对应的执行掩码。因为 ChannelHandler 类一旦被定义进去它的执行掩码就固定了,而 netty 须要接管大量的连贯,创立大量的 channel,并为这些 channel 初始化对应的 pipeline,须要频繁的记录 channelHandler 的执行掩码到 context 类中,所以这里须要将掩码缓存起来。

private static int mask0(Class<? extends ChannelHandler> handlerType) {
int mask = MASK_EXCEPTION_CAUGHT;
try {if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
// 如果该 ChannelHandler 是 Inbound 类型的,则先将 inbound 事件全副设置进掩码中
mask |= MASK_ALL_INBOUND;
// 最初在对不感兴趣的事件一一排除(handler 中的事件回调办法如果标注了 @Skip 注解,则认为 handler 对该事件不感兴趣)if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {mask &= ~MASK_CHANNEL_REGISTERED;}
if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {mask &= ~MASK_CHANNEL_UNREGISTERED;}
if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) {mask &= ~MASK_CHANNEL_ACTIVE;}
if (isSkippable(handlerType, "channelInactive", ChannelHandlerContext.class)) {mask &= ~MASK_CHANNEL_INACTIVE;}
if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {mask &= ~MASK_CHANNEL_READ;}
if (isSkippable(handlerType, "channelReadComplete", ChannelHandlerContext.class)) {mask &= ~MASK_CHANNEL_READ_COMPLETE;}
if (isSkippable(handlerType, "channelWritabilityChanged", ChannelHandlerContext.class)) {mask &= ~MASK_CHANNEL_WRITABILITY_CHANGED;}
if (isSkippable(handlerType, "userEventTriggered", ChannelHandlerContext.class, Object.class)) {mask &= ~MASK_USER_EVENT_TRIGGERED;}
}
if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
// 如果 handler 为 Outbound 类型的,则先将全副 outbound 事件设置进掩码中
mask |= MASK_ALL_OUTBOUND;
// 最初对 handler 不感兴趣的事件从掩码中一一排除
if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
SocketAddress.class, ChannelPromise.class)) {mask &= ~MASK_BIND;}
if (isSkippable(handlerType, "connect", ChannelHandlerContext.class, SocketAddress.class,
SocketAddress.class, ChannelPromise.class)) {mask &= ~MASK_CONNECT;}
if (isSkippable(handlerType, "disconnect", ChannelHandlerContext.class, ChannelPromise.class)) {mask &= ~MASK_DISCONNECT;}
if (isSkippable(handlerType, "close", ChannelHandlerContext.class, ChannelPromise.class)) {mask &= ~MASK_CLOSE;}
if (isSkippable(handlerType, "deregister", ChannelHandlerContext.class, ChannelPromise.class)) {mask &= ~MASK_DEREGISTER;}
if (isSkippable(handlerType, "read", ChannelHandlerContext.class)) {mask &= ~MASK_READ;}
if (isSkippable(handlerType, "write", ChannelHandlerContext.class,
Object.class, ChannelPromise.class)) {mask &= ~MASK_WRITE;}
if (isSkippable(handlerType, "flush", ChannelHandlerContext.class)) {mask &= ~MASK_FLUSH;}
}
if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) {mask &= ~MASK_EXCEPTION_CAUGHT;}
} catch (Exception e) {
// Should never reach here.
PlatformDependent.throwException(e);
}
// 计算出的掩码须要缓存,因为每次向 pipeline 中增加该类型的 handler 的时候都须要获取掩码(创立一个 channel 就须要为其初始化 pipeline)return mask;
}

计算 ChannelHandler 的执行掩码 mask0 办法尽管比拟长,然而逻辑却非常简略。在本文的第三大节《3. pipeline 中的事件分类》中,笔者为大家具体介绍了各种事件类型的掩码示意,这里我来看下如何利用这些根本事件掩码来计算出 ChannelHandler 的执行掩码的。

如果 ChannelHandler 是 ChannelInboundHandler 类型的,那么首先会将所有 Inbound 事件掩码设置进执行掩码 mask 中。

最初挨个遍历所有 Inbound 事件,从掩码汇合 mask 中排除该 ChannelHandler 不感兴趣的事件。这样一轮下来,就失去了 ChannelHandler 的执行掩码。

从这个过程中咱们能够看到,ChannelHandler 的执行掩码蕴含的是该 ChannelHandler 感兴趣的事件掩码汇合。当事件在 pipeline 中流传的时候,在 ChannelHandlerContext 中能够利用这个执行掩码来判断,以后 ChannelHandler 是否合乎响应该事件的资格。

同理咱们也能够计算出 ChannelOutboundHandler 类型的 ChannelHandler 对应的执行掩码。

那么 netty 框架是如何判断出咱们自定义的 ChannelHandler 对哪些事件感兴趣,对哪些事件不感兴趣的呢?

这里咱们以 ChannelInboundHandler 类型举例说明,在本文第三大节中,笔者对所有 Inbound 类型的事件作了一个全面的介绍,然而在理论开发中,咱们可能并不需要监听所有的 Inbound 事件,可能只是须要监听其中的一到两个事件。

对于咱们不感兴趣的事件,咱们只须要在其对应的回调办法上标注 @Skip 注解即可,netty 就会认为该 ChannelHandler 对标注 @Skip 注解的事件不感兴趣,当不感兴趣的事件在 pipeline 流传的时候,该 ChannelHandler 就不须要执行响应。

private static boolean isSkippable(final Class<?> handlerType, final String methodName, final Class<?>... paramTypes) throws Exception {return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
@Override
public Boolean run() throws Exception {
Method m;
try {
// 首先查看类中是否笼罩实现了对应的事件回调办法
m = handlerType.getMethod(methodName, paramTypes);
} catch (NoSuchMethodException e) {if (logger.isDebugEnabled()) {
logger.debug("Class {} missing method {}, assume we can not skip execution", handlerType, methodName, e);
}
return false;
}
return m != null && m.isAnnotationPresent(Skip.class);
}
});
}

那咱们在编写自定义 ChannelHandler 的时候是不是要在 ChannelInboundHandler 或者 ChannelOutboundHandler 接口提供的所有事件回调办法上,对咱们不感兴趣的事件繁琐地一一标注 @Skip 注解呢?

其实是不须要的,netty 为咱们提供了 ChannelInboundHandlerAdapter 类和 ChannelOutboundHandlerAdapter 类,netty 当时曾经在这些 Adapter 类中的事件回调办法上全副标注了 @Skip 注解,咱们在自定义实现 ChannelHandler 的时候只须要继承这些 Adapter 类并笼罩咱们感兴趣的事件回调办法即可。

public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
@Skip
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelRegistered();
}
@Skip
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelUnregistered();
}
@Skip
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelActive();
}
@Skip
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelInactive();
}
@Skip
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ctx.fireChannelRead(msg);
}
@Skip
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelReadComplete();
}
@Skip
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {ctx.fireUserEventTriggered(evt);
}
@Skip
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelWritabilityChanged();
}
@Skip
@Override
@SuppressWarnings("deprecation")
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {ctx.fireExceptionCaught(cause);
}
}

6. 从 pipeline 删除 channelHandler

从上个大节的内容中咱们能够看到向 pipeline 中增加 ChannelHandler 的逻辑还是比较复杂的,波及到的细节比拟多。

那么在理解了向 pipeline 中增加 ChannelHandler 的过程之后,从 pipeline 中删除 ChannelHandler 的逻辑就变得很好了解了。

public interface ChannelPipeline
extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {
// 从 pipeline 中删除指定的 channelHandler
ChannelPipeline remove(ChannelHandler handler);
// 从 pipeline 中删除指定名称的 channelHandler
ChannelHandler remove(String name);
// 从 pipeline 中删除特定类型的 channelHandler
<T extends ChannelHandler> T remove(Class<T> handlerType);
}

netty 提供了以上三种形式从 pipeline 中删除指定 ChannelHandler,上面咱们以第一种形式为例来介绍 ChannelHandler 的删除过程。

public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelPipeline remove(ChannelHandler handler) {remove(getContextOrDie(handler));
return this;
}
}

6.1 getContextOrDie

首先须要通过 getContextOrDie 办法在 pipeline 中查找到指定的 ChannelHandler 对应的 ChannelHandelrContext。以便确认要删除的 ChannelHandler 的确是存在于 pipeline 中。

context 办法是通过遍历 pipeline 中的双向链表来查找要删除的 ChannelHandlerContext。

private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
if (ctx == null) {throw new NoSuchElementException(handler.getClass().getName());
} else {return ctx;}
}
@Override
public final ChannelHandlerContext context(ChannelHandler handler) {ObjectUtil.checkNotNull(handler, "handler");
// 获取 pipeline 双向链表构造的头结点
AbstractChannelHandlerContext ctx = head.next;
for (;;) {if (ctx == null) {return null;}
if (ctx.handler() == handler) {return ctx;}
ctx = ctx.next;
}
}

6.2 remove

remove 办法的整体代码构造和 addLast0 办法的代码构造一样,整体逻辑也是先从 pipeline 中的双向链表构造中将指定的 ChanneHandlerContext 删除,而后在解决被删除的 ChannelHandler 中 handlerRemoved 办法的回调。

private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
assert ctx != head && ctx != tail;
synchronized (this) {
// 从 pipeline 的双向列表中删除指定 channelHandler 对应的 context
atomicRemoveFromHandlerList(ctx);
if (!registered) {
// 如果此时 channel 还未向 reactor 注册,则通过向 pipeline 中增加 PendingHandlerRemovedTask 工作
// 在注册之后回调 channelHandelr 中的 handlerRemoved 办法
callHandlerCallbackLater(ctx, false);
return ctx;
}
//channelHandelr 从 pipeline 中删除后,须要回调其 handlerRemoved 办法
// 须要确保 handlerRemoved 办法在 channelHandelr 指定的 executor 中进行
EventExecutor executor = ctx.executor();
if (!executor.inEventLoop()) {executor.execute(new Runnable() {
@Override
public void run() {callHandlerRemoved0(ctx);
}
});
return ctx;
}
}
callHandlerRemoved0(ctx);
return ctx;
}
  1. 从 pipeline 中删除指定 ChannelHandler 对应的 ChannelHandlerContext。逻辑比较简单,就是一般双向链表的删除操作。
private synchronized void atomicRemoveFromHandlerList(AbstractChannelHandlerContext ctx) {
AbstractChannelHandlerContext prev = ctx.prev;
AbstractChannelHandlerContext next = ctx.next;
prev.next = next;
next.prev = prev;
}
  1. 如果此时 channel 并未向对应的 reactor 进行注册,则须要向 pipeline 的工作列表中增加 PendingHandlerRemovedTask 工作,再该工作中会执行 ChannelHandler 的 handlerRemoved 回调,当 channel 向 reactor 注册胜利后,reactor 会执行 pipeline 中工作列表中的工作,从而回调被删除 ChannelHandler 的 handlerRemoved 办法。
private final class PendingHandlerRemovedTask extends PendingHandlerCallback {PendingHandlerRemovedTask(AbstractChannelHandlerContext ctx) {super(ctx);
}
@Override
public void run() {callHandlerRemoved0(ctx);
}
}

在执行 ChannelHandler 中 handlerRemoved 回调的时候,须要对 ChannelHandler 的状态进行判断:只有当 handlerState 为 ADD_COMPLETE 的时候能力回调 handlerRemoved 办法。

这里表白的语义是只有当 ChannelHanler 的 handlerAdded 办法被回调之后,那么在 ChannelHanler 被从 pipeline 中删除的时候它的 handlerRemoved 办法才能够被回调。

在 ChannelHandler 的 handlerRemove 办法被回调之后,将 ChannelHandler 的状态设置为 REMOVE_COMPLETE。

private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
try {
// 在这里回调 handlerRemoved 办法
ctx.callHandlerRemoved();} catch (Throwable t) {
fireExceptionCaught(new ChannelPipelineException(ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
}
}
final void callHandlerRemoved() throws Exception {
try {if (handlerState == ADD_COMPLETE) {handler().handlerRemoved(this);
}
} finally {
// Mark the handler as removed in any case.
setRemoved();}
}
final void setRemoved() {handlerState = REMOVE_COMPLETE;}
  1. 如果 channel 曾经在 reactor 中注册胜利,那么当 channelHandler 从 pipeline 中删除之后,须要立刻回调其 handlerRemoved 办法。然而须要确保 handlerRemoved 办法在 channelHandler 指定的 executor 中进行。

7. pipeline 的初始化

其实对于 pipeline 初始化的相干内容咱们在《具体图解 Netty Reactor 启动全流程》中曾经简要介绍了 NioServerSocketChannel 中的 pipeline 的初始化机会以及过程。

在《Netty 如何高效接管网络连接》中笔者也简要介绍了 NioSocketChannel 中 pipeline 的初始化机会以及过程。

本大节笔者将联合这两种类型的 Channel 来残缺全面的介绍 pipeline 的整个初始化过程。

7.1 NioServerSocketChannel 中 pipeline 的初始化

从前边提到的这两篇文章以及本文前边的相干内容咱们晓得,Netty 提供了一个非凡的 ChannelInboundHandler 叫做 ChannelInitializer,用户能够利用这个非凡的 ChannelHandler 对 Channel 中的 pipeline 进行自定义的初始化逻辑。

如果用户只心愿在 pipeline 中增加一个固定的 ChannelHandler 能够通过如下代码间接增加。

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)// 配置主从 Reactor
...........
.handler(new LoggingHandler(LogLevel.INFO))

如果心愿增加多个 ChannelHandler,则能够通过 ChannelInitializer 来自定义增加逻辑。

因为应用 ChannelInitializer 初始化 NioServerSocketChannel 中 pipeline 的逻辑会略微简单一点,上面咱们均以这个简单的案例来讲述 pipeline 的初始化过程。

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)// 配置主从 Reactor
...........
.handler(new ChannelInitializer<NioServerSocketChannel>() {
@Override
protected void initChannel(NioServerSocketChannel ch) throws Exception {
.... 自定义 pipeline 初始化逻辑....
ChannelPipeline p = ch.pipeline();
p.addLast(channelHandler1);
p.addLast(channelHandler2);
p.addLast(channelHandler3);
........
}
})

以上这些由用户自定义的用于初始化 pipeline 的 ChannelInitializer,被保留至 ServerBootstrap 启动类中的 handler 字段中。用于后续的初始化调用

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable
private volatile ChannelHandler handler;
}

在服务端启动的时候,会随同着 NioServeSocketChannel 的创立以及初始化,在初始化 NioServerSokcetChannel 的时候会将一个新的 ChannelInitializer 增加进 pipeline 中,在新的 ChannelInitializer 中才会将用户自定义的 ChannelInitializer 增加进 pipeline 中,随后才执行初始化过程。

Netty 这里之所以引入一个新的 ChannelInitializer 来初始化 NioServerSocketChannel 中的 pipeline 的起因是须要兼容前边介绍的这两种初始化 pipeline 的形式。

  • 一种是间接应用一个具体的 ChannelHandler 来初始化 pipeline。
  • 另一种是应用 ChannelInitializer 来自定义初始化 pipeline 逻辑。

遗记 netty 启动过程的同学能够在回看下笔者的《具体图解 Netty Reactor 启动全流程》这篇文章。

@Override
void init(Channel channel) {
.........
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {final ChannelPipeline pipeline = ch.pipeline();
//ServerBootstrap 中用户指定的 channelHandler
ChannelHandler handler = config.handler();
if (handler != null) {pipeline.addLast(handler);
}
.........
}
});
}

留神此时 NioServerSocketChannel 并未开始向 Main Reactor 注册,依据本文第四大节《4. 向 pipeline 增加 channelHandler》中的介绍,此时向 pipeline 中增加这个新的 ChannelInitializer 之后,netty 会向 pipeline 的工作列表中增加 PendingHandlerAddedTask。当 NioServerSocketChannel 向 Main Reactor 注册胜利之后,紧接着 Main Reactor 线程会调用这个 PendingHandlerAddedTask,在工作中会执行这个新的 ChannelInitializer 的 handlerAdded 回调。在这个回调办法中会执行上边 initChannel 办法里的代码。

当 NioServerSocketChannel 在向 Main Reactor 注册胜利之后,就挨个执行 pipeline 中的工作列表中的工作。

private void register0(ChannelPromise promise) {
.........
boolean firstRegistration = neverRegistered;
// 执行真正的注册操作
doRegister();
// 批改注册状态
neverRegistered = false;
registered = true;
// 调用 pipeline 中的工作链表,执行 PendingHandlerAddedTask
pipeline.invokeHandlerAddedIfNeeded();
.........
final void invokeHandlerAddedIfNeeded() {assert channel.eventLoop().inEventLoop();
if (firstRegistration) {
firstRegistration = false;
// 执行 pipeline 工作列表中的 PendingHandlerAddedTask 工作。callHandlerAddedForAllHandlers();}
}

执行 pipeline 工作列表中的 PendingHandlerAddedTask 工作:

private void callHandlerAddedForAllHandlers() {
// pipeline 工作列表中的头结点
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;
// This Channel itself was registered.
registered = true;
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
// Null out so it can be GC'ed.
this.pendingHandlerCallbackHead = null;
}
PendingHandlerCallback task = pendingHandlerCallbackHead;
// 挨个执行工作列表中的工作
while (task != null) {
// 触发 ChannelInitializer 的 handlerAdded 回调
task.execute();
task = task.next;
}
}

最终在 PendingHandlerAddedTask 中执行 pipeline 中 ChannelInitializer 的 handlerAdded 回调。

这个 ChannelInitializer 就是在初始化 NioServerSocketChannel 的 init 办法中向 pipeline 增加的 ChannelInitializer。

@Sharable
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {if (ctx.channel().isRegistered()) {if (initChannel(ctx)) {
// 初始化工作实现后,须要将本身从 pipeline 中移除
removeState(ctx);
}
}
}
}

在 handelrAdded 回调中执行 ChannelInitializer 匿名类中 initChannel 办法,留神此时执行的 ChannelInitializer 类为在本大节结尾 init 办法中由 Netty 框架增加的 ChannelInitializer,并不是用户自定义的 ChannelInitializer。

@Override
void init(Channel channel) {
.........
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {final ChannelPipeline pipeline = ch.pipeline();
//ServerBootstrap 中用户指定的 ChannelInitializer
ChannelHandler handler = config.handler();
if (handler != null) {pipeline.addLast(handler);
}
.........
}
});
}

执行完 ChannelInitializer 匿名类中 initChannel 办法后,需将 ChannelInitializer 从 pipeline 中删除。并回调 ChannelInitializer 的 handlerRemoved 办法。删除过程笔者曾经在第六大节《6. 从 pipeline 删除 channelHandler》具体介绍过了。

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {if (initMap.add(ctx)) { // Guard against re-entrance.
try {
// 执行 ChannelInitializer 匿名类中的 initChannel 办法
initChannel((C) ctx.channel());
} catch (Throwable cause) {exceptionCaught(ctx, cause);
} finally {ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
// 初始化结束后,从 pipeline 中移除本身
pipeline.remove(this);
}
}
return true;
}
return false;
}

当执行完 initChannel 办法后此时 pipeline 的构造如下图所示:

当用户的自定义 ChannelInitializer 被增加进 pipeline 之后,依据第四大节所讲的增加逻辑,此时 NioServerSocketChannel 曾经向 main reactor 胜利注册结束,不再须要向 pipeine 的工作列表中增加 PendingHandlerAddedTask 工作,而是间接调用自定义 ChannelInitializer 中的 handlerAdded 回调,和下面的逻辑一样。不同的是这里最终回调至用户自定义的初始化逻辑实现 initChannel 办法中。执行完用户自定义的初始化逻辑之后,从 pipeline 删除用户自定义的 ChannelInitializer。

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)// 配置主从 Reactor
...........
.handler(new ChannelInitializer<NioServerSocketChannel>() {
@Override
protected void initChannel(NioServerSocketChannel ch) throws Exception {
.... 自定义 pipeline 初始化逻辑....
ChannelPipeline p = ch.pipeline();
p.addLast(channelHandler1);
p.addLast(channelHandler2);
p.addLast(channelHandler3);
........
}
})

随后 netty 会以异步工作的模式向 pipeline 的开端增加 ServerBootstrapAcceptor,至此 NioServerSocketChannel 中 pipeline 的初始化工作就全副实现了。

7.2 NioSocketChannel 中 pipeline 的初始化

在 7.1 大节中笔者举的这个 pipeline 初始化的例子相对来说比较复杂,当咱们把这个简单例子的初始化逻辑搞清楚之后,NioSocketChannel 中 pipeline 的初始化过程就变的很简略了。

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)// 配置主从 Reactor
...........
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
.... 自定义 pipeline 初始化逻辑....
ChannelPipeline p = ch.pipeline();
p.addLast(channelHandler1);
p.addLast(channelHandler2);
p.addLast(channelHandler3);
........
}
})
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
// 保留用户自定义 ChannelInitializer
private volatile ChannelHandler childHandler;
}

在《Netty 如何高效接管网络连接》一文中咱们介绍过,当客户端发动连贯,实现三次握手之后,NioServerSocketChannel 上的 OP_ACCEPT 事件沉闷,随后会在 NioServerSocketChannel 的 pipeline 中触发 channelRead 事件。并最终在 ServerBootstrapAcceptor 中初始化客户端 NioSocketChannel。

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
...........
}
}

在这里会将用户自定义的 ChannelInitializer 增加进 NioSocketChannel 中的 pipeline 中,因为此时 NioSocketChannel 还没有向 sub reactor 开始注册。所以在向 pipeline 中增加 ChannelInitializer 的同时会随同着 PendingHandlerAddedTask 被增加进 pipeline 的工作列表中。

前面的流程大家应该很相熟了,和咱们在 7.1 大节中介绍的截然不同,当 NioSocketChannel 再向 sub reactor 注册胜利之后,会执行 pipeline 中的工作列表中的 PendingHandlerAddedTask 工作,在 PendingHandlerAddedTask 工作中会回调用户自定义 ChannelInitializer 的 handelrAdded 办法,在该办法中执行 initChannel 办法,用户自定义的初始化逻辑就封装在这外面。在初始化完 pipeline 后,将 ChannelInitializer 从 pipeline 中删除,并回调其 handlerRemoved 办法。

至此客户端 NioSocketChannel 中 pipeline 初始化工作就全副实现了。

8. 事件流传

在本文第三大节《3. pipeline 中的事件分类》中咱们介绍了 Netty 事件类型共分为三大类,别离是 Inbound 类事件,Outbound 类事件,ExceptionCaught 事件。并具体介绍了这三类事件的掩码示意,和触发机会,以及事件流传的方向。

本大节咱们就来依照 Netty 中异步事件的分类从源码角度剖析下事件是如何在 pipeline 中进行流传的。

8.1 Inbound 事件的流传

在第三大节中咱们介绍了所有的 Inbound 类事件,这些事件在 pipeline 中的流传逻辑和流传方向都是一样的,惟一的区别就是执行的回调办法不同。

本大节咱们就以 ChannelRead 事件的流传为例,来阐明 Inbound 类事件是如何在 pipeline 中进行流传的。

第三大节中咱们提到过,在 NioSocketChannel 中,ChannelRead 事件的触发机会是在每一次 read loop 读取数据之后在 pipeline 中触发的。

do {
............
allocHandle.lastBytesRead(doReadBytes(byteBuf));
............
// 在客户端 NioSocketChannel 的 pipeline 中触发 ChannelRead 事件
pipeline.fireChannelRead(byteBuf);
} while (allocHandle.continueReading());

从这里能够看到,任何 Inbound 类事件在 pipeline 中的流传终点都是从 HeadContext 头结点开始的。

public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelPipeline fireChannelRead(Object msg) {AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
.........
}

ChannelRead 事件从 HeadContext 开始在 pipeline 中流传,首先就会回调 HeadContext 中的 channelRead 办法。

在执行 ChannelHandler 中的相应事件回调办法时,须要确保回调办法的执行在指定的 executor 中进行。

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
// 须要保障 channelRead 事件回调在 channelHandler 指定的 executor 中进行
if (executor.inEventLoop()) {next.invokeChannelRead(m);
} else {executor.execute(new Runnable() {
@Override
public void run() {next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {if (invokeHandler()) {
try {((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {invokeExceptionCaught(t);
}
} else {fireChannelRead(msg);
}
}

在执行 HeadContext 的 channelRead 办法产生异样时,就会回调 HeadContext 的 exceptionCaught 办法。并在相应的事件回调办法中决定是否将事件持续在 pipeline 中流传。

final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {ctx.fireChannelRead(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {ctx.fireExceptionCaught(cause);
}
}

在 HeadContext 中通过 ctx.fireChannelRead(msg) 持续将 ChannelRead 事件在 pipeline 中向后流传。

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}
}

这里的 findContextInbound 办法是整个 inbound 类事件在 pipeline 中流传的外围所在。

因为咱们当初须要持续将 ChannelRead 事件在 pipeline 中流传,所以咱们目前的外围问题就是通过 findContextInbound 办法在 pipeline 中找到下一个对 ChannelRead 事件感兴趣的 ChannelInboundHandler。而后执行该 ChannelInboundHandler 的 ChannelRead 事件回调。

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
// 须要保障 channelRead 事件回调在 channelHandler 指定的 executor 中进行
if (executor.inEventLoop()) {next.invokeChannelRead(m);
} else {executor.execute(new Runnable() {
@Override
public void run() {next.invokeChannelRead(m);
}
});
}
}

ChannelRead 事件就这样周而复始的始终在 pipeline 中流传,在流传的过程中只有对 ChannelRead 事件感兴趣的 ChannelInboundHandler 才能够响应。其余类型的 ChannelHandler 则间接跳过。

如果 ChannelRead 事件在 pipeline 中流传的过程中,没有失去其余 ChannelInboundHandler 的无效解决,最终会被流传到 pipeline 的开端 TailContext 中。而在本文第二大节中,咱们也提到过 TailContext 对于 inbound 事件存在的意义就是做一个兜底的解决。比方:打印日志,开释 bytebuffer。

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {onUnhandledInboundMessage(ctx, msg);
}
protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {onUnhandledInboundMessage(msg);
if (logger.isDebugEnabled()) {logger.debug("Discarded message pipeline : {}. Channel : {}.",
ctx.pipeline().names(), ctx.channel());
}
}
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug("Discarded inbound message {} that reached at the tail of the pipeline." +
"Please check your pipeline configuration.", msg);
} finally {
// 开释 DirectByteBuffer
ReferenceCountUtil.release(msg);
}
}
}

8.2 findContextInbound

本大节要介绍的 findContextInbound 办法和咱们在上篇文章《一文聊透 Netty 发送数据全流程》中介绍的 findContextOutbound 办法均是 netty 异步事件在 pipeline 中流传的外围所在。

事件流传的外围问题就是须要高效的在 pipeline 中依照事件的流传方向,找到下一个具备响应事件资格的 ChannelHandler。

比方:这里咱们在 pipeline 中流传的 ChannelRead 事件,咱们就须要在 pipeline 中找到下一个对 ChannelRead 事件感兴趣的 ChannelInboundHandler,并执行该 ChannelInboudnHandler 的 ChannelRead 事件回调,在 ChannelRead 事件回调中对事件进行业务解决,并决定是否通过 ctx.fireChannelRead(msg) 将 ChannelRead 事件持续向后流传。

private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
EventExecutor currentExecutor = executor();
do {ctx = ctx.next;} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
return ctx;
}

参数 mask 示意咱们正在流传的 ChannelRead 事件掩码 MASK_CHANNEL_READ。

static final int MASK_EXCEPTION_CAUGHT = 1;
static final int MASK_CHANNEL_REGISTERED = 1 << 1;
static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
static final int MASK_CHANNEL_ACTIVE = 1 << 3;
static final int MASK_CHANNEL_INACTIVE = 1 << 4;
static final int MASK_CHANNEL_READ = 1 << 5;
static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;

通过 ctx = ctx.next 在 pipeline 中找到下一个 ChannelHandler,并通过 skipContext 办法判断下一个 ChannelHandler 是否具备响应事件的资格。如果没有则跳过持续向后查找。

比方:下一个 ChannelHandler 如果是一个 ChannelOutboundHandler,或者下一个 ChannelInboundHandler 对 ChannelRead 事件不感兴趣,那么就间接跳过。

8.3 skipContext

该办法次要用来判断下一个 ChannelHandler 是否具备 mask 代表的事件的响应资格。

private static boolean skipContext(AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {return (ctx.executionMask & (onlyMask | mask)) == 0 ||
(ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
}
  • 参数 onlyMask 示意咱们须要查找的 ChannelHandler 类型,比方这里咱们正在流传 ChannelRead 事件,它是一个 inbound 类事件,那么必须只能由 ChannelInboundHandler 来响应解决,所以这里传入的 onlyMask 为 MASK_ONLY_INBOUND(ChannelInboundHandler 的掩码示意)
  • ctx.executionMask 咱们曾经在《5.3 ChanneHandlerContext》大节中具体介绍过了,当 ChannelHandler 被增加进 pipeline 中时,须要计算出该 ChannelHandler 感兴趣的事件汇合掩码来,保留在对应 ChannelHandlerContext 的 executionMask 字段中。
  • 首先会通过 ctx.executionMask & (onlyMask | mask)) == 0 来判断下一个 ChannelHandler 类型是否正确,比方咱们正在流传 inbound 类事件,下一个却是一个 ChannelOutboundHandler,那么必定是要跳过的,持续向后查找。
  • 如果下一个 ChannelHandler 的类型正确,那么就会通过 (ctx.executionMask & mask) == 0 来判断该 ChannelHandler 是否对正在流传的 mask 事件感兴趣。如果该 ChannelHandler 中笼罩了 ChannelRead 回调则执行,如果没有笼罩对应的事件回调办法则跳过,持续向后查找,直到 TailContext。

以上就是 skipContext 办法的外围逻辑,这里表白的外围语义是:

  • 如果 pipeline 中流传的是 inbound 类事件,则必须由 ChannelInboundHandler 来响应,并且该 ChannelHandler 必须笼罩实现对应的 inbound 事件回调。
  • 如果 pipeline 中流传的是 outbound 类事件,则必须由 ChannelOutboundHandler 来响应,并且该 ChannelHandler 必须笼罩实现对应的 outbound 事件回调。

这里大部分同学可能会对 ctx.executor() == currentExecutor 这个条件感到很纳闷。加上这个条件,其实对咱们这里的外围语义并没有多大影响。

  • 当 ctx.executor() == currentExecutor 也就是说前后两个 ChannelHandler 指定的 executor 雷同时,咱们外围语义放弃不变。
  • ctx.executor() != currentExecutor 也就是前后两个 ChannelHandler 指定的 executor 不同时,语义变为:只有前后两个 ChannelHandler 指定的 executor 不同,不论下一个 ChannelHandler 有没有笼罩实现指定事件的回调办法,均不能跳过。 在这种状况下会执行到 ChannelHandler 的默认事件回调办法,持续在 pipeline 中传递事件。咱们在《5.3 ChanneHandlerContext》大节提到过 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter 会别离对 inbound 类事件回调办法和 outbound 类事件回调办法进行默认的实现。
public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {
@Skip
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,
ChannelPromise promise) throws Exception {ctx.bind(localAddress, promise);
}
@Skip
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {ctx.connect(remoteAddress, localAddress, promise);
}
@Skip
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)
throws Exception {ctx.disconnect(promise);
}
@Skip
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise)
throws Exception {ctx.close(promise);
}
@Skip
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {ctx.deregister(promise);
}
@Skip
@Override
public void read(ChannelHandlerContext ctx) throws Exception {ctx.read();
}
@Skip
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {ctx.write(msg, promise);
}
@Skip
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {ctx.flush();
}
}

而这里之所以须要退出 ctx.executor() == currentExecutor 条件的判断,是为了避免 HttpContentCompressor 在被指定不同的 executor 状况下无奈正确的创立压缩内容,导致的一些异样。但这个不是本文的重点,大家只须要了解这里的外围语义就好,这种非凡状况的非凡解决理解一下就好。

8.4 Outbound 事件的流传

对于 Outbound 类事件的流传,笔者在上篇文章《一文搞懂 Netty 发送数据全流程》中曾经进行了具体的介绍,本大节就不在赘述。

8.5 ExceptionCaught 事件的流传

在最初咱们来介绍下异样事件在 pipeline 中的流传,ExceptionCaught 事件和 Inbound 类事件一样都是在 pipeline 中从前往后开始流传。

ExceptionCaught 事件的触发有两种状况:一种是 netty 框架外部产生的异样,这时 netty 会间接在 pipeline 中触发 ExceptionCaught 事件的流传。异样事件会在 pipeline 中从 HeadContext 开始始终向后流传直到 TailContext。

比方 netty 在 read loop 中读取数据时产生异样:

try {
...........
do {
............
allocHandle.lastBytesRead(doReadBytes(byteBuf));
............
// 客户端 NioSocketChannel 的 pipeline 中触发 ChannelRead 事件
pipeline.fireChannelRead(byteBuf);
} while (allocHandle.continueReading());
...........
} catch (Throwable t) {handleReadException(pipeline, byteBuf, t, close, allocHandle);
}

这时会 netty 会间接从 pipeline 中触发 ExceptionCaught 事件的流传。

private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
RecvByteBufAllocator.Handle allocHandle) {
.............
pipeline.fireExceptionCaught(cause);
.............
}

和 Inbound 类事件一样,ExceptionCaught 事件会在 pipeline 中从 HeadContext 开始始终向后流传。

@Override
public final ChannelPipeline fireExceptionCaught(Throwable cause) {AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
return this;
}

第二种触发 ExceptionCaught 事件的状况是,当 Inbound 类事件或者 flush 事件在 pipeline 中流传的过程中,在某个 ChannelHandler 中的事件回调办法解决中产生异样,这时该 ChannelHandler 的 exceptionCaught 办法会被回调。用户能够在这里解决异样事件,并决定是否通过 ctx.fireExceptionCaught(cause) 持续向后流传异样事件。

比方咱们在 ChannelInboundHandler 中的 ChannelRead 回调中解决业务申请时产生异样,就会触发该 ChannelInboundHandler 的 exceptionCaught 办法。

private void invokeChannelRead(Object msg) {if (invokeHandler()) {
try {((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {invokeExceptionCaught(t);
}
} else {fireChannelRead(msg);
}
}
private void invokeExceptionCaught(final Throwable cause) {if (invokeHandler()) {
try {
// 触发 channelHandler 的 exceptionCaught 回调
handler().exceptionCaught(this, cause);
} catch (Throwable error) {........} else {........}
}

再比方:当咱们在 ChannelOutboundHandler 中的 flush 回调中解决业务后果发送的时候产生异样,也会触发该 ChannelOutboundHandler 的 exceptionCaught 办法。

private void invokeFlush0() {
try {((ChannelOutboundHandler) handler()).flush(this);
} catch (Throwable t) {invokeExceptionCaught(t);
}
}

咱们能够在 ChannelHandler 的 exceptionCaught 回调中进行异样解决,并决定是否通过 ctx.fireExceptionCaught(cause) 持续向后流传异样事件。

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
......... 异样解决.......
ctx.fireExceptionCaught(cause);
}
@Override
public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {invokeExceptionCaught(findContextInbound(MASK_EXCEPTION_CAUGHT), cause);
return this;
}
static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {ObjectUtil.checkNotNull(cause, "cause");
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {next.invokeExceptionCaught(cause);
} else {
try {executor.execute(new Runnable() {
@Override
public void run() {next.invokeExceptionCaught(cause);
}
});
} catch (Throwable t) {if (logger.isWarnEnabled()) {logger.warn("Failed to submit an exceptionCaught() event.", t);
logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
}
}
}
}

8.6 ExceptionCaught 事件和 Inbound 类事件的区别

尽管 ExceptionCaught 事件和 Inbound 类事件在流传方向都是在 pipeline 中从前向后流传。然而大家这里留神辨别这两个事件的区别。

在 Inbound 类事件流传过程中是会查找下一个具备事件响应资格的 ChannelInboundHandler。遇到 ChannelOutboundHandler 会间接跳过。

而 ExceptionCaught 事件无论是在哪种类型的 channelHandler 中触发的,都会从以后异样 ChannelHandler 开始始终向后流传,ChannelInboundHandler 能够响应该异样事件,ChannelOutboundHandler 也能够响应该异样事件。

因为无论异样是在 ChannelInboundHandler 中产生的还是在 ChannelOutboundHandler 中产生的,exceptionCaught 事件都会在 pipeline 中是从前向后流传,并且不关怀 ChannelHandler 的类型。所以咱们个别将负责对立异样解决的 ChannelHandler 放在 pipeline 的最初,这样它对于 inbound 类异样和 outbound 类异样均能够捕捉失去。


总结

本文波及到的内容比拟多,通过 netty 异步事件在 pipeline 中的编排和流传这条主线,咱们相当于将之前的文章内容从新又回顾总结了一遍。

本文中咱们具体介绍了 pipeline 的组成构造,它次要是由 ChannelHandlerContext 类型节点组成的双向链表。ChannelHandlerContext 蕴含了 ChannelHandler 执行上下文的信息,从而能够使 ChannelHandler 只关注于 IO 事件的解决,遵循了繁多准则和开闭准则。

此外 pipeline 构造中还蕴含了一个工作链表,用来寄存执行 ChannelHandler 中的 handlerAdded 回调和 handlerRemoved 回调。pipeline 还持有了所属 channel 的援用。

咱们还具体介绍了 Netty 中异步事件的分类:Inbound 类事件,Outbound 类事件,ExceptionCaught 事件。并具体介绍了每种分类下的所有事件的触发机会和在 pipeline 中的流传门路。

最初介绍了 pipeline 的构造以及创立和初始化过程,以及对 pipeline 相干操作的源码实现。

两头咱们又交叉介绍了 ChannelHanderContext 的构造,介绍了 ChannelHandlerContext 具体封装了哪些对于 ChannelHandler 执行的上下文信息。

本文的内容到这里就完结了,感激大家的观看,咱们下篇文章见~~~

正文完
 0