关于golang:深度解密Go语言之syncPool

42次阅读

共计 22940 个字符,预计需要花费 58 分钟才能阅读完成。

https://juejin.cn/post/691837…

最近在工作中碰到了 GC 的问题:我的项目中大量反复地创立许多对象,造成 GC 的工作量微小,CPU 频繁掉底。筹备应用 sync.Pool 来缓存对象,加重 GC 的耗费。为了用起来更顺畅,我顺便钻研了一番,造成此文。本文从应用到源码解析,循序渐进,一一道来。

本文基于 Go 1.14

  • 是什么
  • 有什么用
  • 怎么用
  • 简略的例子
  • fmt 包如何用
  • pool\_test
  • 其余
  • 源码剖析
  • Pool 构造体
  • Get
  • Put
  • pack/unpack
  • GC
  • 总结
  • 参考资料

是什么

sync.Pool 是 sync 包下的一个组件,能够作为保留长期取还对象的一个“池子”。集体感觉它的名字有肯定的误导性,因为 Pool 里装的对象能够被无告诉地被回收,可能 sync.Cache 是一个更适合的名字。

有什么用

对于很多须要反复调配、回收内存的中央,sync.Pool 是一个很好的抉择。频繁地调配、回收内存会给 GC 带来肯定的累赘,重大的时候会引起 CPU 的毛刺,而 sync.Pool 能够将临时不必的对象缓存起来,待下次须要的时候间接应用,不必再次通过内存调配,复用对象的内存,加重 GC 的压力,晋升零碎的性能。

怎么用

首先,sync.Pool 是协程平安的,这对于使用者来说是极其不便的。应用前,设置好对象的 New 函数,用于在 Pool 里没有缓存的对象时,创立一个。之后,在程序的任何中央、任何时候仅通过 Get()Put() 办法就能够取、还对象了。

上面是 2018 年的时候,《Go 夜读》上对于 sync.Pool 的分享,对于实用场景:

当多个 goroutine 都须要创立同⼀个对象的时候,如果 goroutine 数过多,导致对象的创立数⽬剧增,进⽽导致 GC 压⼒增大。造成“并发⼤-占⽤内存⼤-GC 迟缓-解决并发能⼒升高-并发更⼤”这样的恶性循环。

在这个时候,须要有⼀个对象池,每个 goroutine 不再⾃⼰独自创建对象,⽽是从对象池中获取出⼀个对象(如果池中曾经有的话)。

因而要害思维就是对象的复用,防止反复创立、销毁,上面咱们来看看如何应用。

简略的例子

首先来看一个简略的例子:

package main
import (
  "fmt"
  "sync"
)

var pool \*sync.Pool

type Person struct {Name string}

func initPool() {
  pool = &sync.Pool {New: func()interface{} {fmt.Println("Creating a new Person")
      return new(Person)
    },
  }
}

func main() {initPool()

  p := pool.Get().(\*Person)
  fmt.Println("首次从 pool 里获取:", p)

  p.Name = "first"
  fmt.Printf("设置 p.Name = %s\\n", p.Name)

  pool.Put(p)

  fmt.Println("Pool 里已有一个对象:&{first},调用 Get:", pool.Get().(\*Person))
  fmt.Println("Pool 没有对象了,调用 Get:", pool.Get().(\*Person))
}

运行后果:

Creating a new Person
首次从 pool 里获取:&{}
设置 p.Name = first
Pool 里已有一个对象:&{first},Get:  &{first}
Creating a new Person
Pool 没有对象了,Get:  &{}

首先,须要初始化 Pool,惟一须要的就是设置好 New 函数。当调用 Get 办法时,如果池子里缓存了对象,就间接返回缓存的对象。如果没有存货,则调用 New 函数创立一个新的对象。

另外,咱们发现 Get 办法取出来的对象和上次 Put 进去的对象实际上是同一个,Pool 没有做任何“清空”的解决。但咱们不该当对此有任何假如,因为在理论的并发应用场景中,无奈保障这种程序,最好的做法是在 Put 前,将对象清空。

fmt 包如何用

这部分次要看 fmt.Printf 如何应用:

func Printf(format string, a ...interface{}) (n int, err error) {return Fprintf(os.Stdout, format, a...)
}

持续看 Fprintf

func Fprintf(w io.Writer, format string, a ...interface{}) (n int, err error) {p := newPrinter()
  p.doPrintf(format, a)
  n, err = w.Write(p.buf)
  p.free()
  return
}

Fprintf 函数的参数是一个 io.WriterPrintf 传的是 os.Stdout,相当于间接输入到规范输入。这里的 newPrinter 用的就是 Pool:

// newPrinter allocates a new pp struct or grabs a cached one.
func newPrinter() \*pp {p := ppFree.Get().(\*pp)
  p.panicking = false
  p.erroring = false
  p.wrapErrs = false
  p.fmt.init(&p.buf)
  return p
}

var ppFree = sync.Pool{New: func() interface{} { return new(pp) },
}

回到 Fprintf 函数,拿到 pp 指针后,会做一些 format 的操作,并且将 p.buf 外面的内容写入 w。最初,调用 free 函数,将 pp 指针偿还到 Pool 中:

// free saves used pp structs in ppFree; avoids an allocation per invocation.
func (p \*pp) free() {if cap(p.buf) > 64<<10 {return}

  p.buf = p.buf\[:0\]
  p.arg = nil
  p.value = reflect.Value{}
  p.wrappedErr = nil
  ppFree.Put(p)
}

偿还到 Pool 前将对象的一些字段清零,这样,通过 Get 拿到缓存的对象时,就能够平安地应用了。

pool\_test

通过 test 文件学习源码是一个很好的路径,因为它代表了“官网”的用法。更重要的是,测试用例会成心测试一些“坑”,学习这些坑,也会让本人在应用的时候就能学会防止。

pool_test 文件里共有 7 个测试,4 个 BechMark。

