乐趣区

关于java:一文详解-Netty-组件

作者:京东物流 张弓言

一、背景

Netty 是一款优良的高性能网络框架,外部通过 NIO 的形式来解决网络申请,在高负载下也能牢靠和高效地解决 I/O 操作

作为较底层的网络通信框架,其被广泛应用在各种中间件的开发中,比方 RPC 框架、MQ、Elasticsearch 等,这些中间件框架的底层网络通信模块大都利用到了 Netty 弱小的网络形象

上面这篇文章将次要对 Netty 中的各个组件进行剖析,并在介绍完了各个组件之后,通过 JSF 这个 RPC 框架为例来剖析 Netty 的应用,心愿让大家对 Netty 能有一个清晰的理解

二、Netty Server

通过 Netty 来构建一个繁难服务端是比较简单的,代码如下:

public class NettyServer {public static final Logger LOGGER = LoggerFactory.getLogger(NettyServer.class);

    public static void main(String[] args) {ServerBootstrap serverBootstrap = new ServerBootstrap();
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        ChannelFuture channelFuture = serverBootstrap
                .group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .handler(new ChannelHandlerAdapter() {
                    @Override
                    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {LOGGER.info("Handler Added");
                    }
                })
                .childHandler(new ServerChannelInitializer())
                .bind(8100);

        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {if (future.isSuccess()) {LOGGER.info("Netty Server Start !");
                }
            }
        });

        try {channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {e.printStackTrace();
        } finally {workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();}
    }
}

下面代码的次要逻辑如下:

  1. 新建服务端疏导启动类 ServerBootstrap,外部封装了各个组件,用来进行服务端的启动
  2. 新建了两个 EventLoopGroup 用来进行连贯解决,此时能够简略的将 EventLoopGroup 了解为多个线程的汇合。bossGroup 中的线程用来解决新连贯的建设,当新连贯建设后,workerGroup 中的每个线程则都会和惟一的客户端 Channel 连贯进行绑定,用来解决该 Channel 上的读、写事件
  3. 指定服务端创立的 Channel 类型为 NioServerSocketChannel
  4. childOption 用来配置客户端连贯的 NioSocketChannel 底层网络参数
  5. handler 用来指定针对服务端 Channel 的处理器,外部定义了一系列的回调办法,会在服务端 Channel 产生指定事件时进行回调
  6. childHandler 用来指定客户端 Channel 的处理器,当客户端 Channel 中产生指定事件时,会进行回调
  7. bind 指定服务端监听端口号

三、Netty Client

public class HelloClient {public static void main(String[] args) throws InterruptedException {NioEventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            // 1. 启动类
            ChannelFuture channelFuture = new Bootstrap()
                    // 2. 增加 EventLoop
                    .group(workGroup)
                    // 3. 抉择客户端 channel 实现
                    .channel(NioSocketChannel.class)
                    // 4. 增加处理器
                    .handler(new ChannelInitializer<NioSocketChannel>() {
                        @Override // 在连贯建设后被调用
                        protected void initChannel(NioSocketChannel ch) throws Exception {ZAS                       ch.pipeline().addLast(new LoggingHandler());
                            ch.pipeline().addLast(new StringEncoder());
                        }
                    })
                    // 5. 连贯到服务器
                    .connect(new InetSocketAddress("localhost", 8100));
            channelFuture.addListener(future -> {if (future.isSuccess()) {((ChannelFuture) future).channel().writeAndFlush("hello");
                }
            });
            channelFuture.channel().closeFuture().sync();} finally {workGroup.shutdownGracefully();
        }
    }
}

下面代码的次要逻辑如下:

  1. 新建 Bootstrap 用来进行客户端启动
  2. group() 指定一个 NioEventLoopGroup 实例,用来解决客户端连贯的建设和后续事件处理
  3. handler() 指定 Channel 处理器,
  4. 当将客户端启动类中的各个属性都设置结束后,调用 connect() 办法进行服务端连贯

从下面的的两个例子能够看出,如果想通过 Netty 实现一个繁难的服务器其实是非常简单的,只须要在启动疏导类中设置好对应属性,而后实现端口绑定就能够实现。但也正是因为这种繁难的实现形式,导致很多人在学习 Netty 的过程中,发现代码是写的进去,然而对外部的组件有什么作用以及为什么这么写可能就不是很分明了,因而心愿通过这一系列文章来加深大家对 Netty 的了解

四、Netty 根本组件

Channel

Netty 中的 Channel 能够看成网络编程中的 Socket,其提供了一系列 IO 操作的 API,比方 read、write、bind、connect 等,大大降低了间接应用 Socket 类的复杂性

整体类继承关系如下:

