关于golang:golang-网络连接库-cellnet2

8次阅读

共计 3475 个字符,预计需要花费 9 分钟才能阅读完成。

本章将介绍 tcp 库的 peer 端,本节次要讲述 connector 端创立和收发信息


connector

type tcpConnector struct {
    peer.SessionManager

    peer.CorePeerProperty
    peer.CoreContextSet
    peer.CoreRunningTag
    peer.CoreProcBundle
    peer.CoreTCPSocketOption
    peer.CoreCaptureIOPanic

    defaultSes *tcpSession

    tryConnTimes int // 尝试连贯次数

    sesEndSignal sync.WaitGroup

    reconDur time.Duration
}

tcpSession 是负责解决音讯接管。tcpSession 的构造定义

type tcpSession struct {
    peer.CoreContextSet
    peer.CoreSessionIdentify
    *peer.CoreProcBundle

    pInterface cellnet.Peer

    // Socket 原始连贯
    conn      net.Conn
    connGuard sync.RWMutex

    // 退出同步器
    exitSync sync.WaitGroup

    // 发送队列
    sendQueue *cellnet.Pipe

    cleanupGuard sync.Mutex

    endNotify func()

    closing int64
}

tcpConnector 初始化连贯。conn 实例保留在 tcpSession

// 连接器,传入连贯地址和发送封包次数
func (self *tcpConnector) connect(address string) {self.SetRunning(true)

    for {
        self.tryConnTimes++

        // 尝试用 Socket 连贯地址
        conn, err := net.Dial("tcp", address)

        self.defaultSes.setConn(conn)

        // 产生谬误时退出
        if err != nil {

            if self.tryConnTimes <= reportConnectFailedLimitTimes {log.Errorf("#tcp.connect failed(%s) %v", self.Name(), err.Error())

                if self.tryConnTimes == reportConnectFailedLimitTimes {log.Errorf("(%s) continue reconnecting, but mute log", self.Name())
                }
            }

            // 没重连就退出
            if self.ReconnectDuration() == 0 || self.IsStopping() {

                self.ProcEvent(&cellnet.RecvMsgEvent{
                    Ses: self.defaultSes,
                    Msg: &cellnet.SessionConnectError{},})
                break
            }

            // 有重连就期待
            time.Sleep(self.ReconnectDuration())

            // 持续连贯
            continue
        }

        self.sesEndSignal.Add(1)

        self.ApplySocketOption(conn)

        self.defaultSes.Start()

        self.tryConnTimes = 0

        self.ProcEvent(&cellnet.RecvMsgEvent{Ses: self.defaultSes, Msg: &cellnet.SessionConnected{}})

        self.sesEndSignal.Wait()

        self.defaultSes.setConn(nil)

        // 没重连就退出 / 被动退出
        if self.IsStopping() || self.ReconnectDuration() == 0 {break}

        // 有重连就期待
        time.Sleep(self.ReconnectDuration())

        // 持续连贯
        continue

    }

    self.SetRunning(false)

    self.EndStopping()}
 self.defaultSes.Start() 会开启两个 goroutine,一个是发送循环,一个是接管循环 
// 发送循环
func (self *tcpSession) sendLoop() {var writeList []interface{}

    var capturePanic bool

    if i, ok := self.Peer().(cellnet.PeerCaptureIOPanic); ok {capturePanic = i.CaptureIOPanic()
    }

    for {writeList = writeList[0:0]
        exit := self.sendQueue.Pick(&writeList)

        // 遍历要发送的数据
        for _, msg := range writeList {

            if capturePanic {self.protectedSendMessage(&cellnet.SendMsgEvent{Ses: self, Msg: msg})
            } else {self.SendMessage(&cellnet.SendMsgEvent{Ses: self, Msg: msg})
            }
        }

        if exit {break}
    }

    // 残缺敞开
    conn := self.Conn()
    if conn != nil {conn.Close()
    }

    // 告诉实现
    self.exitSync.Done()}
// 接管循环
func (self *tcpSession) recvLoop() {

    var capturePanic bool

    if i, ok := self.Peer().(cellnet.PeerCaptureIOPanic); ok {capturePanic = i.CaptureIOPanic()
    }

    for self.Conn() != nil {var msg interface{}
        var err error

        if capturePanic {msg, err = self.protectedReadMessage()
        } else {msg, err = self.ReadMessage(self)
        }

        if err != nil {if !util.IsEOFOrNetReadError(err) {

                var ip string
                if self.conn != nil {addr := self.conn.RemoteAddr()
                    if addr != nil {ip = addr.String()
                    }
                }

                log.Errorf("session closed, sesid: %d, err: %s ip: %s", self.ID(), err, ip)
            }

            self.sendQueue.Add(nil)

            // 标记为手动敞开起因
            closedMsg := &cellnet.SessionClosed{}
            if self.IsManualClosed() {closedMsg.Reason = cellnet.CloseReason_Manual}

            self.ProcEvent(&cellnet.RecvMsgEvent{Ses: self, Msg: closedMsg})
            break
        }

        self.ProcEvent(&cellnet.RecvMsgEvent{Ses: self, Msg: msg})
    }

    // 告诉实现
    self.exitSync.Done()}

self.ProcEvent 函数会调用 proc.BindProcessorHandler 的第三个参数,也就是用户自定义的数据处理函数,上面是一个例子。用户能够依据本人的须要自定义数据处理函数

proc.BindProcessorHandler(peerIns, "tcp.ltv", func(ev cellnet.Event) {switch msg := ev.Message().(type) {
        case *cellnet.SessionConnected: // 曾经连贯上
            fmt.Println("client connected")
            ev.Session().Send(&TestEchoACK{
                Msg:   "hello",
                Value: 1234,
            })
        case *TestEchoACK: // 收到服务器发送的音讯

            fmt.Printf("client recv %+v\n", msg)

            // 实现操作
            done <- struct{}{}

        case *cellnet.SessionClosed:
            fmt.Println("client closed")
        }
    })

以上就是一条信息处理的次要步骤。

正文完
 0