TestPoolTestPoolNew 比较简单,次要是测试 Get/Put 的性能。咱们来看下 TestPoolNew

func TestPoolNew(t \*testing.T) {
  // disable GC so we can control when it happens.
  defer debug.SetGCPercent(debug.SetGCPercent(-1))

  i := 0
  p := Pool{New: func() interface{} {
      i++
      return i
    },
  }
  if v := p.Get(); v != 1 {t.Fatalf("got %v; want 1", v)
  }
  if v := p.Get(); v != 2 {t.Fatalf("got %v; want 2", v)
  }

  // Make sure that the goroutine doesn't migrate to another P
  // between Put and Get calls.
  Runtime\_procPin()
  p.Put(42)
  if v := p.Get(); v != 42 {t.Fatalf("got %v; want 42", v)
  }
  Runtime\_procUnpin()

  if v := p.Get(); v != 3 {t.Fatalf("got %v; want 3", v)
  }
}

首先设置了 GC=-1,作用就是进行 GC。那为啥要用 defer?函数都跑完了,还要 defer 干啥。留神到,debug.SetGCPercent 这个函数被调用了两次,而且这个函数返回的是上一次 GC 的值。因而,defer 在这里的用处是还原到调用此函数之前的 GC 设置,也就是复原现场。

接着,调置了 Pool 的 New 函数:间接返回一个 int,变且每次调用 New,都会自增 1。而后,间断调用了两次 Get 函数,因为这个时候 Pool 里没有缓存的对象,因而每次都会调用 New 创立一个,所以第一次返回 1,第二次返回 2。

而后,调用 Runtime_procPin() 避免 goroutine 被强占,目标是爱护接下来的一次 Put 和 Get 操作,使得它们操作的对象都是同一个 P 的“池子”。并且,这次调用 Get 的时候并没有调用 New,因为之前有一次 Put 的操作。

最初,再次调用 Get 操作,因为没有“存货”,因而还是会再次调用 New 创立一个对象。

TestPoolGCTestPoolRelease 则次要测试 GC 对 Pool 里对象的影响。这里用了一个函数,用于计数有多少对象会被 GC 回收:

runtime.SetFinalizer(v, func(vv \*string) {atomic.AddUint32(&fin, 1)
})

当垃圾回收检测到 v 是一个不可达的对象时,并且 v 又有一个关联的 Finalizer,就会另起一个 goroutine 调用设置的 finalizer 函数,也就是下面代码里的参数 func。这样,就会让对象 v 从新可达,从而在这次 GC 过程中不被回收。之后,解绑对象 v 和它所关联的 Finalizer,当下次 GC 再次检测到对象 v 不可达时,才会被回收。

TestPoolStress 从名字看,次要是想测一下“压力”,具体操作就是起了 10 个 goroutine 一直地向 Pool 里 Put 对象,而后又 Get 对象,看是否会出错。

TestPoolDequeueTestPoolChain,都调用了 testPoolDequeue,这是具体干活的。它须要传入一个 PoolDequeue 接口:

// poolDequeue testing.
type PoolDequeue interface {PushHead(val interface{}) bool
  PopHead() (interface{}, bool)
  PopTail() (interface{}, bool)
}

PoolDequeue 是一个双端队列,能够从头部入队元素,从头部和尾部出队元素。调用函数时,前者传入 NewPoolDequeue(16),后者传入 NewPoolChain(),底层其实都是 poolDequeue 这个构造体。具体来看 testPoolDequeue 做了什么:

双端队列

总共起了 10 个 goroutine:1 个生产者,9 个消费者。生产者一直地从队列头 pushHead 元素到双端队列里去,并且每 push 10 次,就 popHead 一次;消费者则始终从队列尾取元素。不论是从队列头还是从队列尾取元素,都会在 map 里做标记,最初测验每个元素是不是只被取出过一次。

剩下的就是 Benchmark 测试了。第一个 BenchmarkPool 比较简单,就是不停地 Put/Get,测试性能。

BenchmarkPoolSTW 函数会先关掉 GC,再向 pool 里 put 10 个对象,而后强制触发 GC,记录 GC 的进展工夫,并且做一个排序,计算 P50 和 P95 的 STW 工夫。这个函数能够退出集体的代码库了:

func BenchmarkPoolSTW(b \*testing.B) {
  // Take control of GC.
  defer debug.SetGCPercent(debug.SetGCPercent(-1))

  var mstats runtime.MemStats
  var pauses \[\]uint64

  var p Pool
  for i := 0; i < b.N; i++ {
    // Put a large number of items into a pool.
    const N = 100000
    var item interface{} = 42
    for i := 0; i < N; i++ {p.Put(item)
    }
    // Do a GC.
    runtime.GC()
    // Record pause time.
    runtime.ReadMemStats(&mstats)
    pauses = append(pauses, mstats.PauseNs\[(mstats.NumGC+255)%256\])
  }

  // Get pause time stats.
  sort.Slice(pauses, func(i, j int) bool {return pauses\[i\] < pauses\[j\] })
  var total uint64
  for \_, ns := range pauses {total += ns}
  // ns/op for this benchmark is average STW time.
  b.ReportMetric(float64(total)/float64(b.N), "ns/op")
  b.ReportMetric(float64(pauses\[len(pauses)\*95/100\]), "p95-ns/STW")
  b.ReportMetric(float64(pauses\[len(pauses)\*50/100\]), "p50-ns/STW")
}

我在 mac 上跑了一下:

go test -v -run=none -bench=BenchmarkPoolSTW

失去输入:

goos: darwin
goarch: amd64
pkg: sync
BenchmarkPoolSTW-12    361    3708 ns/op    3583 p50-ns/STW    5008 p95-ns/STW
PASS
ok      sync    1.481s

最初一个 BenchmarkPoolExpensiveNew 测试当 New 的代价很高时,Pool 的体现。也能够退出集体的代码库。

