本章将介绍tcp库的peer端,本节次要讲述connector端创立和收发信息


connector

type tcpConnector struct {    peer.SessionManager    peer.CorePeerProperty    peer.CoreContextSet    peer.CoreRunningTag    peer.CoreProcBundle    peer.CoreTCPSocketOption    peer.CoreCaptureIOPanic    defaultSes *tcpSession    tryConnTimes int // 尝试连贯次数    sesEndSignal sync.WaitGroup    reconDur time.Duration}

tcpSession是负责解决音讯接管。tcpSession的构造定义

type tcpSession struct {    peer.CoreContextSet    peer.CoreSessionIdentify    *peer.CoreProcBundle    pInterface cellnet.Peer    // Socket原始连贯    conn      net.Conn    connGuard sync.RWMutex    // 退出同步器    exitSync sync.WaitGroup    // 发送队列    sendQueue *cellnet.Pipe    cleanupGuard sync.Mutex    endNotify func()    closing int64}

tcpConnector 初始化连贯。conn实例保留在tcpSession

// 连接器,传入连贯地址和发送封包次数func (self *tcpConnector) connect(address string) {    self.SetRunning(true)    for {        self.tryConnTimes++        // 尝试用Socket连贯地址        conn, err := net.Dial("tcp", address)        self.defaultSes.setConn(conn)        // 产生谬误时退出        if err != nil {            if self.tryConnTimes <= reportConnectFailedLimitTimes {                log.Errorf("#tcp.connect failed(%s) %v", self.Name(), err.Error())                if self.tryConnTimes == reportConnectFailedLimitTimes {                    log.Errorf("(%s) continue reconnecting, but mute log", self.Name())                }            }            // 没重连就退出            if self.ReconnectDuration() == 0 || self.IsStopping() {                self.ProcEvent(&cellnet.RecvMsgEvent{                    Ses: self.defaultSes,                    Msg: &cellnet.SessionConnectError{},                })                break            }            // 有重连就期待            time.Sleep(self.ReconnectDuration())            // 持续连贯            continue        }        self.sesEndSignal.Add(1)        self.ApplySocketOption(conn)        self.defaultSes.Start()        self.tryConnTimes = 0        self.ProcEvent(&cellnet.RecvMsgEvent{Ses: self.defaultSes, Msg: &cellnet.SessionConnected{}})        self.sesEndSignal.Wait()        self.defaultSes.setConn(nil)        // 没重连就退出/被动退出        if self.IsStopping() || self.ReconnectDuration() == 0 {            break        }        // 有重连就期待        time.Sleep(self.ReconnectDuration())        // 持续连贯        continue    }    self.SetRunning(false)    self.EndStopping()}
 self.defaultSes.Start()会开启两个goroutine,一个是发送循环,一个是接管循环
// 发送循环func (self *tcpSession) sendLoop() {    var writeList []interface{}    var capturePanic bool    if i, ok := self.Peer().(cellnet.PeerCaptureIOPanic); ok {        capturePanic = i.CaptureIOPanic()    }    for {        writeList = writeList[0:0]        exit := self.sendQueue.Pick(&writeList)        // 遍历要发送的数据        for _, msg := range writeList {            if capturePanic {                self.protectedSendMessage(&cellnet.SendMsgEvent{Ses: self, Msg: msg})            } else {                self.SendMessage(&cellnet.SendMsgEvent{Ses: self, Msg: msg})            }        }        if exit {            break        }    }    // 残缺敞开    conn := self.Conn()    if conn != nil {        conn.Close()    }    // 告诉实现    self.exitSync.Done()}
// 接管循环func (self *tcpSession) recvLoop() {    var capturePanic bool    if i, ok := self.Peer().(cellnet.PeerCaptureIOPanic); ok {        capturePanic = i.CaptureIOPanic()    }    for self.Conn() != nil {        var msg interface{}        var err error        if capturePanic {            msg, err = self.protectedReadMessage()        } else {            msg, err = self.ReadMessage(self)        }        if err != nil {            if !util.IsEOFOrNetReadError(err) {                var ip string                if self.conn != nil {                    addr := self.conn.RemoteAddr()                    if addr != nil {                        ip = addr.String()                    }                }                log.Errorf("session closed, sesid: %d, err: %s ip: %s", self.ID(), err, ip)            }            self.sendQueue.Add(nil)            // 标记为手动敞开起因            closedMsg := &cellnet.SessionClosed{}            if self.IsManualClosed() {                closedMsg.Reason = cellnet.CloseReason_Manual            }            self.ProcEvent(&cellnet.RecvMsgEvent{Ses: self, Msg: closedMsg})            break        }        self.ProcEvent(&cellnet.RecvMsgEvent{Ses: self, Msg: msg})    }    // 告诉实现    self.exitSync.Done()}

self.ProcEvent函数会调用proc.BindProcessorHandler的第三个参数,也就是用户自定义的数据处理函数,上面是一个例子。用户能够依据本人的须要自定义数据处理函数

proc.BindProcessorHandler(peerIns, "tcp.ltv", func(ev cellnet.Event) {        switch msg := ev.Message().(type) {        case *cellnet.SessionConnected: // 曾经连贯上            fmt.Println("client connected")            ev.Session().Send(&TestEchoACK{                Msg:   "hello",                Value: 1234,            })        case *TestEchoACK: //收到服务器发送的音讯            fmt.Printf("client recv %+v\n", msg)            // 实现操作            done <- struct{}{}        case *cellnet.SessionClosed:            fmt.Println("client closed")        }    })

以上就是一条信息处理的次要步骤。