从下面的继承关系能够看出,NioSocketChannel 和 NioServerSocketChannel 别离对应客户端和服务端的 Channel,两者的间接父类不统一,因而对外提供的性能也是不雷同的。比方当产生 read 事件时,NioServerSocketChannel 的次要逻辑就是建设新的连贯,而 NioSocketChannel 则是读取传输的字节进行业务解决

上面就以 NioServerSocketChannel 为例,带大家理解下该类的初始化过程,整体流程如下:

  1. 启动疏导类中通过 channel() 指定底层创立的 Channel 类型
  2. 依据指定的 Channel 类型创立出 ChannelFactory,后续通过该工厂类进行 Channel 的实例化
  3. 实例化 Channel

channel() 指定 ChannelFactory 类型

在下面的服务端启动过程中,ServerBootstrap 调用 channel() 办法并传入 NioServerSocketChannel,其底层代码逻辑为:

public B channel(Class<? extends C> channelClass) {
        return channelFactory(new ReflectiveChannelFactory<C>(ObjectUtil.checkNotNull(channelClass, "channelClass")
        ));
}

// ReflectiveChannelFactory 构造方法
public ReflectiveChannelFactory(Class<? extends T> clazz) {ObjectUtil.checkNotNull(clazz, "clazz");
  try {this.constructor = clazz.getConstructor();
  } catch (NoSuchMethodException e) {throw new IllegalArgumentException("Class" + StringUtil.simpleClassName(clazz) +
                                       "does not have a public non-arg constructor", e);
  }
}

整体逻辑很简略,通过传入的 Class 对象指定一个 Channel 反射工厂,后续调用工厂办法获取指定类型的 Channel 对象

channel 实例化

当服务端启动疏导类 ServerBootstrap 调用 bind() 办法之后,外部会走到 Channel 的实例化过程,代码精简如下:

// channel 初始化流程,外部通过 channelFactory 结构
final ChannelFuture initAndRegister() {channel = channelFactory.newChannel();
}

// channelFactory 的 newChannel 办法逻辑
public T newChannel() {
  try {return constructor.newInstance();
  } catch (Throwable t) {throw new ChannelException("Unable to create Channel from class" + constructor.getDeclaringClass(), t);
  }
}

ChannelFactory 的整体逻辑就是通过反射的形式新建 Channel 对象,而 Channel 对象的类型则是在启动疏导类中通过 channel() 办法进行指定的

在实例化 Channel 的过程中,会对其外部的一些属性进行初始化,而对这些属性的理解,能够使咱们对 Netty 中各个组件的作用范畴有一个更加清晰的了解,上面看下 NioServerSocketChannel 的构造函数源码

public NioServerSocketChannel() {this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

public NioServerSocketChannel(ServerSocketChannel channel) {super(null, channel, SelectionKey.OP_ACCEPT);
  config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent, ch, readInterestOp);
}

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);
  this.ch = ch;
  this.readInterestOp = readInterestOp;
  try {ch.configureBlocking(false);
  } catch (IOException e) {
    try {ch.close();
    } catch (IOException e2) {if (logger.isWarnEnabled()) {
        logger.warn("Failed to close a partially initialized socket.", e2);
      }
    }

    throw new ChannelException("Failed to enter non-blocking mode.", e);
  }
}

protected AbstractChannel(Channel parent) {
  this.parent = parent;
  id = newId();
  unsafe = newUnsafe();
  pipeline = newChannelPipeline();}

上述源码就是一层一层的父类结构,能够对照后面的类关系图进行浏览

NioServerSocketChannel 实例化过程中次要实现了以下外部属性的初始化:

  1. unsafe 属性进行赋值为 NioMessageUnsafe,后续 Channel 上事件处理的次要逻辑都是由该类实现
  2. pipeline 属性进行初始化赋值,pipeline 是 Channel 中特地重要的一个属性,后续的所有业务处理器都是通过该 pipeline 组织的
  3. 指定以后 Channel 的 readInterestOp 属性为 SelectionKey.OP_ACCEPT,用于后续绑定到 Selector 时指定以后 Channel 监听的事件类型
  4. 指定以后 Channel 非阻塞,ch.configureBlocking(false)

总结

对于 Channel 的实例化流程能够总结如下:

  1. 启动疏导类中通过 channel() 办法指定生成的 ChannelFactory 类型
  2. 通过 ChannelFactory 来结构对应 Channel,并在实例化的过程中初始化了一些重要属性,比方 pipeline

ChannelPipeline

ChannelPipeline 也是 Netty 中的一个比拟重要的组件,从下面的 Channel 实例化过程能够看出,每一个 Channel 实例中都会蕴含一个对应的 ChannelPipeline 属性

ChannelPipeline 初始化

ChannelPipeline 底层初始化源码:

