关于go:为什么SyncPool不需要加锁却能保证线程安全

47次阅读

共计 9605 个字符,预计需要花费 25 分钟才能阅读完成。

1. 简介

咱们在 Sync.Pool: 进步 go 语言程序性能的要害一步 一文中,曾经理解了应用 sync.Pool 来实现对象的复用以缩小对象的频繁创立和销毁,以及应用 sync.Pool 的一些常见注意事项。

在这篇文章中,咱们将分析 sync.Pool 外部实现中,介绍了 sync.Pool 比拟奇妙的外部设计思路以及其实现形式。在这个过程中,也间接介绍了为何不加锁也可能实现线程平安。

次要会波及到 Go 语言中实现并发的 GMP 模型以及其根本的调度原理,以及本地缓存的设计,无锁队列的应用这几个局部的内容,综上这几个方面的内容实现了不加锁也可能保障线程平安。

2. GMP 之间的绑定关系

为了之可能帮忙咱们后续更好得了解 sync.Pool 的设计与实现,这里须要对 GMP 模型进行简略的介绍。GMP 模型是 Go 语言中的一种合作式调度模型,其中 G 示意 Goroutine,M 能够了解为内核线程,P 为逻辑处理器,简略了解其保护了一条 Goroutine 队列。

2.1 M 和 P 的关系

在 GMP 模型中,M 和 P 能够动静绑定,一个 M 能够在运行时绑定到任意一个 P 上,而一个 P 也能够与任意一个 M 绑定。这种绑定形式是动静的,能够依据理论状况进行灵便调整,从而实现更加高效的协程调度。

只管 M 和 P 能够动静绑定,但在特定工夫点,一个 M 只会对应一个 P。这是因为 M 是操作系统线程,而 P 是 Go 语言的逻辑处理器,Go 语言的逻辑处理器须要在某个操作系统线程中运行,并且是被该逻辑处理器 (P) 独自占用的。

P 的数量个别是和 CPU 核数保持一致,每个 P 占用一个 CPU 外围来执行,能够通过 runtime.GOMAXPROCS 函数来批改。不过在大多数状况下,不须要手动批改,Go 语言的调度器会依据理论状况主动进行调整。

2.2 P 和 G 的关系

刚创立的 Goroutine 会被放入以后线程对应 P 的本地队列中期待被执行。如果本地队列已满,则会放入全局队列中,供其余线程的 P 来抢占执行。

当 P 闲暇时,会尝试从全局队列中获取 Goroutine 来执行。如果全局队列中没有 Goroutine,则会从其余处理器的本地运行队列中 ” 偷取 ” 一些 Goroutine 来执行。

如果协程执行过程中遇到阻塞操作(比方期待 I / O 或者锁),处理器(P)会立刻将协程移出本地运行队列,并执行其余协程,直到被阻塞的协程能够继续执行为止。被阻塞的协程会被放到相应的期待队列中期待事件产生后再次被唤醒并退出到运行队列中,但不肯定还是放回原来的处理器 (P) 的期待队列中。

从上述过程能够看出,G 和 P 的绑定关系是动静绑定的,在不同的工夫点,同一个 G 可能在不同的 P 上执行,同时,在不同的工夫点,P 也会调度执行不同的 G。

2.3 总结

每个 P 在某个时刻只能绑定一个 M,而每个 G 在某个时刻也只存在于某个 P 的期待队列中,期待被调度执行。这是 GMP 模型的根本调度原理,也是 Go 语言高效调度的外围所在。通过动静绑定和灵便调度,能够充分利用多核处理器的计算能力,从而实现高并发、高效率的协程调度。

通过对 GMP 模型的根本理解,可能帮忙咱们后续更好得了解 sync.Pool 的设计与实现。

3.Sync.Pool 与 GMP 模型

3.1 sync.Pool 性能问题

这里咱们回到sync.Pool, 能够简略应用切片,存储可复用的对象,在须要时从中取出对象,用完之后再从新放回池子中,实现对象的重复使用。

当多个协程同时从 sync.Pool 中取对象时,会存在并发问题,因而须要实现并发平安。一种简略的实现形式是加锁,每个协程在取数据前先加锁,而后获取数据,再解锁,实现串行读取的成果。然而这种形式在并发比拟大的场景下容易导致大量协程进入阻塞状态,从而进一步升高性能。

