没看过的倡议先看上一篇,原本打算讲讲linux内核,也看了一些书籍,可是c放了太久了,看代码切实头疼,就先放弃了,写写业务也没必要卷这么深吧。就讲到调用底层api为止我感觉刚刚好。不太善于将源码联合讲故事,所以整片略显干燥,将就看下吧~~
demo
public class ServerConnect{ public static void main(String[] args) { selector(); } public static void handleAccept(SelectionKey key) throws IOException{ ServerSocketChannel ssChannel = (ServerSocketChannel)key.channel(); SocketChannel sc = ssChannel.accept(); sc.configureBlocking(false); sc.register(key.selector(), SelectionKey.OP_READ,ByteBuffer.allocateDirect(1024)); } public static void handleRead(SelectionKey key) throws IOException{ SocketChannel sc = (SocketChannel)key.channel(); ByteBuffer buf = (ByteBuffer)key.attachment(); long bytesRead = sc.read(buf); while(bytesRead>0){ buf.flip(); while(buf.hasRemaining()){ System.out.print((char)buf.get()); } buf.clear(); bytesRead = sc.read(buf); } if(bytesRead == -1){ sc.close(); } } public static void handleWrite(SelectionKey key) throws IOException{ ByteBuffer buf = (ByteBuffer)key.attachment(); buf.flip(); SocketChannel sc = (SocketChannel) key.channel(); while(buf.hasRemaining()){ sc.write(buf); } buf.compact(); } public static void selector() { Selector selector = null; ServerSocketChannel ssc = null; try{ selector = Selector.open(); ssc= ServerSocketChannel.open(); ssc.socket().bind(new InetSocketAddress(8080)); ssc.configureBlocking(false); ssc.register(selector, SelectionKey.OP_ACCEPT); while(true){ if(selector.select(3000) == 0){ continue; } Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while(iter.hasNext()){ SelectionKey key = iter.next(); if(key.isAcceptable()){ handleAccept(key); } if(key.isReadable()){ handleRead(key); } if(key.isWritable() && key.isValid()){ handleWrite(key); } if(key.isConnectable()){ System.out.println("isConnectable = true"); } iter.remove(); } } }catch(IOException e){ e.printStackTrace(); }finally{ try{ if(selector!=null){ selector.close(); } if(ssc!=null){ ssc.close(); } }catch(IOException e){ e.printStackTrace(); } } }}
源码剖析
Selector.open()
此处会有SelectorProvider
是个模板办法,不必零碎的实现不同,这就很蛋疼,没有EPollSelectorProvider
,莫的方法,只能去github找了一份源码迁徙到gitee,jdk8感兴趣的能够自行下载,想看其余版本的同学就要自行寻找了。
EPollSelectorImpl(SelectorProvider sp) throws IOException { super(sp); long pipeFds = IOUtil.makePipe(false); fd0 = (int) (pipeFds >>> 32); fd1 = (int) pipeFds; pollWrapper = new EPollArrayWrapper(); pollWrapper.initInterrupt(fd0, fd1); fdToKey = new HashMap<>();}
IOUtil.makePipe(false)
IOUtil.makePipe(false);
是一个JNI办法对应的门路为/jdk8u_jdk/src/solaris/native/sun/nio/ch/IOUtil.c
- 底层会调用
int pipe(int pipefd[2]);
函数pipe()会建设管道,并将文件形容词由参数pipefd数组返回。
- pipefd[0]为管道里的读取端
- pipefd[1]则为管道的写入端
JNIEXPORT jlong JNICALLJava_sun_nio_ch_IOUtil_makePipe(JNIEnv *env, jobject this, jboolean blocking){ int fd[2]; //调用pipe函数 if (pipe(fd) < 0) { JNU_ThrowIOExceptionWithLastError(env, "Pipe failed"); return 0; } if (blocking == JNI_FALSE) { if ((configureBlocking(fd[0], JNI_FALSE) < 0) || (configureBlocking(fd[1], JNI_FALSE) < 0)) { JNU_ThrowIOExceptionWithLastError(env, "Configure blocking failed"); close(fd[0]); close(fd[1]); return 0; } } //2个地址合并成long返回 return ((jlong) fd[0] << 32) | (jlong) fd[1];}
new EPollArrayWrapper()
这个类呢,类正文写的很分明了,这里剪短的概述下,这个类就是操作epoll相干的构造体的,所以啦,外面也基本上都是JNI办法
EPollArrayWrapper() throws IOException { // creates the epoll file descriptor epfd = epollCreate(); // the epoll_event array passed to epoll_wait int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT; pollArray = new AllocatedNativeObject(allocationSize, true); pollArrayAddress = pollArray.address(); // eventHigh needed when using file descriptors > 64k if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE) eventsHigh = new HashMap<>();}
JNIEXPORT jint JNICALLJava_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this){ /* * epoll_create expects a size as a hint to the kernel about how to * dimension internal structures. We can't predict the size in advance. */ //创立epoll构造体 int epfd = epoll_create(256); if (epfd < 0) { JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed"); } return epfd;}
new AllocatedNativeObject(allocationSize, true);
上面pollArray = new AllocatedNativeObject(allocationSize, true);
pollArray正文为:来自 epoll_wait 的后果的 epoll_event 数组,然而看源码吧,也算不上数组,就开拓了一个内存块。
protected NativeObject(int var1, boolean var2) { //略 if (!var2) { this.allocationAddress = unsafe.allocateMemory((long)var1); this.address = this.allocationAddress; } else { //获取内存页数 int var3 = pageSize(); //开拓内存空间,返回地址 long var4 = unsafe.allocateMemory((long)(var1 + var3)); this.allocationAddress = var4; this.address = var4 + (long)var3 - (var4 & (long)(var3 - 1)); }}
pollWrapper.initInterrupt(fd0, fd1)
fd0:后面pipe申请的读取端的文件描述符
上一篇咱们介绍了epoll_ctl()
函数,
这里咱们重温下。epollCtl (epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
向epfd
增加fd0
且fd0
可读,over~~
void initInterrupt(int fd0, int fd1) { outgoingInterruptFD = fd1; incomingInterruptFD = fd0; epollCtl (epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);}
JNIEXPORT void JNICALLJava_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd, jint opcode, jint fd, jint events){ struct epoll_event event; int res; event.events = events; event.data.fd = fd; RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res); /* * A channel may be registered with several Selectors. When each Selector * is polled a EPOLL_CTL_DEL op will be inserted into its pending update * list to remove the file descriptor from epoll. The "last" Selector will * close the file descriptor which automatically unregisters it from each * epoll descriptor. To avoid costly synchronization between Selectors we * allow pending updates to be processed, ignoring errors. The errors are * harmless as the last update for the file descriptor is guaranteed to * be EPOLL_CTL_DEL. */ if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) { JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed"); }}
ServerSocketChannel.open()
上面就来到了channel,外部调用为sun.nio.ch.ServerSocketChannelImpl#ServerSocketChannelImpl(java.nio.channels.spi.SelectorProvider)
ServerSocketChannelImpl(SelectorProvider var1) throws IOException { super(var1); this.fd = Net.serverSocket(true); this.fdVal = IOUtil.fdVal(this.fd); this.state = 0;}
Net.serverSocket(true)
这里看个办法名就大略晓得这是创立socket套接字的中央,各种协定规定之类的就跳过了,感兴趣的能够钻研下。
static FileDescriptor serverSocket(boolean var0) { return IOUtil.newFD(socket0(isIPv6Available(), var0, true, fastLoopback));}
JNIEXPORT int JNICALLJava_sun_nio_ch_Net_socket0(JNIEnv *env, jclass cl, jboolean preferIPv6, jboolean stream, jboolean reuse, jboolean ignored){ int fd; int type = (stream ? SOCK_STREAM : SOCK_DGRAM);#ifdef AF_INET6 int domain = (ipv6_available() && preferIPv6) ? AF_INET6 : AF_INET;#else int domain = AF_INET;#endif //创立套接字 fd = socket(domain, type, 0); if (fd < 0) { return handleSocketError(env, errno); }#ifdef AF_INET6 /* Disable IPV6_V6ONLY to ensure dual-socket support */ if (domain == AF_INET6) { int arg = 0; if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, (char*)&arg, sizeof(int)) < 0) { JNU_ThrowByNameWithLastError(env, JNU_JAVANETPKG "SocketException", "Unable to set IPV6_V6ONLY"); close(fd); return -1; } }#endif if (reuse) { int arg = 1; if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&arg, sizeof(arg)) < 0) { JNU_ThrowByNameWithLastError(env, JNU_JAVANETPKG "SocketException", "Unable to set SO_REUSEADDR"); close(fd); return -1; } }#if defined(__linux__) if (type == SOCK_DGRAM) { int arg = 0; int level = (domain == AF_INET6) ? IPPROTO_IPV6 : IPPROTO_IP; if ((setsockopt(fd, level, IP_MULTICAST_ALL, (char*)&arg, sizeof(arg)) < 0) && (errno != ENOPROTOOPT)) { JNU_ThrowByNameWithLastError(env, JNU_JAVANETPKG "SocketException", "Unable to set IP_MULTICAST_ALL"); close(fd); return -1; } }#endif#if defined(__linux__) && defined(AF_INET6) /* By default, Linux uses the route default */ if (domain == AF_INET6 && type == SOCK_DGRAM) { int arg = 1; if (setsockopt(fd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, &arg, sizeof(arg)) < 0) { JNU_ThrowByNameWithLastError(env, JNU_JAVANETPKG "SocketException", "Unable to set IPV6_MULTICAST_HOPS"); close(fd); return -1; } }#endif return fd;}
这里newFD,就是创立一个对应的java对象,setfdVal()
就是把文件描述符地址,放入到java对象中
public static FileDescriptor newFD(int var0) { FileDescriptor var1 = new FileDescriptor(); setfdVal(var1, var0); return var1;}
JNIEXPORT void JNICALLJava_sun_nio_ch_IOUtil_setfdVal(JNIEnv *env, jclass clazz, jobject fdo, jint val){ (*env)->SetIntField(env, fdo, fd_fdID, val);}
ssc.socket().bind()
无聊的省略了,最初会到sun.nio.ch.ServerSocketChannelImpl#bind
public ServerSocketChannel bind(SocketAddress var1, int var2) throws IOException { synchronized(this.lock) { if (!this.isOpen()) { throw new ClosedChannelException(); } else if (this.isBound()) { throw new AlreadyBoundException(); } else { //端口 InetSocketAddress var4 = var1 == null ? new InetSocketAddress(0) : Net.checkAddress(var1); SecurityManager var5 = System.getSecurityManager(); if (var5 != null) { var5.checkListen(var4.getPort()); } NetHooks.beforeTcpBind(this.fd, var4.getAddress(), var4.getPort()); //最终会调用bind和listen Net.bind(this.fd, var4.getAddress(), var4.getPort()); Net.listen(this.fd, var2 < 1 ? 50 : var2); synchronized(this.stateLock) { this.localAddress = Net.localAddress(this.fd); } return this; } }}
bind()
能够看到,channel外面封装的fd(Socket文件描述符),后续也是把这个绑定端口。
public static void bind(FileDescriptor var0, InetAddress var1, int var2) throws IOException { bind(UNSPEC, var0, var1, var2);}static void bind(ProtocolFamily var0, FileDescriptor var1, InetAddress var2, int var3) throws IOException { boolean var4 = isIPv6Available() && var0 != StandardProtocolFamily.INET; bind0(var1, var4, exclusiveBind, var2, var3);}
JNIEXPORT void JNICALLJava_sun_nio_ch_Net_bind0(JNIEnv *env, jclass clazz, jobject fdo, jboolean preferIPv6, jboolean useExclBind, jobject iao, int port){ SOCKADDR sa; int sa_len = SOCKADDR_LEN; int rv = 0; if (NET_InetAddressToSockaddr(env, iao, port, (struct sockaddr *)&sa, &sa_len, preferIPv6) != 0) { return; } // rv = NET_Bind(fdval(env, fdo), (struct sockaddr *)&sa, sa_len); if (rv != 0) { handleSocketError(env, errno); }}intNET_Bind(int fd, struct sockaddr *him, int len){ //略 rv = bind(fd, him, len); //略 return rv;}
listen()
listen()
就很直白了,就相当于间接调用本地办法。
static native void listen(FileDescriptor var0, int var1) throws IOException;
JNIEXPORT void JNICALLJava_sun_nio_ch_Net_listen(JNIEnv *env, jclass cl, jobject fdo, jint backlog){ if (listen(fdval(env, fdo), backlog) < 0) handleSocketError(env, errno);}
ssc.configureBlocking(false)
将文件描述符设置为NIO,简略形容下略过啦
configureBlocking(int fd, jboolean blocking){ int flags = fcntl(fd, F_GETFL); int newflags = blocking ? (flags & ~O_NONBLOCK) : (flags | O_NONBLOCK); return (flags == newflags) ? 0 : fcntl(fd, F_SETFL, newflags);}
ssc.register(selector, SelectionKey.OP_ACCEPT);
在此之前先梳理下,
selector
:就相当于epoll构造体,增加了pipe()
创立的读取端文件描述符channel
:封装了创立的socket
,记录了套接字文件描述符,并且绑定了本地端口,并且开始监听申请
public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException{ synchronized (regLock) { //略 if (k == null) { // New registration synchronized (keyLock) { if (!isOpen()) throw new ClosedChannelException(); //att :null 这里先疏忽 k = ((AbstractSelector)sel).register(this, ops, att); addKey(k); } } return k; }}
protected final SelectionKey register(AbstractSelectableChannel var1, int var2, Object var3) { if (!(var1 instanceof SelChImpl)) { throw new IllegalSelectorException(); } else { SelectionKeyImpl var4 = new SelectionKeyImpl((SelChImpl)var1, this); var4.attach(var3); synchronized(this.publicKeys) { //对应不同零碎的实现 this.implRegister(var4); } var4.interestOps(var2); return var4; }}
implRegister()
SelectionKey
外部封装了channel
,后面曾经很显著的看到,channel创立的Socket(套接字)和epoll构造体之间并没有显示关联,在这里,就是进行关联。上面是Epoll的实现
fdToKey
是selector初始化的时候创立的类型为Map<Integer,SelectionKeyImpl>
,就是将文件描述符和SelectionKey
关联起来,最终会增加到EpollArrayWrapper::eventsLow
,此时events=0
protected void implRegister(SelectionKeyImpl ski) { if (closed) throw new ClosedSelectorException(); SelChImpl ch = ski.channel; int fd = Integer.valueOf(ch.getFDVal()); fdToKey.put(fd, ski); pollWrapper.add(fd); keys.add(ski);}//pollWrapper.add(fd);最终会调用private void setUpdateEvents(int fd, byte events, boolean force) { if (fd < MAX_UPDATE_ARRAY_SIZE) { if ((eventsLow[fd] != KILLED) || force) { eventsLow[fd] = events; } } else { Integer key = Integer.valueOf(fd); if (!isEventsHighKilled(key) || force) { eventsHigh.put(key, Byte.valueOf(events)); } }}
在随后的var4.interestOps(var2);
吧eventsLow
数组中event从0改成POLLIN
,此时还没有调用epollCtl()
增加进epoll
构造体
selector.select(3000)
间接从sun.nio.ch.EPollSelectorImpl#doSelect
开始
protected int doSelect(long timeout) throws IOException { if (closed) throw new ClosedSelectorException(); processDeregisterQueue(); try { begin(); pollWrapper.poll(timeout); } finally { end(); } processDeregisterQueue(); int numKeysUpdated = updateSelectedKeys(); if (pollWrapper.interrupted()) { // Clear the wakeup pipe pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0); synchronized (interruptLock) { pollWrapper.clearInterrupted(); IOUtil.drain(fd0); interruptTriggered = false; } } return numKeysUpdated;}
poll()
这里就能够很容易的看进去在每次poll()
的时候会先把未注册的Socket
套接字,通过调用epollCtl()
增加进epoll
构造体中
int poll(long timeout) throws IOException { updateRegistrations(); updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd); //这一段我猜想是中断操作,因为就算设置了true,sun.nio.ch.EPollSelectorImpl#doSelect中也会批改成false。如有dalao欢送通知我 for (int i=0; i<updated; i++) { if (getDescriptor(i) == incomingInterruptFD) { interruptedIndex = i; interrupted = true; break; } } return updated;}private void updateRegistrations() { synchronized (updateLock) { int j = 0; while (j < updateCount) { int fd = updateDescriptors[j]; short events = getUpdateEvents(fd); boolean isRegistered = registered.get(fd); int opcode = 0; if (events != KILLED) { if (isRegistered) { opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL; } else { opcode = (events != 0) ? EPOLL_CTL_ADD : 0; } if (opcode != 0) { //这里增加进构造体 epollCtl(epfd, opcode, fd, events); if (opcode == EPOLL_CTL_ADD) { registered.set(fd); } else if (opcode == EPOLL_CTL_DEL) { registered.clear(fd); } } } j++; } updateCount = 0; }}
epollWait
也就是对应的调用底层办法了,pollArrayAddress
后面开拓的内存块,这里也就晓得干什么用了,也就是对应着epoll_event构造体指针
JNIEXPORT jint JNICALLJava_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this, jlong address, jint numfds, jlong timeout, jint epfd){ struct epoll_event *events = jlong_to_ptr(address); int res; if (timeout <= 0) { /* Indefinite or no wait */ RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res); } else { /* Bounded wait; bounded restarts */ res = iepoll(epfd, events, numfds, timeout); } if (res < 0) { JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed"); } return res;}static intiepoll(int epfd, struct epoll_event *events, int numfds, jlong timeout){ jlong start, now; int remaining = timeout; struct timeval t; int diff; gettimeofday(&t, NULL); start = t.tv_sec * 1000 + t.tv_usec / 1000; for (;;) { int res = epoll_wait(epfd, events, numfds, remaining); if (res < 0 && errno == EINTR) { if (remaining >= 0) { gettimeofday(&t, NULL); now = t.tv_sec * 1000 + t.tv_usec / 1000; diff = now - start; remaining -= diff; if (diff < 0 || remaining <= 0) { return 0; } start = now; } } else { return res; } }}
//这一段我猜想是中断操作,因为就算设置了true,sun.nio.ch.EPollSelectorImpl#doSelect中也会批改成false,临时存疑。如有dalao欢送通知我
//sun.nio.ch.EPollArrayWrapper#poll for (int i=0; i<updated; i++) { if (getDescriptor(i) == incomingInterruptFD) { interruptedIndex = i; interrupted = true; break; }} //sun.nio.ch.EPollSelectorImpl#doSelectif (pollWrapper.interrupted()) { // Clear the wakeup pipe pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0); synchronized (interruptLock) { pollWrapper.clearInterrupted(); IOUtil.drain(fd0); interruptTriggered = false; }}
updateSelectedKeys()
private int updateSelectedKeys() { int entries = pollWrapper.updated; int numKeysUpdated = 0; for (int i=0; i<entries; i++) { //从pollArrayAddress找到epoll_event构造体 int nextFD = pollWrapper.getDescriptor(i); //找到对应的SelectionKey SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD)); // ski is null in the case of an interrupt if (ski != null) { int rOps = pollWrapper.getEventOps(i); //第一次没有 if (selectedKeys.contains(ski)) { if (ski.channel.translateAndSetReadyOps(rOps, ski)) { numKeysUpdated++; } } else { ski.channel.translateAndSetReadyOps(rOps, ski); if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) { //增加进selectedKeys汇合 selectedKeys.add(ski); numKeysUpdated++; } } } } return numKeysUpdated;}
其余
到这里,整体脉络就很分明了剩下的这些也没必要剖析了,根本和上一篇外面的对应了,各位读者姥爷这么聪慧,就不节约大家工夫了
if(key.isReadable()){ handleRead(key);}if(key.isWritable() && key.isValid()){ handleWrite(key);}if(key.isConnectable()){ System.out.println("isConnectable = true");}//上面是c// 如果是新的连贯,须要把新的socket增加到efd中 if (ep[i].data.fd == listenfd ) { connfd = accept(listenfd,(struct sockaddr*)&cliaddr,&clilen); tep.events = EPOLLIN; tep.data.fd = connfd; ret = epoll_ctl(efd,EPOLL_CTL_ADD,connfd,&tep); } // 否则,读取数据 else { connfd = ep[i].data.fd; int bytes = read(connfd,buf,MAXLEN); // 客户端敞开连贯 if (bytes == 0){ ret =epoll_ctl(efd,EPOLL_CTL_DEL,connfd,NULL); close(connfd); printf("client[%d] closed\n", i); } else { for (int j = 0; j < bytes; ++j) { buf[j] = toupper(buf[j]); } // 向客户端发送数据 write(connfd,buf,bytes); } }
总结
- Channel:对socket的封装
- Selector:对Epoll&epoll_event2个构造体的封装
- SelectionKey:关联下面2个,有了文件描述符疾速找到对应Socket
参考文章
epoll:https://zh.wikipedia.org/wiki...
linux文档:https://man7.org/linux/man-pa...