关于java:学不懂Netty看不懂源码不存在的这篇文章手把手带你阅读Netty源码

3次阅读

共计 24772 个字符,预计需要花费 62 分钟才能阅读完成。

浏览这篇文章之前,倡议先浏览和这篇文章关联的内容。

1. 具体分析散布式微服务架构下网络通信的底层实现原理(图解)

2. (年薪 60W 的技巧)工作了 5 年,你真的了解 Netty 以及为什么要用吗?(深度干货)

3. 深度解析 Netty 中的外围组件(图解 + 实例)

4. BAT 面试必问细节:对于 Netty 中的 ByteBuf 详解

5. 通过大量实战案例合成 Netty 中是如何解决拆包黏包问题的?

6. 基于 Netty 实现自定义音讯通信协议(协定设计及解析利用实战)

7. 全网最具体最齐全的序列化技术及深度解析与利用实战

8. 手把手教你基于 Netty 实现一个根底的 RPC 框架(通俗易懂)

9. (年薪 60W 分水岭)基于 Netty 手写实现 RPC 框架进阶篇(带注册核心和注解)

提前准备好如下代码,从服务端构建着手,深入分析 Netty 服务端的启动过程。

public class NettyBasicServerExample {public void bind(int port){
        //netty 的服务端编程要从 EventLoopGroup 开始,// 咱们要创立两个 EventLoopGroup,// 一个是 boss 专门用来接管连贯,能够了解为解决 accept 事件,// 另一个是 worker,能够关注除了 accept 之外的其它事件,解决子工作。// 下面留神,boss 线程个别设置一个线程,设置多个也只会用到一个,而且多个目前没有利用场景,// worker 线程通常要依据服务器调优,如果不写默认就是 cpu 的两倍。EventLoopGroup bossGroup=new NioEventLoopGroup();

        EventLoopGroup workerGroup=new NioEventLoopGroup();
        try {
            // 服务端要启动,须要创立 ServerBootStrap,// 在这外面 netty 把 nio 的模板式的代码都给封装好了
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                // 配置 Server 的通道,相当于 NIO 中的 ServerSocketChannel
                .channel(NioServerSocketChannel.class)
                .handler(new LoggingHandler(LogLevel.INFO)) // 设置 ServerSocketChannel 对应的 Handler
                //childHandler 示意给 worker 那些线程配置了一个处理器,// 这个就是下面 NIO 中说的,把解决业务的具体逻辑形象进去,放到 Handler 外面
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline()
                            .addLast(new NormalInBoundHandler("NormalInBoundA",false))
                            .addLast(new NormalInBoundHandler("NormalInBoundB",false))
                            .addLast(new NormalInBoundHandler("NormalInBoundC",true));
                        socketChannel.pipeline()
                            .addLast(new NormalOutBoundHandler("NormalOutBoundA"))
                            .addLast(new NormalOutBoundHandler("NormalOutBoundB"))
                            .addLast(new NormalOutBoundHandler("NormalOutBoundC"))
                            .addLast(new ExceptionHandler());
                    }
                });
            // 绑定端口并同步期待客户端连贯
            ChannelFuture channelFuture=bootstrap.bind(port).sync();
            System.out.println("Netty Server Started,Listening on :"+port);
            // 期待服务端监听端口敞开
            channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();
        } finally {
            // 开释线程资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();}
    }

    public static void main(String[] args) {new NettyBasicServerExample().bind(8080);
    }
}
public class NormalInBoundHandler extends ChannelInboundHandlerAdapter {
    private final String name;
    private final boolean flush;

    public NormalInBoundHandler(String name, boolean flush) {
        this.name = name;
        this.flush = flush;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("InboundHandler:"+name);
        if(flush){ctx.channel().writeAndFlush(msg);
        }else {throw new RuntimeException("InBoundHandler:"+name);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("InboundHandlerException:"+name);
        super.exceptionCaught(ctx, cause);
    }
}
public class NormalOutBoundHandler extends ChannelOutboundHandlerAdapter {
    private final String name;

    public NormalOutBoundHandler(String name) {this.name = name;}

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("OutBoundHandler:"+name);
        super.write(ctx, msg, promise);
    }
}

在服务端启动之前,须要配置 ServerBootstrap 的相干参数,这一步大略分为以下几个步骤

  • 配置 EventLoopGroup 线程组
  • 配置 Channel 类型
  • 设置 ServerSocketChannel 对应的 Handler
  • 设置网络监听的端口
  • 设置 SocketChannel 对应的 Handler
  • 配置 Channel 参数

