关于go:23-GolangGo并发编程网络IO

42次阅读

共计 8440 个字符,预计需要花费 22 分钟才能阅读完成。

  咱们都晓得用户程序读写 socket 的时候,可能阻塞以后协程,那么是不是阐明 Go 语言采纳阻塞形式调用 socket 相干零碎调用呢?你有没有想过,Go 语言又是如何实现高性能网络 IO 呢?有没有应用传说中的 IO 多路复用,如 epoll 呢?

摸索 Go 语言网络 IO

  HTTP 服务必定波及到 socket 的读写吧,而且 Go 语言启动一个 HTTP 服务还是非常简单的,几行代码就能够搞定,后面也不须要反向代理服务如 Nginx,咱们写一个简略的 HTTP 服务来测试:

package main

import (
    "fmt"
    "net/http"
)

func main() {http.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {writer.Write([]byte("hello world"))
    })

    server := &http.Server{Addr: "0.0.0.0:80",}
    err := server.ListenAndServe()
    fmt.Println(err)
}

//curl http://127.0.0.1:10086/ping
//hello world

  能够临时不了解 http.Server,只须要晓得这是 Go 语言提供的 HTTP 服务;咱们启动的 HTTP 服务监听 80 端口,所有申请都返回 ”hello world”。程序挺简略的,然而如何验证咱们提出的疑难呢?Go 语言层面的 socket 读写,最终必定会转化为具体的零碎调用吧,有一个工具 strace,能够监听过程所有的零碎调用,咱们先通过 strace 简略看一下。

# ps aux | grep test
root     27435  0.0  0.0 219452  4636 pts/0    Sl+  11:00   0:00 ./test

# strace -p 27435
strace: Process 27435 attached
epoll_pwait(5, [{EPOLLIN, {u32=1030856456, u64=140403511762696}}], 128, -1, NULL, 0) = 1
futex(0x9234d0, FUTEX_WAKE_PRIVATE, 1)  = 1
accept4(3, {sa_family=AF_INET6, sin6_port=htons(56447), inet_pton(AF_INET6, "::ffff:127.0.0.1", &sin6_addr), sin6_flowinfo=0, sin6_scope_id=0}, [28], SOCK_CLOEXEC|SOCK_NONBLOCK) = 4
epoll_ctl(5, EPOLL_CTL_ADD, 4, {EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLET, {u32=1030856248, u64=140403511762488}}) = 0
getsockname(4, {sa_family=AF_INET6, sin6_port=htons(10086), inet_pton(AF_INET6, "::ffff:127.0.0.1", &sin6_addr), sin6_flowinfo=0, sin6_scope_id=0}, [28]) = 0
setsockopt(4, SOL_TCP, TCP_NODELAY, [1], 4) = 0
setsockopt(4, SOL_SOCKET, SO_KEEPALIVE, [1], 4) = 0
setsockopt(4, SOL_TCP, TCP_KEEPINTVL, [180], 4) = 0
setsockopt(4, SOL_TCP, TCP_KEEPIDLE, [180], 4) = 0
futex(0xc000036848, FUTEX_WAKE_PRIVATE, 1) = 1
accept4(3, 0xc0000abac0, 0xc0000abaa4, SOCK_CLOEXEC|SOCK_NONBLOCK) = -1 EAGAIN (Resource temporarily unavailable)
futex(0x923d48, FUTEX_WAIT_PRIVATE, 0, NULL) = 0
nanosleep({0, 3000}, NULL)

  strace 应用起来还是非常简单的,ps 命令查出来过程的 pid,而后 strace -p pid 就能够了,同时咱们手动 curl 申请一下。能够很分明的看到 epoll_pwait,epoll_ctl,accept4 等零碎调用,很显著,Go 应用了 IO 多路复用 epoll(不同零碎 Linux,Windows,Mac 不一样)。另外,留神第二个 accept4 零碎调用,返回 EAGAIN,并且第四个参数蕴含标识 SOCK_NONBLOCK,看到这根本也能猜到,Go 语言采取的是非阻塞形式调用 socket 相干零碎调用。

  Linux 零碎,高性能网络 IO 通常应用 epoll,epoll 能够同时监听多个 socket fd 是否可读或者可写(socket 缓冲区有数据了就是可读,socket 缓冲区有空间了就是可写)。epoll 应用也比较简单,咱们不做过多介绍,读者能够本人查阅相干材料,理解下 epoll 基于红黑树 + 双向列表实现,以及程度触发边缘触发等概念。epoll 只有三个 API:

