没看过的倡议先看上一篇,原本打算讲讲linux内核,也看了一些书籍,可是c放了太久了,看代码切实头疼,就先放弃了,写写业务也没必要卷这么深吧。就讲到调用底层api为止我感觉刚刚好。不太善于将源码联合讲故事,所以整片略显干燥,将就看下吧~~

demo

public class ServerConnect{    public static void main(String[] args)    {        selector();    }    public static void handleAccept(SelectionKey key) throws IOException{        ServerSocketChannel ssChannel = (ServerSocketChannel)key.channel();        SocketChannel sc = ssChannel.accept();        sc.configureBlocking(false);        sc.register(key.selector(), SelectionKey.OP_READ,ByteBuffer.allocateDirect(1024));    }    public static void handleRead(SelectionKey key) throws IOException{        SocketChannel sc = (SocketChannel)key.channel();        ByteBuffer buf = (ByteBuffer)key.attachment();        long bytesRead = sc.read(buf);        while(bytesRead>0){            buf.flip();            while(buf.hasRemaining()){                System.out.print((char)buf.get());            }            buf.clear();            bytesRead = sc.read(buf);        }        if(bytesRead == -1){            sc.close();        }    }    public static void handleWrite(SelectionKey key) throws IOException{        ByteBuffer buf = (ByteBuffer)key.attachment();        buf.flip();        SocketChannel sc = (SocketChannel) key.channel();        while(buf.hasRemaining()){            sc.write(buf);        }        buf.compact();    }    public static void selector() {        Selector selector = null;        ServerSocketChannel ssc = null;        try{            selector = Selector.open();            ssc= ServerSocketChannel.open();            ssc.socket().bind(new InetSocketAddress(8080));            ssc.configureBlocking(false);            ssc.register(selector, SelectionKey.OP_ACCEPT);            while(true){                if(selector.select(3000) == 0){                    continue;                }                Iterator<SelectionKey> iter = selector.selectedKeys().iterator();                while(iter.hasNext()){                    SelectionKey key = iter.next();                    if(key.isAcceptable()){                        handleAccept(key);                    }                    if(key.isReadable()){                        handleRead(key);                    }                    if(key.isWritable() && key.isValid()){                        handleWrite(key);                    }                    if(key.isConnectable()){                        System.out.println("isConnectable = true");                    }                    iter.remove();                }            }        }catch(IOException e){            e.printStackTrace();        }finally{            try{                if(selector!=null){                    selector.close();                }                if(ssc!=null){                    ssc.close();                }            }catch(IOException e){                e.printStackTrace();            }        }    }}

源码剖析

Selector.open()

此处会有SelectorProvider是个模板办法,不必零碎的实现不同,这就很蛋疼,没有EPollSelectorProvider,莫的方法,只能去github找了一份源码迁徙到gitee,jdk8感兴趣的能够自行下载,想看其余版本的同学就要自行寻找了。

EPollSelectorImpl(SelectorProvider sp) throws IOException {    super(sp);    long pipeFds = IOUtil.makePipe(false);    fd0 = (int) (pipeFds >>> 32);    fd1 = (int) pipeFds;    pollWrapper = new EPollArrayWrapper();    pollWrapper.initInterrupt(fd0, fd1);    fdToKey = new HashMap<>();}

IOUtil.makePipe(false)

IOUtil.makePipe(false);是一个JNI办法对应的门路为/jdk8u_jdk/src/solaris/native/sun/nio/ch/IOUtil.c

  • 底层会调用int pipe(int pipefd[2]);
  • 函数pipe()会建设管道,并将文件形容词由参数pipefd数组返回。

    • pipefd[0]为管道里的读取端
    • pipefd[1]则为管道的写入端
