关于golang:Netpoll导读

Netpoll 是由 字节跳动 开发的高性能 NIO(Non-blocking I/O) 网络库,专一于 RPC 场景。
详情:https://github.com/cloudwego/…

  1. 要理解一个像Netty这样我的项目的源码是一件十分艰难的事件。我的项目自身源码量大,波及的底层常识多,加上利用了各种设计模式,都会对源码浏览减少难度。对于这样的工程最好是依据作者提供的导图先理解个大略,而后充沛浏览文档,头脑中构建一个思维框架。一开始就冲进去直怼源码是一件十分怯懦的事件。这样的我的项目往往是一个大工程,汇聚了很多卓越开源作者的心血,想一口气读完,而且丝毫不差的了解外面的设计思路是不事实的,浏览源码往往咱们只须要浏览它一个次要流程的过程是怎么产生构建的,而不须要字字斟酌的去读,这样容易陷在码海里。通过第一遍理清流程之后,能够第二遍零碎的浏览,而后再去反思作者这样设计的益处。
  2. 源码目录
    netpoll源码文件全在同一个目录下,高深莫测,有多少文件。
  3. 次要架构
  1. 万事开头难,从服务启动开始
func main() {
    //创立 Listener
    listener, err := netpoll.CreateListener(network, address)
    if err != nil {
        panic("create netpoll listener failed")
    }
    //创立 EventLoop
    eventLoop, _ := netpoll.NewEventLoop(
        handler,   //
        netpoll.WithOnPrepare(prepare),
        netpoll.WithReadTimeout(time.Second),
    )
    //运行 Server
    eventLoop.Serve(listener)
}
//扩大net.Listener
type Listener interface {
   net.Listener
   Fd() (fd int)
}

这个步骤是开启MianReactor 作用在于监听端口,获取操作符地址

func CreateListener(network, addr string) (l Listener, err error) {
   if network == "udp" {
      // TODO: udp listener.
      return udpListener(network, addr)
   }
   // tcp, tcp4, tcp6, unix
   //调用go原生
   ln, err := net.Listen(network, addr)
   if err != nil {
      return nil, err
   }
   return ConvertListener(ln)
}

//转换成定义的Listener ,向零碎设置Nonblock

func ConvertListener(l net.Listener) (nl Listener, err error) {
    if tmp, ok := l.(Listener); ok {
        return tmp, nil
    }
    ln := &listener{}
    ln.ln = l
    ln.addr = l.Addr()
    err = ln.parseFD()
    if err != nil {
        return nil, err
    }
    return ln, syscall.SetNonblock(ln.fd, true)
}

//获取操作符

func (ln *listener) parseFD() (err error) {
    switch netln := ln.ln.(type) {
    case *net.TCPListener:
        ln.file, err = netln.File()
    case *net.UnixListener:
        ln.file, err = netln.File()
    default:
        return errors.New("listener type can't support")
    }
    if err != nil {
        return err
    }
    ln.fd = int(ln.file.Fd())
    return nil
}

//看一些构造体,接口的具体定义。
//EventLoop接口

type EventLoop interface {
   Serve(ln net.Listener) error            //开启服务
   Shutdown(ctx context.Context) error     //中断服务
}

//

type eventLoop struct {
   sync.Mutex                    //
   opt     *options              // 参数配置项
   prepare OnPrepare             // handler封装
   svr     *server               //形象的服务实体
   stop    chan error
}

//

type server struct {
   operator    FDOperator            // 文件操作符
   ln          Listener              // 这就是你创立的那个listener
   prepare     OnPrepare             // 这就是你创立的那个handler
   quit        func(err error)       //
   connections sync.Map              // key=fd, value=connection
}

//创立server后,启动服务

func (evl *eventLoop) Serve(ln net.Listener) error {
   npln, err := ConvertListener(ln)
   if err != nil {
      return err
   }
   evl.Lock()
   evl.svr = newServer(npln, evl.prepare, evl.quit)
   evl.svr.Run()
   evl.Unlock()
   return evl.waitQuit()
}

//这里正式让服务跑起来

func (s *server) Run() (err error) {
   s.operator = FDOperator{
      FD:     s.ln.Fd(),
      OnRead: s.OnRead,
      OnHup:  s.OnHup,
   }
    //通过负载平衡算法选取一个Poll
   s.operator.poll = pollmanager.Pick()
    //将poll和零碎操作符事件关联,PollReadable 用于监听连贯的开启或者敞开
   err = s.operator.Control(PollReadable)
   if err != nil {
      s.quit(err)
   }
   return err
}

//eventLoop的个数 通过获取 runtime.GOMAXPROCS(0)来比拟设置
//pollmanager.Pick()
//eventLoop的启动是通过 poll_manager.go中的init()办法启动