其余

规范库中 encoding/json 也用到了 sync.Pool 来晋升性能。驰名的 gin 框架,对 context 取用也到了 sync.Pool

来看下 gin 如何应用 sync.Pool。设置 New 函数:

engine.pool.New = func() interface{} {return engine.allocateContext()
}

func (engine \*Engine) allocateContext() \*Context {return &Context{engine: engine, KeysMutex: &sync.RWMutex{}}
}

应用:

// ServeHTTP conforms to the http.Handler interface.
func (engine \*Engine) ServeHTTP(w http.ResponseWriter, req \*http.Request) {c := engine.pool.Get().(\*Context)
  c.writermem.reset(w)
  c.Request = req
  c.reset()

  engine.handleHTTPRequest(c)

  engine.pool.Put(c)
}

先调用 Get 取出来缓存的对象,而后会做一些 reset 操作,再执行 handleHTTPRequest,最初再 Put 回 Pool。

另外,Echo 框架也使⽤了 sync.Pool 来治理 context,并且⼏乎达到了零堆内存调配:

It leverages sync pool to reuse memory and achieve zero dynamic memory allocation with no GC overhead.

源码剖析

Pool 构造体

首先来看 Pool 的构造体:

type Pool struct {
  noCopy noCopy

    // 每个 P 的本地队列,理论类型为 \[P\]poolLocal
  local     unsafe.Pointer // local fixed-size per-P pool, actual type is \[P\]poolLocal
  // \[P\]poolLocal 的大小
  localSize uintptr        // size of the local array

  victim     unsafe.Pointer // local from previous cycle
  victimSize uintptr        // size of victims array
  // 自定义的对象创立回调函数,当 pool 中无可用对象时会调用此函数
  New func() interface{}
}

因为 Pool 不心愿被复制,所以构造体里有一个 noCopy 的字段,应用 go vet 工具能够检测到用户代码是否复制了 Pool。

noCopy 是 go1.7 开始引入的一个动态查看机制。它不仅仅工作在运行时或规范库,同时也对用户代码无效。

用户只需实现这样的不耗费内存、仅用于动态剖析的构造,来保障一个对象在第一次应用后不会产生复制。

实现非常简单:

// noCopy 用于嵌入一个构造体中来保障其第一次应用后不会被复制
//
// 见 https://golang.org/issues/8005#issuecomment-190753527
type noCopy struct{}

// Lock 是一个空操作用来给 \`go ve\` 的 -copylocks 动态剖析
func (\*noCopy) Lock()   {}
func (\*noCopy) Unlock() {}

local 字段存储指向 [P]poolLocal 数组(严格来说,它是一个切片)的指针,localSize 则示意 local 数组的大小。拜访时,P 的 id 对应 [P]poolLocal 下标索引。通过这样的设计,多个 goroutine 应用同一个 Pool 时,缩小了竞争,晋升了性能。

在一轮 GC 到来时,victim 和 victimSize 会别离“接管”local 和 localSize。victim 的机制用于缩小 GC 后冷启动导致的性能抖动,让调配对象更平滑。

Victim Cache 原本是计算机架构外面的一个概念,是 CPU 硬件解决缓存的一种技术,sync.Pool 引入的用意在于升高 GC 压力的同时进步命中率。

当 Pool 没有缓存的对象时,调用 New 办法生成一个新的对象。

type poolLocal struct {
  poolLocalInternal

  // 将 poolLocal 补齐至两个缓存行的倍数,避免 false sharing,
  // 每个缓存行具备 64 bytes,即 512 bit
  // 目前咱们的处理器个别领有 32 \* 1024 / 64 = 512 条缓存行
  // 伪共享,仅占位用,避免在 cache line 上调配多个 poolLocalInternal
  pad \[128 - unsafe.Sizeof(poolLocalInternal{})%128\]byte
}

// Local per-P Pool appendix.
type poolLocalInternal struct {
    // P 的公有缓存区,应用时无须要加锁
  private interface{}
  // 公共缓存区。本地 P 能够 pushHead/popHead;其余 P 则只能 popTail
  shared  poolChain
}

字段 pad 次要是避免 false sharing,董大的《什么是 cpu cache》里讲得比拟好:

古代 cpu 中,cache 都划分成以 cache line (cache block) 为单位,在 x86\_64 体系下个别都是 64 字节,cache line 是操作的最小单元。

程序即便只想读内存中的 1 个字节数据,也要同时把左近 63 节字加载到 cache 中,如果读取超个 64 字节,那么就要加载到多个 cache line 中。

简略来说,如果没有 pad 字段,那么当须要拜访 0 号索引的 poolLocal 时,CPU 同时会把 0 号和 1 号索引同时加载到 cpu cache。在只批改 0 号索引的状况下,会让 1 号索引的 poolLocal 生效。这样,当其余线程想要读取 1 号索引时,产生 cache miss,还得从新再加载,对性能有损。减少一个 pad,补齐缓存行,让相干的字段能独立地加载到缓存行就不会呈现 false sharding 了。

poolChain 是一个双端队列的实现:

type poolChain struct {
  // 只有生产者会 push to,不必加锁
  head \*poolChainElt

  // 读写须要原子管制。pop from
  tail \*poolChainElt
}

type poolChainElt struct {
  poolDequeue

  // next 被 producer 写,consumer 读。所以只会从 nil 变成 non-nil
  // prev 被 consumer 写,producer 读。所以只会从 non-nil 变成 nil
  next, prev \*poolChainElt
}