Netty 会把咱们配置的这些信息组装,公布服务监听。

ServerBootstrap 参数配置过程

上面这段代码是咱们配置 ServerBootStrap 相干参数,这个过程比较简单,就是把配置的参数值保留到 ServerBootstrap 定义的成员变量中就能够了。

bootstrap.group(bossGroup, workerGroup)
    // 配置 Server 的通道,相当于 NIO 中的 ServerSocketChannel
    .channel(NioServerSocketChannel.class)
    .handler(new LoggingHandler(LogLevel.INFO)) // 设置 ServerSocketChannel 对应的 Handler
    //childHandler 示意给 worker 那些线程配置了一个处理器,// 这个就是下面 NIO 中说的,把解决业务的具体逻辑形象进去,放到 Handler 外面
    .childHandler(new ChannelInitializer<SocketChannel>() {});

咱们来看一下 ServerBootstrap 的类关系图以及属性定义

ServerBootstrap 类关系图

如图 8 - 1 所示,示意 ServerBootstrap 的类关系图。

  • AbstractBootstrap,定义了一个抽象类,作为抽象类,肯定是抽离了 Bootstrap 相干的形象逻辑,所以很显然能够推断出 Bootstrap 应该也继承了 AbstractBootstrap
  • ServerBootstrap,服务端的启动类,
  • ServerBootstrapAcceptor,继承了 ChannelInboundHandlerAdapter,所以自身就是一个 Handler,当服务端启动后,客户端连贯上来时,会先进入到 ServerBootstrapAccepter。

<img src=”https://mic-blob-bucket.oss-cn-beijing.aliyuncs.com/202111182305905.png” alt=”image-20210910154646643″ style=”zoom:80%;” />

<center> 图 8 -1 ServerBootstrap 类关系图 </center>

AbstractBootstrap 属性定义

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {@SuppressWarnings("unchecked")
    private static final Map.Entry<ChannelOption<?>, Object>[] EMPTY_OPTION_ARRAY = new Map.Entry[0];
    @SuppressWarnings("unchecked")
    private static final Map.Entry<AttributeKey<?>, Object>[] EMPTY_ATTRIBUTE_ARRAY = new Map.Entry[0];
    /**
     * 这里的 EventLoopGroup 作为服务端 Acceptor 线程,负责解决客户端的申请接入
     * 作为客户端 Connector 线程,负责注册监听连贯操作位,用于判断异步连贯后果。*/
    volatile EventLoopGroup group; //
    @SuppressWarnings("deprecation")
    private volatile ChannelFactory<? extends C> channelFactory;  //channel 工厂,很显著应该是用来制作对应 Channel 的
    private volatile SocketAddress localAddress;  //SocketAddress 用来绑定一个服务端地址

    // The order in which ChannelOptions are applied is important they may depend on each other for validation
    // purposes.
    /**
     * ChannelOption 能够增加 Channer 增加一些配置信息
     */
    private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
    private final Map<AttributeKey<?>, Object> attrs = new ConcurrentHashMap<AttributeKey<?>, Object>();
    /**
     *  ChannelHandler 是具体怎么解决 Channer 的 IO 事件。*/
    private volatile ChannelHandler handler;
}

对于上述属性定义,整体总结如下:

  1. 提供了一个 ChannelFactory 对象用来创立 Channel, 一个 Channel 会对应一个 EventLoop 用于 IO 的事件处理,在一个 Channel 的整个生命周期中 只会绑定一个 EventLoop, 这里可了解给 Channel 调配一个线程进行 IO 事件处理,完结后回收该线程。
  2. AbstractBootstrap 没有提供 EventLoop 而是提供了一个 EventLoopGroup,其实我认为这里只用一个 EventLoop 就行了。
  3. 不论是服务器还是客户端的 Channel 都须要绑定一个本地端口这就有了 SocketAddress 类的对象 localAddress。
  4. Channel 有很多选项所有有了 options 对象 LinkedHashMap<channeloption<?>, Object>
  5. 怎么解决 Channel 的 IO 事件呢,咱们增加一个事件处理器 ChannelHandler 对象。

ServerBootstrap 属性定义