protected DefaultChannelPipeline(Channel channel) {this.channel = ObjectUtil.checkNotNull(channel, "channel");
  succeededFuture = new SucceededChannelFuture(channel, null);
  voidPromise =  new VoidChannelPromise(channel, true);

  tail = new TailContext(this);
  head = new HeadContext(this);

  head.next = tail;
  tail.prev = head;
}

从 ChannelPipeline 的构造函数能够看出,每一个 ChannelPipeline 底层都是一个双向链表构造,默认会蕴含 head 和 tail 头尾节点,用来进行一些默认的逻辑解决,解决细节会在后续文章中展示

addLast()
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
  final AbstractChannelHandlerContext newCtx;
  synchronized (this) {checkMultiplicity(handler);

    newCtx = newContext(group, filterName(name, handler), handler);

    addLast0(newCtx);

    // If the registered is false it means that the channel was not registered on an eventLoop yet.
    // In this case we add the context to the pipeline and add a task that will call
    // ChannelHandler.handlerAdded(...) once the channel is registered.
    if (!registered) {newCtx.setAddPending();
      callHandlerCallbackLater(newCtx, true);
      return this;
    }

    EventExecutor executor = newCtx.executor();
    if (!executor.inEventLoop()) {callHandlerAddedInEventLoop(newCtx, executor);
      return this;
    }
  }
  // 回调 ChannelHandler 中的 handlerAdded() 办法
  callHandlerAdded0(newCtx);
  return this;
}

private void addLast0(AbstractChannelHandlerContext newCtx) {
  AbstractChannelHandlerContext prev = tail.prev;
  newCtx.prev = prev;
  newCtx.next = tail;
  prev.next = newCtx;
  tail.prev = newCtx;
}

addLast() 办法是向 ChannelPipeline 中增加 ChannelHandler 用来进行业务解决

整个办法的逻辑为:

  • 判断以后 ChannelHandler 是否曾经增加
  • 将以后 ChannelHandler 包装成 ChannelHandlerContext,并将其增加到 ChannelPipeline 的双向链表中
  • 回调增加的 ChannelHandler 中的 handlerAdded() 办法
Channel、ChannelPipeline、ChannelHandler 关系

Channel、ChannelPipeline 和 ChannelHandler 三者的关系如图所示:

  • 每一个 Channel 中都会蕴含一个 ChannelPipeline 属性
  • ChannelPipeline 是一个双向链表构造,默认会蕴含 HeadContext 和 TailContext 两个节点
  • 当向 ChannelPipeline 中增加 ChannelHandler 时,会包装成 ChannelContext 插入到 ChannelPipeline 链表中
  • 当 Channel 中产生指定事件时,该事件就会在 ChannelPipeline 中沿着双向链表进行流传,调用各个 ChannelHandler 中的指定办法,实现相应的业务解决

Netty 正是通过 ChannelPipeline 这一构造为用户提供了自定义业务逻辑的扩大点,用户只须要向 ChannelPipeline 中增加解决对应业务逻辑的 ChannelHandler,之后当指定事件产生时,该 ChannelHandler 中的对应办法就会进行回调,实现业务的解决

ChannelHandler

ChannelHandler 是 Netty 中业务解决的外围类,当有 IO 事件产生时,该事件会在 ChannelPipeline 中进行流传,并顺次调用到 ChannelHandler 中的指定办法

ChannelHandler 的类继承关系如下:

从下面的类继承关系能够看出,ChannelHandler 大抵能够分为 ChannelInboundHandler 和 ChannelOutboundHandler,别离用来解决读、写事件

ChannInboundHandler

public interface ChannelInboundHandler extends ChannelHandler {void channelRegistered(ChannelHandlerContext ctx) throws Exception;

    void channelUnregistered(ChannelHandlerContext ctx) throws Exception;

    void channelActive(ChannelHandlerContext ctx) throws Exception;

    void channelInactive(ChannelHandlerContext ctx) throws Exception;

    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;

    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;

    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;

    void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;

    @Override
    @SuppressWarnings("deprecation")
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}

在 ChannelInboundHandler 中定义了一系列的回调办法,用户能够实现该接口并重写相应的办法来自定义的业务逻辑。

重写办法逻辑是简略的,但很多人其实不分明的是这些回调办法到底在什么场景下会被调用,如何调用,只有理解了这些回调办法的调用机会,能力在更适宜的中央实现相应性能

channelRegistered

channelRegistered() 从办法名了解是当 Channel 实现注册之后会被调用,那么何为 Channel 注册?

上面就以 Netty 服务端启动过程中的局部源码为例 (具体源码剖析会在后续文章中),看下 channelRegistered() 的调用机会

