缘起
最近浏览 <<Go 微服务实战 >> (刘金亮, 2021.1)
本系列笔记拟采纳 golang 练习之
案例需要 (聊天服务器)
- 用户能够连贯到服务器。
- 用户能够设定本人的用户名。
- 用户能够向服务器发送音讯,同时服务器也会向其余用户播送该音讯。
指标 (Day 4)
- 诊断并修复内存透露
诊断
- 在 day 3 的代码根底上, 应用 go tool pprof 查看 heap 日志
$ go tool pprof ~/chat_server_mem.profile
File: chat_server.test
Type: inuse_space
Time: Mar 10, 2021 at 7:35am (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 9495.99kB, 100% of 9495.99kB total
Showing top 10 nodes out of 12
flat flat% sum% cum cum%
7287.48kB 76.74% 76.74% 7287.48kB 76.74% time.startTimer
1184.27kB 12.47% 89.21% 1184.27kB 12.47% runtime/pprof.StartCPUProfile
512.19kB 5.39% 94.61% 512.19kB 5.39% runtime.malg
512.05kB 5.39% 100% 7799.53kB 82.13% learning/gooop/chat_server.(*tChatClient).beginWrite
0 0% 100% 1184.27kB 12.47% command-line-arguments.Test_ChatServer
0 0% 100% 512.19kB 5.39% runtime.mstart
0 0% 100% 512.19kB 5.39% runtime.newproc.func1
0 0% 100% 512.19kB 5.39% runtime.newproc1
0 0% 100% 512.19kB 5.39% runtime.systemstack
0 0% 100% 1184.27kB 12.47% testing.tRunner
(pprof)
- 疑似有两个透露点, 一个是 time.startTimer, 一个是 (*tChatClient).beginWrite
- 因为 (*tChatClient).beginWrite 才是业务代码, 且 cum% > time.startTimer 的 cum%
-
因而能够狐疑:
- (*tChatClient).beginWrite 是内存透露的根本点
- 次要透露起因是调用了太屡次 time.startTimer
复查代码
-
复查 tChatClient.beginWrite 的代码, 导致一直分配内存的点可能有两个:
- Logging.Logf, 一直追加日志.
- 解决办法: 革新 Logging, 限度最多日志条数 (应用容量无限的队列)
- for 循环中一直调用 time.After, 导致大量创立 timer.
- 解决办法: 不应用 time.After, 而应用独立的 routine 和 timer 检测读超时
func (me *tChatClient) beginWrite() {Logging.Logf("tChatClient.beginWrite, %v, serverFlag=%v", me.name, me.serverFlag)
writer := io.Writer(me.conn)
for {
select {
case <- me.closeChan:
Logging.Logf("tChatClient.beginWrite, <- closeChan, %v, serverFlag=%v", me.name, me.serverFlag)
_ = me.conn.Close()
me.closeFlag = 2
me.postConnClosed()
return
case msg := <- me.sendChan:
atomic.AddInt32(&me.pendingSend, -1)
_,e := writer.Write([]byte(msg.Encode()))
if e != nil {Logging.Logf("tChatClient.beginWrite, write error, %v, serverFlag=%v", me.name, me.serverFlag)
me.closeConn()} else {me.sendLogs = append(me.sendLogs, msg)
}
break
case <- time.After(time.Duration(5) * time.Second):
me.postRecvTimeout()
break
}
}
}
革新 Logging
次要是将日志数组革新为容量有下限的日志队列, 避免诊断日志的采集, 导致内存有限增长.
package chat_server
import (
"fmt"
"sync"
)
type ILoggingService interface {Logf(f string, args... interface{})
AllLogs() []string
}
type tLoggingService struct {
mutex *sync.Mutex
logs []string
capacity int
rindex int
windex int
}
var gMaxLogs = 10_000
var gEmptyString = ""
func newLoggingService() ILoggingService {
return &tLoggingService{mutex: new(sync.Mutex),
logs: make([]string, gMaxLogs*2),
//logs: make([]string, 0),
capacity: gMaxLogs,
rindex: 0,
windex: 0,
}
}
func (me *tLoggingService) size() int {return me.windex - me.rindex}
func (me *tLoggingService) Logf(f string, args... interface{}) {log := fmt.Sprintf(f, args...)
me.mutex.Lock()
//me.logs = append(me.logs, log)
me.ensureSpace()
me.logs[me.windex] = log
me.windex++
me.mutex.Unlock()
fmt.Println(log)
}
func (me *tLoggingService) ensureSpace() {for me.size() >= me.capacity {
// dequeue head items
me.logs[me.rindex] = gEmptyString
me.rindex++
}
if me.rindex >= me.capacity {
// move data to offset 0
for i,n := 0, me.size();i < n;i++ {me.logs[i], me.logs[i + me.rindex] = me.logs[i + me.rindex], gEmptyString
}
// reset read and write index
me.windex, me.rindex = me.windex - me.rindex, 0
}
}
func (me *tLoggingService) AllLogs() []string {return me.logs}
var Logging = newLoggingService()
革新 tChatClient
- 去掉写循环中, time.After 的调用
- 应用专门的 routine 和读计数器, 检测读超时的情况
func (me *tChatClient) open(){if !atomic.CompareAndSwapInt32(&me.openFlag, 0, 1) {return}
go me.beginWrite()
go me.beginRead()
// 读超时检测
go me.beginWatchRecvTimeout()}
func (me *tChatClient) beginWatchRecvTimeout() {duration := time.Duration(5)
for range time.Tick(duration * time.Second) {if me.isClosed() {break}
me.timeoutCounter++
if me.timeoutCounter >= 3 {me.postRecvTimeout()
}
}
}
func (me *tChatClient) beginWrite() {Logging.Logf("tChatClient.beginWrite, %v, serverFlag=%v", me.name, me.serverFlag)
writer := io.Writer(me.conn)
for {
select {
case <- me.closeChan:
Logging.Logf("tChatClient.beginWrite, <- closeChan, %v, serverFlag=%v", me.name, me.serverFlag)
_ = me.conn.Close()
me.closeFlag = 2
me.postConnClosed()
return
case msg := <- me.sendChan:
atomic.AddInt32(&me.pendingSend, -1)
_,e := writer.Write([]byte(msg.Encode()))
if e != nil {Logging.Logf("tChatClient.beginWrite, write error, %v, serverFlag=%v", me.name, me.serverFlag)
me.closeConn()} else {me.sendLogs = append(me.sendLogs, msg)
}
break
//case <- time.After(time.Duration(5) * time.Second):
// me.postRecvTimeout()
// break
}
}
}
func (me *tChatClient) beginRead() {reader := bufio.NewReader(me.conn)
for {line, err := reader.ReadString('\n')
if err != nil {Logging.Logf("tChatClient.beginRead, read error, %v, serverFlag=%v", me.name, me.serverFlag)
me.closeConn()
break
}
// 重置读超时计数
me.timeoutCounter = 0
ok, msg := MsgDecoder.Decode(line)
if ok {
fn := me.recvHandler
if fn != nil {fn(me, msg)
}
me.recvLogs = append(me.recvLogs, msg)
}
}
}
复测
- 重跑测试, 查 pprof, 当初内存清新多了, 曾经看不到业务代码导致的透露点, 修复无效
$ go tool pprof ~/chat_server_mem.profile
File: chat_server.test
Type: inuse_space
Time: Mar 10, 2021 at 7:55am (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 2.66MB, 100% of 2.66MB total
flat flat% sum% cum cum%
1.50MB 56.47% 56.47% 1.50MB 56.47% runtime.malg
1.16MB 43.53% 100% 1.16MB 43.53% runtime/pprof.StartCPUProfile
0 0% 100% 1.16MB 43.53% command-line-arguments.Test_ChatServer
0 0% 100% 1.50MB 56.47% runtime.mstart
0 0% 100% 1.50MB 56.47% runtime.newproc.func1
0 0% 100% 1.50MB 56.47% runtime.newproc1
0 0% 100% 1.50MB 56.47% runtime.systemstack
0 0% 100% 1.16MB 43.53% testing.tRunner
(pprof)
(end)