ServerBootstrap 能够了解为服务器启动的工厂类,咱们能够通过它来实现服务器端的 Netty 初始化。主要职责:|

  • EventLoop 初始化
  • channel 的注册
  • pipeline 的初始化
  • handler 的增加过程
  • 服务端连贯解决。
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class);

    // The order in which child ChannelOptions are applied is important they may depend on each other for validation
    // purposes.
    //SocketChannel 相干的属性配置
    private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
    private final Map<AttributeKey<?>, Object> childAttrs = new ConcurrentHashMap<AttributeKey<?>, Object>();
    private final ServerBootstrapConfig config = new ServerBootstrapConfig(this); // 配置类
    private volatile EventLoopGroup childGroup;  // 工作线程组
    private volatile ChannelHandler childHandler; // 负责 SocketChannel 的 IO 解决相干的 Handler

    public ServerBootstrap() {}
}

服务端启动过程剖析

理解了 ServerBootstrap 相干属性的配置之后,咱们持续来看服务的启动过程,在开始往下剖析的时候,先无妨来思考以下这些问题

  • Netty 本人实现的 Channel 与底层 JDK 提供的 Channel 是如何分割并且构建实现的
  • ChannelInitializer 这个非凡的 Handler 处理器的作用以及实现原理
  • Pipeline 是如何初始化以的

ServerBootstrap.bind

先来看 ServerBootstrap.bind()办法的定义,这里次要用来绑定一个端口并且公布服务端监听。

依据咱们应用 NIO 相干 API 的了解,无非就是应用 JDK 底层的 API 来关上一个服务端监听并绑定一个端口。

 ChannelFuture channelFuture=bootstrap.bind(port).sync();
public ChannelFuture bind(SocketAddress localAddress) {validate();
    return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
  • validate(),验证 ServerBootstrap 核心成员属性的配置是否正确,比方 group、channelFactory、childHandler、childGroup 等,这些属性如果没配置,那么服务端启动会报错
  • localAddress,绑定一个本地端口地址

doBind

doBind 办法比拟长,从大的代码构造,能够分为三个局部

  • initAndRegister 初始化并注册 Channel,并返回一个 ChannelFuture,阐明初始化注册 Channel 是异步实现
  • regFuture.cause() 用来判断 initAndRegister() 是否产生异样,如果产生异样,则间接返回
  • regFuture.isDone(),判断 initAndRegister() 办法是否执行实现。

    • 如果执行实现,则调用 doBind0()办法。
    • 如果未执行实现,regFuture 增加一个监听回调,在监听回调中再次判断执行后果进行相干解决。
    • PendingRegistrationPromise 用来保留异步执行后果的状态

从整体代码逻辑来看,逻辑构造还是十分清晰的,initAndRegister()办法负责 Channel 的初始化和注册、doBind0()办法用来绑定端口。这个无非就是咱们应用 NIO 相干 API 公布服务所做的事件。

private ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {return regFuture;}
    
    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();

                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

initAndRegister

这个办法顾名思义,就是初始化和注册,基于咱们整个流程的剖析能够猜测到

  • 初始化,应该就是构建服务端的 Handler 解决链
  • register,应该就是把以后服务端的连贯注册到 selector 上

上面咱们通过源码印证咱们的猜测。

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        // 通过 ChannelFactory 创立一个具体的 Channel 实现
        channel = channelFactory.newChannel();
        init(channel); // 初始化
    } catch (Throwable t) {// 省略....}
    // 这个代码应该和咱们猜测是统一的,就是将以后初始化的 channel 注册到 selector 上,这个过程同样也是异步的
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) { // 获取 regFuture 的执行后果
        if (channel.isRegistered()) {channel.close();
        } else {channel.unsafe().closeForcibly();}
    }
    return regFuture;
}

channelFactory.newChannel()

这个办法在剖析之前,咱们能够持续揣测它的逻辑。

在最开始构建服务端的代码中,咱们通过 channel 设置了一个 NioServerSocketChannel.class 类对象,这个对象示意以后 channel 的构建应用哪种具体的 API

bootstrap.group(bossGroup, workerGroup)
    // 配置 Server 的通道,相当于 NIO 中的 ServerSocketChannel
    .channel(NioServerSocketChannel.class)

而在 initAndRegister 办法中,又用到了 channelFactory.newChannel()来生成一个具体的 Channel 实例,因而不难想到,这两者必然有肯定的分割,咱们也能够果断的认为,这个工厂会依据咱们配置的 channel 来动静构建一个指定的 channel 实例。