// 创立 epoll 对象
int epoll_create(int size)
// 增加 / 批改 / 删除监听的 socket,包含能够设置监听 socket 的可读还是可写
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
// 阻塞期待,直到监听的多个 socket 可读或者可写;events 就是返回的事件列表
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

网络 IO 与调度器 schedule

  咱们能够猜想下 Go 语言网络 IO 流程:socket 读写采取的都是非阻塞式,如果不可读或者不可写,会立刻返回 EAGAIN,此时 Go 语言会将该 socket 增加到 epoll 对象监听,同时阻塞用户协程切换到调度器 schedule。而等到适合的机会,再调用 epoll_wait 获取可读或者可写的 socket,从而复原这些因为 socket 读写阻塞的用户协程。

  什么时候是适合的机会呢?还记得咱们上一篇文章介绍的调度器 schedule 吗,调度器在获取可执行协程时,还会尝试检测一下,以后是否有协程曾经解除阻塞了,其中就包含检测监听的 socket 是否可读或者可写。这些逻辑都在 runtime.findrunnable 函数内能够看到:

func findrunnable() (gp *g, inheritTime bool) {
    // 本地队列
    if gp, inheritTime := runqget(_p_); gp != nil {return gp, inheritTime}

    // 全局队列
    if sched.runqsize != 0 {lock(&sched.lock)
        gp := globrunqget(_p_, 0)
        unlock(&sched.lock)
        if gp != nil {return gp, false}
    }

    // 检测是否有 socket 可读或者可写
    if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {if list := netpoll(0); !list.empty() { // non-blocking
            gp := list.pop()
            injectglist(&list)
            casgstatus(gp, _Gwaiting, _Grunnable)
            return gp, false
        }
    }
}

  netpoll 对应的就是咱们提到的 epoll_wait,留神这里输出参数是 0(超时工夫为 0,不会阻塞),即不论是否存在 socket 可读或者可写,都立刻返回,而且返回的就是 gList,解除阻塞的协程列表。injectglist 函数将协程增加到全局队列,或者是 P 的本地队列。

  然而咱们也能够看到,什么时候检测是否有 socket 可读或者可写呢?在查找以后 P 的本地队列,以及查找全局队列之后。那问题来了,如果这两个队列始终有协程怎么办?是不是就始终不会检测 socket 了状态了,也就是说这些协程会始终这么阻塞了。这必定不行啊,那怎么办?别忘了咱们还有一个辅助线程 sysmon,这个函数也会以 10ms 周期检测的.

func sysmon() {
    delay = 10 * 1000  // up to 10ms
    
    for {usleep(delay)

        lastpoll := int64(atomic.Load64(&sched.lastpoll))
        if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
            list := netpoll(0) // non-blocking - returns list of goroutines
            if !list.empty() {incidlelocked(-1)
                injectglist(&list)
                incidlelocked(1)
            }
        }
    }
}

  与调度器 schedule 相似,同样超时工夫为 0,不会阻塞;同样的将解除阻塞的协程增加到全局队列,或者是 P 的本地队列。

  接下来该钻研 socket 读写操作的流程了,当然必定与咱们的猜想相似,非阻塞读写,如果不可读或者不可写,立刻返回 EAGAIN,于是将该 socket 增加到 epoll 对象监听,并且阻塞以后协程,并切换到调度器 schedule。一方面,咱们能够从上往下,如从 server.ListenAndServe 往底层逐层去摸索,钻研 socket 读写的实现;另一方面,咱们曾经晓得底层肯定会走到 epoll_ctl,只是咱们不晓得 Go 语言对立封装的办法名称,简略浏览下 runtime 包下的文件,能够找到 runtime/netpoll_epoll.go,依据名称根本就能判断这是对 epoll 的封装,这下简略了,关上调试模式(Goland、dlv 都能够调试),打断点,再查看调用栈,socket 读写操作的调用链霎时就分明了。

