关于java:netty系列之kequeue传输协议详解

6次阅读

共计 7760 个字符,预计需要花费 20 分钟才能阅读完成。

简介

在后面的章节中,咱们介绍了在 netty 中能够应用 kequeue 或者 epoll 来实现更为高效的 native 传输方式。那么 kequeue 和 epoll 和 NIO 传输协定有什么不同呢?

本章将会以 kequeue 为例进行深入探讨。

在下面咱们介绍的 native 的例子中,对于 kqueue 的类有这样几个,别离是 KQueueEventLoopGroup,KQueueServerSocketChannel 和 KQueueSocketChannel, 通过简略的替换和增加对应的依赖包,咱们能够轻松的将一般的 NIO netty 服务替换成为 native 的 Kqueue 服务。

是时候揭开 Kqueue 的机密了。

KQueueEventLoopGroup

eventLoop 和 eventLoopGroup 是用来承受 event 和事件处理的。先来看下 KQueueEventLoopGroup 的定义:

public final class KQueueEventLoopGroup extends MultithreadEventLoopGroup

作为一个 MultithreadEventLoopGroup,必须实现一个 newChild 办法,用来创立 child EventLoop。在 KQueueEventLoopGroup 中,除了构造函数之外,额定须要实现的办法就是 newChild:

    protected EventLoop newChild(Executor executor, Object... args) throws Exception {Integer maxEvents = (Integer) args[0];
        SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];
        RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
        EventLoopTaskQueueFactory taskQueueFactory = null;
        EventLoopTaskQueueFactory tailTaskQueueFactory = null;

        int argsLength = args.length;
        if (argsLength > 3) {taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
        }
        if (argsLength > 4) {tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
        }
        return new KQueueEventLoop(this, executor, maxEvents,
                selectStrategyFactory.newSelectStrategy(),
                rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
    }

newChild 中的所有参数都是从 KQueueEventLoopGroup 的构造函数中传入的。除了 maxEvents,selectStrategyFactory 和 rejectedExecutionHandler 之外,还能够接管 taskQueueFactory 和 tailTaskQueueFactory 两个参数,最初把这些参数都传到 KQueueEventLoop 的构造函数中去,最终返回一个 KQueueEventLoop 对象。

另外在应用 KQueueEventLoopGroup 之前咱们还须要确保 Kqueue 在零碎中是可用的,这个判断是通过调用 KQueue.ensureAvailability(); 来实现的。

KQueue.ensureAvailability 首先判断是否定义了零碎属性 io.netty.transport.noNative,如果定了,阐明 native transport 被禁用了,后续也就没有必要再进行判断了。

如果 io.netty.transport.noNative 没有被定义,那么会调用 Native.newKQueue() 来尝试从 native 中获取一个 kqueue 的 FileDescriptor,如果上述的获取过程中没有任何异样,则阐明 kqueue 在 native 办法中存在,咱们能够持续应用了。

