前言
咱们为什么要读源码?因为咱们只有深刻到实现原理,能力理解他的劣势,架构和外围原理能帮忙咱们疾速定位问题。防止反复造轮子,借鉴思维。明天咱们就来看下 sync.pool 的源码
type Pool struct {
noCopy noCopy
local unsafe.Pointer // 本地固定大小的池子。等价于每个 P 一个池子 [p] p 是索引 ID
localSize uintptr // 本地数组大小
// New optionally specifies a function to generate // a value when Get would otherwise return nil. // It may not be changed concurrently with calls to Get. New func() interface{}
}
// 本地 P index 索引
type poolLocalInternal struct {private interface{} // 公有对象只能被创立时的 P 用。shared []interface{} // 共享对象 能被其余 P 调用
Mutex // Protects shared.
}
func (p *Pool) Put(x interface{}) {
if x == nil {return}
if race.Enabled {if fastrand()%4 == 0 {
// Randomly drop x on floor.
return
}
race.ReleaseMerge(poolRaceAddr(x))
race.Disable()}
l := p.pin()
if l.private == nil {
l.private = x
x = nil
}
runtime_procUnpin()
if x != nil {l.Lock()
l.shared = append(l.shared, x)
l.Unlock()}
if race.Enabled {race.Enable()
}
}
// 获取以后 P 的 localPool
func (p *Pool) pin() *poolLocal {pid := runtime_procPin()
// In pinSlow we store to localSize and then to local, here we load in opposite order.
// Since we've disabled preemption, GC cannot happen in between. // Thus here we must observe local at least as large localSize. // We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness). s := atomic.LoadUintptr(&p.localSize) // load-acquire
l := p.local // load-consume
if uintptr(pid) < s {return indexLocal(l, pid)
}
return p.pinSlow()}
//
func (p *Pool) pinSlow() *poolLocal {
// 重试
// 当被锁定时不能 +mutex. runtime_procUnpin()
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
pid := runtime_procPin()
// poolCleanup 不会被调用 当咱们被锁定时
s := p.localSize
l := p.local
// 以后 pid 小于 size 应用 pid 去本地 local 索引到 localPool 对象
if uintptr(pid) < s {return indexLocal(l, pid)
}
if p.local == nil {allPools = append(allPools, p)
}
// 如果 GCs 的时候 GOMAXPROCS 变动。咱们会重新分配数组 并遗弃旧的
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size)
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release
return &local[pid]
}
以上就是 PUT 的大抵流程。
//get 也是调用 p.pin 获取本地 local. 而后获取 private, 如果 nil, 则 +lock 从 shared 查找,不然从其余 P 的 localPool 偷取。func (p *Pool) Get() interface{} {
if race.Enabled {race.Disable()
}
l := p.pin()// 定位 local
x := l.private // 公有对象
l.private = nil //clear
runtime_procUnpin()
if x == nil { // 公有对象为空
l.Lock()
last := len(l.shared) - 1 // 从 share 尾部开始
if last >= 0 {x = l.shared[last]
l.shared = l.shared[:last]
}
l.Unlock()
if x == nil {x = p.getSlow() // 上面看 slow
}
}
if race.Enabled {race.Enable()
if x != nil {race.Acquire(poolRaceAddr(x))
}
}
if x == nil && p.New != nil {x = p.New() // 所有 P 的 share 中都没找到,那么新建
}
return x
}
func (p *Pool) getSlow() (x interface{}) {
// 获取以后 size
size := atomic.LoadUintptr(&p.localSize) // load-acquire
local := p.local // load-consume
// Try to steal one element from other procs. pid := runtime_procPin()
runtime_procUnpin()
for i := 0; i < int(size); i++ { // 循环 size 次
l := indexLocal(local, (pid+i+1)%int(size)) // 定位从以后 P +1 %size 开始,就是从以后 p 往后走一圈。l.Lock() // 加锁
last := len(l.shared) - 1
// 查看每个 P 的 shared 开端是否存在这个值,存在就返回。if last >= 0 {x = l.shared[last]
l.shared = l.shared[:last]
l.Unlock()
break
}
l.Unlock()}
return x
}
以上是 GET 操作
1.14 poolCleanup
咱们间接看 1.14 版本的 poolCleanup, 下面的 get,put 均是 12.5 版本
这个 Cleanup 的思路很好,引入 victim 和 local 概念,在我看来就是 0 / 1 切换思维
思路: Put 新对象放在 local 中,Get 从 victim 拿, 拿不到再从 local 拿
GC 的时候执行 poolCleanup, 先删除 victim。而后将以后池子中的对象(旧对象) 移到 victim 中。
func poolCleanup() {
// This function is called with the world stopped, at the beginning of a garbage collection.
// It must not allocate and probably should not call any runtime functions.
// Because the world is stopped, no pool user can be in a // pinned section (in effect, this has all Ps pinned).
// Drop victim caches from all pools. for _, p := range oldPools {
p.victim = nil
p.victimSize = 0
}
// Move primary cache to victim cache.
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}
// The pools with non-empty primary caches now have non-empty
// victim caches and no pools have primary caches. oldPools, allPools = allPools, nil
}
比照
我看的 1.12.5 版本的 sync.pool 实现基于 mutex 来 lock. 保障多 goroutine 平安. 看的最新 1.14 版本引入双链表 移除 mutex 改善共享拜访
所以咱们在应用 12.5 版本以下的时候要留神 GC 引起的 sync.pool 的全副清空带来的毛刺。另外适宜 sync.pool 的场景是对象频繁创立
比方 我当初有个推送工作 100 万人群 / 次。构造体是
type Manual struct {
core.BaseTask
core.BaseClass
ManualFormat *model.ManualFormat
ManualAppId []int
Cfg *baseConfig.TomlConfig
IsAllPush bool
}
每次都要对人群渲染。此时用 sync.pool 能缩小大量 GC 的压力。也要留神到引发 GC 的两个条件. 第一条,2 分钟触发一次。第二条, 内存达到肯定阈值触发一次。
参考资料
https://mp.weixin.qq.com/s/Oc…