channelFactory 有多个实现类,所以咱们能够从配置办法中找到 channelFactory 的具体定义,代码如下。

public B channel(Class<? extends C> channelClass) {
    return channelFactory(new ReflectiveChannelFactory<C>(ObjectUtil.checkNotNull(channelClass, "channelClass")
    ));
}

channelFactory 对应的具体实现是:ReflectiveChannelFactory,因而咱们定位到 newChannel()办法的实现。

ReflectiveChannelFactory.newChannel

在该办法中,应用 constructor 构建了一个实例。

@Override
public T newChannel() {
    try {return constructor.newInstance();
    } catch (Throwable t) {throw new ChannelException("Unable to create Channel from class" + constructor.getDeclaringClass(), t);
    }
}

construtor 的初始化代码如下,用到了传递进来的 clazz 类,取得该类的结构器,该结构器后续能够通过 newInstance 创立一个实例对象

而此时的 clazz 其实就是:NioServerSocketChannel

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    private final Constructor<? extends T> constructor;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {ObjectUtil.checkNotNull(clazz, "clazz");
        try {this.constructor = clazz.getConstructor();
        } catch (NoSuchMethodException e) {throw new IllegalArgumentException("Class" + StringUtil.simpleClassName(clazz) +
                    "does not have a public non-arg constructor", e);
        }
    }
}

NioServerSocketChannel

NioServerSocketChannel 的构造方法定义如下。

public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {return provider.openServerSocketChannel();
        } catch (IOException e) {
            throw new ChannelException("Failed to open a server socket.", e);
        }
    }
    public NioServerSocketChannel() {this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
}

当 NioServerSocketChannel 实例化后,调用 newSocket 办法创立了一个服务端实例。

newSocket 办法中调用了provider.openServerSocketChannel(),来实现 ServerSocketChannel 的创立,ServerSocketChannel 就是 Java 中 NIO 中的服务端 API。

public ServerSocketChannel openServerSocketChannel() throws IOException {return new ServerSocketChannelImpl(this);
}

通过层层推演,最终看到了 Netty 是如何一步步封装,实现 ServerSocketChannel 的创立。

设置非阻塞

在 NioServerSocketChannel 中的构造方法中,先通过 super 调用父类做一些配置操作

public NioServerSocketChannel(ServerSocketChannel channel) {super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

最终,super 会调用 AbstractNioChannel 中的构造方法,

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp; // 设置关怀事件,此时是一个连贯事件,所以是 OP_ACCEPT
    try {ch.configureBlocking(false); // 设置非阻塞
    } catch (IOException e) {
        try {ch.close();
        } catch (IOException e2) {
            logger.warn("Failed to close a partially initialized socket.", e2);
        }

        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}

持续剖析 initAndRegister

剖析实现 channel 的初始化后,接下来就是要将以后 channel 注册到 Selector 上,所以持续回到 initAndRegister 办法。

final ChannelFuture initAndRegister() {
// 省略....
    // 这个代码应该和咱们猜测是统一的,就是将以后初始化的 channel 注册到 selector 上,这个过程同样也是异步的
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) { // 获取 regFuture 的执行后果
        if (channel.isRegistered()) {channel.close();
        } else {channel.unsafe().closeForcibly();}
    }
    return regFuture;
}

注册到某个 Selector 上,其实就是注册到某个 EventLoopGroup 中,如果大家能有这个猜测,阐明后面的内容是听懂了的。

config().group().register(channel)这段代码,其实就是获取在 ServerBootstrap 中配置的 bossEventLoopGroup,而后把以后的服务端 channel 注册到该 group 中。

此时,咱们通过快捷键想去看一下 register 的实现时,发现 EventLoopGroup 又有多个实现,咱们来看一下类关系图如图 8 - 2 所示。

<img src=”https://mic-blob-bucket.oss-cn-beijing.aliyuncs.com/202111182305717.png” alt=”image-20210910170502364″ style=”zoom:67%;” />

<center> 图 8 -3 EventLoopGroup 类关系图 </center>

而咱们在后面配置的 EventLoopGroup 的实现类是 NioEventLoopGroup,而 NioEventLoopGroup 继承自 MultithreadEventLoopGroup,所以在 register()办法中,咱们间接找到父类的实现办法即可。

MultithreadEventLoopGroup.register

这段代码大家都熟了,从 NioEventLoopGroup 中抉择一个 NioEventLoop,将以后 channel 注册下来