type poolDequeue struct {
  // The head index is stored in the most-significant bits so
  // that we can atomically add to it and the overflow is
  // harmless.
  // headTail 蕴含一个 32 位的 head 和一个 32 位的 tail 指针。这两个值都和 len(vals)-1 取模过。// tail 是队列中最老的数据,head 指向下一个将要填充的 slot
    // slots 的无效范畴是 \[tail, head),由 consumers 持有。headTail uint64

  // vals 是一个存储 interface{} 的环形队列,它的 size 必须是 2 的幂
  // 如果 slot 为空,则 vals\[i\].typ 为空;否则,非空。// 一个 slot 在这时宣告无效:tail 不指向它了,vals\[i\].typ 为 nil
  // 由 consumer 设置成 nil,由 producer 读
  vals \[\]eface
}

poolDequeue 被实现为单生产者、多消费者的固定大小的无锁(atomic 实现)Ring 式队列(底层存储应用数组,应用两个指针标记 head、tail)。生产者能够从 head 插入、head 删除,而消费者仅可从 tail 删除。

headTail 指向队列的头和尾,通过位运算将 head 和 tail 存入 headTail 变量中。

咱们用一幅图来残缺地形容 Pool 构造体:

Pool 构造体

联合木白的技术私厨的《请问 sync.Pool 有什么毛病?》里的一张图,对于双端队列的了解会更容易一些:

Pool 构造体

咱们看到 Pool 并没有间接应用 poolDequeue,起因是它的大小是固定的,而 Pool 的大小是没有限度的。因而,在 poolDequeue 之上包装了一下,变成了一个 poolChainElt 的双向链表,能够动静增长。

Get

间接上源码:

func (p \*Pool) Get() interface{} {
    // ......
  l, pid := p.pin()
  x := l.private
  l.private = nil
  if x == nil {x, \_ = l.shared.popHead()
    if x == nil {x = p.getSlow(pid)
    }
  }
  runtime\_procUnpin()
    // ......
  if x == nil && p.New != nil {x = p.New()
  }
  return x
}

省略号的内容是 race 相干的,属于浏览源码过程中的一些乐音,临时正文掉。这样,Get 的整个过程就十分清晰了:

  1. 首先,调用 p.pin() 函数将以后的 goroutine 和 P 绑定,禁止被抢占,返回以后 P 对应的 poolLocal,以及 pid。
  2. 而后间接取 l.private,赋值给 x,并置 l.private 为 nil。
  3. 判断 x 是否为空,若为空,则尝试从 l.shared 的头部 pop 一个对象进去,同时赋值给 x。
  4. 如果 x 依然为空,则调用 getSlow 尝试从其余 P 的 shared 双端队列尾部“偷”一个对象进去。
  5. Pool 的相干操作做完了,调用 runtime_procUnpin() 解除非抢占。
  6. 最初如果还是没有取到缓存的对象,那就间接调用事后设置好的 New 函数,创立一个进去。

我用一张流程图来展现整个过程:

Get 流程图

整体流程梳理完了,咱们再来看一下其中的一些要害函数。

pin

先来看 Pool.pin()

// src/sync/pool.go

// 调用方必须在实现取值后调用 runtime\_procUnpin() 来勾销抢占。func (p \*Pool) pin() (\*poolLocal, int) {pid := runtime\_procPin()
  s := atomic.LoadUintptr(&p.localSize) // load-acquire
  l := p.local                          // load-consume
  // 因为可能存在动静的 P(运行时调整 P 的个数)if uintptr(pid) < s {return indexLocal(l, pid), pid
  }
  return p.pinSlow()}

pin 的作用就是将以后 groutine 和 P 绑定在一起,禁止抢占。并且返回对应的 poolLocal 以及 P 的 id。

如果 G 被抢占,则 G 的状态从 running 变成 runnable,会被放回 P 的 localq 或 globaq,期待下一次调度。下次再执行时,就不肯定是和当初的 P 相结合了。因为之后会用到 pid,如果被抢占了,有可能接下来应用的 pid 与所绑定的 P 并非同一个。

“绑定”的工作最终交给了 procPin

// src/runtime/proc.go

func procPin() int {\_g\_ := getg()
  mp := \_g\_.m

  mp.locks++
  return int(mp.p.ptr().id)
}

实现的代码很简洁:将以后 goroutine 绑定的 m 上的一个锁字段 locks 值加 1,即实现了“绑定”。对于 pin 的原理,能够参考《golang 的对象池 sync.pool 源码解读》,文章详细分析了为什么执行 procPin 之后,不可抢占,且 GC 不会打扫 Pool 里的对象。

咱们再回到 p.pin(),原子操作取出 p.localSizep.local,如果以后 pid 小于 p.localSize,则间接取 poolLocal 数组中的 pid 索引处的元素。否则,阐明 Pool 还没有创立 poolLocal,调用 p.pinSlow() 实现创立工作。

func (p \*Pool) pinSlow() (\*poolLocal, int) {
  // Retry under the mutex.
  // Can not lock the mutex while pinned.
  runtime\_procUnpin()
  allPoolsMu.Lock()
  defer allPoolsMu.Unlock()
  pid := runtime\_procPin()
  // poolCleanup won't be called while we are pinned.
  // 没有应用原子操作,因为曾经加了全局锁了
  s := p.localSize
  l := p.local
  // 因为 pinSlow 中途可能曾经被其余的线程调用,因而这时候须要再次对 pid 进行查看。如果 pid 在 p.local 大小范畴内,则不必创立 poolLocal 切片,间接返回。if uintptr(pid) < s {return indexLocal(l, pid), pid
  }
  if p.local == nil {allPools = append(allPools, p)
  }
  // If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
  // 以后 P 的数量
  size := runtime.GOMAXPROCS(0)
  local := make(\[\]poolLocal, size)
  // 旧的 local 会被回收
  atomic.StorePointer(&p.local, unsafe.Pointer(&local\[0\])) // store-release
  atomic.StoreUintptr(&p.localSize, uintptr(size))         // store-release
  return &local\[pid\], pid
}

因为要上一把大锁 allPoolsMu,所以函数名带有 slow。咱们晓得,锁粒度越大,竞争越多,天然就越“slow”。不过要想上锁的话,得先解除“绑定”,锁上之后,再执行“绑定”。起因是锁越大,被阻塞的概率就越大,如果还占着 P,那就浪费资源。

