关于程序员:NioServerSocketChannel的注册源码解析

32次阅读

共计 9616 个字符,预计需要花费 25 分钟才能阅读完成。

咱们上一章剖析了 Netty 中 NioServerSocketChaennl 的创立于初始化,本章节将持续剖析 NioServerSocketChannel 的剖析,NioServerSocketChannel 是 Netty 官网封装的一个通道对象,旨用来代替或者包装 JDK 原生的 SocketChannel 对象,那么他是如何讲 NioServerSocketChannel 于 JDK 的 NIO 相干代码关联起来的呢?

一、源码入口寻找

咱们上一节课次要剖析的源码办法是 initAndRegister 办法,其实从名字能够看进去,这里是做通道的初始化于注册的,咱们持续回到这个办法, 该办法的寻找,参照上一章节:

AbstractBootstrap#initAndRegister

咱们跳过上节课曾经剖析的代码,间接来到注册相干的逻辑:

ChannelFuture regFuture = config().group().register(channel);

咱们一一办法进行剖析:

config()

当初咱们创立的 ServerBootstrap,所以为什么选这个我就不多说了:

private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);

咱们能够看到,他返回的是这个对象,该对象是再创立 ServerBootstrap 的时候主动创立的,咱们看,他构造方法外面穿了一个 this, 证实 他持有一个 ServerBootstrap 的援用,这代表着他能够通过这个对象,获取 ServerBootstrap 内所有的属性和办法!获取到这个类之后干嘛了呢?

config().group()

预计大家很多都曾经猜出来了,咱们间接点进 group 外面去验证一下:

@SuppressWarnings("deprecation")
public final EventLoopGroup group() {return bootstrap.group();
}

该代码是获取到了咱们再构建 ServerBootstrap 的时候设置的 bossGroup 对象,有趣味的能够追一下,这里比较简单就不做太多的论述了,咱们持续回到主线,

config().group().register(channel);

咱们通过上述代码的剖析,晓得了 group 办法返回的是 NioEventLoopGroup,咱们进入到 register 办法:

咱们发现这里并没有 NioEventLoopGroup,然而通过前几章咱们的学习,咱们晓得 NioEventLoopGroup 是MultithreadEventLoopGroup 的子类,所以咱们子类没有往父类找,咱们进入到 MultithreadEventLoopGroup 源码外面:

@Override
public ChannelFuture register(Channel channel) {
    // 一般来说这里获取的 NioEventLoop 他有继承与  SingleThreadEventLoop
    return next().register(channel);
}

在这里,咱们看到了一个咱们后面剖析过得代码,next(),他调用的是 chooser.next();, chooser 是咱们在构建NioEventLoopGroup 的时候创立的一个执行器的选择器,next办法的性能是轮训的返回一个线程执行器:NioEventLoop!记不太清的同学能够回头看 NioEventLoopGroup 初始化源码解析的那一章代码!

当初咱们依据前几章的根底,咱们晓得了 next()办法返回的是一个 NioEventLoop 类,咱们进入到 register()办法查看:

然而,咱们发现 NioEventLoop 相干的实现,然而咱们依据后面所学,咱们能够晓得,NioEventLoop 的父类是 SingleThreadEventLoop,所以咱们进入到 SingleThreadEventLoop#register(io.netty.channel.Channel):

@Override
public ChannelFuture register(Channel channel) {
    // 调用自身的注册办法
    return register(new DefaultChannelPromise(channel, this));
}

// 没什么可说的持续往下追
@Override
public ChannelFuture register(final ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}

咱们肯定可能猜到,这里的次要代码是:promise.channel().unsafe().register(this, promise);

咱们上一章剖析过 unsafe 是 NioMessageUnsafe, 然而 register 却没有他的实现:

咱们还是须要往父类追,进入到 io.netty.channel.AbstractChannel.AbstractUnsafe#register(this, promise):

咱们这里先关注一下参数 :

this: 传入的是他自身,他自身是个什么 NioEventLoop,也就是说,他传入了一个执行器

promise:NioServerSocketChannel 的包装对象

咱们进入到 register 办法中,剖析次要代码:

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ...................... 临时疏忽不必要代码.............................
    AbstractChannel.this.eventLoop = eventLoop;
    // 留神此时的 thread = null 所以返回 false
    if (eventLoop.inEventLoop()) {
        // 理论的注册 注册 selector 触发 handlerAdded 事件和 channelRegistered 事件
        register0(promise);
    } else {....................... 临时疏忽不必要代码......................}
}
AbstractChannel.this.eventLoop = eventLoop;

首先咱们将上一步获取的执行器保留在 NioServerSocketChannel 中!这行代码无力的证实了,每一个 Channel 绑定一个 NioEventLoop 对象!

if (eventLoop.inEventLoop()) {
    // 理论的注册 注册 selector 触发 handlerAdded 事件和 channelRegistered 事件
    register0(promise);
}