0  0x00000000010304ea in runtime.netpollopen
    at /Users/xxx/Documents/go1.18/src/runtime/netpoll_epoll.go:64
 1  0x000000000105cdf4 in internal/poll.runtime_pollOpen
    at /Users/xxx/Documents/go1.18/src/runtime/netpoll.go:239
 2  0x000000000109e32d in internal/poll.(*pollDesc).init
    at /Users/xxx/Documents/go1.18/src/internal/poll/fd_poll_runtime.go:39
 3  0x000000000109eca6 in internal/poll.(*FD).Init
    at /Users/xxx/Documents/go1.18/src/internal/poll/fd_unix.go:63
 4  0x0000000001150078 in net.(*netFD).init
    at /Users/xxx/Documents/go1.18/src/net/fd_unix.go:41
 5  0x0000000001150078 in net.(*netFD).accept
    at /Users/xxx/Documents/go1.18/src/net/fd_unix.go:184
 6  0x000000000115f5a8 in net.(*TCPListener).accept
    at /Users/xxx/Documents/go1.18/src/net/tcpsock_posix.go:139
 7  0x000000000115e91d in net.(*TCPListener).Accept
    at /Users/xxx/Documents/go1.18/src/net/tcpsock.go:288
 8  0x00000000011ff56a in net/http.(*onceCloseListener).Accept
    at <autogenerated>:1
 9  0x00000000011f3145 in net/http.(*Server).Serve
    at /Users/xxx/Documents/go1.18/src/net/http/server.go:3039
10  0x00000000011f2d7d in net/http.(*Server).ListenAndServe
    at /Users/xxx/Documents/go1.18/src/net/http/server.go:2968

  有了这个调用栈,socket 读写操作的整个流程基本上没有太大问题了,这里就不再赘述了。咱们能够简略看一下 Accept 的逻辑,是不是之前咱们说的,非阻塞读写,如果不可读或者不可写,立刻返回 EAGAIN,同时将该 socket 增加到 epoll 对象监听,以及阻塞以后协程,并切换到调度器 schedule。

func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {if err := fd.readLock(); err != nil {return -1, nil, "", err}
    defer fd.readUnlock()

    if err := fd.pd.prepareRead(fd.isFile); err != nil {return -1, nil, "", err}
    for {s, rsa, errcall, err := accept(fd.Sysfd)
        if err == nil {return s, rsa, "", err}
        switch err {
        case syscall.EINTR:
            continue
        case syscall.EAGAIN:
            if fd.pd.pollable() {if err = fd.pd.waitRead(fd.isFile); err == nil {continue}
            }
        case syscall.ECONNABORTED:
            // This means that a socket on the listen
            // queue was closed before we Accept()ed it;
            // it's a silly error, so try again.
            continue
        }
        return -1, nil, errcall, err
    }
}

  for 循环始终尝试执行 accept,如果返回 EAGAIN;函数 waitRead 底层就是监听读 socket 事件,并且阻塞协程以及切换到调度器 schedule。

读写超时

  下面咱们简略介绍了 socket 读写操作根本流程,调度器 Schedule 以及辅助线程 sysmon 检测 socket 根本流程。还有两个问题,咱们没有提到:1)socket 可读或者可写时,如何关联到协程呢?怎么晓得哪些协程因为这个 socket 阻塞了呢?2)高性能服务,socket 读写操作必定是须要设置正当的超时工夫的,不然如果依赖服务变慢,用户协程也会跟着长时间阻塞。socket 读写超时,怎么实现呢?

  咱们先答复第一个问题,在回顾下 epoll 的三个 API,其中波及到一个构造体 epoll_event,不仅蕴含了 socket fd,还蕴含一个 void 类型指针,通常指向用户自定义数据。Go 语言也是这么做的,自定义了构造 runtime.pollDesc:

