Go中的网络轮询器(1)--Epoll在Go中的形象
Go用netpoll实现对不同操作系统的I/O多路复用的形象(或者说是封装),Go中多路复用与平台无关。编译器在编译时依据平台的不同应用不同的多路复用器进行编译。
无关netpoll的函数
epoll
、kqueue
、solaries
等多路复用模块都要实现以下五个函数
// 新建多路复用器// Initialize the poller. Only called once.// func netpollinit()// 监听文件描述符上的边缘触发事件,创立事件并退出监听// Disable notifications for fd. Return an errno value.// func netpollopen(fd uintptr, pd *pollDesc) int32// 监听多路复用器,返回一组曾经就绪的goroutine// func netpoll(delta int64) gList// Poll the network. If delta < 0, block indefinitely. If delta == 0,// poll without blocking. If delta > 0, block for up to delta nanoseconds.// Return a list of goroutines built by calling netpollready.// 唤醒多路复用器// func netpollBreak()// Wake up the network poller, assumed to be blocked in netpoll.// 判断文件描述符是否被多路复用器应用// func netpollIsPollDescriptor(fd uintptr) bool// Reports whether fd is a file descriptor used by the poller.
上面是Linux上的epoll的相干实现
数据结构
操作系统中的I/O多路复用器会监控文件描述符的可读可写,而Go中的netpoll会监控pollDesc
构造体的状态,它封装了操作系统的文件描述符。
// Network poller descriptor.//// No heap pointers.////go:notinheap// socket详细信息type pollDesc struct { link *pollDesc // in pollcache, protected by pollcache.lock // 文件描述符 fd uintptr // constant for pollDesc usage lifetime // atomicInfo holds bits from closing, rd, and wd, // which are only ever written while holding the lock, // summarized for use by netpollcheckerr, // which cannot acquire the lock. // After writing these fields under lock in a way that // might change the summary, code must call publishInfo // before releasing the lock. // Code that changes fields and then calls netpollunblock // (while still holding the lock) must call publishInfo // before calling netpollunblock, because publishInfo is what // stops netpollblock from blocking anew // (by changing the result of netpollcheckerr). // atomicInfo also holds the eventErr bit, // recording whether a poll event on the fd got an error; // atomicInfo is the only source of truth for that bit. atomicInfo atomic.Uint32 // atomic pollInfo // rg, wg are accessed atomically and hold g pointers. // (Using atomic.Uintptr here is similar to using guintptr elsewhere.) // rg,wg为二进制的信号量 rg atomic.Uintptr // pdReady, pdWait, G waiting for read or nil wg atomic.Uintptr // pdReady, pdWait, G waiting for write or nil // 互斥锁 lock mutex // protects the following fields closing bool user uint32 // user settable cookie // 文件形容被重用 rseq uintptr // protects from stale read timers // 期待文件描述符的计时器 rt timer // read deadline timer (set if rt.f != nil) // 期待文件描述符可读的截止日期 rd int64 // read deadline (a nanotime in the future, -1 when expired) // 计时器被重置 wseq uintptr // protects from stale write timers // 期待文件描述符的计时器 wt timer // write deadline timer // 期待文件描述符可写的截止日期 wd int64 // write deadline (a nanotime in the future, -1 when expired) self *pollDesc // storage for indirect interface. See (*pollDesc).makeArg.}
netpollinit
- 调用
epollcreate1
创立epoll
文件描述符,该描述符会在整个程序的生命周期中应用 - 调用
nonblockingPipe
创立一个用于通信的管道 - 注册事件
- 应用
epollctl
将用于读取数据的文件描述符打包成epollevent
事件退出监听
func netpollinit() { // 创立epoll文件描述符,该描述符会在整个程序的生命周期中应用 epfd = epollcreate1(_EPOLL_CLOEXEC) if epfd < 0 { // 拿到epollId epfd = epollcreate(1024) if epfd < 0 { println("runtime: epollcreate failed with", -epfd) throw("runtime: netpollinit failed") } closeonexec(epfd) } // 创立Linux中的管道,目标是敞开epoll r, w, errno := nonblockingPipe() if errno != 0 { println("runtime: pipe failed with", -errno) throw("runtime: pipe failed") } // 注册事件 ev := epollevent{ events: _EPOLLIN, } *(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd // 应用 epollctl 将用于读取数据的文件描述符打包成 epollevent 事件退出监听 errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev) if errno != 0 { println("runtime: epollctl failed with", -errno) throw("runtime: epollctl failed") } // 管道的读写 netpollBreakRd = uintptr(r) netpollBreakWr = uintptr(w)}
netpollBreak
初始化的管道为咱们提供了中断多路复用期待文件描述符中事件的办法,`netpollBreak
会向管道中写入数据唤醒 epoll
。
因为目前的计时器由网络轮询器治理和触发,它可能让网络轮询器立即返回并让运行时查看是否有须要触发的计时器。
func netpollBreak() { if atomic.Cas(&netpollWakeSig, 0, 1) { for { var b byte // 向管道写入数据 n := write(netpollBreakWr, unsafe.Pointer(&b), 1) if n == 1 { break } if n == -_EINTR { continue } if n == -_EAGAIN { return } println("runtime: netpollBreak write failed with", -n) throw("runtime: netpollBreak write failed") } }}
netpollopen
// 将socket可读,可写,断开事件注册到Epoll中func netpollopen(fd uintptr, pd *pollDesc) int32 { var ev epollevent ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd // 调用 epollctl 向全局的轮询文件描述符 epfd 中退出新的轮询事件监听文件描述符的可读和可写状态: return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)}
netpollclose
func netpollclose(fd uintptr) int32 { var ev epollevent // 从全局的epfd中删除监听的文件描述符 return -epollctl(epfd, _EPOLL_CTL_DEL, int32(fd), &ev)}
netpoll
netpoll
轮询网络查问产生了什么事件。
- 依据传入的
delay
计算epoll
零碎调用须要期待的工夫; - 调用
epollwait
期待可读或者可写事件的产生; - 在循环中顺次解决
epollevent
事件;
// netpoll checks for ready network connections.// Returns list of goroutines that become runnable.// delay < 0: blocks indefinitely// delay == 0: does not block, just polls// delay > 0: block for up to that many nanoseconds// 查问产生了什么事件// 返回协程链表func netpoll(delay int64) gList { if epfd == -1 { return gList{} } // delay的单位是纳秒所以须要将纳秒转换成毫秒 var waitms int32 if delay < 0 { waitms = -1 } else if delay == 0 { waitms = 0 } else if delay < 1e6 { waitms = 1 } else if delay < 1e15 { waitms = int32(delay / 1e6) } else { // An arbitrary cap on how long to wait for a timer. // 1e9 ms == ~11.5 days. waitms = 1e9 } var events [128]epolleventretry: // 调用epollwait期待文件描述符转换成可读或者可写 n := epollwait(epfd, &events[0], int32(len(events)), waitms) if n < 0 { if n != -_EINTR { println("runtime: epollwait on fd", epfd, "failed with", -n) throw("runtime: netpoll failed") } // If a timed sleep was interrupted, just return to // recalculate how long we should sleep now. if waitms > 0 { return gList{} } goto retry } // 协程链表 var toRun gList // n > 0 ==> 呈现了待处理的事件 // 在循环中顺次处理事件 for i := int32(0); i < n; i++ { ev := &events[i] if ev.events == 0 { continue } if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd { if ev.events != _EPOLLIN { println("runtime: netpoll: break fd ready for", ev.events) throw("runtime: netpoll: break fd ready for something unexpected") } if delay != 0 { // netpollBreak could be picked up by a // nonblocking poll. Only read the byte // if blocking. var tmp [16]byte read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp))) atomic.Store(&netpollWakeSig, 0) } continue } var mode int32 // 可读 if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 { mode += 'r' } // 可写 if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 { mode += 'w' } if mode != 0 { pd := *(**pollDesc)(unsafe.Pointer(&ev.data)) pd.setEventErr(ev.events == _EPOLLERR) netpollready(&toRun, pd, mode) } } return toRun}func EpollWait(epfd int, events []EpollEvent, msec int) (n int, err error) { var _p0 unsafe.Pointer if len(events) > 0 { _p0 = unsafe.Pointer(&events[0]) } else { _p0 = unsafe.Pointer(&_zero) } r0, _, e1 := Syscall6(SYS_EPOLL_WAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), uintptr(msec), 0, 0) n = int(r0) if e1 != 0 { err = errnoErr(e1) } return}// netpollready is called by the platform-specific netpoll function.// It declares that the fd associated with pd is ready for I/O.// The toRun argument is used to build a list of goroutines to return// from netpoll. The mode argument is 'r', 'w', or 'r'+'w' to indicate// whether the fd is ready for reading or writing or both.//// This may run while the world is stopped, so write barriers are not allowed.//go:nowritebarrierfunc netpollready(toRun *gList, pd *pollDesc, mode int32) { var rg, wg *g if mode == 'r' || mode == 'r'+'w' { rg = netpollunblock(pd, 'r', true) } if mode == 'w' || mode == 'r'+'w' { wg = netpollunblock(pd, 'w', true) } if rg != nil { toRun.push(rg) } if wg != nil { toRun.push(wg) }}
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g { gpp := &pd.rg if mode == 'w' { gpp = &pd.wg } // 将pollDesc中的读或者写信号量转换成pdReady 并返回其中存储的Goroutine; for { old := gpp.Load() if old == pdReady { return nil } if old == 0 && !ioready { // Only set pdReady for ioready. runtime_pollWait // will check for timeout/cancel before waiting. return nil } var new uintptr if ioready { new = pdReady } if gpp.CompareAndSwap(old, new) { if old == pdWait { old = 0 } return (*g)(unsafe.Pointer(old)) } }}