乐趣区

关于go:23-GolangGo并发编程网络IO

  咱们都晓得用户程序读写 socket 的时候,可能阻塞以后协程,那么是不是阐明 Go 语言采纳阻塞形式调用 socket 相干零碎调用呢?你有没有想过,Go 语言又是如何实现高性能网络 IO 呢?有没有应用传说中的 IO 多路复用,如 epoll 呢?

摸索 Go 语言网络 IO

  HTTP 服务必定波及到 socket 的读写吧,而且 Go 语言启动一个 HTTP 服务还是非常简单的,几行代码就能够搞定,后面也不须要反向代理服务如 Nginx,咱们写一个简略的 HTTP 服务来测试:

package main

import (
    "fmt"
    "net/http"
)

func main() {http.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {writer.Write([]byte("hello world"))
    })

    server := &http.Server{Addr: "0.0.0.0:80",}
    err := server.ListenAndServe()
    fmt.Println(err)
}

//curl http://127.0.0.1:10086/ping
//hello world

  能够临时不了解 http.Server,只须要晓得这是 Go 语言提供的 HTTP 服务;咱们启动的 HTTP 服务监听 80 端口,所有申请都返回 ”hello world”。程序挺简略的,然而如何验证咱们提出的疑难呢?Go 语言层面的 socket 读写,最终必定会转化为具体的零碎调用吧,有一个工具 strace,能够监听过程所有的零碎调用,咱们先通过 strace 简略看一下。

# ps aux | grep test
root     27435  0.0  0.0 219452  4636 pts/0    Sl+  11:00   0:00 ./test

# strace -p 27435
strace: Process 27435 attached
epoll_pwait(5, [{EPOLLIN, {u32=1030856456, u64=140403511762696}}], 128, -1, NULL, 0) = 1
futex(0x9234d0, FUTEX_WAKE_PRIVATE, 1)  = 1
accept4(3, {sa_family=AF_INET6, sin6_port=htons(56447), inet_pton(AF_INET6, "::ffff:127.0.0.1", &sin6_addr), sin6_flowinfo=0, sin6_scope_id=0}, [28], SOCK_CLOEXEC|SOCK_NONBLOCK) = 4
epoll_ctl(5, EPOLL_CTL_ADD, 4, {EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLET, {u32=1030856248, u64=140403511762488}}) = 0
getsockname(4, {sa_family=AF_INET6, sin6_port=htons(10086), inet_pton(AF_INET6, "::ffff:127.0.0.1", &sin6_addr), sin6_flowinfo=0, sin6_scope_id=0}, [28]) = 0
setsockopt(4, SOL_TCP, TCP_NODELAY, [1], 4) = 0
setsockopt(4, SOL_SOCKET, SO_KEEPALIVE, [1], 4) = 0
setsockopt(4, SOL_TCP, TCP_KEEPINTVL, [180], 4) = 0
setsockopt(4, SOL_TCP, TCP_KEEPIDLE, [180], 4) = 0
futex(0xc000036848, FUTEX_WAKE_PRIVATE, 1) = 1
accept4(3, 0xc0000abac0, 0xc0000abaa4, SOCK_CLOEXEC|SOCK_NONBLOCK) = -1 EAGAIN (Resource temporarily unavailable)
futex(0x923d48, FUTEX_WAIT_PRIVATE, 0, NULL) = 0
nanosleep({0, 3000}, NULL)

  strace 应用起来还是非常简单的,ps 命令查出来过程的 pid,而后 strace -p pid 就能够了,同时咱们手动 curl 申请一下。能够很分明的看到 epoll_pwait,epoll_ctl,accept4 等零碎调用,很显著,Go 应用了 IO 多路复用 epoll(不同零碎 Linux,Windows,Mac 不一样)。另外,留神第二个 accept4 零碎调用,返回 EAGAIN,并且第四个参数蕴含标识 SOCK_NONBLOCK,看到这根本也能猜到,Go 语言采取的是非阻塞形式调用 socket 相干零碎调用。

  Linux 零碎,高性能网络 IO 通常应用 epoll,epoll 能够同时监听多个 socket fd 是否可读或者可写(socket 缓冲区有数据了就是可读,socket 缓冲区有空间了就是可写)。epoll 应用也比较简单,咱们不做过多介绍,读者能够本人查阅相干材料,理解下 epoll 基于红黑树 + 双向列表实现,以及程度触发边缘触发等概念。epoll 只有三个 API:

