本章将介绍tcp库的peer端,本节次要讲述connector端创立和收发信息
connector
type tcpConnector struct {
peer.SessionManager
peer.CorePeerProperty
peer.CoreContextSet
peer.CoreRunningTag
peer.CoreProcBundle
peer.CoreTCPSocketOption
peer.CoreCaptureIOPanic
defaultSes *tcpSession
tryConnTimes int // 尝试连贯次数
sesEndSignal sync.WaitGroup
reconDur time.Duration
}
tcpSession是负责解决音讯接管。tcpSession的构造定义
type tcpSession struct {
peer.CoreContextSet
peer.CoreSessionIdentify
*peer.CoreProcBundle
pInterface cellnet.Peer
// Socket原始连贯
conn net.Conn
connGuard sync.RWMutex
// 退出同步器
exitSync sync.WaitGroup
// 发送队列
sendQueue *cellnet.Pipe
cleanupGuard sync.Mutex
endNotify func()
closing int64
}
tcpConnector 初始化连贯。conn实例保留在tcpSession
// 连接器,传入连贯地址和发送封包次数
func (self *tcpConnector) connect(address string) {
self.SetRunning(true)
for {
self.tryConnTimes++
// 尝试用Socket连贯地址
conn, err := net.Dial("tcp", address)
self.defaultSes.setConn(conn)
// 产生谬误时退出
if err != nil {
if self.tryConnTimes <= reportConnectFailedLimitTimes {
log.Errorf("#tcp.connect failed(%s) %v", self.Name(), err.Error())
if self.tryConnTimes == reportConnectFailedLimitTimes {
log.Errorf("(%s) continue reconnecting, but mute log", self.Name())
}
}
// 没重连就退出
if self.ReconnectDuration() == 0 || self.IsStopping() {
self.ProcEvent(&cellnet.RecvMsgEvent{
Ses: self.defaultSes,
Msg: &cellnet.SessionConnectError{},
})
break
}
// 有重连就期待
time.Sleep(self.ReconnectDuration())
// 持续连贯
continue
}
self.sesEndSignal.Add(1)
self.ApplySocketOption(conn)
self.defaultSes.Start()
self.tryConnTimes = 0
self.ProcEvent(&cellnet.RecvMsgEvent{Ses: self.defaultSes, Msg: &cellnet.SessionConnected{}})
self.sesEndSignal.Wait()
self.defaultSes.setConn(nil)
// 没重连就退出/被动退出
if self.IsStopping() || self.ReconnectDuration() == 0 {
break
}
// 有重连就期待
time.Sleep(self.ReconnectDuration())
// 持续连贯
continue
}
self.SetRunning(false)
self.EndStopping()
}
self.defaultSes.Start()会开启两个goroutine,一个是发送循环,一个是接管循环
// 发送循环
func (self *tcpSession) sendLoop() {
var writeList []interface{}
var capturePanic bool
if i, ok := self.Peer().(cellnet.PeerCaptureIOPanic); ok {
capturePanic = i.CaptureIOPanic()
}
for {
writeList = writeList[0:0]
exit := self.sendQueue.Pick(&writeList)
// 遍历要发送的数据
for _, msg := range writeList {
if capturePanic {
self.protectedSendMessage(&cellnet.SendMsgEvent{Ses: self, Msg: msg})
} else {
self.SendMessage(&cellnet.SendMsgEvent{Ses: self, Msg: msg})
}
}
if exit {
break
}
}
// 残缺敞开
conn := self.Conn()
if conn != nil {
conn.Close()
}
// 告诉实现
self.exitSync.Done()
}
// 接管循环
func (self *tcpSession) recvLoop() {
var capturePanic bool
if i, ok := self.Peer().(cellnet.PeerCaptureIOPanic); ok {
capturePanic = i.CaptureIOPanic()
}
for self.Conn() != nil {
var msg interface{}
var err error
if capturePanic {
msg, err = self.protectedReadMessage()
} else {
msg, err = self.ReadMessage(self)
}
if err != nil {
if !util.IsEOFOrNetReadError(err) {
var ip string
if self.conn != nil {
addr := self.conn.RemoteAddr()
if addr != nil {
ip = addr.String()
}
}
log.Errorf("session closed, sesid: %d, err: %s ip: %s", self.ID(), err, ip)
}
self.sendQueue.Add(nil)
// 标记为手动敞开起因
closedMsg := &cellnet.SessionClosed{}
if self.IsManualClosed() {
closedMsg.Reason = cellnet.CloseReason_Manual
}
self.ProcEvent(&cellnet.RecvMsgEvent{Ses: self, Msg: closedMsg})
break
}
self.ProcEvent(&cellnet.RecvMsgEvent{Ses: self, Msg: msg})
}
// 告诉实现
self.exitSync.Done()
}
self.ProcEvent函数会调用proc.BindProcessorHandler的第三个参数,也就是用户自定义的数据处理函数,上面是一个例子。用户能够依据本人的须要自定义数据处理函数
proc.BindProcessorHandler(peerIns, "tcp.ltv", func(ev cellnet.Event) {
switch msg := ev.Message().(type) {
case *cellnet.SessionConnected: // 曾经连贯上
fmt.Println("client connected")
ev.Session().Send(&TestEchoACK{
Msg: "hello",
Value: 1234,
})
case *TestEchoACK: //收到服务器发送的音讯
fmt.Printf("client recv %+v\n", msg)
// 实现操作
done <- struct{}{}
case *cellnet.SessionClosed:
fmt.Println("client closed")
}
})
以上就是一条信息处理的次要步骤。
发表回复