在解除绑定后,pinSlow 可能被其余的线程调用过了,p.local 可能会发生变化。因而这时候须要再次对 pid 进行查看。如果 pid 在 p.localSize 大小范畴内,则不必再创立 poolLocal 切片,间接返回。

之后,依据 P 的个数,应用 make 创立切片,蕴含 runtime.GOMAXPROCS(0) 个 poolLocal,并且应用原子操作设置 p.local 和 p.localSize。

最初,返回 p.local 对应 pid 索引处的元素。

对于这把大锁 allPoolsMu,曹大在《几个 Go 零碎可能遇到的锁问题》里讲了一个例子。第三方库用了 sync.Pool,外部有一个构造体 fasttemplate.Template,蕴含 sync.Pool 字段。而 rd 在应用时,每个申请都会新建这样一个构造体。于是,解决每个申请时,都会尝试从一个空的 Pool 里取缓存的对象,最初 goroutine 都阻塞在了这把大锁上,因为都在尝试执行:allPools = append(allPools, p),从而造成性能问题。

popHead

回到 Get 函数,再来看另一个要害的函数:poolChain.popHead()

func (c \*poolChain) popHead() (interface{}, bool) {
  d := c.head
  for d != nil {if val, ok := d.popHead(); ok {return val, ok}
    // There may still be unconsumed elements in the
    // previous dequeue, so try backing up.
    d = loadPoolChainElt(&d.prev)
  }
  return nil, false
}

popHead 函数只会被 producer 调用。首先拿到头节点:c.head,如果头节点不为空的话,尝试调用头节点的 popHead 办法。留神这两个 popHead 办法实际上并不相同,一个是 poolChain 的,一个是 poolDequeue 的,有纳闷的,无妨回头再看一下 Pool 构造体的图。咱们来看 poolDequeue.popHead()

// /usr/local/go/src/sync/poolqueue.go

func (d \*poolDequeue) popHead() (interface{}, bool) {
  var slot \*eface
  for {ptrs := atomic.LoadUint64(&d.headTail)
    head, tail := d.unpack(ptrs)
    // 判断队列是否为空
    if tail == head {
      // Queue is empty.
      return nil, false
    }

    // head 地位是队头的前一个地位,所以此处要先退一位。// 在读出 slot 的 value 之前就把 head 值减 1,勾销对这个 slot 的管制
    head--
    ptrs2 := d.pack(head, tail)
    if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
      // We successfully took back slot.
      slot = &d.vals\[head&uint32(len(d.vals)-1)\]
      break
    }
  }

    // 取出 val
  val := \*(\*interface{})(unsafe.Pointer(slot))
  if val == dequeueNil(nil) {val = nil}

  // 重置 slot,typ 和 val 均为 nil
  // 这里清空的形式与 popTail 不同,与 pushHead 没有竞争关系,所以不必太小心
  \*slot = eface{}
  return val, true
}

此函数会删掉并且返回 queue 的头节点。但如果 queue 为空的话,返回 false。这里的 queue 存储的实际上就是 Pool 里缓存的对象。

整个函数的外围是一个有限循环,这是 Go 中罕用的无锁化编程模式。

首先调用 unpack 函数拆散出 head 和 tail 指针,如果 head 和 tail 相等,即首尾相等,那么这个队列就是空的,间接就返回 nil,false

否则,将 head 指针后移一位,即 head 值减 1,而后调用 pack 打包 head 和 tail 指针。应用 atomic.CompareAndSwapUint64 比拟 headTail 在这之间是否有变动,如果没变动,相当于获取到了这把锁,那就更新 headTail 的值。并且把 vals 相应索引处的元素赋值给 slot。

因为 vals 长度理论是只能是 2 的 n 次幂,因而 len(d.vals)-1 实际上失去的值的低 n 位是全 1,它再与 head 相与,理论就是取 head 低 n 位的值。

失去相应 slot 的元素后,通过类型转换并判断是否是 dequeueNil,如果是,阐明没取到缓存的对象,返回 nil。

// /usr/local/go/src/sync/poolqueue.go
// 因为应用 nil 代表空的 slots,因而应用 dequeueNil 示意 interface{}(nil)
type dequeueNil \*struct{}

最初,返回 val 之前,将 slot“归零”:*slot = eface{}

回到 poolChain.popHead(),调用 poolDequeue.popHead() 拿到缓存的对象后,间接返回。否则,将 d 从新指向 d.prev,持续尝试获取缓存的对象。

getSlow

如果在 shared 里没有获取到缓存对象,则持续调用 Pool.getSlow(),尝试从其余 P 的 poolLocal 偷取:

func (p \*Pool) getSlow(pid int) interface{} {
  // See the comment in pin regarding ordering of the loads.
  size := atomic.LoadUintptr(&p.localSize) // load-acquire
  locals := p.local                        // load-consume
  // Try to steal one element from other procs.
  // 从其余 P 中窃取对象
  for i := 0; i < int(size); i++ {l := indexLocal(locals, (pid+i+1)%int(size))
    if x, \_ := l.shared.popTail(); x != nil {return x}
  }

  // 尝试从 victim cache 中取对象。这产生在尝试从其余 P 的 poolLocal 偷去失败后,// 因为这样能够使 victim 中的对象更容易被回收。size = atomic.LoadUintptr(&p.victimSize)
  if uintptr(pid) >= size {return nil}
  locals = p.victim
  l := indexLocal(locals, pid)
  if x := l.private; x != nil {
    l.private = nil
    return x
  }
  for i := 0; i < int(size); i++ {l := indexLocal(locals, (pid+i)%int(size))
    if x, \_ := l.shared.popTail(); x != nil {return x}
  }

  // 清空 victim cache。下次就不必再从这里找了
  atomic.StoreUintptr(&p.victimSize, 0)

  return nil
}

