本章将介绍tcp库的peer端,本节次要讲述connector端创立和收发信息
connector
type tcpConnector struct { peer.SessionManager peer.CorePeerProperty peer.CoreContextSet peer.CoreRunningTag peer.CoreProcBundle peer.CoreTCPSocketOption peer.CoreCaptureIOPanic defaultSes *tcpSession tryConnTimes int // 尝试连贯次数 sesEndSignal sync.WaitGroup reconDur time.Duration}
tcpSession是负责解决音讯接管。tcpSession的构造定义
type tcpSession struct { peer.CoreContextSet peer.CoreSessionIdentify *peer.CoreProcBundle pInterface cellnet.Peer // Socket原始连贯 conn net.Conn connGuard sync.RWMutex // 退出同步器 exitSync sync.WaitGroup // 发送队列 sendQueue *cellnet.Pipe cleanupGuard sync.Mutex endNotify func() closing int64}
tcpConnector 初始化连贯。conn实例保留在tcpSession
// 连接器,传入连贯地址和发送封包次数func (self *tcpConnector) connect(address string) { self.SetRunning(true) for { self.tryConnTimes++ // 尝试用Socket连贯地址 conn, err := net.Dial("tcp", address) self.defaultSes.setConn(conn) // 产生谬误时退出 if err != nil { if self.tryConnTimes <= reportConnectFailedLimitTimes { log.Errorf("#tcp.connect failed(%s) %v", self.Name(), err.Error()) if self.tryConnTimes == reportConnectFailedLimitTimes { log.Errorf("(%s) continue reconnecting, but mute log", self.Name()) } } // 没重连就退出 if self.ReconnectDuration() == 0 || self.IsStopping() { self.ProcEvent(&cellnet.RecvMsgEvent{ Ses: self.defaultSes, Msg: &cellnet.SessionConnectError{}, }) break } // 有重连就期待 time.Sleep(self.ReconnectDuration()) // 持续连贯 continue } self.sesEndSignal.Add(1) self.ApplySocketOption(conn) self.defaultSes.Start() self.tryConnTimes = 0 self.ProcEvent(&cellnet.RecvMsgEvent{Ses: self.defaultSes, Msg: &cellnet.SessionConnected{}}) self.sesEndSignal.Wait() self.defaultSes.setConn(nil) // 没重连就退出/被动退出 if self.IsStopping() || self.ReconnectDuration() == 0 { break } // 有重连就期待 time.Sleep(self.ReconnectDuration()) // 持续连贯 continue } self.SetRunning(false) self.EndStopping()}
self.defaultSes.Start()会开启两个goroutine,一个是发送循环,一个是接管循环
// 发送循环func (self *tcpSession) sendLoop() { var writeList []interface{} var capturePanic bool if i, ok := self.Peer().(cellnet.PeerCaptureIOPanic); ok { capturePanic = i.CaptureIOPanic() } for { writeList = writeList[0:0] exit := self.sendQueue.Pick(&writeList) // 遍历要发送的数据 for _, msg := range writeList { if capturePanic { self.protectedSendMessage(&cellnet.SendMsgEvent{Ses: self, Msg: msg}) } else { self.SendMessage(&cellnet.SendMsgEvent{Ses: self, Msg: msg}) } } if exit { break } } // 残缺敞开 conn := self.Conn() if conn != nil { conn.Close() } // 告诉实现 self.exitSync.Done()}
// 接管循环func (self *tcpSession) recvLoop() { var capturePanic bool if i, ok := self.Peer().(cellnet.PeerCaptureIOPanic); ok { capturePanic = i.CaptureIOPanic() } for self.Conn() != nil { var msg interface{} var err error if capturePanic { msg, err = self.protectedReadMessage() } else { msg, err = self.ReadMessage(self) } if err != nil { if !util.IsEOFOrNetReadError(err) { var ip string if self.conn != nil { addr := self.conn.RemoteAddr() if addr != nil { ip = addr.String() } } log.Errorf("session closed, sesid: %d, err: %s ip: %s", self.ID(), err, ip) } self.sendQueue.Add(nil) // 标记为手动敞开起因 closedMsg := &cellnet.SessionClosed{} if self.IsManualClosed() { closedMsg.Reason = cellnet.CloseReason_Manual } self.ProcEvent(&cellnet.RecvMsgEvent{Ses: self, Msg: closedMsg}) break } self.ProcEvent(&cellnet.RecvMsgEvent{Ses: self, Msg: msg}) } // 告诉实现 self.exitSync.Done()}
self.ProcEvent函数会调用proc.BindProcessorHandler的第三个参数,也就是用户自定义的数据处理函数,上面是一个例子。用户能够依据本人的须要自定义数据处理函数
proc.BindProcessorHandler(peerIns, "tcp.ltv", func(ev cellnet.Event) { switch msg := ev.Message().(type) { case *cellnet.SessionConnected: // 曾经连贯上 fmt.Println("client connected") ev.Session().Send(&TestEchoACK{ Msg: "hello", Value: 1234, }) case *TestEchoACK: //收到服务器发送的音讯 fmt.Printf("client recv %+v\n", msg) // 实现操作 done <- struct{}{} case *cellnet.SessionClosed: fmt.Println("client closed") } })
以上就是一条信息处理的次要步骤。