关于golang:Netpoll导读

80次阅读

共计 5319 个字符,预计需要花费 14 分钟才能阅读完成。

Netpoll 是由 字节跳动 开发的高性能 NIO(Non-blocking I/O) 网络库,专一于 RPC 场景。
详情:https://github.com/cloudwego/…

  1. 要理解一个像 Netty 这样我的项目的源码是一件十分艰难的事件。我的项目自身源码量大,波及的底层常识多,加上利用了各种设计模式,都会对源码浏览减少难度。对于这样的工程最好是依据作者提供的导图先理解个大略,而后充沛浏览文档,头脑中构建一个思维框架。一开始就冲进去直怼源码是一件十分怯懦的事件。这样的我的项目往往是一个大工程,汇聚了很多卓越开源作者的心血,想一口气读完,而且丝毫不差的了解外面的设计思路是不事实的,浏览源码往往咱们只须要浏览它一个次要流程的过程是怎么产生构建的,而不须要字字斟酌的去读,这样容易陷在码海里。通过第一遍理清流程之后,能够第二遍零碎的浏览,而后再去反思作者这样设计的益处。
  2. 源码目录
    netpoll 源码文件全在同一个目录下,高深莫测,有多少文件。
  3. 次要架构
  1. 万事开头难,从服务启动开始
func main() {
    // 创立 Listener
    listener, err := netpoll.CreateListener(network, address)
    if err != nil {panic("create netpoll listener failed")
    }
    // 创立 EventLoop
    eventLoop, _ := netpoll.NewEventLoop(
        handler,   //
        netpoll.WithOnPrepare(prepare),
        netpoll.WithReadTimeout(time.Second),
    )
    // 运行 Server
    eventLoop.Serve(listener)
}
// 扩大 net.Listener
type Listener interface {
   net.Listener
   Fd() (fd int)
}

这个步骤是开启 MianReactor 作用在于监听端口,获取操作符地址

func CreateListener(network, addr string) (l Listener, err error) {
   if network == "udp" {
      // TODO: udp listener.
      return udpListener(network, addr)
   }
   // tcp, tcp4, tcp6, unix
   // 调用 go 原生
   ln, err := net.Listen(network, addr)
   if err != nil {return nil, err}
   return ConvertListener(ln)
}

// 转换成定义的 Listener,向零碎设置 Nonblock

func ConvertListener(l net.Listener) (nl Listener, err error) {if tmp, ok := l.(Listener); ok {return tmp, nil}
    ln := &listener{}
    ln.ln = l
    ln.addr = l.Addr()
    err = ln.parseFD()
    if err != nil {return nil, err}
    return ln, syscall.SetNonblock(ln.fd, true)
}

// 获取操作符

func (ln *listener) parseFD() (err error) {switch netln := ln.ln.(type) {
    case *net.TCPListener:
        ln.file, err = netln.File()
    case *net.UnixListener:
        ln.file, err = netln.File()
    default:
        return errors.New("listener type can't support")
    }
    if err != nil {return err}
    ln.fd = int(ln.file.Fd())
    return nil
}

// 看一些构造体,接口的具体定义。
//EventLoop 接口

type EventLoop interface {Serve(ln net.Listener) error            // 开启服务
   Shutdown(ctx context.Context) error     // 中断服务
}

//

type eventLoop struct {
   sync.Mutex                    //
   opt     *options              // 参数配置项
   prepare OnPrepare             // handler 封装
   svr     *server               // 形象的服务实体
   stop    chan error
}

//

type server struct {
   operator    FDOperator            // 文件操作符
   ln          Listener              // 这就是你创立的那个 listener
   prepare     OnPrepare             // 这就是你创立的那个 handler
   quit        func(err error)       //
   connections sync.Map              // key=fd, value=connection
}

// 创立 server 后,启动服务

func (evl *eventLoop) Serve(ln net.Listener) error {npln, err := ConvertListener(ln)
   if err != nil {return err}
   evl.Lock()
   evl.svr = newServer(npln, evl.prepare, evl.quit)
   evl.svr.Run()
   evl.Unlock()
   return evl.waitQuit()}

// 这里正式让服务跑起来

func (s *server) Run() (err error) {
   s.operator = FDOperator{FD:     s.ln.Fd(),
      OnRead: s.OnRead,
      OnHup:  s.OnHup,
   }
    // 通过负载平衡算法选取一个 Poll
   s.operator.poll = pollmanager.Pick()
    // 将 poll 和零碎操作符事件关联,PollReadable 用于监听连贯的开启或者敞开
   err = s.operator.Control(PollReadable)
   if err != nil {s.quit(err)
   }
   return err
}

//eventLoop 的个数 通过获取 runtime.GOMAXPROCS(0)来比拟设置
//pollmanager.Pick()
//eventLoop 的启动是通过 poll_manager.go 中的 init() 办法启动