从索引为 pid+1 的 poolLocal 处开始,尝试调用 shared.popTail() 获取缓存对象。如果没有拿到,则从 victim 里找,和 poolLocal 的逻辑相似。

最初,切实没找到,就把 victimSize 置 0,避免起初的“人”再到 victim 里找。

在 Get 函数的最初,通过这一番操作还是没找到缓存的对象,就调用 New 函数创立一个新的对象。

popTail

最初,还剩一个 popTail 函数:

func (c \*poolChain) popTail() (interface{}, bool) {d := loadPoolChainElt(&c.tail)
  if d == nil {return nil, false}

  for {d2 := loadPoolChainElt(&d.next)

    if val, ok := d.popTail(); ok {return val, ok}

    if d2 == nil {
      // 双向链表只有一个尾节点,当初为空
      return nil, false
    }

    // 双向链表的尾节点里的双端队列被“掏空”,所以持续看下一个节点。// 并且因为尾节点曾经被“掏空”,所以要甩掉它。这样,下次 popHead 就不会再看它有没有缓存对象了。if atomic.CompareAndSwapPointer((\*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
      // 甩掉尾节点
      storePoolChainElt(&d2.prev, nil)
    }
    d = d2
  }
}

for 循环的一开始,就把 d.next 加载到了 d2。因为 d 可能会短暂为空,但如果 d2 在 pop 或者 pop fails 之前就不为空的话,阐明 d 就会永恒为空了。在这种状况下,能够平安地将 d 这个结点“甩掉”。

最初,将 c.tail 更新为 d2,能够避免下次 popTail 的时候查看一个空的 dequeue;而将 d2.prev 设置为 nil,能够避免下次 popHead 时查看一个空的 dequeue

咱们再看一下外围的 poolDequeue.popTail

// src/sync/poolqueue.go:147

func (d \*poolDequeue) popTail() (interface{}, bool) {
  var slot \*eface
  for {ptrs := atomic.LoadUint64(&d.headTail)
    head, tail := d.unpack(ptrs)
    // 判断队列是否空
    if tail == head {
      // Queue is empty.
      return nil, false
    }

    // 先搞定 head 和 tail 指针地位。如果搞定,那么这个 slot 就归属咱们了
    ptrs2 := d.pack(head, tail+1)
    if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
      // Success.
      slot = &d.vals\[tail&uint32(len(d.vals)-1)\]
      break
    }
  }

  // We now own slot.
  val := \*(\*interface{})(unsafe.Pointer(slot))
  if val == dequeueNil(nil) {val = nil}

  slot.val = nil
  atomic.StorePointer(&slot.typ, nil)
  // At this point pushHead owns the slot.

  return val, true
}

popTail 从队列尾部移除一个元素,如果队列为空,返回 false。此函数可能同时被多个 消费者 调用。

函数的外围是一个有限循环,又是一个无锁编程。先解出 head,tail 指针值,如果两者相等,阐明队列为空。

因为要从尾部移除一个元素,所以 tail 指针后退 1,而后应用原子操作设置 headTail。

最初,将要移除的 slot 的 val 和 typ“归零”:

slot.val = nil
atomic.StorePointer(&slot.typ, nil)

Put

// src/sync/pool.go

// Put 将对象增加到 Pool 
func (p \*Pool) Put(x interface{}) {
  if x == nil {return}
  // ……
  l, \_ := p.pin()
  if l.private == nil {
    l.private = x
    x = nil
  }
  if x != nil {l.shared.pushHead(x)
  }
  runtime\_procUnpin()
    //…… 
}

同样删掉了 race 相干的函数,看起来清新多了。整个 Put 的逻辑也很清晰:

  1. 先绑定 g 和 P,而后尝试将 x 赋值给 private 字段。
  2. 如果失败,就调用 pushHead 办法尝试将其放入 shared 字段所保护的双端队列中。

同样用流程图来展现整个过程:

Put 流程图

pushHead

咱们来看 pushHead 的源码,比拟清晰:

// src/sync/poolqueue.go

func (c \*poolChain) pushHead(val interface{}) {
  d := c.head
  if d == nil {
    // poolDequeue 初始长度为 8
    const initSize = 8 // Must be a power of 2
    d = new(poolChainElt)
    d.vals = make(\[\]eface, initSize)
    c.head = d
    storePoolChainElt(&c.tail, d)
  }

  if d.pushHead(val) {return}

    // 前一个 poolDequeue 长度的 2 倍
  newSize := len(d.vals) \* 2
  if newSize >= dequeueLimit {
    // Can't make it any bigger.
    newSize = dequeueLimit
  }

    // 首尾相连,形成链表
  d2 := &poolChainElt{prev: d}
  d2.vals = make(\[\]eface, newSize)
  c.head = d2
  storePoolChainElt(&d.next, d2)
  d2.pushHead(val)
}

如果 c.head 为空,就要创立一个 poolChainElt,作为首结点,当然也是尾节点。它治理的双端队列的长度,初始为 8,放满之后,再创立一个 poolChainElt 节点时,双端队列的长度就要翻倍。当然,有一个最大长度限度(2^30):

const dequeueBits = 32
const dequeueLimit = (1 << dequeueBits) / 4

调用 poolDequeue.pushHead 尝试将对象放到 poolDeque 里去:

// src/sync/poolqueue.go