因而,为了进步程序的性能,咱们须要寻找一种缩小并发抵触的形式。有什么形式可能缩小并发抵触呢?

3.2 基于 GMP 模型的改良

回到 GMP 模型,从第二节对 GMP 模型的介绍中,咱们晓得协程 (G) 须要在逻辑处理器 (P) 上执行,而逻辑处理器的数量是无限的,个别与 CPU 外围数雷同。而之前的 sync.Pool 实现形式是所有 P 竞争同一份数据,容易导致大量协程进入阻塞状态,影响程序性能。

那咱们这里,是不是可能将 sync.Pool 分成多个小的存储池,每个 P 都用领有一个小的存储池呢? 在每个小存储池中别离应用独立的锁进行并发管制。这样能够防止多个协程同时竞争同一个全局锁的状况,升高锁的粒度,从而缩小并发抵触。

协程运行时都须要绑定一个逻辑处理器(P),此时每个 P 都有本人的数据缓存,须要对象时从绑定的 P 的缓存中获取,用完后从新放回。这种实现形式缩小了协程竞争同一份数据的状况,只有在同一个逻辑处理器上的协程才存在竞争,从而缩小并发抵触,晋升性能。

3.3 能不能齐全不加锁

在下面的实现中,处于不同的 P 上的协程都是操作不同的数据,此时并不会呈现并发问题。惟一可能呈现并发问题的中央,为协程在获取缓存对象时,逻辑处理器中途调度其余协程来执行,此时才可能导致的并发问题。那这里能不能防止并发呢?

那如果可能将协程固定到逻辑处理器 P 上,并且不容许被抢占,也就是该 P 上永远都是执行某一个协程,直到胜利获取缓存对象后,才容许逻辑处理器去调度执行其余协程,那么就能够完全避免并发抵触的问题了。

因而,如果咱们可能做到协程在读取缓冲池中的数据时,可能齐全占用逻辑处理器 P,不会被抢占,此时就不会呈现并发了,也不须要加锁了。

侥幸的是,runtime 包中提供了 runtime_procPin 调用,能够将以后协程固定到协程所在逻辑处理器 P 上,并且不容许被抢占,也就是逻辑处理器 P 始终都被以后协程所独享。在获取缓存对象时,咱们能够应用 runtime_procPin 将以后协程固定到逻辑处理器 P 上,而后从该逻辑处理器 P 的缓存中获取对象。这样做不仅能够防止并发抵触,还能够防止上下文切换和锁竞争等性能问题。

4. sync.Pool 初步实现

上面来看看以后 sync.Pool 的局部代码,其原理便是上述所提到的形式。具体来说,每个逻辑处理器 P 保留一份数据,并利用 runtime_procPin 来防止同一逻辑处理器 P 中的协程产生并发抵触。

须要留神的是,上面所展现的代码只是局部代码,并不蕴含残缺的实现。然而这些代码涵盖了后面所提到的实现形式。同时,为了讲述不便,也批改局部实现,后文会阐明以后 sync.Pool 以后真正的实现。

4.1 sync.Pool 构造体定义

type Pool struct {
   // 指向 poolLocal 构造体切片的地址,长度与 cpu 外围数保持一致
   local     unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
   // 记录以后 poolLocal 切片的长度
   localSize uintptr        // size of the local array
   // New optionally specifies a function to generate
   // a value when Get would otherwise return nil.
   // It may not be changed concurrently with calls to Get.
   New func() any}