// 创立 epoll 对象
int epoll_create(int size)
// 增加 / 批改 / 删除监听的 socket,包含能够设置监听 socket 的可读还是可写
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
// 阻塞期待,直到监听的多个 socket 可读或者可写;events 就是返回的事件列表
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

网络 IO 与调度器 schedule

  咱们能够猜想下 Go 语言网络 IO 流程:socket 读写采取的都是非阻塞式,如果不可读或者不可写,会立刻返回 EAGAIN,此时 Go 语言会将该 socket 增加到 epoll 对象监听,同时阻塞用户协程切换到调度器 schedule。而等到适合的机会,再调用 epoll_wait 获取可读或者可写的 socket,从而复原这些因为 socket 读写阻塞的用户协程。

  什么时候是适合的机会呢?还记得咱们上一篇文章介绍的调度器 schedule 吗,调度器在获取可执行协程时,还会尝试检测一下,以后是否有协程曾经解除阻塞了,其中就包含检测监听的 socket 是否可读或者可写。这些逻辑都在 runtime.findrunnable 函数内能够看到:

func findrunnable() (gp *g, inheritTime bool) {
    // 本地队列
    if gp, inheritTime := runqget(_p_); gp != nil {return gp, inheritTime}

    // 全局队列
    if sched.runqsize != 0 {lock(&sched.lock)
        gp := globrunqget(_p_, 0)
        unlock(&sched.lock)
        if gp != nil {return gp, false}
    }

    // 检测是否有 socket 可读或者可写
    if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {if list := netpoll(0); !list.empty() { // non-blocking
            gp := list.pop()
            injectglist(&list)
            casgstatus(gp, _Gwaiting, _Grunnable)
            return gp, false
        }
    }
}

  netpoll 对应的就是咱们提到的 epoll_wait,留神这里输出参数是 0(超时工夫为 0,不会阻塞),即不论是否存在 socket 可读或者可写,都立刻返回,而且返回的就是 gList,解除阻塞的协程列表。injectglist 函数将协程增加到全局队列,或者是 P 的本地队列。

  然而咱们也能够看到,什么时候检测是否有 socket 可读或者可写呢?在查找以后 P 的本地队列,以及查找全局队列之后。那问题来了,如果这两个队列始终有协程怎么办?是不是就始终不会检测 socket 了状态了,也就是说这些协程会始终这么阻塞了。这必定不行啊,那怎么办?别忘了咱们还有一个辅助线程 sysmon,这个函数也会以 10ms 周期检测的.

func sysmon() {
    delay = 10 * 1000  // up to 10ms
    
    for {usleep(delay)

        lastpoll := int64(atomic.Load64(&sched.lastpoll))
        if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
            list := netpoll(0) // non-blocking - returns list of goroutines
            if !list.empty() {incidlelocked(-1)
                injectglist(&list)
                incidlelocked(1)
            }
        }
    }
}

  与调度器 schedule 相似,同样超时工夫为 0,不会阻塞;同样的将解除阻塞的协程增加到全局队列,或者是 P 的本地队列。

  接下来该钻研 socket 读写操作的流程了,当然必定与咱们的猜想相似,非阻塞读写,如果不可读或者不可写,立刻返回 EAGAIN,于是将该 socket 增加到 epoll 对象监听,并且阻塞以后协程,并切换到调度器 schedule。一方面,咱们能够从上往下,如从 server.ListenAndServe 往底层逐层去摸索,钻研 socket 读写的实现;另一方面,咱们曾经晓得底层肯定会走到 epoll_ctl,只是咱们不晓得 Go 语言对立封装的办法名称,简略浏览下 runtime 包下的文件,能够找到 runtime/netpoll_epoll.go,依据名称根本就能判断这是对 epoll 的封装,这下简略了,关上调试模式(Goland、dlv 都能够调试),打断点,再查看调用栈,socket 读写操作的调用链霎时就分明了。