// 将 val 增加到双端队列头部。如果队列已满,则返回 false。此函数只能被一个生产者调用
func (d \*poolDequeue) pushHead(val interface{}) bool {ptrs := atomic.LoadUint64(&d.headTail)
  head, tail := d.unpack(ptrs)
  if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
    // 队列满了
    return false
  }
  slot := &d.vals\[head&uint32(len(d.vals)-1)\]

  // 检测这个 slot 是否被 popTail 开释
  typ := atomic.LoadPointer(&slot.typ)
  if typ != nil {
    // 另一个 groutine 正在 popTail 这个 slot,阐明队列依然是满的
    return false
  }

  // The head slot is free, so we own it.
  if val == nil {val = dequeueNil(nil)
  }

  // slot 占位,将 val 存入 vals 中
  \*(\*interface{})(unsafe.Pointer(slot)) = val

  // head 减少 1
  atomic.AddUint64(&d.headTail, 1<<dequeueBits)
  return true
}

首先判断队列是否已满:

if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
  // Queue is full.
  return false
}

也就是将尾部指针加上 d.vals 的长度,再取低 31 位,看它是否和 head 相等。咱们晓得,d.vals 的长度实际上是固定的,因而如果队列已满,那么 if 语句的两边就是相等的。如果队列满了,间接返回 false。

否则,队列没满,通过 head 指针找到行将填充的 slot 地位:取 head 指针的低 31 位。

// Check if the head slot has been released by popTail.
typ := atomic.LoadPointer(&slot.typ)
if typ != nil {
  // Another goroutine is still cleaning up the tail, so
  // the queue is actually still full.
  // popTail 是先设置 val,再将 typ 设置为 nil。设置完 typ 之后,popHead 才能够操作这个 slot
  return false
}

下面这一段用来判断是否和 popTail 有抵触产生,如果有,则间接返回 false。

最初,将 val 赋值到 slot,并将 head 指针值加 1。

// slot 占位,将 val 存入 vals 中
\*(\*interface{})(unsafe.Pointer(slot)) = val

这里的实现比拟奇妙,slot 是 eface 类型,将 slot 转为 interface{} 类型,这样 val 能以 interface{} 赋值给 slot 让 slot.typ 和 slot.val 指向其内存块,于是 slot.typ 和 slot.val 均不为空。

pack/unpack

最初咱们再来看一下 pack 和 unpack 函数,它们实际上是一组绑定、解绑 head 和 tail 指针的两个函数。

// src/sync/poolqueue.go

const dequeueBits = 32

func (d \*poolDequeue) pack(head, tail uint32) uint64 {
  const mask = 1<<dequeueBits - 1
  return (uint64(head) << dequeueBits) |
    uint64(tail&mask)
}

mask 的低 31 位为全 1,其余位为 0,它和 tail 相与,就是只看 tail 的低 31 位。而 head 向左移 32 位之后,低 32 位为全 0。最初把两局部“或”起来,head 和 tail 就“绑定”在一起了。

相应的解绑函数:

func (d \*poolDequeue) unpack(ptrs uint64) (head, tail uint32) {
  const mask = 1<<dequeueBits - 1
  head = uint32((ptrs >> dequeueBits) & mask)
  tail = uint32(ptrs & mask)
  return
}

取出 head 指针的办法就是将 ptrs 右移 32 位,再与 mask 相与,同样只看 head 的低 31 位。而 tail 实际上更简略,间接将 ptrs 与 mask 相与就能够了。

GC

对于 Pool 而言,并不能有限扩大,否则对象占用内存太多了,会引起内存溢出。

简直所有的池技术中,都会在某个时刻清空或革除局部缓存对象,那么在 Go 中何时清理未应用的对象呢?

答案是 GC 产生时。

在 pool.go 文件的 init 函数里,注册了 GC 产生时,如何清理 Pool 的函数:

// src/sync/pool.go

func init() {runtime\_registerPoolCleanup(poolCleanup)
}

编译器在背地做了一些动作:

// src/runtime/mgc.go

// Hooks for other packages

var poolcleanup func()

// 利用编译器标记将 sync 包中的清理注册到运行时
//go:linkname sync\_runtime\_registerPoolCleanup sync.runtime\_registerPoolCleanup
func sync\_runtime\_registerPoolCleanup(f func()) {poolcleanup = f}

具体来看下:

func poolCleanup() {
  for \_, p := range oldPools {
    p.victim = nil
    p.victimSize = 0
  }

  // Move primary cache to victim cache.
  for \_, p := range allPools {
    p.victim = p.local
    p.victimSize = p.localSize
    p.local = nil
    p.localSize = 0
  }

  oldPools, allPools = allPools, nil
}

poolCleanup 会在 STW 阶段被调用。整体看起来,比拟简洁。次要是将 local 和 victim 作替换,这样也就不致于让 GC 把所有的 Pool 都清空了,有 victim 在“兜底”。

如果 sync.Pool 的获取、开释速度稳固,那么就不会有新的池对象进行调配。如果获取的速度降落了,那么对象可能会在两个 GC 周期内被开释,而不是以前的一个 GC 周期。

鸟窝的【Go 1.13 中 sync.Pool 是如何优化的?】讲了 1.13 中的优化。

参考资料【了解 Go 1.13 中 sync.Pool 的设计与实现】手动模仿了一下调用 poolCleanup 函数前后 oldPools,allPools,p.vitcim 的变动过程,很精彩:

  1. 初始状态下,oldPools 和 allPools 均为 nil。
  1. 第 1 次调用 Get,因为 p.local 为 nil,将会在 pinSlow 中创立 p.local,而后将 p 放入 allPools,此时 allPools 长度为 1,oldPools 为 nil。
  2. 对象应用结束,第 1 次调用 Put 放回对象。
  3. 第 1 次 GC STW 阶段,allPools 中所有 p.local 将值赋值给 victim 并置为 nil。allPools 赋值给 oldPools,最初 allPools 为 nil,oldPools 长度为 1。
  4. 第 2 次调用 Get,因为 p.local 为 nil,此时会从 p.victim 外面尝试取对象。
  5. 对象应用结束,第 2 次调用 Put 放回对象,但因为 p.local 为 nil,从新创立 p.local,并将对象放回,此时 allPools 长度为 1,oldPools 长度为 1。
  6. 第 2 次 GC STW 阶段,oldPools 中所有 p.victim 置 nil,前一次的 cache 在本次 GC 时被回收,allPools 所有 p.local 将值赋值给 victim 并置为 nil,最初 allPools 为 nil,oldPools 长度为 1。

