领有垃圾回收个性的语言里,gc 产生时都会带来性能损耗,为了缩小 gc 影响,通常的做法是缩小小块对象内存频繁申请,让每次产生垃圾回收时 scan 和 clean 沉闷对象尽可能的少。sync.Pool能够帮忙在程序构建了对象池,提供对象可复用能力,自身是可伸缩且并发平安的。

次要构造体 Pool 对外导出两个办法:GetPutGet 是用来从 Pool 中获取可用对象 ,如果可用对象为空,则会通过New 预约义的 func 创立新对象。Put 是将对象放入 Pool 中,提供下次获取


func (p *Pool) Get() interface{} {
    if race.Enabled {race.Disable()
    l, pid := p.pin()
    x := l.private
    l.private = nil
    if x == nil {
        // Try to pop the head of the local shard. We prefer
        // the head over the tail for temporal locality of
        // reuse.
        x, _ = l.shared.popHead()
        if x == nil {x = p.getSlow(pid)
    if race.Enabled {race.Enable()
        if x != nil {race.Acquire(poolRaceAddr(x))
    if x == nil && p.New != nil {x = p.New()
    return x

首先看下 GET 办法的逻辑(在看前须要对 gmp 调度模型有大抵理解)

  • 通过 pin 拿到 poolLocal 和以后 goroutine 绑定运行的 P 的 id。每个 goroutine 创立后会挂在 P 构造体上;运行时,须要绑定 P 能力在 M 上执行。因而,对 private 指向的 poolLocal 操作无需加锁,都是线程平安的
  • 设置x,并且清空private
  • x为空阐明本地对象未设置,因为 P 上存在多个 G,如果一个工夫片内协程 1 把公有对象获取后置空,下一时间片 g2 再去获取就是 nil。此时须要去share 中获取头部元素,share是在多个 P 间共享的,读写都须要 加锁,然而这里并未加锁,具体起因等下讲
  • 如果 share 中也返回空,调用 getSlow() 函数获取,等下具体看外部实现
  • runtime_procUnpin()办法,稍后咱们具体看
  • 最初如果还是未找到可复用的对象, 并且设置了 New 的 func,初始化一个新对象

Poollocal 字段示意 poolLocal 指针。获取时,优先查看 private 域是否为空,为空时再从 share 中读取,还是空的话从其余 P 中窃取一个,相似 goroutine 的调度机制。


方才的几个问题,咱们具体看下。首先,pin办法获取以后 PpoolLocal, 办法逻辑比较简单

func (p *Pool) pin() *poolLocal {pid := runtime_procPin()
    s := atomic.LoadUintptr(&p.localSize) // load-acquire
    l := p.local                          // load-consume
    if uintptr(pid) < s {return indexLocal(l, pid)
    return p.pinSlow()}

runtime_procPin返回了以后的 pid,实现细节看看 runtime 外部

//go:linkname sync_runtime_procPin sync.runtime_procPin
func sync_runtime_procPin() int {return procPin()
//go:linkname sync_runtime_procUnpin sync.runtime_procUnpin
func sync_runtime_procUnpin() {procUnpin()
func procPin() int {_g_ := getg()
    mp := _g_.m

    return int(mp.p.ptr().id)
func procUnpin() {_g_ := getg()
  • pin获取以后 goroutine 的地址,让 g 对应的 m 构造体中 locks 字段 ++,返回 p 的 id。unPin则是对 mlocks字段 –,为什么要这么做?

协程产生调度的机会之一:如果某个 g 长时间占用 cpu 资源,便会产生抢占式调度,能够抢占的根据就是 locks == 0。其实实质是为了禁止产生抢占。

// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {_g_ := getg()

    // 调度时,会判断 `locks` 是否为 0。if _g_.m.locks != 0 {throw("schedule: holding locks")

为什么要禁止调度呢? 因为调度是把 mp的绑定关系解除,让 p 去绑定其余线程,执行其余线程的代码段。在 get 时,首先是获取以后 goroutine 绑定的 p 的 private,不禁止调度的话,前面的获取都不是以后协程的运行时的 p,会净化其余p 上的数据,引起未知谬误。



type poolChain struct {
    head *poolChainElt
    tail *poolChainElt


poolChain.popHead获取时,首先从 poolDequeuepopHead办法获取,未获取到时,找到 prev 节点,持续反复查找,直到返回 nil。

func (c *poolChain) popHead() (interface{}, bool) {
    d := c.head
    for d != nil {if val, ok := d.popHead(); ok {return val, ok}
        // There may still be unconsumed elements in the
        // previous dequeue, so try backing up.
        d = loadPoolChainElt(&d.prev)
    return nil, false

这里留神辨别 poolChainpoolDequeue,两个构造存在同名的办法,然而构造和逻辑齐全不同

type poolChain struct {
    // head is the poolDequeue to push to. This is only accessed
    // by the producer, so doesn't need to be synchronized.
    head *poolChainElt

    // tail is the poolDequeue to popTail from. This is accessed
    // by consumers, so reads and writes must be atomic.
    tail *poolChainElt
type poolChainElt struct {
    next, prev *poolChainElt
type poolDequeue struct {
    headTail uint64
    vals []eface}

须要阐明下:poolChainElt组成的链表构造和咱们常见的链表方向相同,从 head -> tail 的方向是 prev,反之是next;poolDequeue 是一个环形链表,headTail 字段保留首尾地址,其中高 32 位示意 head,低 32 位示意 tail.


func (d *poolDequeue) popHead() (interface{}, bool) {
    var slot *eface
    for {ptrs := atomic.LoadUint64(&d.headTail)
        head, tail := d.unpack(ptrs)
        if tail == head {return nil, false}
        ptrs2 := d.pack(head, tail)
        if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {slot = &d.vals[head&uint32(len(d.vals)-1)]

    val := *(*interface{})(unsafe.Pointer(slot))
    if val == dequeueNil(nil) {val = nil}
    *slot = eface{}
    return val, true
  • 看到 if tail == head ,如果首位地址雷同阐明链表整体为空,证实poolDequeue 的确是环形链表;
  • head--pack(head, tail) 失去新的地址 ptrs2,如果 ptrs == ptrs2,批改 headTail 地址;
  • 把 slot 转成 interface{}类型的 value;


如果从 sharedpopHead中没拿到可服用的对象,须要通过 getSlow 来获取

func (p *Pool) getSlow(pid int) interface{} {size := atomic.LoadUintptr(&p.localSize) // load-acquire
    locals := p.local                        // load-consume
    // 遍历 locals,从其余 P 上的尾部窃取
    for i := 0; i < int(size); i++ {l := indexLocal(locals, (pid+i+1)%int(size))
        if x, _ := l.shared.popTail(); x != nil {return x}

    size = atomic.LoadUintptr(&p.victimSize)
    if uintptr(pid) >= size {return nil}
    // 尝试从 victim 指向的 poolLocal 中,依照先 private -> shared 的程序获取
    locals = p.victim
    l := indexLocal(locals, pid)
    if x := l.private; x != nil {
        l.private = nil
        return x
    for i := 0; i < int(size); i++ {l := indexLocal(locals, (pid+i)%int(size))
        if x, _ := l.shared.popTail(); x != nil {return x}

    atomic.StoreUintptr(&p.victimSize, 0)

    return nil

通过遍历 locals 获取对象,应用到 victim 字段指向的 []poolLocal。这里其实援用了一种叫做Victim Cache 的机制,具体解释详见这里。


func (c *poolChain) popTail() (interface{}, bool) {d := loadPoolChainElt(&c.tail)
    if d == nil {return nil, false}

    for {d2 := loadPoolChainElt(&d.next)

        if val, ok := d.popTail(); ok {return val, ok}

        if d2 == nil {return nil, false}
        if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {storePoolChainElt(&d2.prev, nil)
        d = d2
  • d2dnext节点,d曾经为链表尾部了,这里也应证了咱们方才说到的 poolChain 链表的首尾方向和失常的链表是相同的(至于为啥要这么设计,我也是比拟懵逼)。如果 d2 为空证实曾经到了链表的头部,所以间接返回;
  • 从尾部节点 get 胜利时间接返回,曾经返回的这个地位,期待着下次 get 遍历时再删除。因为是从其余的 P 上窃取,可能产生同时多个协程获取对象,须要保障并发平安;
  • 为什么 popHead 不去删除链表节点,两个起因吧。第一个,popHead 只有以后协程在本人的 P 上操作,popTail 是窃取,如果在 popHead 中操作,也须要原子操作,作者应该是心愿把 get 阶段的开销降到最低;第二个,因为 poolChain 构造自身是链表,无论在哪一步做后果都是一样,不如对立放在尾部获取时删除。


func (d *poolDequeue) popTail() (interface{}, bool) {
    var slot *eface
    for {ptrs := atomic.LoadUint64(&d.headTail)
        head, tail := d.unpack(ptrs)
        if tail == head {return nil, false}
        ptrs2 := d.pack(head, tail+1)
        if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {slot = &d.vals[tail&uint32(len(d.vals)-1)]

    val := *(*interface{})(unsafe.Pointer(slot))
    if val == dequeueNil(nil) {val = nil}
    slot.val = nil
    atomic.StorePointer(&slot.typ, nil)

    return val, true

poolDequeue.popHead 办法逻辑根本差不多,因为 popTail 存在多个协程同时遍历,须要通过 CAS 获取,最初设置 slot 为空。


func (p *Pool) Put(x interface{}) {
    if x == nil {return}
    if race.Enabled {if fastrand()%4 == 0 {
            // Randomly drop x on floor.
    l, _ := p.pin()
    if l.private == nil {
        l.private = x
        x = nil
    if x != nil {l.shared.pushHead(x)
    if race.Enabled {race.Enable()

put办法相干逻辑和 get 很像,先设置 poolLocalprivate,如果 private 已有,通过 shared.pushHead 写入。


func (c *poolChain) pushHead(val interface{}) {
    d := c.head
    if d == nil {
        // 初始化环,数量为 2 的幂
        const initSize = 8
        d = new(poolChainElt)
        d.vals = make([]eface, initSize)
        c.head = d
        storePoolChainElt(&c.tail, d)

    if d.pushHead(val) {return}

    // 如果环已满,依照 2 倍大小创立新的 ring。留神这里有最大数量限度
    newSize := len(d.vals) * 2
    if newSize >= dequeueLimit {
        // Can't make it any bigger.
        newSize = dequeueLimit

    d2 := &poolChainElt{prev: d}
    d2.vals = make([]eface, newSize)
    c.head = d2
    storePoolChainElt(&d.next, d2)

如果节点是空,则创立一个新的 poolChainElt 对象作为头节点, 而后调用 pushHead 放入到环状队列中. 如果搁置失败,那么创立一个 2 倍大小且不超过 dequeueLimit(2 的 30 次方)的 poolChainElt 节点。所有的 vals 长度必须为 2 的整数幂。

func (d *poolDequeue) pushHead(val interface{}) bool {ptrs := atomic.LoadUint64(&d.headTail)
    head, tail := d.unpack(ptrs)
    if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {return false}
    slot := &d.vals[head&uint32(len(d.vals)-1)]
    typ := atomic.LoadPointer(&slot.typ)
    if typ != nil {return false}

    if val == nil {val = dequeueNil(nil)
    *(*interface{})(unsafe.Pointer(slot)) = val
    atomic.AddUint64(&d.headTail, 1<<dequeueBits)
    return true

首先判断 ring 是否大小已满,而后找到 head 地位对应的 slot 判断 typ 是否为空,因为 popTail 是先设置 val,再将 typ 设置为 nil,有抵触会间接返回。




注册了全局清理的 func,在每次 gc 开始时运行。既然每次 gc 都会清理 pool 内对象,那么对象复用的劣势在哪里呢?
poolCleanup在每次 gc 时,会将 allPools 里的对象写入 oldPools 对象后再革除本身对象。那么就是说,如果申请的对象,会通过两次 gc 后,才会被彻底回收。p.local会先设置为p.victim,是不是有点相似新生代、老生代的感觉。

func init() {runtime_registerPoolCleanup(poolCleanup)
func poolCleanup() {
    for _, p := range oldPools {
        p.victim = nil
        p.victimSize = 0

    // Move primary cache to victim cache.
    for _, p := range allPools {
        p.victim = p.local
        p.victimSize = p.localSize
        p.local = nil
        p.localSize = 0

    oldPools, allPools = allPools, nil

能够看出,在 gc 产生不频繁的场景,sync.Pool对象复用就能够缩小内存的频繁申请和回收。