留神:这里我须要廓清一点,实在的调试过程中,并不会走这个分支,而是会走 else 分支异步进行注册,这里为了更不便大家了解,我就按照 if 分支进行源码剖析,其实没有太大变动,都是调用 register0 办法进行注册,只不过一个同步一个异步,对于异步,是 Netty 中及其重要的一个知识点,我将放到前面独自开一章进行解说!

咱们进入到 register0 源码外面:

private void register0(ChannelPromise promise) {
    try {.............. 疏忽代码..................]
        // 理论的注册  调用 jdk 底层的数据注册 selector
        // 调用 JDK 底层的 register() 进行注册
        //io.netty.channel.nio.AbstractNioChannel.doRegister
        doRegister();
        neverRegistered = false;
        registered = true;

        // 告诉管道  流传 handlerAdded 事件
        // 触发 handlerAdded 事件 触发工作 add 事件
        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        // 告诉管道  流传 channelRegistered 事件
        // 触发 channelRegistered 事件
        pipeline.fireChannelRegistered();
        // 如果从未注册过频道,则仅触发 channelActive。// 如果勾销注册并从新注册通道,则多个通道处于活动状态。//isActive() 返回 false
        // 此时 Channel 还未注册绑定地址,所以处于非沉闷状态
        if (isActive()) {.................... 疏忽不必要代码..................}
    } catch (Throwable t) {
        // 间接敞开通道以防止 FD 透露。closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

二、源码解析

doRegister();

doRegister();

真正的注册办法,该办法是将 Netty 自身的 NioServerSocket 与 JDK 连接起来的最重要的一个类!

selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

javaChannel()办法是返回 JDK 原生的 SocketChannel,他是再 NioServerSocketChannel 初始化的时候被保留的,还记得咱们再讲述 NIO 开发 Socket 的时候的流程吗

咱们重点关注一下 javaChannel().register 的参数:

eventLoop().unwrappedSelector():NioEventLoop 再创立的时候,会保留两个选择器,一个是 JDK 的原始的选择器,一个是通过 Netty 包装的选择器,这里返回的是原生的选择器!

0:不关注任何事件

this:this 代表着以后类,他是 NioServerSocketChannel 类型的,他将一个 NioServerSocketChannel 的对象,绑定到了 JDK 原生的选择器,后续只须要通过 SelectionKey.attachment(),就能获取到 NioServerSocketChannel,而一个 NioServerSocketChannel 外面又蕴含一个 JDK 原生的 Channel 对象,就能够基于该 jdk 原生的 Channel 来进行各种读写操作!

到当初为止,咱们就实现 JDK 中的 NIO 的将通道绑定到选择器上,咱们回到上一步:

pipeline.invokeHandlerAddedIfNeeded

pipeline.invokeHandlerAddedIfNeeded();

开始回调 pipeline 通道外面增加自定义事件:

final void invokeHandlerAddedIfNeeded() {assert channel.eventLoop().inEventLoop();
    if (firstRegistration) {
        firstRegistration = false;
        // 当初,咱们已注册到 EventLoop。当初该调用 ChannelHandler 的回调了,// 在实现注册之前增加的内容。callHandlerAddedForAllHandlers();}
}

//callHandlerAddedForAllHandlers
private void callHandlerAddedForAllHandlers() {
    //task = PendingHandlerAddedTask
    PendingHandlerCallback task = pendingHandlerCallbackHead;
    while (task != null) {task.execute();
        task = task.next;
    }
}

须要留神的是 PendingHandlerCallback task 是 PendingHandlerAddedTask 类型的,他是什么时候加载的呢?切实咱们初始化 NioServerSocketChannel 的时候调用 addLast 办法的时候被赋的值,有趣味的小伙伴能够本人去跟一下源码,这里间接进入到:

if (executor.inEventLoop()) {callHandlerAdded0(ctx);
}

// 进入到 callHandlerAdded0 源码逻辑
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
    try{ctx.callHandlerAdded();
    }
    .......................
}

// 进入到 ctx.callHandlerAdded();
final void callHandlerAdded() throws Exception {if (setAddComplete()) {handler().handlerAdded(this);
    }
}

还接记得 handler()吗,我再 NioServerSocketChannel 初始化的时候说过,过后程序向 pipeline 中增加了一个 ChannelInitializer, 这里返回的就是那个 ChannelInitializer!咱们进入到 ChannelInitializer#handlerAdded 办法外面:

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {if (ctx.channel().isRegistered()) {if (initChannel(ctx)) {removeState(ctx);
        }
    }
}

首先咱们重点关注一个 initChannel(ctx)

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    // 避免再次进入。if (initMap.add(ctx)) {
        try {// 调用 ChannelInitializer 实现的 initChannel() 办法
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {................................} finally {ChannelPipeline pipeline = ctx.pipeline();
            if (pipeline.context(this) != null) {
                // 将 ChannelInitializer 本身从 Pipeline 中移出
                pipeline.remove(this);
            }
        }
        return true;
    }
    return false;
}
initChannel((C) ctx.channel());