以下是判断 kqueue 是否可用的代码:

    static {
        Throwable cause = null;
        if (SystemPropertyUtil.getBoolean("io.netty.transport.noNative", false)) {
            cause = new UnsupportedOperationException("Native transport was explicit disabled with -Dio.netty.transport.noNative=true");
        } else {
            FileDescriptor kqueueFd = null;
            try {kqueueFd = Native.newKQueue();
            } catch (Throwable t) {cause = t;} finally {if (kqueueFd != null) {
                    try {kqueueFd.close();
                    } catch (Exception ignore) {// ignore}
                }
            }
        }
        UNAVAILABILITY_CAUSE = cause;
    }

KQueueEventLoop

KQueueEventLoop 是从 KQueueEventLoopGroup 中创立进去的,用来执行具体的 IO 工作。

先来看一下 KQueueEventLoop 的定义:

final class KQueueEventLoop extends SingleThreadEventLoop 

不论是 NIO 还是 KQueue 或者是 Epoll,因为应用了更加高级的 IO 技术,所以他们应用的 EventLoop 都是 SingleThreadEventLoop, 也就是说应用单线程就够了。

和 KQueueEventLoopGroup 一样,KQueueEventLoop 也须要判断以后的零碎环境是否反对 kqueue:

    static {KQueue.ensureAvailability();
    }

上一节讲到了,KQueueEventLoopGroup 会调用 KQueueEventLoop 的构造函数来返回一个 eventLoop 对象,咱们先来看下 KQueueEventLoop 的构造函数:

    KQueueEventLoop(EventLoopGroup parent, Executor executor, int maxEvents,
                    SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                    EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
                rejectedExecutionHandler);
        this.selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
        this.kqueueFd = Native.newKQueue();
        if (maxEvents == 0) {
            allowGrowing = true;
            maxEvents = 4096;
        } else {allowGrowing = false;}
        this.changeList = new KQueueEventArray(maxEvents);
        this.eventList = new KQueueEventArray(maxEvents);
        int result = Native.keventAddUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
        if (result < 0) {cleanup();
            throw new IllegalStateException("kevent failed to add user event with errno:" + (-result));
        }
    }

传入的 maxEvents 示意的是这个 KQueueEventLoop 可能承受的最大的 event 个数。如果 maxEvents=0, 则示意 KQueueEventLoop 的 event 容量能够动静扩大,并且最大值是 4096。否则的话,KQueueEventLoop 的 event 容量不能扩大。

maxEvents 是作为数组的大小用来构建 changeList 和 eventList。

KQueueEventLoop 中还定义了一个 map 叫做 channels, 用来保留注册的 channels:

private final IntObjectMap<AbstractKQueueChannel> channels = new IntObjectHashMap<AbstractKQueueChannel>(4096);

来看一下 channel 的 add 和 remote 办法:

    void add(AbstractKQueueChannel ch) {assert inEventLoop();
        AbstractKQueueChannel old = channels.put(ch.fd().intValue(), ch);
        assert old == null || !old.isOpen();}

    void remove(AbstractKQueueChannel ch) throws Exception {assert inEventLoop();
        int fd = ch.fd().intValue();
        AbstractKQueueChannel old = channels.remove(fd);
        if (old != null && old != ch) {channels.put(fd, old);
            assert !ch.isOpen();} else if (ch.isOpen()) {ch.unregisterFilters();
        }
    }

能够看到增加和删除的都是 AbstractKQueueChannel,前面的章节中咱们会具体解说 KQueueChannel,这里咱们只须要晓得 channel map 中的 key 是 kequeue 中特有的 FileDescriptor 的 int 值。