type pollDesc struct {
    fd   uintptr   // constant for pollDesc usage lifetime
    
    // 指向读 socket 阻塞的协程
    rg atomic.Uintptr // pdReady, pdWait, G waiting for read or nil
    // 指向写 socket 阻塞的协程
    wg atomic.Uintptr // pdReady, pdWait, G waiting for write or nil

    // 读超时定时器
    rt      timer     // read deadline timer (set if rt.f != nil)
    rd      int64     // read deadline (a nanotime in the future, -1 when expired)
    // 写超时定时器
    wt      timer     // write deadline timer
    wd      int64     // write deadline (a nanotime in the future, -1 when expired)
}

  pollDesc 构造蕴含了读写 socket 阻塞的协程指针,这样一来,在通过 epoll_ctl 监听 socket 时,使得 epoll_event 指向 pollDesc 构造就行了,epoll_wait 返回事件列表之后,就能解析进去构造 pollDesc,从而解除对应协程的阻塞。

  另外,咱们也能看到 pollDesc 构造还蕴含了设置的读写超时工夫,以及超时定时器。通过这定义也根本能确定,socket 超时是基于定时器实现的。如果你认真钻研过上一大节介绍的 socket 读写操作流程,应该就能在 internal/poll/fd_poll_runtime.go 发现还有其余一些函数申明,包含 runtime_pollSetDeadline,设置超时工夫。Go 语言解决 HTTP 申请时,默认是有读写超时工夫的,同样的,咱们能够输入该流程的调用栈:

0  0x000000000105d0ef in internal/poll.runtime_pollSetDeadline
   at /Users/xxx/Documents/go1.18/src/runtime/netpoll.go:323
1  0x000000000109e95e in internal/poll.setDeadlineImpl
   at /Users/xxx/Documents/go1.18/src/internal/poll/fd_poll_runtime.go:160
2  0x000000000115a0c8 in internal/poll.(*FD).SetReadDeadline
   at /Users/xxx/Documents/go1.18/src/internal/poll/fd_poll_runtime.go:137
3  0x000000000115a0c8 in net.(*netFD).SetReadDeadline
   at /Users/xxx/Documents/go1.18/src/net/fd_posix.go:142
4  0x000000000115a0c8 in net.(*conn).SetReadDeadline
   at /Users/xxx/Documents/go1.18/src/net/net.go:250
5  0x00000000011ea591 in net/http.(*conn).readRequest
   at /Users/xxx/Documents/go1.18/src/net/http/server.go:975
6  0x00000000011ee9ab in net/http.(*conn).serve
   at /Users/xxx/Documents/go1.18/src/net/http/server.go:1891
7  0x00000000011f352e in net/http.(*Server).Serve.func3
   at /Users/xxx/Documents/go1.18/src/net/http/server.go:3071

  咱们曾经晓得,超时工夫是通过定时器实现的,所以函数 poll_runtime_pollSetDeadline 最终其实也是增加了定时器而已(定时器将在下一篇文章介绍),而定时器的处理函数为 netpollReadDeadline 或 netpollDeadline 或 netpollWriteDeadline(依据读写操作不同)。超时了怎么办?一来必定是设置超时标识,二来如果以后有协程因为 socket 阻塞还需唤醒该协程。

总结

  Go 语言高性能网络 IO 其实还是基于 IO 多路复用技术(如 epoll)实现的,读写 socket 都是非阻塞操作,如果不可读或者不可写,则将该 socket 增加到 epoll 对象监听。而调度器 schedule,以及辅助线程 sysmon 也会不定时检测是否有 socket 又扭转状态可读或者可写了,从而复原这些因为 socket 读写而阻塞的协程。

正文完
 0