简介
在后面的章节中,咱们介绍了在netty中能够应用kequeue或者epoll来实现更为高效的native传输方式。那么kequeue和epoll和NIO传输协定有什么不同呢?
本章将会以kequeue为例进行深入探讨。
在下面咱们介绍的native的例子中,对于kqueue的类有这样几个,别离是KQueueEventLoopGroup,KQueueServerSocketChannel和KQueueSocketChannel,通过简略的替换和增加对应的依赖包,咱们能够轻松的将一般的NIO netty服务替换成为native的Kqueue服务。
是时候揭开Kqueue的机密了。
KQueueEventLoopGroup
eventLoop和eventLoopGroup是用来承受event和事件处理的。先来看下KQueueEventLoopGroup的定义:
public final class KQueueEventLoopGroup extends MultithreadEventLoopGroup
作为一个MultithreadEventLoopGroup,必须实现一个newChild办法,用来创立child EventLoop。在KQueueEventLoopGroup中,除了构造函数之外,额定须要实现的办法就是newChild:
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
Integer maxEvents = (Integer) args[0];
SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];
RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
EventLoopTaskQueueFactory taskQueueFactory = null;
EventLoopTaskQueueFactory tailTaskQueueFactory = null;
int argsLength = args.length;
if (argsLength > 3) {
taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
}
if (argsLength > 4) {
tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
}
return new KQueueEventLoop(this, executor, maxEvents,
selectStrategyFactory.newSelectStrategy(),
rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
}
newChild中的所有参数都是从KQueueEventLoopGroup的构造函数中传入的。除了maxEvents,selectStrategyFactory和rejectedExecutionHandler之外,还能够接管taskQueueFactory和tailTaskQueueFactory两个参数,最初把这些参数都传到KQueueEventLoop的构造函数中去,最终返回一个KQueueEventLoop对象。
另外在应用KQueueEventLoopGroup之前咱们还须要确保Kqueue在零碎中是可用的,这个判断是通过调用KQueue.ensureAvailability();
来实现的。
KQueue.ensureAvailability首先判断是否定义了零碎属性io.netty.transport.noNative,如果定了,阐明native transport被禁用了,后续也就没有必要再进行判断了。
如果io.netty.transport.noNative没有被定义,那么会调用Native.newKQueue()
来尝试从native中获取一个kqueue的FileDescriptor,如果上述的获取过程中没有任何异样,则阐明kqueue在native办法中存在,咱们能够持续应用了。
以下是判断kqueue是否可用的代码:
static {
Throwable cause = null;
if (SystemPropertyUtil.getBoolean("io.netty.transport.noNative", false)) {
cause = new UnsupportedOperationException(
"Native transport was explicit disabled with -Dio.netty.transport.noNative=true");
} else {
FileDescriptor kqueueFd = null;
try {
kqueueFd = Native.newKQueue();
} catch (Throwable t) {
cause = t;
} finally {
if (kqueueFd != null) {
try {
kqueueFd.close();
} catch (Exception ignore) {
// ignore
}
}
}
}
UNAVAILABILITY_CAUSE = cause;
}
KQueueEventLoop
KQueueEventLoop是从KQueueEventLoopGroup中创立进去的,用来执行具体的IO工作。
先来看一下KQueueEventLoop的定义:
final class KQueueEventLoop extends SingleThreadEventLoop
不论是NIO还是KQueue或者是Epoll,因为应用了更加高级的IO技术,所以他们应用的EventLoop都是SingleThreadEventLoop,也就是说应用单线程就够了。
和KQueueEventLoopGroup一样,KQueueEventLoop也须要判断以后的零碎环境是否反对kqueue:
static {
KQueue.ensureAvailability();
}
上一节讲到了,KQueueEventLoopGroup会调用KQueueEventLoop的构造函数来返回一个eventLoop对象, 咱们先来看下KQueueEventLoop的构造函数:
KQueueEventLoop(EventLoopGroup parent, Executor executor, int maxEvents,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
rejectedExecutionHandler);
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
this.kqueueFd = Native.newKQueue();
if (maxEvents == 0) {
allowGrowing = true;
maxEvents = 4096;
} else {
allowGrowing = false;
}
this.changeList = new KQueueEventArray(maxEvents);
this.eventList = new KQueueEventArray(maxEvents);
int result = Native.keventAddUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
if (result < 0) {
cleanup();
throw new IllegalStateException("kevent failed to add user event with errno: " + (-result));
}
}
传入的maxEvents示意的是这个KQueueEventLoop可能承受的最大的event个数。如果maxEvents=0,则示意KQueueEventLoop的event容量能够动静扩大,并且最大值是4096。否则的话,KQueueEventLoop的event容量不能扩大。
maxEvents是作为数组的大小用来构建changeList和eventList。
KQueueEventLoop中还定义了一个map叫做channels,用来保留注册的channels:
private final IntObjectMap<AbstractKQueueChannel> channels = new IntObjectHashMap<AbstractKQueueChannel>(4096);
来看一下channel的add和remote办法:
void add(AbstractKQueueChannel ch) {
assert inEventLoop();
AbstractKQueueChannel old = channels.put(ch.fd().intValue(), ch);
assert old == null || !old.isOpen();
}
void remove(AbstractKQueueChannel ch) throws Exception {
assert inEventLoop();
int fd = ch.fd().intValue();
AbstractKQueueChannel old = channels.remove(fd);
if (old != null && old != ch) {
channels.put(fd, old);
assert !ch.isOpen();
} else if (ch.isOpen()) {
ch.unregisterFilters();
}
}
能够看到增加和删除的都是AbstractKQueueChannel,前面的章节中咱们会具体解说KQueueChannel,这里咱们只须要晓得channel map中的key是kequeue中特有的FileDescriptor的int值。
再来看一下EventLoop中最重要的run办法:
protected void run() {
for (;;) {
try {
int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT:
strategy = kqueueWait(WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);
if (wakenUp == 1) {
wakeup();
}
default:
}
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
if (strategy > 0) {
processReady(strategy);
}
} finally {
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
if (strategy > 0) {
processReady(strategy);
}
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
它的逻辑是先应用selectStrategy.calculateStrategy获取以后的select strategy,而后依据strategy的值来判断是否须要执行processReady办法,最初执行runAllTasks,从task queue中拿到要执行的工作去执行。
selectStrategy.calculateStrategy用来判断以后的select状态,默认状况下有三个状态,别离是:SELECT,CONTINUE,BUSY_WAIT。 这三个状态都是正数:
int SELECT = -1;
int CONTINUE = -2;
int BUSY_WAIT = -3;
别离示意以后的IO在slect的block状态,或者跳过以后IO的状态,和正在IO loop pull的状态。BUSY_WAIT是一个非阻塞的IO PULL,kqueue并不反对,所以会fallback到SELECT。
除了这三个状态之外,calculateStrategy还会返回一个正值,示意以后要执行的工作的个数。
在run办法中,如果strategy的后果是SELECT,那么最终会调用Native.keventWait办法返回以后ready的events个数,并且将ready的event放到KQueueEventArray的eventList中去。
如果ready的event个数大于零,则会调用processReady办法对这些event进行状态回调解决。
怎么解决的呢?上面是解决的外围逻辑:
AbstractKQueueChannel channel = channels.get(fd);
AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) channel.unsafe();
if (filter == Native.EVFILT_WRITE) {
unsafe.writeReady();
} else if (filter == Native.EVFILT_READ) {
unsafe.readReady(eventList.data(i));
} else if (filter == Native.EVFILT_SOCK && (eventList.fflags(i) & Native.NOTE_RDHUP) != 0) {
unsafe.readEOF();
}
这里的fd是从eventList中读取到的:
final int fd = eventList.fd(i);
依据eventList的fd,咱们能够从channels中拿到对应的KQueueChannel,而后依据event的filter状态来决定KQueueChannel的具体操作,是writeReady,readReady或者readEOF。
最初就是执行runAllTasks办法了,runAllTasks的逻辑很简略,就是从taskQueue中读取工作而后执行。
KQueueServerSocketChannel和KQueueSocketChannel
KQueueServerSocketChannel是用在server端的channel:
public final class KQueueServerSocketChannel extends AbstractKQueueServerChannel implements ServerSocketChannel {
KQueueServerSocketChannel继承自AbstractKQueueServerChannel,除了构造函数之外,最重要的一个办法就是newChildChannel:
@Override
protected Channel newChildChannel(int fd, byte[] address, int offset, int len) throws Exception {
return new KQueueSocketChannel(this, new BsdSocket(fd), address(address, offset, len));
}
这个办法用来创立一个新的child channel。从下面的代码中,咱们能够看到生成的child channel是一个KQueueSocketChannel的实例。
它的构造函数承受三个参数,别离是parent channel,BsdSocket和InetSocketAddress。
KQueueSocketChannel(Channel parent, BsdSocket fd, InetSocketAddress remoteAddress) {
super(parent, fd, remoteAddress);
config = new KQueueSocketChannelConfig(this);
}
这里的fd是socket accept acceptedAddress的后果:
int acceptFd = socket.accept(acceptedAddress);
上面是KQueueSocketChannel的定义:
public final class KQueueSocketChannel extends AbstractKQueueStreamChannel implements SocketChannel {
KQueueSocketChannel和KQueueServerSocketChannel的关系是父子的关系,在KQueueSocketChannel中有一个parent办法,用来返回ServerSocketChannel对象,这也是后面提到的newChildChannel办法中传入KQueueSocketChannel构造函数中的serverChannel:
public ServerSocketChannel parent() {
return (ServerSocketChannel) super.parent();
}
KQueueSocketChannel还有一个个性就是反对tcp fastopen,它的实质是调用BsdSocket的connectx办法,在建设连贯的同时传递数据:
int bytesSent = socket.connectx(
(InetSocketAddress) localAddress, (InetSocketAddress) remoteAddress, iov, true);
总结
以上就是KqueueEventLoop和KqueueSocketChannel的具体介绍,基本上和NIO没有太大的区别,只不过性能依据优良。
更多内容请参考 http://www.flydean.com/53-1-netty-kqueue-transport/
最艰深的解读,最粗浅的干货,最简洁的教程,泛滥你不晓得的小技巧等你来发现!
欢送关注我的公众号:「程序那些事」,懂技术,更懂你!
发表回复