0  0x00000000010304ea in runtime.netpollopen
    at /Users/xxx/Documents/go1.18/src/runtime/netpoll_epoll.go:64
 1  0x000000000105cdf4 in internal/poll.runtime_pollOpen
    at /Users/xxx/Documents/go1.18/src/runtime/netpoll.go:239
 2  0x000000000109e32d in internal/poll.(*pollDesc).init
    at /Users/xxx/Documents/go1.18/src/internal/poll/fd_poll_runtime.go:39
 3  0x000000000109eca6 in internal/poll.(*FD).Init
    at /Users/xxx/Documents/go1.18/src/internal/poll/fd_unix.go:63
 4  0x0000000001150078 in net.(*netFD).init
    at /Users/xxx/Documents/go1.18/src/net/fd_unix.go:41
 5  0x0000000001150078 in net.(*netFD).accept
    at /Users/xxx/Documents/go1.18/src/net/fd_unix.go:184
 6  0x000000000115f5a8 in net.(*TCPListener).accept
    at /Users/xxx/Documents/go1.18/src/net/tcpsock_posix.go:139
 7  0x000000000115e91d in net.(*TCPListener).Accept
    at /Users/xxx/Documents/go1.18/src/net/tcpsock.go:288
 8  0x00000000011ff56a in net/http.(*onceCloseListener).Accept
    at <autogenerated>:1
 9  0x00000000011f3145 in net/http.(*Server).Serve
    at /Users/xxx/Documents/go1.18/src/net/http/server.go:3039
10  0x00000000011f2d7d in net/http.(*Server).ListenAndServe
    at /Users/xxx/Documents/go1.18/src/net/http/server.go:2968

  有了这个调用栈,socket 读写操作的整个流程基本上没有太大问题了,这里就不再赘述了。咱们能够简略看一下 Accept 的逻辑,是不是之前咱们说的,非阻塞读写,如果不可读或者不可写,立刻返回 EAGAIN,同时将该 socket 增加到 epoll 对象监听,以及阻塞以后协程,并切换到调度器 schedule。

func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {if err := fd.readLock(); err != nil {return -1, nil, "", err}
    defer fd.readUnlock()

    if err := fd.pd.prepareRead(fd.isFile); err != nil {return -1, nil, "", err}
    for {s, rsa, errcall, err := accept(fd.Sysfd)
        if err == nil {return s, rsa, "", err}
        switch err {
        case syscall.EINTR:
            continue
        case syscall.EAGAIN:
            if fd.pd.pollable() {if err = fd.pd.waitRead(fd.isFile); err == nil {continue}
            }
        case syscall.ECONNABORTED:
            // This means that a socket on the listen
            // queue was closed before we Accept()ed it;
            // it's a silly error, so try again.
            continue
        }
        return -1, nil, errcall, err
    }
}

  for 循环始终尝试执行 accept,如果返回 EAGAIN;函数 waitRead 底层就是监听读 socket 事件,并且阻塞协程以及切换到调度器 schedule。

读写超时

  下面咱们简略介绍了 socket 读写操作根本流程,调度器 Schedule 以及辅助线程 sysmon 检测 socket 根本流程。还有两个问题,咱们没有提到:1)socket 可读或者可写时,如何关联到协程呢?怎么晓得哪些协程因为这个 socket 阻塞了呢?2)高性能服务,socket 读写操作必定是须要设置正当的超时工夫的,不然如果依赖服务变慢,用户协程也会跟着长时间阻塞。socket 读写超时,怎么实现呢?

  咱们先答复第一个问题,在回顾下 epoll 的三个 API,其中波及到一个构造体 epoll_event,不仅蕴含了 socket fd,还蕴含一个 void 类型指针,通常指向用户自定义数据。Go 语言也是这么做的,自定义了构造 runtime.pollDesc:

type pollDesc struct {
    fd   uintptr   // constant for pollDesc usage lifetime
    
    // 指向读 socket 阻塞的协程
    rg atomic.Uintptr // pdReady, pdWait, G waiting for read or nil
    // 指向写 socket 阻塞的协程
    wg atomic.Uintptr // pdReady, pdWait, G waiting for write or nil

    // 读超时定时器
    rt      timer     // read deadline timer (set if rt.f != nil)
    rd      int64     // read deadline (a nanotime in the future, -1 when expired)
    // 写超时定时器
    wt      timer     // write deadline timer
    wd      int64     // write deadline (a nanotime in the future, -1 when expired)
}

  pollDesc 构造蕴含了读写 socket 阻塞的协程指针,这样一来,在通过 epoll_ctl 监听 socket 时,使得 epoll_event 指向 pollDesc 构造就行了,epoll_wait 返回事件列表之后,就能解析进去构造 pollDesc,从而解除对应协程的阻塞。

  另外,咱们也能看到 pollDesc 构造还蕴含了设置的读写超时工夫,以及超时定时器。通过这定义也根本能确定,socket 超时是基于定时器实现的。如果你认真钻研过上一大节介绍的 socket 读写操作流程,应该就能在 internal/poll/fd_poll_runtime.go 发现还有其余一些函数申明,包含 runtime_pollSetDeadline,设置超时工夫。Go 语言解决 HTTP 申请时,默认是有读写超时工夫的,同样的,咱们能够输入该流程的调用栈:

0  0x000000000105d0ef in internal/poll.runtime_pollSetDeadline
   at /Users/xxx/Documents/go1.18/src/runtime/netpoll.go:323
1  0x000000000109e95e in internal/poll.setDeadlineImpl
   at /Users/xxx/Documents/go1.18/src/internal/poll/fd_poll_runtime.go:160
2  0x000000000115a0c8 in internal/poll.(*FD).SetReadDeadline
   at /Users/xxx/Documents/go1.18/src/internal/poll/fd_poll_runtime.go:137
3  0x000000000115a0c8 in net.(*netFD).SetReadDeadline
   at /Users/xxx/Documents/go1.18/src/net/fd_posix.go:142
4  0x000000000115a0c8 in net.(*conn).SetReadDeadline
   at /Users/xxx/Documents/go1.18/src/net/net.go:250
5  0x00000000011ea591 in net/http.(*conn).readRequest
   at /Users/xxx/Documents/go1.18/src/net/http/server.go:975
6  0x00000000011ee9ab in net/http.(*conn).serve
   at /Users/xxx/Documents/go1.18/src/net/http/server.go:1891
7  0x00000000011f352e in net/http.(*Server).Serve.func3
   at /Users/xxx/Documents/go1.18/src/net/http/server.go:3071

  咱们曾经晓得,超时工夫是通过定时器实现的,所以函数 poll_runtime_pollSetDeadline 最终其实也是增加了定时器而已(定时器将在下一篇文章介绍),而定时器的处理函数为 netpollReadDeadline 或 netpollDeadline 或 netpollWriteDeadline(依据读写操作不同)。超时了怎么办?一来必定是设置超时标识,二来如果以后有协程因为 socket 阻塞还需唤醒该协程。

总结

  Go 语言高性能网络 IO 其实还是基于 IO 多路复用技术(如 epoll)实现的,读写 socket 都是非阻塞操作,如果不可读或者不可写,则将该 socket 增加到 epoll 对象监听。而调度器 schedule,以及辅助线程 sysmon 也会不定时检测是否有 socket 又扭转状态可读或者可写了,从而复原这些因为 socket 读写而阻塞的协程。

退出移动版