Go 中的网络轮询器(1)–Epoll 在 Go 中的形象
Go 用 netpoll 实现对不同操作系统的 I / O 多路复用的形象(或者说是封装),Go 中多路复用与平台无关。编译器在编译时依据平台的不同应用不同的多路复用器进行编译。
无关 netpoll 的函数
epoll
、kqueue
、solaries
等多路复用模块都要实现以下五个函数
// 新建多路复用器
// Initialize the poller. Only called once.
// func netpollinit()
// 监听文件描述符上的边缘触发事件,创立事件并退出监听
// Disable notifications for fd. Return an errno value.
// func netpollopen(fd uintptr, pd *pollDesc) int32
// 监听多路复用器,返回一组曾经就绪的 goroutine
// func netpoll(delta int64) gList
// Poll the network. If delta < 0, block indefinitely. If delta == 0,
// poll without blocking. If delta > 0, block for up to delta nanoseconds.
// Return a list of goroutines built by calling netpollready.
// 唤醒多路复用器
// func netpollBreak()
// Wake up the network poller, assumed to be blocked in netpoll.
// 判断文件描述符是否被多路复用器应用
// func netpollIsPollDescriptor(fd uintptr) bool
// Reports whether fd is a file descriptor used by the poller.
上面是 Linux 上的 epoll 的相干实现
数据结构
操作系统中的 I / O 多路复用器会监控文件描述符的可读可写,而 Go 中的 netpoll 会监控 pollDesc
构造体的状态,它封装了操作系统的文件描述符。
// Network poller descriptor.
//
// No heap pointers.
//
//go:notinheap
// socket 详细信息
type pollDesc struct {
link *pollDesc // in pollcache, protected by pollcache.lock
// 文件描述符
fd uintptr // constant for pollDesc usage lifetime
// atomicInfo holds bits from closing, rd, and wd,
// which are only ever written while holding the lock,
// summarized for use by netpollcheckerr,
// which cannot acquire the lock.
// After writing these fields under lock in a way that
// might change the summary, code must call publishInfo
// before releasing the lock.
// Code that changes fields and then calls netpollunblock
// (while still holding the lock) must call publishInfo
// before calling netpollunblock, because publishInfo is what
// stops netpollblock from blocking anew
// (by changing the result of netpollcheckerr).
// atomicInfo also holds the eventErr bit,
// recording whether a poll event on the fd got an error;
// atomicInfo is the only source of truth for that bit.
atomicInfo atomic.Uint32 // atomic pollInfo
// rg, wg are accessed atomically and hold g pointers.
// (Using atomic.Uintptr here is similar to using guintptr elsewhere.)
// rg,wg 为二进制的信号量
rg atomic.Uintptr // pdReady, pdWait, G waiting for read or nil
wg atomic.Uintptr // pdReady, pdWait, G waiting for write or nil
// 互斥锁
lock mutex // protects the following fields
closing bool
user uint32 // user settable cookie
// 文件形容被重用
rseq uintptr // protects from stale read timers
// 期待文件描述符的计时器
rt timer // read deadline timer (set if rt.f != nil)
// 期待文件描述符可读的截止日期
rd int64 // read deadline (a nanotime in the future, -1 when expired)
// 计时器被重置
wseq uintptr // protects from stale write timers
// 期待文件描述符的计时器
wt timer // write deadline timer
// 期待文件描述符可写的截止日期
wd int64 // write deadline (a nanotime in the future, -1 when expired)
self *pollDesc // storage for indirect interface. See (*pollDesc).makeArg.
}
netpollinit
- 调用
epollcreate1
创立epoll
文件描述符,该描述符会在整个程序的生命周期中应用 - 调用
nonblockingPipe
创立一个用于通信的管道 - 注册事件
- 应用
epollctl
将用于读取数据的文件描述符打包成epollevent
事件退出监听
func netpollinit() {
// 创立 epoll 文件描述符,该描述符会在整个程序的生命周期中应用
epfd = epollcreate1(_EPOLL_CLOEXEC)
if epfd < 0 {
// 拿到 epollId
epfd = epollcreate(1024)
if epfd < 0 {println("runtime: epollcreate failed with", -epfd)
throw("runtime: netpollinit failed")
}
closeonexec(epfd)
}
// 创立 Linux 中的管道,目标是敞开 epoll
r, w, errno := nonblockingPipe()
if errno != 0 {println("runtime: pipe failed with", -errno)
throw("runtime: pipe failed")
}
// 注册事件
ev := epollevent{events: _EPOLLIN,}
*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
// 应用 epollctl 将用于读取数据的文件描述符打包成 epollevent 事件退出监听
errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
if errno != 0 {println("runtime: epollctl failed with", -errno)
throw("runtime: epollctl failed")
}
// 管道的读写
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)
}
netpollBreak
初始化的管道为咱们提供了中断多路复用期待文件描述符中事件的办法,`netpollBreak
会向管道中写入数据唤醒 epoll
。
因为目前的计时器由网络轮询器治理和触发,它可能让网络轮询器立即返回并让运行时查看是否有须要触发的计时器。
func netpollBreak() {if atomic.Cas(&netpollWakeSig, 0, 1) {
for {
var b byte
// 向管道写入数据
n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
if n == 1 {break}
if n == -_EINTR {continue}
if n == -_EAGAIN {return}
println("runtime: netpollBreak write failed with", -n)
throw("runtime: netpollBreak write failed")
}
}
}
netpollopen
// 将 socket 可读,可写,断开事件注册到 Epoll 中
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
// 调用 epollctl 向全局的轮询文件描述符 epfd 中退出新的轮询事件监听文件描述符的可读和可写状态:return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}
netpollclose
func netpollclose(fd uintptr) int32 {
var ev epollevent
// 从全局的 epfd 中删除监听的文件描述符
return -epollctl(epfd, _EPOLL_CTL_DEL, int32(fd), &ev)
}
netpoll
netpoll
轮询网络查问产生了什么事件。
- 依据传入的
delay
计算epoll
零碎调用须要期待的工夫; - 调用
epollwait
期待可读或者可写事件的产生; - 在循环中顺次解决
epollevent
事件;
// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
// 查问产生了什么事件
// 返回协程链表
func netpoll(delay int64) gList {
if epfd == -1 {return gList{}
}
// delay 的单位是纳秒所以须要将纳秒转换成毫秒
var waitms int32
if delay < 0 {waitms = -1} else if delay == 0 {waitms = 0} else if delay < 1e6 {waitms = 1} else if delay < 1e15 {waitms = int32(delay / 1e6)
} else {
// An arbitrary cap on how long to wait for a timer.
// 1e9 ms == ~11.5 days.
waitms = 1e9
}
var events [128]epollevent
retry:
// 调用 epollwait 期待文件描述符转换成可读或者可写
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
if n != -_EINTR {println("runtime: epollwait on fd", epfd, "failed with", -n)
throw("runtime: netpoll failed")
}
// If a timed sleep was interrupted, just return to
// recalculate how long we should sleep now.
if waitms > 0 {return gList{}
}
goto retry
}
// 协程链表
var toRun gList
// n > 0 ==> 呈现了待处理的事件
// 在循环中顺次处理事件
for i := int32(0); i < n; i++ {ev := &events[i]
if ev.events == 0 {continue}
if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
if ev.events != _EPOLLIN {println("runtime: netpoll: break fd ready for", ev.events)
throw("runtime: netpoll: break fd ready for something unexpected")
}
if delay != 0 {
// netpollBreak could be picked up by a
// nonblocking poll. Only read the byte
// if blocking.
var tmp [16]byte
read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
atomic.Store(&netpollWakeSig, 0)
}
continue
}
var mode int32
// 可读
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {mode += 'r'}
// 可写
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {mode += 'w'}
if mode != 0 {pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.setEventErr(ev.events == _EPOLLERR)
netpollready(&toRun, pd, mode)
}
}
return toRun
}
func EpollWait(epfd int, events []EpollEvent, msec int) (n int, err error) {
var _p0 unsafe.Pointer
if len(events) > 0 {_p0 = unsafe.Pointer(&events[0])
} else {_p0 = unsafe.Pointer(&_zero)
}
r0, _, e1 := Syscall6(SYS_EPOLL_WAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), uintptr(msec), 0, 0)
n = int(r0)
if e1 != 0 {err = errnoErr(e1)
}
return
}
// netpollready is called by the platform-specific netpoll function.
// It declares that the fd associated with pd is ready for I/O.
// The toRun argument is used to build a list of goroutines to return
// from netpoll. The mode argument is 'r', 'w', or 'r'+'w' to indicate
// whether the fd is ready for reading or writing or both.
//
// This may run while the world is stopped, so write barriers are not allowed.
//go:nowritebarrier
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' {wg = netpollunblock(pd, 'w', true)
}
if rg != nil {toRun.push(rg)
}
if wg != nil {toRun.push(wg)
}
}
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
gpp := &pd.rg
if mode == 'w' {gpp = &pd.wg}
// 将 pollDesc 中的读或者写信号量转换成 pdReady 并返回其中存储的 Goroutine;for {old := gpp.Load()
if old == pdReady {return nil}
if old == 0 && !ioready {
// Only set pdReady for ioready. runtime_pollWait
// will check for timeout/cancel before waiting.
return nil
}
var new uintptr
if ioready {new = pdReady}
if gpp.CompareAndSwap(old, new) {
if old == pdWait {old = 0}
return (*g)(unsafe.Pointer(old))
}
}
}