该办法会回调 ChannelInitializer 的形象办法 initChannel,该形象办法在咱们初始化的时候实现,咱们就要找到实现这个形象办法的中央,咱们回到上一节课的代码:io.netty.bootstrap.ServerBootstrap#init

void init(Channel channel) {
    ..........................;
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) {final ChannelPipeline pipeline = ch.pipeline();
            // 将用户自定义的 handler 增加进管道  handler 是在构建 ServerBootStr 的时候传入的  handler
            ChannelHandler handler = config.handler();
            if (handler != null) {pipeline.addLast(handler);
            }

            ch.eventLoop().execute(() -> {
                pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
            });
    }    
}

上一节课讲的时候,咱们将这一段逻辑略过了,只说是会向通道中增加一个 ChannelInitializer 实现,当初开始回调他的 initChannel 办法了:

ChannelHandler handler = config.handler();
if (handler != null) {pipeline.addLast(handler);
}

这段代码会将客户再构建 ServerBootstrap 的时候传入的 handler 增加进通道,咱们为了不便了解,假如用户没有设置 handler,所以这个 handler 判断不通过,跳过,咱们持续往下:

ch.eventLoop().execute(() -> {
    pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
});

这里异步的向管道流注册一个默认的 Handler, 为什么说是异步的,咱们暂且不说,咱们暂且认为是同步的进行 add,此时咱们的通道如下:

ServerBootstrapAcceptor 的作用是专门用于新连贯接入的,

ServerBootstrapAcceptor(
                final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
                Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
    this.childGroup = childGroup;
    this.childHandler = childHandler;
    this.childOptions = childOptions;
    this.childAttrs = childAttrs;

    enableAutoReadTask = new Runnable() {
        @Override
        public void run() {channel.config().setAutoRead(true);
        }
    };
}

咱们能够看到,他会保留一系列的参数,包含 WorkGroup、childHandler、childOptions、childAttrs 这些参数都是咱们再创立 serverBootstrap 的时候传入的参数,这也证实了,这些参数是作用于客户端 Socket 连贯的!

无关 ServerBootstrapAcceptor,后续会进行一个具体的剖析,咱们接着说,这里须要重点讲一下 addLast 办法,

@Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        ........................ 疏忽.........................
        // 告诉增加办法回调
        callHandlerAdded0(newCtx);
        return this;
    }

在进行增加的时候,他会回调外部产生的 handlerAdded 办法,还记得,咱们在介绍 Netty 的根本架构的业务通道章节吗?

再调用 addLast 之后,该办法会被回调!

这样就讲所有的办法注册结束了,咱们持续回到 ChannelInitializer#handlerAdded 办法,当 initChannel(ctx) 调用完了之后:

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    // 避免再次进入。if (initMap.add(ctx)) {
        try {// 调用 ChannelInitializer 实现的 initChannel() 办法
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {................................} finally {ChannelPipeline pipeline = ctx.pipeline();
            if (pipeline.context(this) != null) {
                // 将 ChannelInitializer 本身从 Pipeline 中移出
                pipeline.remove(this);
            }
        }
        return true;
    }
    return false;
}

咱们会进入到 finally 外面,咱们会看到,此时会做一个操作,删除以后的类,以后的类是谁,是 ChannelInitializer, 所以删除结束后,此时管道对象的构造如图所示:

至此 invokeHandlerAddedIfNeeded 剖析结束

pipeline.fireChannelRegistered();

@Override
public final ChannelPipeline fireChannelRegistered() {AbstractChannelHandlerContext.invokeChannelRegistered(head);
    return this;
}

这行代码其实没什么可说的,大家能够本人跟一下调试一下代码,这个代码的意义是从 HeadContext 节点开始流传 channelRegistered 办法:

至此,NioServerSocketChannel 的注册根本就剖析完了,有的同学可能感觉少剖析了一段:

if (isActive()) {if (firstRegistration) {
        //Channel 以后状态为沉闷时,触发 channelActive 事件
        pipeline.fireChannelActive();} else if (config().isAutoRead()) {
        // 该通道已注册,并已设置 autoRead()。这意味着咱们须要开始浏览
        // 再次,以便咱们解决入站数据。//
        // See https://github.com/netty/netty/issues/4805
        // 开始读事件
        beginRead();}
}

这段代码再第一次启动的时候并不会被调用,因为此时通道还没有绑定端口正式启动起了,所以这里 isActive 会返回 false, 无关逻辑,会在新连贯接入解说的时候进行剖析!

三、总结

  1. Netty 会调用 JDK 底层的注册办法,同时将自身的 NioServerSocketChannel 作为 att 绑定到抉择事件上!
  2. 当注册实现后会回调 handlerAdded 办法
  3. Netty 会回调再初始化 NioServerSocketChannel 的时候注册的 Channelinitialization<Channel>,增加一个新连贯接入器 ServerBootstrapAcceptor,并删除自身!
  4. 当注册实现后会回调 Channelregistered 办法

正文完
 0