在 Netty 服务端启动时,会调用到 io.netty.channel.AbstractChannel.AbstractUnsafe#register 办法,精简代码如下:

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
  
  AbstractChannel.this.eventLoop = eventLoop;
  if (eventLoop.inEventLoop()) {register0(promise);
  } else {
    try {eventLoop.execute(new Runnable() {
        @Override
        public void run() {register0(promise);
        }
      });
    } catch (Throwable t) {
      logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",
        AbstractChannel.this, t);
      closeForcibly();
      closeFuture.setClosed();
      safeSetFailure(promise, t);
    }
  }
}


private void register0(ChannelPromise promise) {
  try {
    // neverRegistered 初始值为 true
    boolean firstRegistration = neverRegistered;
    // 将 Channel 绑定到对应 eventLoop 中的 Selector 上
    doRegister();
    neverRegistered = false;
    registered = true;

    pipeline.invokeHandlerAddedIfNeeded();

    safeSetSuccess(promise);
    // 调用 ChannelHandler 中的 ChannelRegistered() 
    pipeline.fireChannelRegistered();}
}

protected void doRegister() throws Exception {
  boolean selected = false;
  for (;;) {
    try {selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
      return;
    } catch (CancelledKeyException e) {if (!selected) {eventLoop().selectNow();
        selected = true;
      } else {throw e;}
    }
  }
}

从 Netty 底层的 register() 办法能够看出,ChannelHandler 中的 ChannelRegistered() 调用机会是在调用 pipeline.fireChannelRegistered() 时触发的,此时曾经实现的逻辑为:

  • 通过传入的 EventLoopGroup 失去了该 Channel 对应的 EventLoop,并与 Channel 中的对应属性实现了绑定;AbstractChannel.this.eventLoop = eventLoop 逻辑
  • 以后 Channel 曾经绑定到了对应 EventLoop 中的 Selector 上;doRegister() 逻辑
  • ChannelHandler 中的 handlerAdded() 办法曾经实现了回调;pipeline.invokeHandlerAddedIfNeeded() 逻辑

因而当 Channel 和对应的 Selector 实现了绑定,Channel 中 pipeline 上绑定的 ChannelHandler 的 channelRegisted() 办法就会进行回调

channelActive

下面曾经剖析了 channelRegistered() 办法的调用机会,也就是当 Channel 绑定到了对应 Selector 上之后就会进行回调,上面开始剖析 channelActive() 办法的调用机会

对于服务端 Channel,后面还只是将 Channel 注册到了 Selector 上,还没有调用到 bind() 办法实现真正的底层端口绑定,那么有没有可能当服务端 Channel 实现端口监听之后,就会调用到 channelActive() 办法呢?

上面持续剖析,在下面实现了 Channel 和 Selector 的注册之后,Netty 服务端启动过程中会持续调用到 io.netty.channel.AbstractChannel.AbstractUnsafe#bind 逻辑:

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {assertEventLoop();

  if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}

  boolean wasActive = isActive();
  try {doBind(localAddress);
  } catch (Throwable t) {safeSetFailure(promise, t);
    closeIfClosed();
    return;
  }

  if (!wasActive && isActive()) {invokeLater(new Runnable() {
      @Override
      public void run() {pipeline.fireChannelActive();
      }
    });
  }

  safeSetSuccess(promise);
}


protected void doBind(SocketAddress localAddress) throws Exception {if (PlatformDependent.javaVersion() >= 7) {javaChannel().bind(localAddress, config.getBacklog());
  } else {javaChannel().socket().bind(localAddress, config.getBacklog());
  }
}

在该办法中实现了以下逻辑:

  • 实现了 Channel 和本地端口的绑定
  • 绑定胜利后,isActive() 办法返回 true,此时公布 ChannelActive 事件,进行办法回调
  • safeSetSuccess() 中会回调到服务端启动过程中增加的 listener 办法,表明以后 Channel 实现了端口绑定

总结:

当 Channel 调用了 bind() 办法实现端口绑定之后,channelActive() 办法会进行回调

channelRead

该办法的调用机会,服务端和客户端是不统一的

服务端 channelRead

服务端 Channel 绑定到 Selector 上时监听的是 Accept 事件,当客户端有新连贯接入时,会回调 channelRead() 办法,实现新连贯的接入

Netty 在服务端启动过程中,会默认增加一个 ChannelHandler io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor 来解决新连贯的接入

客户端 channelRead

当服务端解决完 Accept 事件后,会生成一个和客户端通信的 Channel,该 Channel 也会注册到对应的 Selector 上,并监听 read 事件

当客户端向该 Channel 中发送数据时就会触发 read 事件,调用到 channelRead() 办法 (Netty 外部的源码解决会在后续的文章中进行剖析)

exceptionCaught

以后 ChannelHandler 中各回调办法处理过程中如果产生了异样就会回调该办法

退出移动版