关于java:netty系列之NIO和netty详解

22次阅读

共计 12773 个字符,预计需要花费 32 分钟才能阅读完成。

简介

netty 为什么快呢?这是因为 netty 底层应用了 JAVA 的 NIO 技术,并在其根底上进行了性能的优化,尽管 netty 不是单纯的 JAVA nio,然而 netty 的底层还是基于的是 nio 技术。

nio 是 JDK1.4 中引入的,用于区别于传统的 IO,所以 nio 也能够称之为 new io。

nio 的三大外围是 Selector,channel 和 Buffer,本文咱们将会深刻探索 NIO 和 netty 之间的关系。

NIO 罕用用法

在解说 netty 中的 NIO 实现之前,咱们先来回顾一下 JDK 中 NIO 的 selector,channel 是怎么工作的。对于 NIO 来说 selector 次要用来承受客户端的连贯,所以个别用在 server 端。咱们以一个 NIO 的服务器端和客户端聊天室为例来解说 NIO 在 JDK 中是怎么应用的。

因为是一个简略的聊天室,咱们抉择 Socket 协定为根底的 ServerSocketChannel, 首先就是 open 这个 Server channel:

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress("localhost", 9527));
serverSocketChannel.configureBlocking(false);

而后向 server channel 中注册 selector:

Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

尽管是 NIO,然而对于 Selector 来说,它的 select 办法是阻塞办法,只有找到匹配的 channel 之后才会返回,为了屡次进行 select 操作,咱们须要在一个 while 循环外面进行 selector 的 select 操作:

while (true) {selector.select();
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            Iterator<SelectionKey> iter = selectedKeys.iterator();
            while (iter.hasNext()) {SelectionKey selectionKey = iter.next();
                if (selectionKey.isAcceptable()) {register(selector, serverSocketChannel);
                }
                if (selectionKey.isReadable()) {serverResponse(byteBuffer, selectionKey);
                }
                iter.remove();}
            Thread.sleep(1000);
        }

selector 中会有一些 SelectionKey,SelectionKey 中有一些示意操作状态的 OP Status, 依据这个 OP Status 的不同,selectionKey 能够有四种状态,别离是 isReadable,isWritable,isConnectable 和 isAcceptable。

当 SelectionKey 处于 isAcceptable 状态的时候,示意 ServerSocketChannel 能够承受连贯了,咱们须要调用 register 办法将 serverSocketChannel accept 生成的 socketChannel 注册到 selector 中,以监听它的 OP READ 状态,后续能够从中读取数据:

    private static void register(Selector selector, ServerSocketChannel serverSocketChannel)
            throws IOException {SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_READ);
    }

当 selectionKey 处于 isReadable 状态的时候,示意能够从 socketChannel 中读取数据而后进行解决:

    private static void serverResponse(ByteBuffer byteBuffer, SelectionKey selectionKey)
            throws IOException {SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        socketChannel.read(byteBuffer);
        byteBuffer.flip();
        byte[] bytes= new byte[byteBuffer.limit()];
        byteBuffer.get(bytes);
        log.info(new String(bytes).trim());
        if(new String(bytes).trim().equals(BYE_BYE)){log.info("说再见不如不见!");
            socketChannel.write(ByteBuffer.wrap("再见".getBytes()));
            socketChannel.close();}else {socketChannel.write(ByteBuffer.wrap("你是个坏蛋".getBytes()));
        }
        byteBuffer.clear();}

下面的 serverResponse 办法中,从 selectionKey 中拿到对应的 SocketChannel,而后调用 SocketChannel 的 read 办法,将 channel 中的数据读取到 byteBuffer 中,要想回复音讯到 channel 中,还是应用同一个 socketChannel,而后调用 write 办法回写音讯给 client 端,到这里一个简略的回写客户端音讯的 server 端就实现了。

接下来就是对应的 NIO 客户端,在 NIO 客户端须要应用 SocketChannel, 首先建设和服务器的连贯:

socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9527));

而后就能够应用这个 channel 来发送和承受音讯了:

    public String sendMessage(String msg) throws IOException {byteBuffer = ByteBuffer.wrap(msg.getBytes());
        String response = null;
        socketChannel.write(byteBuffer);
        byteBuffer.clear();
        socketChannel.read(byteBuffer);
        byteBuffer.flip();
        byte[] bytes= new byte[byteBuffer.limit()];
        byteBuffer.get(bytes);
        response =new String(bytes).trim();
        byteBuffer.clear();
        return response;
    }