JNIEXPORT jlong JNICALLJava_sun_nio_ch_IOUtil_makePipe(JNIEnv *env, jobject this, jboolean blocking){    int fd[2];        //调用pipe函数    if (pipe(fd) < 0) {        JNU_ThrowIOExceptionWithLastError(env, "Pipe failed");        return 0;    }    if (blocking == JNI_FALSE) {        if ((configureBlocking(fd[0], JNI_FALSE) < 0)            || (configureBlocking(fd[1], JNI_FALSE) < 0)) {            JNU_ThrowIOExceptionWithLastError(env, "Configure blocking failed");            close(fd[0]);            close(fd[1]);            return 0;        }    }    //2个地址合并成long返回    return ((jlong) fd[0] << 32) | (jlong) fd[1];}

new EPollArrayWrapper()

这个类呢,类正文写的很分明了,这里剪短的概述下,这个类就是操作epoll相干的构造体的,所以啦,外面也基本上都是JNI办法

EPollArrayWrapper() throws IOException {    // creates the epoll file descriptor    epfd = epollCreate();    // the epoll_event array passed to epoll_wait    int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;    pollArray = new AllocatedNativeObject(allocationSize, true);    pollArrayAddress = pollArray.address();    // eventHigh needed when using file descriptors > 64k    if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)        eventsHigh = new HashMap<>();}
JNIEXPORT jint JNICALLJava_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this){    /*     * epoll_create expects a size as a hint to the kernel about how to     * dimension internal structures. We can't predict the size in advance.     */    //创立epoll构造体    int epfd = epoll_create(256);    if (epfd < 0) {       JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");    }    return epfd;}

new AllocatedNativeObject(allocationSize, true);

上面pollArray = new AllocatedNativeObject(allocationSize, true);

pollArray正文为:来自 epoll_wait 的后果的 epoll_event 数组,然而看源码吧,也算不上数组,就开拓了一个内存块。

protected NativeObject(int var1, boolean var2) {    //略    if (!var2) {        this.allocationAddress = unsafe.allocateMemory((long)var1);        this.address = this.allocationAddress;    } else {          //获取内存页数        int var3 = pageSize();          //开拓内存空间,返回地址        long var4 = unsafe.allocateMemory((long)(var1 + var3));        this.allocationAddress = var4;        this.address = var4 + (long)var3 - (var4 & (long)(var3 - 1));    }}

pollWrapper.initInterrupt(fd0, fd1)

fd0:后面pipe申请的读取端的文件描述符

上一篇咱们介绍了epoll_ctl()函数,
这里咱们重温下。epollCtl (epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
epfd增加fd0fd0可读,over~~

void initInterrupt(int fd0, int fd1) {    outgoingInterruptFD = fd1;    incomingInterruptFD = fd0;    epollCtl (epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);}
JNIEXPORT void JNICALLJava_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd,                                           jint opcode, jint fd, jint events){    struct epoll_event event;    int res;    event.events = events;    event.data.fd = fd;    RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);    /*     * A channel may be registered with several Selectors. When each Selector     * is polled a EPOLL_CTL_DEL op will be inserted into its pending update     * list to remove the file descriptor from epoll. The "last" Selector will     * close the file descriptor which automatically unregisters it from each     * epoll descriptor. To avoid costly synchronization between Selectors we     * allow pending updates to be processed, ignoring errors. The errors are     * harmless as the last update for the file descriptor is guaranteed to     * be EPOLL_CTL_DEL.     */    if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {        JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");    }}

ServerSocketChannel.open()

上面就来到了channel,外部调用为sun.nio.ch.ServerSocketChannelImpl#ServerSocketChannelImpl(java.nio.channels.spi.SelectorProvider)

ServerSocketChannelImpl(SelectorProvider var1) throws IOException {    super(var1);    this.fd = Net.serverSocket(true);    this.fdVal = IOUtil.fdVal(this.fd);    this.state = 0;}

Net.serverSocket(true)

这里看个办法名就大略晓得这是创立socket套接字的中央,各种协定规定之类的就跳过了,感兴趣的能够钻研下。

static FileDescriptor serverSocket(boolean var0) {    return IOUtil.newFD(socket0(isIPv6Available(), var0, true, fastLoopback));}
JNIEXPORT int JNICALLJava_sun_nio_ch_Net_socket0(JNIEnv *env, jclass cl, jboolean preferIPv6,                            jboolean stream, jboolean reuse, jboolean ignored){    int fd;    int type = (stream ? SOCK_STREAM : SOCK_DGRAM);#ifdef AF_INET6    int domain = (ipv6_available() && preferIPv6) ? AF_INET6 : AF_INET;#else    int domain = AF_INET;#endif    //创立套接字    fd = socket(domain, type, 0);    if (fd < 0) {        return handleSocketError(env, errno);    }#ifdef AF_INET6    /* Disable IPV6_V6ONLY to ensure dual-socket support */    if (domain == AF_INET6) {        int arg = 0;        if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, (char*)&arg,                       sizeof(int)) < 0) {            JNU_ThrowByNameWithLastError(env,                                         JNU_JAVANETPKG "SocketException",                                         "Unable to set IPV6_V6ONLY");            close(fd);            return -1;        }    }#endif    if (reuse) {        int arg = 1;        if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&arg,                       sizeof(arg)) < 0) {            JNU_ThrowByNameWithLastError(env,                                         JNU_JAVANETPKG "SocketException",                                         "Unable to set SO_REUSEADDR");            close(fd);            return -1;        }    }#if defined(__linux__)    if (type == SOCK_DGRAM) {        int arg = 0;        int level = (domain == AF_INET6) ? IPPROTO_IPV6 : IPPROTO_IP;        if ((setsockopt(fd, level, IP_MULTICAST_ALL, (char*)&arg, sizeof(arg)) < 0) &&            (errno != ENOPROTOOPT)) {            JNU_ThrowByNameWithLastError(env,                                         JNU_JAVANETPKG "SocketException",                                         "Unable to set IP_MULTICAST_ALL");            close(fd);            return -1;        }    }#endif#if defined(__linux__) && defined(AF_INET6)    /* By default, Linux uses the route default */    if (domain == AF_INET6 && type == SOCK_DGRAM) {        int arg = 1;        if (setsockopt(fd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, &arg,                       sizeof(arg)) < 0) {            JNU_ThrowByNameWithLastError(env,                                         JNU_JAVANETPKG "SocketException",                                         "Unable to set IPV6_MULTICAST_HOPS");            close(fd);            return -1;        }    }#endif    return fd;}

这里newFD,就是创立一个对应的java对象,setfdVal()就是把文件描述符地址,放入到java对象中

public static FileDescriptor newFD(int var0) {    FileDescriptor var1 = new FileDescriptor();    setfdVal(var1, var0);    return var1;}
JNIEXPORT void JNICALLJava_sun_nio_ch_IOUtil_setfdVal(JNIEnv *env, jclass clazz, jobject fdo, jint val){    (*env)->SetIntField(env, fdo, fd_fdID, val);}

ssc.socket().bind()

无聊的省略了,最初会到sun.nio.ch.ServerSocketChannelImpl#bind

public ServerSocketChannel bind(SocketAddress var1, int var2) throws IOException {    synchronized(this.lock) {        if (!this.isOpen()) {            throw new ClosedChannelException();        } else if (this.isBound()) {            throw new AlreadyBoundException();        } else {            //端口            InetSocketAddress var4 = var1 == null ? new InetSocketAddress(0) : Net.checkAddress(var1);            SecurityManager var5 = System.getSecurityManager();            if (var5 != null) {                var5.checkListen(var4.getPort());            }            NetHooks.beforeTcpBind(this.fd, var4.getAddress(), var4.getPort());            //最终会调用bind和listen            Net.bind(this.fd, var4.getAddress(), var4.getPort());            Net.listen(this.fd, var2 < 1 ? 50 : var2);            synchronized(this.stateLock) {                this.localAddress = Net.localAddress(this.fd);            }            return this;        }    }}

bind()

能够看到,channel外面封装的fd(Socket文件描述符),后续也是把这个绑定端口。

public static void bind(FileDescriptor var0, InetAddress var1, int var2) throws IOException {    bind(UNSPEC, var0, var1, var2);}static void bind(ProtocolFamily var0, FileDescriptor var1, InetAddress var2, int var3) throws IOException {    boolean var4 = isIPv6Available() && var0 != StandardProtocolFamily.INET;    bind0(var1, var4, exclusiveBind, var2, var3);}
JNIEXPORT void JNICALLJava_sun_nio_ch_Net_bind0(JNIEnv *env, jclass clazz, jobject fdo, jboolean preferIPv6,                          jboolean useExclBind, jobject iao, int port){    SOCKADDR sa;    int sa_len = SOCKADDR_LEN;    int rv = 0;    if (NET_InetAddressToSockaddr(env, iao, port, (struct sockaddr *)&sa, &sa_len, preferIPv6) != 0) {      return;    }    //    rv = NET_Bind(fdval(env, fdo), (struct sockaddr *)&sa, sa_len);    if (rv != 0) {        handleSocketError(env, errno);    }}intNET_Bind(int fd, struct sockaddr *him, int len){    //略    rv = bind(fd, him, len);    //略    return rv;}

listen()

listen()就很直白了,就相当于间接调用本地办法。

static native void listen(FileDescriptor var0, int var1) throws IOException;
JNIEXPORT void JNICALLJava_sun_nio_ch_Net_listen(JNIEnv *env, jclass cl, jobject fdo, jint backlog){    if (listen(fdval(env, fdo), backlog) < 0)        handleSocketError(env, errno);}

ssc.configureBlocking(false)

将文件描述符设置为NIO,简略形容下略过啦

configureBlocking(int fd, jboolean blocking){    int flags = fcntl(fd, F_GETFL);    int newflags = blocking ? (flags & ~O_NONBLOCK) : (flags | O_NONBLOCK);    return (flags == newflags) ? 0 : fcntl(fd, F_SETFL, newflags);}

ssc.register(selector, SelectionKey.OP_ACCEPT);

在此之前先梳理下,

  • selector:就相当于epoll构造体,增加了pipe()创立的读取端文件描述符
  • channel:封装了创立的socket,记录了套接字文件描述符,并且绑定了本地端口,并且开始监听申请
public final SelectionKey register(Selector sel, int ops,                                   Object att)    throws ClosedChannelException{    synchronized (regLock) {        //略        if (k == null) {            // New registration            synchronized (keyLock) {                if (!isOpen())                    throw new ClosedChannelException();                    //att :null 这里先疏忽                k = ((AbstractSelector)sel).register(this, ops, att);                addKey(k);            }        }        return k;    }}
protected final SelectionKey register(AbstractSelectableChannel var1, int var2, Object var3) {    if (!(var1 instanceof SelChImpl)) {        throw new IllegalSelectorException();    } else {        SelectionKeyImpl var4 = new SelectionKeyImpl((SelChImpl)var1, this);        var4.attach(var3);        synchronized(this.publicKeys) {            //对应不同零碎的实现            this.implRegister(var4);        }        var4.interestOps(var2);        return var4;    }}

implRegister()

SelectionKey外部封装了channel,后面曾经很显著的看到,channel创立的Socket(套接字)和epoll构造体之间并没有显示关联,在这里,就是进行关联。上面是Epoll的实现

fdToKey是selector初始化的时候创立的类型为Map<Integer,SelectionKeyImpl>,就是将文件描述符和SelectionKey关联起来,最终会增加到EpollArrayWrapper::eventsLow,此时events=0

protected void implRegister(SelectionKeyImpl ski) {    if (closed)        throw new ClosedSelectorException();    SelChImpl ch = ski.channel;    int fd = Integer.valueOf(ch.getFDVal());    fdToKey.put(fd, ski);    pollWrapper.add(fd);    keys.add(ski);}//pollWrapper.add(fd);最终会调用private void setUpdateEvents(int fd, byte events, boolean force) {    if (fd < MAX_UPDATE_ARRAY_SIZE) {        if ((eventsLow[fd] != KILLED) || force) {                    eventsLow[fd] = events;        }    } else {        Integer key = Integer.valueOf(fd);        if (!isEventsHighKilled(key) || force) {            eventsHigh.put(key, Byte.valueOf(events));        }    }}

在随后的var4.interestOps(var2);eventsLow数组中event从0改成POLLIN,此时还没有调用epollCtl()增加进epoll构造体

selector.select(3000)

间接从sun.nio.ch.EPollSelectorImpl#doSelect开始

protected int doSelect(long timeout) throws IOException {    if (closed)        throw new ClosedSelectorException();    processDeregisterQueue();    try {        begin();        pollWrapper.poll(timeout);    } finally {        end();    }    processDeregisterQueue();    int numKeysUpdated = updateSelectedKeys();    if (pollWrapper.interrupted()) {        // Clear the wakeup pipe        pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);        synchronized (interruptLock) {            pollWrapper.clearInterrupted();            IOUtil.drain(fd0);            interruptTriggered = false;        }    }    return numKeysUpdated;}

poll()

这里就能够很容易的看进去在每次poll()的时候会先把未注册的Socket套接字,通过调用epollCtl()增加进epoll构造体中

int poll(long timeout) throws IOException {    updateRegistrations();    updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);    //这一段我猜想是中断操作,因为就算设置了true,sun.nio.ch.EPollSelectorImpl#doSelect中也会批改成false。如有dalao欢送通知我    for (int i=0; i<updated; i++) {        if (getDescriptor(i) == incomingInterruptFD) {            interruptedIndex = i;            interrupted = true;            break;        }    }    return updated;}private void updateRegistrations() {    synchronized (updateLock) {        int j = 0;        while (j < updateCount) {            int fd = updateDescriptors[j];            short events = getUpdateEvents(fd);            boolean isRegistered = registered.get(fd);            int opcode = 0;            if (events != KILLED) {                if (isRegistered) {                    opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;                } else {                    opcode = (events != 0) ? EPOLL_CTL_ADD : 0;                }                if (opcode != 0) {                    //这里增加进构造体                    epollCtl(epfd, opcode, fd, events);                    if (opcode == EPOLL_CTL_ADD) {                        registered.set(fd);                    } else if (opcode == EPOLL_CTL_DEL) {                        registered.clear(fd);                    }                }            }            j++;        }        updateCount = 0;    }}

epollWait也就是对应的调用底层办法了,pollArrayAddress后面开拓的内存块,这里也就晓得干什么用了,也就是对应着epoll_event构造体指针

JNIEXPORT jint JNICALLJava_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this,                                            jlong address, jint numfds,                                            jlong timeout, jint epfd){    struct epoll_event *events = jlong_to_ptr(address);    int res;    if (timeout <= 0) {           /* Indefinite or no wait */        RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);    } else {                      /* Bounded wait; bounded restarts */        res = iepoll(epfd, events, numfds, timeout);    }    if (res < 0) {        JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");    }    return res;}static intiepoll(int epfd, struct epoll_event *events, int numfds, jlong timeout){    jlong start, now;    int remaining = timeout;    struct timeval t;    int diff;    gettimeofday(&t, NULL);    start = t.tv_sec * 1000 + t.tv_usec / 1000;    for (;;) {        int res = epoll_wait(epfd, events, numfds, remaining);        if (res < 0 && errno == EINTR) {            if (remaining >= 0) {                gettimeofday(&t, NULL);                now = t.tv_sec * 1000 + t.tv_usec / 1000;                diff = now - start;                remaining -= diff;                if (diff < 0 || remaining <= 0) {                    return 0;                }                start = now;            }        } else {            return res;        }    }}

//这一段我猜想是中断操作,因为就算设置了true,sun.nio.ch.EPollSelectorImpl#doSelect中也会批改成false,临时存疑。如有dalao欢送通知我

//sun.nio.ch.EPollArrayWrapper#poll    for (int i=0; i<updated; i++) {    if (getDescriptor(i) == incomingInterruptFD) {        interruptedIndex = i;        interrupted = true;        break;    }}    //sun.nio.ch.EPollSelectorImpl#doSelectif (pollWrapper.interrupted()) {    // Clear the wakeup pipe    pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);    synchronized (interruptLock) {        pollWrapper.clearInterrupted();        IOUtil.drain(fd0);        interruptTriggered = false;    }}

updateSelectedKeys()

private int updateSelectedKeys() {    int entries = pollWrapper.updated;    int numKeysUpdated = 0;    for (int i=0; i<entries; i++) {        //从pollArrayAddress找到epoll_event构造体        int nextFD = pollWrapper.getDescriptor(i);        //找到对应的SelectionKey        SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));        // ski is null in the case of an interrupt        if (ski != null) {            int rOps = pollWrapper.getEventOps(i);            //第一次没有            if (selectedKeys.contains(ski)) {                if (ski.channel.translateAndSetReadyOps(rOps, ski)) {                    numKeysUpdated++;                }            } else {                          ski.channel.translateAndSetReadyOps(rOps, ski);                if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {                    //增加进selectedKeys汇合                    selectedKeys.add(ski);                    numKeysUpdated++;                }            }        }    }    return numKeysUpdated;}

其余

到这里,整体脉络就很分明了剩下的这些也没必要剖析了,根本和上一篇外面的对应了,各位读者姥爷这么聪慧,就不节约大家工夫了

if(key.isReadable()){    handleRead(key);}if(key.isWritable() && key.isValid()){    handleWrite(key);}if(key.isConnectable()){    System.out.println("isConnectable = true");}//上面是c// 如果是新的连贯,须要把新的socket增加到efd中            if (ep[i].data.fd == listenfd )            {                connfd = accept(listenfd,(struct sockaddr*)&cliaddr,&clilen);                tep.events = EPOLLIN;                tep.data.fd = connfd;                ret = epoll_ctl(efd,EPOLL_CTL_ADD,connfd,&tep);            }            // 否则,读取数据            else            {                connfd = ep[i].data.fd;                int bytes = read(connfd,buf,MAXLEN);                // 客户端敞开连贯                if (bytes == 0){                    ret =epoll_ctl(efd,EPOLL_CTL_DEL,connfd,NULL);                    close(connfd);                    printf("client[%d] closed\n", i);                }                else                {                    for (int j = 0; j < bytes; ++j)                    {                        buf[j] = toupper(buf[j]);                    }                    // 向客户端发送数据                    write(connfd,buf,bytes);                }            }

总结

  • Channel:对socket的封装
  • Selector:对Epoll&epoll_event2个构造体的封装
  • SelectionKey:关联下面2个,有了文件描述符疾速找到对应Socket

参考文章

epoll:https://zh.wikipedia.org/wiki...

linux文档:https://man7.org/linux/man-pa...