@Override
public ChannelFuture register(Channel channel) {return next().register(channel);
}

next()办法返回的是 NioEventLoop,而 NioEventLoop 又有多个实现类,咱们来看图 8 - 4 所示的类关系图。

<img src=”https://mic-blob-bucket.oss-cn-beijing.aliyuncs.com/202111182305139.png” alt=”image-20210910171415854″ style=”zoom:67%;” />

<center> 图 8 -4 NioEventLoop 类关系图 </center>

从类关系图中发现,发现 NioEventLoop 派生自 SingleThreadEventLoop,所以 next().register(channel); 办法,执行的是 SingleThreadEventLoop 中的 register

SingleThreadEventLoop.register

@Override
public ChannelFuture register(Channel channel) {return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}

ChannelPromise,派生自 Future,用来实现异步工作解决回调性能。简略来说就是把注册的动作异步化,当异步执行完结后会把执行后果回填到 ChannelPromise 中

AbstractChannel.register

抽象类个别就是公共逻辑的解决,而这里的解决次要就是针对一些参数的判断,判断完了之后再调用 register0() 办法。

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {ObjectUtil.checkNotNull(eventLoop, "eventLoop");
    if (isRegistered()) { // 判断是否曾经注册过
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    if (!isCompatible(eventLoop)) { // 判断 eventLoop 类型是否是 EventLoop 对象类型,如果不是则抛出异样
        promise.setFailure(new IllegalStateException("incompatible event loop type:" + eventLoop.getClass().getName()));
        return;
    }

    AbstractChannel.this.eventLoop = eventLoop;
    //Reactor 外部线程调用,也就是说以后 register 办法是 EventLoop 线程触发的,则执行上面流程
    if (eventLoop.inEventLoop()) {register0(promise);
    } else { // 如果是内部线程
        try {eventLoop.execute(new Runnable() {
                @Override
                public void run() {register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",
                AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}

AbstractChannel.register0

Netty 从 EventLoopGroup 线程组中抉择一个 EventLoop 和以后的 Channel 绑定,之后该 Channel 生命周期中的所有 I / O 事件都由这个 EventLoop 负责。

register0 办法次要做四件事:

  • 调用 JDK 层面的 API 对以后 Channel 进行注册
  • 触发 HandlerAdded 事件
  • 触发 channelRegistered 事件
  • Channel 状态为沉闷时,触发 channelActive 事件

在以后的 ServerSocketChannel 连贯注册的逻辑中,咱们只须要关注上面的 doRegister 办法即可。

private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}
        boolean firstRegistration = neverRegistered;
        doRegister();  // 调用 JDK 层面的 register()办法进行注册
        neverRegistered = false;
        registered = true;

        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
        // user may already fire events through the pipeline in the ChannelFutureListener.
        pipeline.invokeHandlerAddedIfNeeded(); // 触发 Handler,如果有必要的状况下

        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        if (isActive()) { // 此时是 ServerSocketChannel 的注册,所以连贯还处于非沉闷状态
            if (firstRegistration) {pipeline.fireChannelActive(); 
            } else if (config().isAutoRead()) {// This channel was registered before and autoRead() is set. This means we need to begin read
                // again so that we process inbound data.
                //
                // See https://github.com/netty/netty/issues/4805
                beginRead();}
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

AbstractNioChannel.doRegister

进入到 AbstractNioChannel.doRegister 办法。

javaChannel().register()负责调用 JDK 层面的办法,把 channel 注册到 eventLoop().unwrappedSelector() 上,其中第三个参数传入的是 Netty 本人实现的 Channel 对象,也就是把该对象绑定到 attachment 中。

这样做的目标是,后续每次调 Selector 对象进行事件轮询时,当触发事件时,Netty 都能够获取本人的 Channe 对象。

@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}

服务注册总结

上述代码比拟绕,然而整体总结下来并不难理解

  • 初始化指定的 Channel 实例
  • 把该 Channel 调配给某一个 EventLoop
  • 而后把 Channel 注册到该 EventLoop 的 Selector 中

AbstractBootstrap.doBind0

剖析完了注册的逻辑后,再回到 AbstractBootstrap 类中的 doBind0 办法,这个办法不必看也能晓得,ServerSocketChannel 初始化了之后,接下来要做的就是绑定一个 ip 和端口地址。

private static void doBind0(
    final ChannelFuture regFuture, final Channel channel,
    final SocketAddress localAddress, final ChannelPromise promise) {

    // 获取以后 channel 中的 eventLoop 实例,执行一个异步工作。// 须要留神,以前咱们在课程中讲过,eventLoop 在轮询中一方面要执行 select 遍历,另一方面要执行阻塞队列中的工作,而这里就是把工作增加到队列中异步执行。channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            // 如果 ServerSocketChannel 注册胜利,则调用该 channel 的 bind 办法
            if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {promise.setFailure(regFuture.cause());
            }
        }
    });
}

channel.bind 办法,会依据 ServerSocketChannel 中的 handler 链配置,一一进行调用,因为在本次案例中,咱们给 ServerSocketChannel 配置了一个 LoggingHandler 的处理器,所以 bind 办法会先调用 LoggingHandler,而后再调用 DefaultChannelPipeline 中的 bind 办法,调用链路

-> DefaultChannelPipeline.ind

​ -> AbstractChannel.bind

​ -> NioServerSocketChannel.doBind

最终就是调用后面初始化好的 ServerSocketChannel 中的 bind 办法绑定本地地址和端口。

protected void doBind(SocketAddress localAddress) throws Exception {if (PlatformDependent.javaVersion() >= 7) {javaChannel().bind(localAddress, config.getBacklog());
    } else {javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

构建 SocketChannel 的 Pipeline

在 ServerBootstrap 的配置中,咱们针对 SocketChannel,配置了入站和出站的 Handler,也就是当某个 SocketChannel 的 IO 事件就绪时,就会依照咱们配置的处理器链表进行逐个解决,那么这个链表是什么时候构建的,又是什么样的构造呢?上面咱们来剖析这部分的内容

.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline()
            .addLast(new NormalInBoundHandler("NormalInBoundA",false))
            .addLast(new NormalInBoundHandler("NormalInBoundB",false))
            .addLast(new NormalInBoundHandler("NormalInBoundC",true));
        socketChannel.pipeline()
            .addLast(new NormalOutBoundHandler("NormalOutBoundA"))
            .addLast(new NormalOutBoundHandler("NormalOutBoundB"))
            .addLast(new NormalOutBoundHandler("NormalOutBoundC"))
            .addLast(new ExceptionHandler());
    }
});

childHandler 的构建

childHandler 的构建过程,在 AbstractChannel.register0 办法中实现

final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {channel = channelFactory.newChannel(); // 这是是创立 channel
            init(channel); // 这里是初始化
        } catch (Throwable t) {// 省略....}
        ChannelFuture regFuture = config().group().register(channel); // 这是是注册
        if (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close();
            } else {channel.unsafe().closeForcibly();}
        }

        return regFuture;
    }