向 channel 中写入音讯能够应用 write 办法,从 channel 中读取音讯能够应用 read 办法。

这样一个 NIO 的客户端就实现了。

尽管以上是 NIO 的 server 和 client 的根本应用,然而基本上涵盖了 NIO 的所有要点。接下来咱们来具体理解一下 netty 中 NIO 到底是怎么应用的。

NIO 和 EventLoopGroup

以 netty 的 ServerBootstrap 为例,启动的时候须要指定它的 group, 先来看一下 ServerBootstrap 的 group 办法:

public ServerBootstrap group(EventLoopGroup group) {return group(group, group);
    }

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {...}

ServerBootstrap 能够承受一个 EventLoopGroup 或者两个 EventLoopGroup,EventLoopGroup 被用来解决所有的 event 和 IO,对于 ServerBootstrap 来说,能够有两个 EventLoopGroup,对于 Bootstrap 来说只有一个 EventLoopGroup。两个 EventLoopGroup 示意 acceptor group 和 worker group。

EventLoopGroup 只是一个接口,咱们罕用的一个实现就是 NioEventLoopGroup, 如下所示是一个罕用的 netty 服务器端代码:

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new FirstServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            // 绑定端口并开始接管连贯
            ChannelFuture f = b.bind(port).sync();
            // 期待 server socket 敞开
            f.channel().closeFuture().sync();

这里和 NIO 相干的有两个类,别离是 NioEventLoopGroup 和 NioServerSocketChannel,事实上在他们的底层还有两个相似的类别离叫做 NioEventLoop 和 NioSocketChannel,接下来咱们别离解说一些他们的底层实现和逻辑关系。

NioEventLoopGroup

NioEventLoopGroup 和 DefaultEventLoopGroup 一样都是继承自 MultithreadEventLoopGroup:

public class NioEventLoopGroup extends MultithreadEventLoopGroup 

他们的不同之处在于 newChild 办法的不同,newChild 用来构建 Group 中的理论对象,NioEventLoopGroup 来说,newChild 返回的是一个 NioEventLoop 对象,先来看下 NioEventLoopGroup 的 newChild 办法:

    protected EventLoop newChild(Executor executor, Object... args) throws Exception {SelectorProvider selectorProvider = (SelectorProvider) args[0];
        SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];
        RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
        EventLoopTaskQueueFactory taskQueueFactory = null;
        EventLoopTaskQueueFactory tailTaskQueueFactory = null;

        int argsLength = args.length;
        if (argsLength > 3) {taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
        }
        if (argsLength > 4) {tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
        }
        return new NioEventLoop(this, executor, selectorProvider,
                selectStrategyFactory.newSelectStrategy(),
                rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
    }

这个 newChild 办法除了固定的 executor 参数之外,还能够依据 NioEventLoopGroup 的构造函数传入的参数来实现更多的性能。

这里参数中传入了 SelectorProvider、SelectStrategyFactory、RejectedExecutionHandler、taskQueueFactory 和 tailTaskQueueFactory 这几个参数,其中前面的两个 EventLoopTaskQueueFactory 并不是必须的。

最初所有的参数都会传递给 NioEventLoop 的构造函数用来结构出一个新的 NioEventLoop。

在具体解说 NioEventLoop 之前,咱们来研读一下传入的这几个参数类型的理论作用。

SelectorProvider

SelectorProvider 是 JDK 中的类,它提供了一个动态的 provider() 办法能够从 Property 或者 ServiceLoader 中加载对应的 SelectorProvider 类并实例化。

另外还提供了 openDatagramChannel、openPipe、openSelector、openServerSocketChannel 和 openSocketChannel 等实用的 NIO 操作方法。

SelectStrategyFactory

SelectStrategyFactory 是一个接口,外面只定义了一个办法,用来返回 SelectStrategy:

public interface SelectStrategyFactory {SelectStrategy newSelectStrategy();
}

什么是 SelectStrategy 呢?

先看下 SelectStrategy 中定义了哪些 Strategy:

    int SELECT = -1;

    int CONTINUE = -2;

    int BUSY_WAIT = -3;

SelectStrategy 中定义了 3 个 strategy, 别离是 SELECT、CONTINUE 和 BUSY_WAIT。

咱们晓得个别状况下,在 NIO 中 select 操作自身是一个阻塞操作,也就是 block 操作,这个操作对应的 strategy 是 SELECT, 也就是 select block 状态。

如果咱们想跳过这个 block,从新进入下一个 event loop, 那么对应的 strategy 就是 CONTINUE。