我依据这个流程画了一张图,能够了解地更清晰一些:

poolCleanup 过程

须要指出的是,allPoolsoldPools 都是切片,切片的元素是指向 Pool 的指针,Get/Put 操作不须要通过它们。在第 6 步,如果还有其余 Pool 执行了 Put 操作,allPools 这时就会有多个元素。

在 Go 1.13 之前的实现中,poolCleanup 比拟“简略粗犷”:

func poolCleanup() {
    for i, p := range allPools {allPools\[i\] = nil
        for i := 0; i < int(p.localSize); i++ {l := indexLocal(p.local, i)
            l.private = nil
            for j := range l.shared {l.shared\[j\] = nil
            }
            l.shared = nil
        }
        p.local = nil
        p.localSize = 0
    }
    allPools = \[\]\*Pool{}}

间接清空了所有 Pool 的 p.localpoolLocal.shared

通过两者的比照发现,新版的实现相比 Go 1.13 之前,GC 的粒度拉大了,因为理论回收的工夫线拉长,单位工夫内 GC 的开销减小。

由此根本明确 p.victim 的作用。它的定位是次级缓存,GC 时将对象放入其中,下一次 GC 降临之前如果有 Get 调用则会从 p.victim 中取,直到再一次 GC 来长期回收。

同时因为从 p.victim 中取出对象应用结束之后并未放回 p.victim 中,在肯定水平也减小了下一次 GC 的开销。原来 1 次 GC 的开销被拉长到 2 次且会有肯定水平的开销减小,这就是 p.victim 引入的用意。

【了解 Go 1.13 中 sync.Pool 的设计与实现】这篇文章最初还总结了 sync.Pool 的设计理念,包含:无锁、操作对象隔离、原子操作代替锁、行为隔离——链表、Victim Cache 升高 GC 开销。写得十分不错,举荐浏览。

另外,对于 sync.Pool 中锁竞争优化的文章,举荐浏览芮大神的【优化锁竞争】。

总结

本文先是介绍了 Pool 是什么,有什么作用,接着给出了 Pool 的用法以及在规范库、一些第三方库中的用法,还介绍了 pool\_test 中的一些测试用例。最初,具体解读了 sync.Pool 的源码。

本文的结尾局部,再来具体地总结一下对于 sync.Pool 的要点:

  1. 要害思维是对象的复用,防止反复创立、销毁。将临时不必的对象缓存起来,待下次须要的时候间接应用,不必再次通过内存调配,复用对象的内存,加重 GC 的压力。
  2. sync.Pool 是协程平安的,应用起来十分不便。设置好 New 函数后,调用 Get 获取,调用 Put 偿还对象。
  3. Go 语言内置的 fmt 包,encoding/json 包都能够看到 sync.Pool 的身影;ginEcho 等框架也都应用了 sync.Pool。
  4. 不要对 Get 失去的对象有任何假如,更好的做法是偿还对象时,将对象“清空”。
  5. Pool 里对象的生命周期受 GC 影响,不适宜于做连接池,因为连接池须要本人治理对象的生命周期。
  6. Pool 不能够指定⼤⼩,⼤⼩只受制于 GC 临界值。
  7. procPin 将 G 和 P 绑定,避免 G 被抢占。在绑定期间,GC 无奈清理缓存的对象。
  8. 在退出 victim 机制前,sync.Pool 里对象的最⼤缓存工夫是一个 GC 周期,当 GC 开始时,没有被引⽤的对象都会被清理掉;退出 victim 机制后,最大缓存工夫为两个 GC 周期。
  9. Victim Cache 原本是计算机架构外面的一个概念,是 CPU 硬件解决缓存的一种技术,sync.Pool 引入的用意在于升高 GC 压力的同时进步命中率。
  10. sync.Pool 的最底层应用切片加链表来实现双端队列,并将缓存的对象存储在切片中。

参考资料

【欧神 源码剖析】https://changkun.us/archives/…

【Go 夜读】https://reading.hidevops.io/r…

【夜读第 14 期视频】https://www.youtube.com/watch…

【源码剖析,伪共享】https://juejin.im/post/5d4087…

【golang 的对象池 sync.pool 源码解读】https://zhuanlan.zhihu.com/p/…

【了解 Go 1.13 中 sync.Pool 的设计与实现】https://zhuanlan.zhihu.com/p/…

【优缺点,图】http://cbsheng.github.io/post…

【xiaorui 优化锁竞争】http://xiaorui.cc/archives/5878

【性能优化之路,自定义多种规格的缓存】https://blog.cyeam.com/golang…

【sync.Pool 有什么毛病】https://mp.weixin.qq.com/s?\_\_biz=MzA4ODg0NDkzOA==&mid=2247487149&idx=1&sn=f38f2d72fd7112e19e97d5a2cd304430&source=41#wechat\_redirect

【1.12 和 1.13 的演变】https://github.com/watermelo/…\_pool\_understand.md

【董泽润 演进】https://www.jianshu.com/p/2e0…

【noCopy】https://github.com/golang/go/…

【董泽润 cpu cache】https://www.jianshu.com/p/dc4…

【gomemcache 例子】https://docs.kilvn.com/The-Go…

【鸟窝 1.13 优化】https://colobu.com/2019/10/08…

【A journey with go】https://medium.com/a-journey-…

【封装了一个计数组件】https://www.akshaydeo.com/blo…

【伪共享】http://ifeve.com/falsesharing/

举荐浏览

  • 我所了解的 sync.Pool

喜爱本文的敌人,欢送关注“Go 语言中文网”:

Go 语言中文网启用微信学习交换群,欢送加微信:274768166,投稿亦欢送

正文完
 0