本章将介绍 tcp 库的 peer 端,本节次要讲述 connector 端创立和收发信息
connector
type tcpConnector struct {
peer.SessionManager
peer.CorePeerProperty
peer.CoreContextSet
peer.CoreRunningTag
peer.CoreProcBundle
peer.CoreTCPSocketOption
peer.CoreCaptureIOPanic
defaultSes *tcpSession
tryConnTimes int // 尝试连贯次数
sesEndSignal sync.WaitGroup
reconDur time.Duration
}
tcpSession 是负责解决音讯接管。tcpSession 的构造定义
type tcpSession struct {
peer.CoreContextSet
peer.CoreSessionIdentify
*peer.CoreProcBundle
pInterface cellnet.Peer
// Socket 原始连贯
conn net.Conn
connGuard sync.RWMutex
// 退出同步器
exitSync sync.WaitGroup
// 发送队列
sendQueue *cellnet.Pipe
cleanupGuard sync.Mutex
endNotify func()
closing int64
}
tcpConnector 初始化连贯。conn 实例保留在 tcpSession
// 连接器,传入连贯地址和发送封包次数
func (self *tcpConnector) connect(address string) {self.SetRunning(true)
for {
self.tryConnTimes++
// 尝试用 Socket 连贯地址
conn, err := net.Dial("tcp", address)
self.defaultSes.setConn(conn)
// 产生谬误时退出
if err != nil {
if self.tryConnTimes <= reportConnectFailedLimitTimes {log.Errorf("#tcp.connect failed(%s) %v", self.Name(), err.Error())
if self.tryConnTimes == reportConnectFailedLimitTimes {log.Errorf("(%s) continue reconnecting, but mute log", self.Name())
}
}
// 没重连就退出
if self.ReconnectDuration() == 0 || self.IsStopping() {
self.ProcEvent(&cellnet.RecvMsgEvent{
Ses: self.defaultSes,
Msg: &cellnet.SessionConnectError{},})
break
}
// 有重连就期待
time.Sleep(self.ReconnectDuration())
// 持续连贯
continue
}
self.sesEndSignal.Add(1)
self.ApplySocketOption(conn)
self.defaultSes.Start()
self.tryConnTimes = 0
self.ProcEvent(&cellnet.RecvMsgEvent{Ses: self.defaultSes, Msg: &cellnet.SessionConnected{}})
self.sesEndSignal.Wait()
self.defaultSes.setConn(nil)
// 没重连就退出 / 被动退出
if self.IsStopping() || self.ReconnectDuration() == 0 {break}
// 有重连就期待
time.Sleep(self.ReconnectDuration())
// 持续连贯
continue
}
self.SetRunning(false)
self.EndStopping()}
self.defaultSes.Start() 会开启两个 goroutine,一个是发送循环,一个是接管循环
// 发送循环
func (self *tcpSession) sendLoop() {var writeList []interface{}
var capturePanic bool
if i, ok := self.Peer().(cellnet.PeerCaptureIOPanic); ok {capturePanic = i.CaptureIOPanic()
}
for {writeList = writeList[0:0]
exit := self.sendQueue.Pick(&writeList)
// 遍历要发送的数据
for _, msg := range writeList {
if capturePanic {self.protectedSendMessage(&cellnet.SendMsgEvent{Ses: self, Msg: msg})
} else {self.SendMessage(&cellnet.SendMsgEvent{Ses: self, Msg: msg})
}
}
if exit {break}
}
// 残缺敞开
conn := self.Conn()
if conn != nil {conn.Close()
}
// 告诉实现
self.exitSync.Done()}
// 接管循环
func (self *tcpSession) recvLoop() {
var capturePanic bool
if i, ok := self.Peer().(cellnet.PeerCaptureIOPanic); ok {capturePanic = i.CaptureIOPanic()
}
for self.Conn() != nil {var msg interface{}
var err error
if capturePanic {msg, err = self.protectedReadMessage()
} else {msg, err = self.ReadMessage(self)
}
if err != nil {if !util.IsEOFOrNetReadError(err) {
var ip string
if self.conn != nil {addr := self.conn.RemoteAddr()
if addr != nil {ip = addr.String()
}
}
log.Errorf("session closed, sesid: %d, err: %s ip: %s", self.ID(), err, ip)
}
self.sendQueue.Add(nil)
// 标记为手动敞开起因
closedMsg := &cellnet.SessionClosed{}
if self.IsManualClosed() {closedMsg.Reason = cellnet.CloseReason_Manual}
self.ProcEvent(&cellnet.RecvMsgEvent{Ses: self, Msg: closedMsg})
break
}
self.ProcEvent(&cellnet.RecvMsgEvent{Ses: self, Msg: msg})
}
// 告诉实现
self.exitSync.Done()}
self.ProcEvent 函数会调用 proc.BindProcessorHandler 的第三个参数,也就是用户自定义的数据处理函数,上面是一个例子。用户能够依据本人的须要自定义数据处理函数
proc.BindProcessorHandler(peerIns, "tcp.ltv", func(ev cellnet.Event) {switch msg := ev.Message().(type) {
case *cellnet.SessionConnected: // 曾经连贯上
fmt.Println("client connected")
ev.Session().Send(&TestEchoACK{
Msg: "hello",
Value: 1234,
})
case *TestEchoACK: // 收到服务器发送的音讯
fmt.Printf("client recv %+v\n", msg)
// 实现操作
done <- struct{}{}
case *cellnet.SessionClosed:
fmt.Println("client closed")
}
})
以上就是一条信息处理的次要步骤。