BUSY_WAIT 是一个非凡的 strategy,是指 IO 循环轮询新事件而不阻塞, 这个 strategy 只有在 epoll 模式下才反对,NIO 和 Kqueue 模式并不反对这个 strategy。

RejectedExecutionHandler

RejectedExecutionHandler 是 netty 本人的类,和 java.util.concurrent.RejectedExecutionHandler 相似,然而是特地针对 SingleThreadEventExecutor 来说的。这个接口定义了一个 rejected 办法,用来示意因为 SingleThreadEventExecutor 容量限度导致的工作增加失败而被回绝的状况:

void rejected(Runnable task, SingleThreadEventExecutor executor);

EventLoopTaskQueueFactory

EventLoopTaskQueueFactory 是一个接口,用来创立存储提交给 EventLoop 的 taskQueue:

Queue<Runnable> newTaskQueue(int maxCapacity);

这个 Queue 必须是线程平安的,并且继承自 java.util.concurrent.BlockingQueue.

解说完这几个参数,接下来咱们就能够具体查看 NioEventLoop 的具体 NIO 实现了。

NioEventLoop

首先 NioEventLoop 和 DefaultEventLoop 一样,都是继承自 SingleThreadEventLoop:

public final class NioEventLoop extends SingleThreadEventLoop

示意的是应用繁多线程来执行工作的 EventLoop。

首先作为一个 NIO 的实现,必须要有 selector,在 NioEventLoop 中定义了两个 selector,别离是 selector 和 unwrappedSelector:

    private Selector selector;
    private Selector unwrappedSelector;

在 NioEventLoop 的构造函数中,他们是这样定义的:

        final SelectorTuple selectorTuple = openSelector();
        this.selector = selectorTuple.selector;
        this.unwrappedSelector = selectorTuple.unwrappedSelector;

首先调用 openSelector 办法,而后通过返回的 SelectorTuple 来获取对应的 selector 和 unwrappedSelector。

这两个 selector 有什么区别呢?

在 openSelector 办法中,首先通过调用 provider 的 openSelector 办法返回一个 Selector,这个 Selector 就是 unwrappedSelector:

final Selector unwrappedSelector;
unwrappedSelector = provider.openSelector();

而后查看 DISABLE_KEY_SET_OPTIMIZATION 是否设置,如果没有设置那么 unwrappedSelector 和 selector 实际上是同一个 Selector:

DISABLE_KEY_SET_OPTIMIZATION 示意的是是否对 select key set 进行优化:

if (DISABLE_KEY_SET_OPTIMIZATION) {return new SelectorTuple(unwrappedSelector);
   }

        SelectorTuple(Selector unwrappedSelector) {
            this.unwrappedSelector = unwrappedSelector;
            this.selector = unwrappedSelector;
        }

如果 DISABLE_KEY_SET_OPTIMIZATION 被设置为 false,那么意味着咱们须要对 select key set 进行优化,具体是怎么进行优化的呢?

先来看下最初的返回:

return new SelectorTuple(unwrappedSelector,
                                 new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));

最初返回的 SelectorTuple 第二个参数就是 selector,这里的 selector 是一个 SelectedSelectionKeySetSelector 对象。

SelectedSelectionKeySetSelector 继承自 selector,构造函数传入的第一个参数是一个 delegate, 所有的 Selector 中定义的办法都是通过调用
delegate 来实现的,不同的是对于 select 办法来说,会首先调用 selectedKeySet 的 reset 办法, 上面是以 isOpen 和 select 办法为例察看一下代码的实现:

    public boolean isOpen() {return delegate.isOpen();
    }

    public int select(long timeout) throws IOException {selectionKeys.reset();
        return delegate.select(timeout);
    }

selectedKeySet 是一个 SelectedSelectionKeySet 对象,是一个 set 汇合,用来存储 SelectionKey,在 openSelector() 办法中,应用 new 来实例化这个对象:

final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

netty 理论是想用这个 SelectedSelectionKeySet 类来治理 Selector 中的 selectedKeys,所以接下来 netty 用了一个高技巧性的对象替换操作。

首先判断零碎中有没有 sun.nio.ch.SelectorImpl 的实现:

        Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    return Class.forName(
                            "sun.nio.ch.SelectorImpl",
                            false,
                            PlatformDependent.getSystemClassLoader());
                } catch (Throwable cause) {return cause;}
            }
        });

SelectorImpl 中有两个 Set 字段:

    private Set<SelectionKey> publicKeys;
    private Set<SelectionKey> publicSelectedKeys;