ServerBootstrap.init

init 办法,调用的是 ServerBootstrap 中的init(),代码如下。

@Override
void init(Channel channel) {setChannelOptions(channel, newOptionsArray(), logger);
    setAttributes(channel, newAttributesArray());

    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;  //childHandler 就是在服务端配置时增加的 ChannelInitializer
    final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);
    // 此时的 Channel 是 NioServerSocketChannel,这里是为 NioServerSocketChannel 增加处理器链。p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) {final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler(); // 如果在 ServerBootstrap 构建时,通过.handler 增加了处理器,则会把相干处理器增加到 NioServerSocketChannel 中的 pipeline 中。if (handler != null) {pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() { // 异步天剑一个 ServerBootstrapAcceptor 处理器,从名字来看,@Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                        //currentChildHandler,示意 SocketChannel 的 pipeline,当收到客户端连贯时,就会把该 handler 增加到以后 SocketChannel 的 pipeline 中
                        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

其中,对于上述代码的外围局部阐明如下

  • ChannelPipeline 是在 AbstractChannel 中的构造方法中初始化的一个 DefaultChannelPipeline

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();}
  • p.addLast是为 NioServerSocketChannel 增加 handler 处理器链,这里增加了一个 ChannelInitializer 回调函数,该回调是异步触发的,在回调办法中做了两件事

    • 如果 ServerBootstrap.handler 增加了处理器,则会把相干处理器增加到该 pipeline 中,在本次演示的案例中,咱们增加了 LoggerHandler
    • 异步执行增加了 ServerBootstrapAcceptor,从名字来看,它是专门用来接管新的连贯解决的。

咱们在这里思考一个问题,为什么 NioServerSocketChannel 须要通过 ChannelInitializer 回调处理器呢?ServerBootstrapAcceptor 为什么通过异步工作增加到 pipeline 中呢?