type manager struct {
    NumLoops int                //eventloop个数
    balance  loadbalance        // 负载平衡策略  Random、RoundRobin
    polls    []Poll             // all the polls
}
func init() {
   pollmanager = &manager{}
   pollmanager.SetLoadBalance(RoundRobin)
   pollmanager.SetNumLoops(defaultNumLoops())
}

//开启SubReactor,解决连贯,读写,敞开事件
//通过SetNumLoops调用Run()

func (m *manager) Run() error {
    for idx := len(m.polls); idx < m.NumLoops; idx++ {
        var poll = openPoll()
        m.polls = append(m.polls, poll)
                //开启
        go poll.Wait()
    }
    m.balance.Rebalance(m.polls)
    return nil
}

通过openPoll()创立 defaultPoll,defaultPoll实现了Poll接口

type Poll interface {
   Wait() error
   Close() error
   Trigger() error
   Control(operator *FDOperator, event PollEvent) error
}
type defaultPoll struct {
    pollArgs   
    fd      int         // epoll fd
    wop     *FDOperator // eventfd, wake epoll_wait
    buf     []byte      // read wfd trigger msg
    trigger uint32      // trigger flag
    Reset   func(size, caps int)
    Handler func(events []epollevent) (closed bool)
}

//这里才是真正工作的中央,后面都是筹备工作
//进入for循环阻塞,期待音讯到来

func (p *defaultPoll) Wait() (err error) {
    // init
    var caps, msec, n = barriercap, -1, 0
    p.Reset(128, caps)
    // wait
    for {
        if n == p.size && p.size < 128*1024 {
            p.Reset(p.size<<1, caps)
        }
                //linux epoll
        n, err = EpollWait(p.fd, p.events, msec)
        if err != nil && err != syscall.EINTR {
            return err
        }
        if n <= 0 {
            msec = -1
            runtime.Gosched()
            continue
        }
        msec = 0
        if p.Handler(p.events[:n]) {
            return nil
        }
    }
}

//解决监听事件
// 这个能够通过协程池来操作,以缩小零碎频繁调度协程的开销

func (p *defaultPoll) handler(events []epollevent) (closed bool) {
    var hups []*FDOperator // TODO: maybe can use sync.Pool
    for i := range events {
        var operator = *(**FDOperator)(unsafe.Pointer(&events[i].data))
        // trigger or exit gracefully
                //解决trigger事件或者exit
        if operator.FD == p.wop.FD {
            ......
            continue
        }
        .........
        default:
                       //数据进来
            if evt&syscall.EPOLLIN != 0 {
                if operator.OnRead != nil {
                                       //解决连贯
                    // for non-connection
                    operator.OnRead(p)
                } else {
                                      //解决已连贯
                    // for connection
                    var bs = operator.Inputs(p.barriers[i].bs)
                    if len(bs) > 0 {
                        var n, err = readv(operator.FD, bs, p.barriers[i].ivs)
                        operator.InputAck(n)
                        .......
                    }
                }
            }
                        //数据进来
            if evt&syscall.EPOLLOUT != 0 {
                .......
            }
        }
        operator.done()
    }
    if len(hups) > 0 {
                //从poll轮询中删除监听事件
        p.detaches(hups)
    }
    return false
}

//解决连贯事件

func (s *server) OnRead(p Poll) error {
   // accept socket
   conn, err := s.ln.Accept()
   if err != nil {
      // shut down
      if strings.Contains(err.Error(), "closed") {
         s.operator.Control(PollDetach)
         s.quit(err)
         return err
      }
      log.Println("accept conn failed:", err.Error())
      return err
   }
   if conn == nil {
      return nil
   }
   // store & register connection
    //保留连贯,在退出的时候用于敞开连贯
   var connection = &connection{}
   connection.init(conn.(Conn), s.prepare)
   if !connection.IsActive() {
      return nil
   }
   var fd = conn.(Conn).Fd()
   connection.AddCloseCallback(func(connection Connection) error {
      s.connections.Delete(fd)
      return nil
   })
   s.connections.Store(fd, connection)
   return nil
}

//解决数据
//返回给注册onRequest的中央,到这里一个次要流程就完了

func (c *connection) inputAck(n int) (err error) {
   if n < 0 {
      n = 0
   }
   leftover := atomic.AddInt32(&c.waitReadSize, int32(-n))
   err = c.inputBuffer.BookAck(n, leftover <= 0)
   c.triggerRead()
   c.onRequest()
   return err
}

buffer 解决上 设计了Nocopy Buffer ,Nocopy Buffer基于链表数组实现。
netpoll外面有许多细节处的设计,这里只是做一个十分浅显的导读。
目前看到的代码 只反对了linux,macos。

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理