这两个字段就是咱们须要替换的对象。如果有 SelectorImpl 的话,首先应用 Unsafe 类,调用 PlatformDependent 中的 objectFieldOffset 办法拿到这两个字段绝对于对象示例的偏移量,而后调用 putObject 将这两个字段替换成为后面初始化的 selectedKeySet 对象:

Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
    // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
    // This allows us to also do this in Java9+ without any extra flags.
    long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
    long publicSelectedKeysFieldOffset =
            PlatformDependent.objectFieldOffset(publicSelectedKeysField);

    if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
        PlatformDependent.putObject(unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
        PlatformDependent.putObject(unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
        return null;
    }

如果零碎设置不反对 Unsafe,那么就用反射再做一次:

 Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
 if (cause != null) {return cause;}
 cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
 if (cause != null) {return cause;}
 selectedKeysField.set(unwrappedSelector, selectedKeySet);
 publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);

在 NioEventLoop 中咱们须要关注的一个十分重要的重写办法就是 run 办法,在 run 办法中实现了如何执行 task 的逻辑。

还记得后面咱们提到的 selectStrategy 吗?run 办法通过调用 selectStrategy.calculateStrategy 返回了 select 的 strategy,而后通过判断
strategy 的值来进行对应的解决。

如果 strategy 是 CONTINUE,这跳过这次循环,进入到下一个 loop 中。

BUSY_WAIT 在 NIO 中是不反对的,如果是 SELECT 状态,那么会在 curDeadlineNanos 之后再次进行 select 操作:

strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
  switch (strategy) {
  case SelectStrategy.CONTINUE:
      continue;
  case SelectStrategy.BUSY_WAIT:
      // fall-through to SELECT since the busy-wait is not supported with NIO
  case SelectStrategy.SELECT:
      long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
      if (curDeadlineNanos == -1L) {curDeadlineNanos = NONE; // nothing on the calendar}
      nextWakeupNanos.set(curDeadlineNanos);
      try {if (!hasTasks()) {strategy = select(curDeadlineNanos);
          }
      } finally {
          // This update is just to help block unnecessary selector wakeups
          // so use of lazySet is ok (no race condition)
          nextWakeupNanos.lazySet(AWAKE);
      }
      // fall through
  default:

如果 strategy > 0, 示意有拿到了 SelectedKeys, 那么须要调用 processSelectedKeys 办法对 SelectedKeys 进行解决:

    private void processSelectedKeys() {if (selectedKeys != null) {processSelectedKeysOptimized();
        } else {processSelectedKeysPlain(selector.selectedKeys());
        }
    }

下面提到了 NioEventLoop 中有两个 selector,还有一个 selectedKeys 属性,这个 selectedKeys 存储的就是 Optimized SelectedKeys,如果这个值不为空,就调用 processSelectedKeysOptimized 办法,否则就调用 processSelectedKeysPlain 办法。

processSelectedKeysOptimized 和 processSelectedKeysPlain 这两个办法差异不大,只是传入的要解决的 selectedKeys 不同。

解决的逻辑是首先拿到 selectedKeys 的 key,而后调用它的 attachment 办法拿到 attach 的对象:

final SelectionKey k = selectedKeys.keys[i];
            selectedKeys.keys[i] = null;

            final Object a = k.attachment();

            if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);
            } else {NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

如果 channel 还没有建设连贯,那么这个对象可能是一个 NioTask, 用来解决 channelReady 和 channelUnregistered 的事件。

如果 channel 曾经建设好连贯了,那么这个对象可能是一个 AbstractNioChannel。

针对两种不同的对象,会去别离调用不同的 processSelectedKey 办法。

对第一种状况,会调用 task 的 channelReady 办法:

task.channelReady(k.channel(), k);

对第二种状况,会依据 SelectionKey 的 readyOps() 的各种状态调用 ch.unsafe() 中的各种办法,去进行 read 或者 close 等操作。

总结

NioEventLoop 尽管也是一个 SingleThreadEventLoop, 然而通过应用 NIO 技术,能够更好的利用现有资源实现更好的效率,这也就是为什么咱们在我的项目中应用 NioEventLoopGroup 而不是 DefaultEventLoopGroup 的起因。

本文已收录于 http://www.flydean.com/05-2-netty-nioeventloop/

最艰深的解读,最粗浅的干货,最简洁的教程,泛滥你不晓得的小技巧等你来发现!

欢送关注我的公众号:「程序那些事」, 懂技术,更懂你!

正文完
 0