type manager struct {
    NumLoops int                //eventloop 个数
    balance  loadbalance        // 负载平衡策略  Random、RoundRobin
    polls    []Poll             // all the polls}
func init() {pollmanager = &manager{}
   pollmanager.SetLoadBalance(RoundRobin)
   pollmanager.SetNumLoops(defaultNumLoops())
}

// 开启 SubReactor, 解决连贯,读写,敞开事件
// 通过 SetNumLoops 调用 Run()

func (m *manager) Run() error {for idx := len(m.polls); idx < m.NumLoops; idx++ {var poll = openPoll()
        m.polls = append(m.polls, poll)
                // 开启
        go poll.Wait()}
    m.balance.Rebalance(m.polls)
    return nil
}

通过 openPoll()创立 defaultPoll,defaultPoll 实现了 Poll 接口

type Poll interface {Wait() error
   Close() error
   Trigger() error
   Control(operator *FDOperator, event PollEvent) error
}
type defaultPoll struct {
    pollArgs   
    fd      int         // epoll fd
    wop     *FDOperator // eventfd, wake epoll_wait
    buf     []byte      // read wfd trigger msg
    trigger uint32      // trigger flag
    Reset   func(size, caps int)
    Handler func(events []epollevent) (closed bool)
}

// 这里才是真正工作的中央,后面都是筹备工作
// 进入 for 循环阻塞,期待音讯到来

func (p *defaultPoll) Wait() (err error) {
    // init
    var caps, msec, n = barriercap, -1, 0
    p.Reset(128, caps)
    // wait
    for {
        if n == p.size && p.size < 128*1024 {p.Reset(p.size<<1, caps)
        }
                //linux epoll
        n, err = EpollWait(p.fd, p.events, msec)
        if err != nil && err != syscall.EINTR {return err}
        if n <= 0 {
            msec = -1
            runtime.Gosched()
            continue
        }
        msec = 0
        if p.Handler(p.events[:n]) {return nil}
    }
}

// 解决监听事件
// 这个能够通过协程池来操作,以缩小零碎频繁调度协程的开销

func (p *defaultPoll) handler(events []epollevent) (closed bool) {var hups []*FDOperator // TODO: maybe can use sync.Pool
    for i := range events {var operator = *(**FDOperator)(unsafe.Pointer(&events[i].data))
        // trigger or exit gracefully
                // 解决 trigger 事件或者 exit
        if operator.FD == p.wop.FD {
            ......
            continue
        }
        .........
        default:
                       // 数据进来
            if evt&syscall.EPOLLIN != 0 {
                if operator.OnRead != nil {
                                       // 解决连贯
                    // for non-connection
                    operator.OnRead(p)
                } else {
                                      // 解决已连贯
                    // for connection
                    var bs = operator.Inputs(p.barriers[i].bs)
                    if len(bs) > 0 {var n, err = readv(operator.FD, bs, p.barriers[i].ivs)
                        operator.InputAck(n)
                        .......
                    }
                }
            }
                        // 数据进来
            if evt&syscall.EPOLLOUT != 0 {.......}
        }
        operator.done()}
    if len(hups) > 0 {
                // 从 poll 轮询中删除监听事件
        p.detaches(hups)
    }
    return false
}

// 解决连贯事件

func (s *server) OnRead(p Poll) error {
   // accept socket
   conn, err := s.ln.Accept()
   if err != nil {
      // shut down
      if strings.Contains(err.Error(), "closed") {s.operator.Control(PollDetach)
         s.quit(err)
         return err
      }
      log.Println("accept conn failed:", err.Error())
      return err
   }
   if conn == nil {return nil}
   // store & register connection
    // 保留连贯,在退出的时候用于敞开连贯
   var connection = &connection{}
   connection.init(conn.(Conn), s.prepare)
   if !connection.IsActive() {return nil}
   var fd = conn.(Conn).Fd()
   connection.AddCloseCallback(func(connection Connection) error {s.connections.Delete(fd)
      return nil
   })
   s.connections.Store(fd, connection)
   return nil
}

// 解决数据
// 返回给注册 onRequest 的中央,到这里一个次要流程就完了

func (c *connection) inputAck(n int) (err error) {
   if n < 0 {n = 0}
   leftover := atomic.AddInt32(&c.waitReadSize, int32(-n))
   err = c.inputBuffer.BookAck(n, leftover <= 0)
   c.triggerRead()
   c.onRequest()
   return err
}

buffer 解决上 设计了 Nocopy Buffer,Nocopy Buffer 基于链表数组实现。
netpoll 外面有许多细节处的设计,这里只是做一个十分浅显的导读。
目前看到的代码 只反对了 linux,macos。

正文完
 0