缘起
最近浏览<<Go微服务实战>> (刘金亮, 2021.1)
本系列笔记拟采纳golang练习之
案例需要(聊天服务器)
- 用户能够连贯到服务器。
- 用户能够设定本人的用户名。
- 用户能够向服务器发送音讯,同时服务器也会向其余用户播送该音讯。
指标(Day 4)
- 诊断并修复内存透露
诊断
- 在day 3的代码根底上, 应用go tool pprof查看heap日志
$ go tool pprof ~/chat_server_mem.profile File: chat_server.testType: inuse_spaceTime: Mar 10, 2021 at 7:35am (CST)Entering interactive mode (type "help" for commands, "o" for options)(pprof) topShowing nodes accounting for 9495.99kB, 100% of 9495.99kB totalShowing top 10 nodes out of 12 flat flat% sum% cum cum% 7287.48kB 76.74% 76.74% 7287.48kB 76.74% time.startTimer 1184.27kB 12.47% 89.21% 1184.27kB 12.47% runtime/pprof.StartCPUProfile 512.19kB 5.39% 94.61% 512.19kB 5.39% runtime.malg 512.05kB 5.39% 100% 7799.53kB 82.13% learning/gooop/chat_server.(*tChatClient).beginWrite 0 0% 100% 1184.27kB 12.47% command-line-arguments.Test_ChatServer 0 0% 100% 512.19kB 5.39% runtime.mstart 0 0% 100% 512.19kB 5.39% runtime.newproc.func1 0 0% 100% 512.19kB 5.39% runtime.newproc1 0 0% 100% 512.19kB 5.39% runtime.systemstack 0 0% 100% 1184.27kB 12.47% testing.tRunner(pprof)
- 疑似有两个透露点, 一个是time.startTimer, 一个是(*tChatClient).beginWrite
- 因为(*tChatClient).beginWrite才是业务代码, 且cum% > time.startTimer的cum%
因而能够狐疑:
- (*tChatClient).beginWrite是内存透露的根本点
- 次要透露起因是调用了太屡次time.startTimer
复查代码
复查tChatClient.beginWrite的代码, 导致一直分配内存的点可能有两个:
- Logging.Logf, 一直追加日志.
- 解决办法: 革新Logging, 限度最多日志条数(应用容量无限的队列)
- for循环中一直调用time.After, 导致大量创立timer.
- 解决办法: 不应用time.After, 而应用独立的routine和timer检测读超时
func (me *tChatClient) beginWrite() { Logging.Logf("tChatClient.beginWrite, %v, serverFlag=%v", me.name, me.serverFlag) writer := io.Writer(me.conn) for { select { case <- me.closeChan: Logging.Logf("tChatClient.beginWrite, <- closeChan, %v, serverFlag=%v", me.name, me.serverFlag) _ = me.conn.Close() me.closeFlag = 2 me.postConnClosed() return case msg := <- me.sendChan: atomic.AddInt32(&me.pendingSend, -1) _,e := writer.Write([]byte(msg.Encode())) if e != nil { Logging.Logf("tChatClient.beginWrite, write error, %v, serverFlag=%v", me.name, me.serverFlag) me.closeConn() } else { me.sendLogs = append(me.sendLogs, msg) } break case <- time.After(time.Duration(5) * time.Second): me.postRecvTimeout() break } }}
革新Logging
次要是将日志数组革新为容量有下限的日志队列, 避免诊断日志的采集, 导致内存有限增长.
package chat_serverimport ( "fmt" "sync")type ILoggingService interface { Logf(f string, args... interface{}) AllLogs() []string}type tLoggingService struct { mutex *sync.Mutex logs []string capacity int rindex int windex int}var gMaxLogs = 10_000var gEmptyString = ""func newLoggingService() ILoggingService { return &tLoggingService{ mutex: new(sync.Mutex), logs: make([]string, gMaxLogs*2), //logs: make([]string, 0), capacity: gMaxLogs, rindex: 0, windex: 0, }}func (me *tLoggingService) size() int { return me.windex - me.rindex}func (me *tLoggingService) Logf(f string, args... interface{}) { log := fmt.Sprintf(f, args...) me.mutex.Lock() //me.logs = append(me.logs, log) me.ensureSpace() me.logs[me.windex] = log me.windex++ me.mutex.Unlock() fmt.Println(log)}func (me *tLoggingService) ensureSpace() { for me.size() >= me.capacity { // dequeue head items me.logs[me.rindex] = gEmptyString me.rindex++ } if me.rindex >= me.capacity { // move data to offset 0 for i,n := 0, me.size();i < n;i++ { me.logs[i], me.logs[i + me.rindex] = me.logs[i + me.rindex], gEmptyString } // reset read and write index me.windex, me.rindex = me.windex - me.rindex, 0 }}func (me *tLoggingService) AllLogs() []string { return me.logs}var Logging = newLoggingService()
革新tChatClient
- 去掉写循环中, time.After的调用
- 应用专门的routine和读计数器, 检测读超时的情况
func (me *tChatClient) open(){ if !atomic.CompareAndSwapInt32(&me.openFlag, 0, 1) { return } go me.beginWrite() go me.beginRead() // 读超时检测 go me.beginWatchRecvTimeout() }func (me *tChatClient) beginWatchRecvTimeout() { duration := time.Duration(5) for range time.Tick(duration * time.Second) { if me.isClosed() { break } me.timeoutCounter++ if me.timeoutCounter >= 3 { me.postRecvTimeout() } }}func (me *tChatClient) beginWrite() { Logging.Logf("tChatClient.beginWrite, %v, serverFlag=%v", me.name, me.serverFlag) writer := io.Writer(me.conn) for { select { case <- me.closeChan: Logging.Logf("tChatClient.beginWrite, <- closeChan, %v, serverFlag=%v", me.name, me.serverFlag) _ = me.conn.Close() me.closeFlag = 2 me.postConnClosed() return case msg := <- me.sendChan: atomic.AddInt32(&me.pendingSend, -1) _,e := writer.Write([]byte(msg.Encode())) if e != nil { Logging.Logf("tChatClient.beginWrite, write error, %v, serverFlag=%v", me.name, me.serverFlag) me.closeConn() } else { me.sendLogs = append(me.sendLogs, msg) } break //case <- time.After(time.Duration(5) * time.Second): // me.postRecvTimeout() // break } }}func (me *tChatClient) beginRead() { reader := bufio.NewReader(me.conn) for { line, err := reader.ReadString('\n') if err != nil { Logging.Logf("tChatClient.beginRead, read error, %v, serverFlag=%v", me.name, me.serverFlag) me.closeConn() break } // 重置读超时计数 me.timeoutCounter = 0 ok, msg := MsgDecoder.Decode(line) if ok { fn := me.recvHandler if fn != nil { fn(me, msg) } me.recvLogs = append(me.recvLogs, msg) } }}
复测
- 重跑测试, 查pprof, 当初内存清新多了, 曾经看不到业务代码导致的透露点, 修复无效
$ go tool pprof ~/chat_server_mem.profile File: chat_server.testType: inuse_spaceTime: Mar 10, 2021 at 7:55am (CST)Entering interactive mode (type "help" for commands, "o" for options)(pprof) topShowing nodes accounting for 2.66MB, 100% of 2.66MB total flat flat% sum% cum cum% 1.50MB 56.47% 56.47% 1.50MB 56.47% runtime.malg 1.16MB 43.53% 100% 1.16MB 43.53% runtime/pprof.StartCPUProfile 0 0% 100% 1.16MB 43.53% command-line-arguments.Test_ChatServer 0 0% 100% 1.50MB 56.47% runtime.mstart 0 0% 100% 1.50MB 56.47% runtime.newproc.func1 0 0% 100% 1.50MB 56.47% runtime.newproc1 0 0% 100% 1.50MB 56.47% runtime.systemstack 0 0% 100% 1.16MB 43.53% testing.tRunner(pprof)
(end)