再来看一下 EventLoop 中最重要的 run 办法:

   protected void run() {for (;;) {
            try {int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                switch (strategy) {
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
          
                    case SelectStrategy.SELECT:
                        strategy = kqueueWait(WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);
                        if (wakenUp == 1) {wakeup();
                        }
                    default:
                }

                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {if (strategy > 0) {processReady(strategy);
                        }
                    } finally {runAllTasks();
                    }
                } else {final long ioStartTime = System.nanoTime();

                    try {if (strategy > 0) {processReady(strategy);
                        }
                    } finally {final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }

它的逻辑是先应用 selectStrategy.calculateStrategy 获取以后的 select strategy, 而后依据 strategy 的值来判断是否须要执行 processReady 办法,最初执行 runAllTasks, 从 task queue 中拿到要执行的工作去执行。

selectStrategy.calculateStrategy 用来判断以后的 select 状态,默认状况下有三个状态,别离是:SELECT,CONTINUE,BUSY_WAIT。这三个状态都是正数:

    int SELECT = -1;

    int CONTINUE = -2;

    int BUSY_WAIT = -3;

别离示意以后的 IO 在 slect 的 block 状态,或者跳过以后 IO 的状态,和正在 IO loop pull 的状态。BUSY_WAIT 是一个非阻塞的 IO PULL,kqueue 并不反对,所以会 fallback 到 SELECT。

除了这三个状态之外,calculateStrategy 还会返回一个正值,示意以后要执行的工作的个数。

在 run 办法中,如果 strategy 的后果是 SELECT,那么最终会调用 Native.keventWait 办法返回以后 ready 的 events 个数, 并且将 ready 的 event 放到 KQueueEventArray 的 eventList 中去。

如果 ready 的 event 个数大于零,则会调用 processReady 办法对这些 event 进行状态回调解决。

怎么解决的呢?上面是解决的外围逻辑:

            AbstractKQueueChannel channel = channels.get(fd);

            AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) channel.unsafe();

            if (filter == Native.EVFILT_WRITE) {unsafe.writeReady();
            } else if (filter == Native.EVFILT_READ) {unsafe.readReady(eventList.data(i));
            } else if (filter == Native.EVFILT_SOCK && (eventList.fflags(i) & Native.NOTE_RDHUP) != 0) {unsafe.readEOF();
            }

这里的 fd 是从 eventList 中读取到的:

final int fd = eventList.fd(i);

依据 eventList 的 fd,咱们能够从 channels 中拿到对应的 KQueueChannel, 而后依据 event 的 filter 状态来决定 KQueueChannel 的具体操作,是 writeReady,readReady 或者 readEOF。

最初就是执行 runAllTasks 办法了,runAllTasks 的逻辑很简略,就是从 taskQueue 中读取工作而后执行。

KQueueServerSocketChannel 和 KQueueSocketChannel

KQueueServerSocketChannel 是用在 server 端的 channel:

public final class KQueueServerSocketChannel extends AbstractKQueueServerChannel implements ServerSocketChannel {

KQueueServerSocketChannel 继承自 AbstractKQueueServerChannel, 除了构造函数之外,最重要的一个办法就是 newChildChannel:

    @Override
    protected Channel newChildChannel(int fd, byte[] address, int offset, int len) throws Exception {return new KQueueSocketChannel(this, new BsdSocket(fd), address(address, offset, len));
    }

这个办法用来创立一个新的 child channel。从下面的代码中,咱们能够看到生成的 child channel 是一个 KQueueSocketChannel 的实例。

它的构造函数承受三个参数,别离是 parent channel,BsdSocket 和 InetSocketAddress。

    KQueueSocketChannel(Channel parent, BsdSocket fd, InetSocketAddress remoteAddress) {super(parent, fd, remoteAddress);
        config = new KQueueSocketChannelConfig(this);
    }

这里的 fd 是 socket accept acceptedAddress 的后果:

int acceptFd = socket.accept(acceptedAddress);

上面是 KQueueSocketChannel 的定义:

public final class KQueueSocketChannel extends AbstractKQueueStreamChannel implements SocketChannel {

KQueueSocketChannel 和 KQueueServerSocketChannel 的关系是父子的关系,在 KQueueSocketChannel 中有一个 parent 办法,用来返回 ServerSocketChannel 对象, 这也是后面提到的 newChildChannel 办法中传入 KQueueSocketChannel 构造函数中的 serverChannel:

public ServerSocketChannel parent() {return (ServerSocketChannel) super.parent();}

KQueueSocketChannel 还有一个个性就是反对 tcp fastopen, 它的实质是调用 BsdSocket 的 connectx 办法,在建设连贯的同时传递数据:

int bytesSent = socket.connectx((InetSocketAddress) localAddress, (InetSocketAddress) remoteAddress, iov, true);

总结

以上就是 KqueueEventLoop 和 KqueueSocketChannel 的具体介绍,基本上和 NIO 没有太大的区别,只不过性能依据优良。

更多内容请参考 http://www.flydean.com/53-1-netty-kqueue-transport/

最艰深的解读,最粗浅的干货,最简洁的教程,泛滥你不晓得的小技巧等你来发现!

欢送关注我的公众号:「程序那些事」, 懂技术,更懂你!

正文完
 0