type poolLocal struct {
   // 上文所说的小缓冲池的实现
   private []any       // Can be used only by the respective P.}

其中,New 函数用于创立一个新的对象,以便向池中增加对象。当池中没有可用对象时,会调用该函数。local 是指向 poolLocal 切片的指针,poolLocal 即为上文提到的小缓冲池,每个逻辑处理器都有一个。

4.2 Put 办法

func (p *Pool) Put(x any) {
   if x == nil {return}
   // 这里调用 pin 办法, 获取到 poolLocal
   l, _ := p.pin()
   // 将对象从新放入逻辑解决的小缓冲池当中
   l.private = l.private.append(x)
   // 这个为解除 Proccssor 的固定,Processor 可能调度其余协程去执行
   runtime_procUnpin()}

Put办法是用于将对象从新放入缓冲池,首先调用 pin 办法获取到 poolLocal, 而后将对象放入到poolLocal 当中,而后再通过 runtime_procUnpin 调用,解除对以后 P 的绑定。

能够发现,其中比拟重要的逻辑,是调用 pin 办法获取到 P 对应的 poolLocal, 上面咱们来看pin 办法的实现。

func (p *Pool) pin() (*poolLocal, int) {
    // 调用 runtime_procPin, 占用 Processor, 不会被抢占
   pid := runtime_procPin()
   // 获取 localSize 字段的值
   s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
   // 获取 poolLocal 切片
   l := p.local                              // load-consume
   // pid 为协程编号,如果 pid < localSize 的值,阐明属于该 processor 的缓冲池曾经创立好了
   if uintptr(pid) < s {
      // 依据 pid 获取对应的缓冲池
      return indexLocal(l, pid), pid
   }
   // 否则走上面逻辑
   return p.pinSlow()}
func indexLocal(l unsafe.Pointer, i int) *poolLocal {
   // 间接通过 Processor 的编号,计算出偏移量获取到对应的 poolLocal
   lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
   return (*poolLocal)(lp)
}

在函数开始时,pin办法通过调用 runtime_procPin 办法,占用以后 goroutine 所在的 P,不容许其余 goroutine 抢占该 P,这里可能防止处于同一期待队列中的协程呈现并发读取 poolLocal 的数据的问题。

同时 runtime_procPin 办法也会返回以后 P 的编号,在零碎外部是惟一的,从 0 开始顺次递增的整数。其将可能作为 poolLocal 的切片下标,来读取poolLocal

接下来,通过原子操作 runtime_LoadAcquintptr 读取 localSize 字段的值,该字段示意以后 poolLocal 实例切片的长度。如果以后的 P 编号小于 localSize 的值,则示意该 P 的 poolLocal 实例曾经被创立,能够间接获取该 P 对应的 poolLocal 实例并返回。

如果 P 编号大于等于 localSize 的值,此时阐明该 P 对应的 poolLocal 还没创立,通过调用 pinSlow() 办法进行初始化。上面持续来看 pinSlow 办法的具体实现。

func (p *Pool) pinSlow(pid int) (*poolLocal, int) {
   // 获取 processor 数量
   size := runtime.GOMAXPROCS(0)
   // 创立一个新的 poolLocal 切片
   local := make([]poolLocal, size)
   // 将切片地址存储到 local 字段当中
   atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
   // 将切片数组长度 存储到 localSize 当中
   runtime_StoreReluintptr(&p.localSize, uintptr(size))     // store-release
   // 依据 pid, 获取到对应的 poolLocal
   return &local[pid], pid
}

首先调用 runtime.GOMAXPROCS(0) 获取以后程序可用的 processor 数量,并基于这个数量创立了一个新的 poolLocal 切片 local,这里也印证了咱们之前所说的,每一个 Processor 都有一个小缓冲池。

接着,应用原子操作 atomic.StorePointer 将指向 p.local 的指针批改为指向新创建的 local 切片的第一个元素的指针。而后应用 runtime_StoreReluintptr 原子操作将 p.localSize 批改为 size。到此为止,便实现了 poolLocal 切片的初始化操作。

最初返回以后 processor 对应的 poolLocal 指针和它的编号 pid。因为这里新创建的 local 切片是局部变量,在 pinSlow 函数返回后,它就无奈被拜访了。然而,因为咱们曾经将 p.local 批改为指向这个切片的第一个元素的指针,所以其余 processor 在调用 pin 办法时,就能获取到新创建的 poolLocal

4.3 Get 办法

func (p *Pool) Get() any {l, pid := p.pin()
   var x any
   if n := len(l.private); n > 0 {x = l.private[n-1]
       l.private[n-1] = nil // Just to be safe
       l.private = l.private[:n-1]
   }
   runtime_procUnpin()
   if x == nil && p.New != nil {x = p.New()
   }
   return x
}

首先调用 pin() 办法,取得以后协程绑定的缓存池 local 和协程编号 pid。接着从local.private 中尝试取出对象,如果取出来是空,阐明缓冲池中没有对象,此时调用 runtime_procUnpin() 办法解绑协程与处理器的绑定。

如果没有从缓冲池中胜利获取对象,并且 Pool 构造体的 New 字段非空,则调用 New 字段所指向的函数创立一个新对象并返回。

4.4 总结

到此为止,sync.Pool曾经通过联合 GMP 模型的特点,给每一个 P 设置一份缓存数据,当逻辑处理器上的协程须要从 sync.Pool 获取可重用对象时,此时将从逻辑处理器 P 对应的缓存中取出对象,防止了不同逻辑处理器的竞争。

此外,也通过调用 runtime_procPin 办法,让协程可能在某段时间独占该逻辑处理器,防止了锁的竞争和不必要的上下文切换的耗费,从而晋升性能。

5. sync.Pool 实现优化

5.1 问题形容

sync.Pool 的初步实现中,咱们让每一个逻辑处理器 P 都领有一个小缓冲池,让各个逻辑处理器 P 上的协程从 sync.Pool 获取对象时不会竞争,从而晋升性能。

当初可能存在的问题,在于每个 Processor 都保留一份缓存数据,那么当某个 Processor 上的 goroutine 须要应用缓存时,可能会发现它所在的 Processor 上的缓存池为空的,而其余 Processor 上的缓存对象却没有被利用。这样就节约了其余 Processor 上的资源。

回到 sync.Pool 的设计初衷来看,首先是晋升程序性能,缩小反复创立和销毁对象的开销;其次是缩小内存压力,通过对象复用,从而升高程序 GC 频次。从这两个方面来看,下面 sync.Pool 的初步实现其实存在一些优化空间的。

这里就陷入了一个两难的地步,如果多个 Processor 共享同一个缓冲池,会存在容易导致大量协程进入阻塞状态,进一步升高性能。每个 Processor 都保留一份缓存数据的话,此时也容易陷入资源节约的问题。那能怎么办呢?

5.2 实现优化

很多时候,可能并没有美中不足的事件,咱们往往须要折中。比方下面多个 Processor 共享同一个缓冲池,会升高性能;而每个 Processor 都保留一份缓存数据的话,容易陷入资源节约的问题。

这个时候,咱们能够折中一下,不采纳齐全共享的模式,也不采纳齐全独占的模式。而 采纳局部独有、局部共享 的模式。每个 Processor 独占一部分缓存,能够防止不同 Processor 之间的竞争,进步并发性能。同时,每个 Processor 也能够共享其余 Processor 上的缓存,防止了节约。绝对于齐全共享和齐全独立的模式,这种设计形式是不是可能更好地均衡并发性能和缓存利用效率。

同时,也能够基于局部独有,局部共享的模式的根底上,再对其进行优化。对于共享局部的资源,能够应用多个缓冲池来存储,是将其给了所有的 Processor,每个 Processor 保留一部分共享数据。

当 Processor 读取数据时,此时先从本身的公有缓冲中读取,读取不到再到本身的共享缓存中读取,读取不到才到其余 Processor 读取其共享局部。这样子可能防止了多个 Processor 同时竞争一个池导致的性能问题。同时,共享局部也能够被充分利用,防止了资源节约。

6.Sync.Pool 最终实现

6.1 sync.Pool 构造体定义

type Pool struct {
   noCopy noCopy
   // 1. 指向 poolLocal 切片的指针
   local     unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
   // 2. 对应 local 切片的长度
   localSize uintptr        // size of the local array
   // 3. 缓存池中没对象时,调用设置的 New 函数来创建对象
   New func() any
   // 局部与此次讲述无关内容, 未被蕴含进来
   // ....
}

// 每个 Processor 都会对应一个 poolLocal
type poolLocal struct {
   // 存储缓存对象的数据结构
   poolLocalInternal
   // 用于内存对齐
   pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

// Local per-P Pool appendix.
type poolLocalInternal struct {
   // 存储每个 Processor 独享的对象
   private any       // Can be used only by the respective P.
   // 存储 Processor 共享的对象,这个是一个无锁队列
   shared  poolChain // Local P can pushHead/popHead; any P can popTail.
}

首先阐明 poolLocal 构造体,能够认为是一个小缓冲池,每个 Processor 都会有对应的 poolLocal 对象。poolLocal中对象的存储通过 poolLocalInternal 来实现,至于 poolLocal 中的 pad 字段只是用于内存对其。

poolLocalInternal其中蕴含 private 字段和 shared 字段,private字段保留了上文所说的 Processor 独占的缓存对象,而 shared 字段,也就是咱们上文所说的共享缓冲池组成的一部分,是容许 Processor 之间互相读取的。shared字段的类型为 poolChain,是一个无锁队列,调用pushHead 可能将数据放入共享缓冲池,调用 popHead 可能从缓冲池中取出数据,无需加锁也是并发平安的,这个并非今日的重点,在此简略形容一下。

Pool构造体中 local 字段,指向了 poolLocal 构造体切片的地址,而 localSize 字段的值,为后面 poolLocal 切片的长度。

6.2 Get 办法

func (p *Pool) Get() any {l, pid := p.pin()
   x := l.private
   l.private = nil
   if x == nil {
      // Try to pop the head of the local shard. We prefer
      // the head over the tail for temporal locality of
      // reuse.
      x, _ = l.shared.popHead()
      if x == nil {x = p.getSlow(pid)
      }
   }
   runtime_procUnpin()
   if x == nil && p.New != nil {x = p.New()
   }
   return x
}

首先调用 pin() 办法,获取以后 Processor 对应的 poolLocal 对象和协程编号pid,同时占用该 Processor。

开始尝试获取对象,首先从 poolLocal 对象中获取公有缓存 private。如果公有缓存为空,则尝试从共享缓存shared 的头部弹出一个元素 x,并赋值给x。如果共享缓存也为空,则调用getSlow() 办法从其余 Processor 的共享缓存或 New 办法中获取元素 x。开释以后 Processor 的占用。如果元素x 不为空,则返回 x,否则如果New 办法不为空,则调用 New 办法生成一个新的元素 x 并返回,否则返回nil

能够看进去,在 Get 办法的外层,次要是尝试从 Proessor 对应的 poolLocal 中获取数据,读取不到,则调用 getSlow 办法,尝试从其余 Processor 的共享数据中获取。上面来看 getSlow 办法的逻辑:

func (p *Pool) getSlow(pid int) any {
   // See the comment in pin regarding ordering of the loads.
   size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
   locals := p.local                            // load-consume
   // Try to steal one element from other procs.
   for i := 0; i < int(size); i++ {
      // 获取 poolLocal
      l := indexLocal(locals, (pid+i+1)%int(size))
      // poolLocal 中的 shared 是一个无锁队列,无需加锁,也可能保障线程平安
      if x, _ := l.shared.popTail(); x != nil {return x}
   }
   // 与 sync.Pool 对象回收的相干逻辑先删除, 与此次讲述并无太大关系
   // ....

   return nil
}

getSlow办法实现较为简单,首先读取 Pool 构造体中 localSize 字段的值,得悉以后有多少个 poolLocal。而后对所有的poolLocal 进行遍历,尝试从其余 poolLocal 的共享缓存中获取数据,胜利获取则间接返回。

6.3 Put 办法

func (p *Pool) Put(x any) {
   if x == nil {return}
   l, _ := p.pin()
   if l.private == nil {
      l.private = x
      x = nil
   }
   if x != nil {l.shared.pushHead(x)
   }
   runtime_procUnpin()}

首先调用 pin 办法,获取以后 Processor 对应的 poolLocal,而后将 x 放到该poolLocalprivate字段中,也就是放到以后 Processor 的公有缓存中。如果 private 字段不为空,阐明曾经有对象放到 private 中了,那么 x 则会放到 poolLocalshared字段中,通过无锁队列的形式退出到共享资源池中。

6.4 总结

到此为止,在 sync.Pool 本来的实现上,对缓存数据的设计进行了优化,将缓存数据中辨别为公有缓存局部和共享局部。此时在肯定水平上防止不同 Processor 之间的竞争,进步并发性能。同时,每个 Processor 也能够共享其余 Processor 上的缓存,防止了内存的节约。

7. 总结

这篇文章,咱们其实次要介绍了 sync.Pool 的实现原理。

咱们首先基于 GMP 模型实现 sync.Pool 的一个实现,基于该实现,引出了 局部独有、局部共享 的模式的优化。在这个过程中,也展现了 sync.Pool 的局部源码,以便可能更好得了解 sync.Pool 的实现。

同时,基于实现的讲述,咱们也间接得解答了 sync.Pool 为何不须要加锁也保障了线程平安的问题。

这次讲述 sync.Pool 的实现过程中,并没有间接讲述 sync.Pool 源码的实现,而是一步一步得对现有实现进行优化,将其中比拟好的点给形容进去,心愿可能有所帮忙。

正文完
 0