起因是,NioServerSocketChannel 在初始化的时候,还没有开始将该 Channel 注册到 Selector 对象上,也就是没方法把 ACCEPT 事件注册到 Selector 上,所以当时增加了 ChannelInitializer 处理器,期待 Channel 注册实现后,再向 Pipeline 中增加 ServerBootstrapAcceptor。

ServerBootstrapAcceptor

依照上面的办法演示一下 SocketChannel 中的 Pipeline 的构建过程

  1. 启动服务端监听
  2. 在 ServerBootstrapAcceptor 的 channelRead 办法中打上断点
  3. 通过 telnet 连贯,此时会触发 debug。
public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;

    child.pipeline().addLast(childHandler);  // 在这里,将 handler 增加到 SocketChannel 的 pipeline 中

    setChannelOptions(child, childOptions, logger);
    setAttributes(child, childAttrs);

    try {
        // 把以后客户端的链接 SocketChannel 注册到某个 EventLoop 中。childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {forceClose(child, t);
    }
}

ServerBootstrapAcceptor 是服务端 NioServerSocketChannel 中的一个非凡处理器,该处理器的 channelRead 事件只会在新连贯产生时触发,所以这里通过 final Channel child = (Channel) msg; 能够间接拿到客户端的链接 SocketChannel。

ServerBootstrapAcceptor 接着通过 childGroup.register()办法,把以后 NioSocketChannel 注册到工作线程中。

事件触发机制的流程

在 ServerBootstrapAcceptor 中,收到客户端连贯时,会调用 childGroup.register(child) 把以后客户端连贯注册到指定 NioEventLoop 的 Selector 中。

这个注册流程和后面解说的 NioServerSocketChannel 注册流程齐全一样,最终都会进入到 AbstractChannel.register0 办法。

AbstractChannel.register0

private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}
        boolean firstRegistration = neverRegistered;
        doRegister();
        neverRegistered = false;
        registered = true;

        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
        // user may already fire events through the pipeline in the ChannelFutureListener.
        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        pipeline.fireChannelRegistered(); // 执行 pipeline 中的 ChannelRegistered()事件。// Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        if (isActive()) {if (firstRegistration) {pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {// This channel was registered before and autoRead() is set. This means we need to begin read
                // again so that we process inbound data.
                //
                // See https://github.com/netty/netty/issues/4805
                beginRead();}
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

pipeline.fireChannelRegistered()

@Override
public final ChannelPipeline fireChannelRegistered() {AbstractChannelHandlerContext.invokeChannelRegistered(head);
    return this;
}

上面的事件触发,分为两个逻辑

  • 如果以后的工作是在 eventLoop 中触发的,则间接调用 invokeChannelRegistered
  • 否则,异步执行 invokeChannelRegistered。
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {next.invokeChannelRegistered();
    } else {executor.execute(new Runnable() {
            @Override
            public void run() {next.invokeChannelRegistered();
            }
        });
    }
}

invokeChannelRegistered

触发下一个 handler 的 channelRegistered 办法。

private void invokeChannelRegistered() {if (invokeHandler()) {
        try {((ChannelInboundHandler) handler()).channelRegistered(this);
        } catch (Throwable t) {invokeExceptionCaught(t);
        }
    } else {fireChannelRegistered();
    }
}

Netty 服务端启动总结

到此为止,整个服务端启动的过程,咱们就曾经剖析实现了,次要的逻辑如下

  • 创立服务端 Channel,实质上是依据用户配置的实现,调用 JDK 原生的 Channel
  • 初始化 Channel 的外围属性,unsafe、pipeline
  • 初始化 Channel 的 Pipeline,次要是增加两个非凡的处理器,ChannelInitializer 和 ServerBootstrapAcceptor
  • 注册服务端的 Channel,增加 OP_ACCEPT 事件,这里底层调用的是 JDK 层面的实现,讲 Channel 注册到 BossEventLoop 中的 Selector 上
  • 绑定端口,调用 JDK 层面的 API,绑定端口。

版权申明:本博客所有文章除特地申明外,均采纳 CC BY-NC-SA 4.0 许可协定。转载请注明来自 Mic 带你学架构
如果本篇文章对您有帮忙,还请帮忙点个关注和赞,您的保持是我一直创作的能源。欢送关注同名微信公众号获取更多技术干货!

正文完
 0