共计 14985 个字符,预计需要花费 38 分钟才能阅读完成。
简介
继上一篇 Go 每日一库之 ants,这篇文章咱们来一起看看 ants
的源码。
Pool
通过上篇文章,咱们晓得 ants
池有两种创立形式:
p, _ := ants.NewPool(cap)
:这种形式创立的池子对象须要调用p.Submit(task)
提交工作,工作是一个无参数无返回值的函数;p, _ := ants.NewPoolWithFunc(cap, func(interface{}))
:这种形式创立的池子对象须要指定池函数,并且应用p.Invoke(arg)
调用池函数。arg
就是传给池函数func(interface{})
的参数。
在 ants
中这两种池子应用不同的构造来示意:ants.Pool
和 ants.PoolWithFunc
。咱们先来介绍Pool
。PoolWithFunc
构造也是相似的,介绍完 Pool
之后,咱们再简略比拟一下它们。
Pool
构造定义在文件 pool.go
中:
// src/github.com/panjf2000/ants/pool.go
type Pool struct {
capacity int32
running int32
workers workerArray
state int32
lock sync.Locker
cond *sync.Cond
workerCache sync.Pool
blockingNum int
options *Options
}
各个字段含意如下:
capacity
:池容量,示意ants
最多能创立的 goroutine 数量。如果为正数,示意容量无限度;running
:曾经创立的 worker goroutine 的数量;workers
:寄存一组 worker 对象,workerArray
只是一个接口,示意一个 worker 容器,前面详述;state
:记录池子以后的状态,是否已敞开(CLOSED
);lock
:锁。ants
本人实现了一个自旋锁。用于同步并发操作;cond
:条件变量。解决工作期待和唤醒;workerCache
:应用sync.Pool
对象池治理和创立worker
对象,晋升性能;blockingNum
:阻塞期待的工作数量;options
:选项。上一篇文章曾经具体介绍过了。
这里明确一个概念,ants
中为每个工作都是由 worker 对象来解决的,每个 worker 对象会对应创立一个 goroutine 来解决工作。ants
中应用 goWorker
示意 worker:
// src/github.com/panjf2000/ants/worker.go
type goWorker struct {
pool *Pool
task chan func()
recycleTime time.Time
}
后文具体介绍这一块内容,当初咱们只须要晓得 Pool.workers
字段就是寄存 goWorker
对象的容器。
Pool
创立
创立 Pool
对象需调用 ants.NewPool(size, options)
函数。省略了一些解决选项的代码,最终代码如下:
// src/github.com/panjf2000/ants/pool.go
func NewPool(size int, options ...Option) (*Pool, error) {
// ...
p := &Pool{capacity: int32(size),
lock: internal.NewSpinLock(),
options: opts,
}
p.workerCache.New = func() interface{} {
return &goWorker{
pool: p,
task: make(chan func(), workerChanCap),
}
}
if p.options.PreAlloc {
if size == -1 {return nil, ErrInvalidPreAllocSize}
p.workers = newWorkerArray(loopQueueType, size)
} else {p.workers = newWorkerArray(stackType, 0)
}
p.cond = sync.NewCond(p.lock)
go p.purgePeriodically()
return p, nil
}
代码不难理解:
- 创立
Pool
对象,设置容量,创立一个自旋锁来初始化lock
字段,设置选项; - 设置
workerCache
这个sync.Pool
对象的New
办法,在调用sync.Pool
对象的Get()
办法时,如果它没有缓存的 worker 对象了,则调用这个办法创立一个; - 依据是否设置了预调配选项,创立不同类型的 workers;
- 应用
p.lock
锁创立一个条件变量; - 最初启动一个 goroutine 用于定期清理过期的 worker。
Pool.workers
字段为 workerArray
类型,这实际上是一个接口,示意一个 worker 容器:
type workerArray interface {len() int
isEmpty() bool
insert(worker *goWorker) error
detach() *goWorker
retrieveExpiry(duration time.Duration) []*goWorker
reset()}
每个办法从名字上很好了解含意:
len() int
:worker 数量;isEmpty() bool
:worker 数量是否为 0;insert(worker *goWorker) error
:goroutine 工作执行完结后,将相应的 worker 放回workerArray
中;detach() *goWorker
:从workerArray
中取出一个 worker;retrieveExpiry(duration time.Duration) []*goWorker
:取出所有的过期 worker;reset()
:重置容器。
workerArray
在 ants
中有两种实现,即 workerStack
和loopQueue
。
workerStack
咱们先来介绍一下 workerStack
,它位于文件worker_stack.go
中:
// src/github.com/panjf2000/ants/worker_stack.go
type workerStack struct {items []*goWorker
expiry []*goWorker
size int
}
func newWorkerStack(size int) *workerStack {
return &workerStack{items: make([]*goWorker, 0, size),
size: size,
}
}
items
:闲暇的worker
;expiry
:过期的worker
。
goroutine 实现工作之后,Pool
池会将相应的 worker 放回 workerStack
,调用workerStack.insert()
间接 append
到items
中即可:
func (wq *workerStack) insert(worker *goWorker) error {wq.items = append(wq.items, worker)
return nil
}
新工作到来时,会调用 workerStack.detach()
从容器中取出一个闲暇的 worker:
func (wq *workerStack) detach() *goWorker {l := wq.len()
if l == 0 {return nil}
w := wq.items[l-1]
wq.items[l-1] = nil // avoid memory leaks
wq.items = wq.items[:l-1]
return w
}
这里总是返回最初一个 worker,每次 insert()
也是 append
到最初,合乎栈后进先出的特点,故称为workerStack
。
这里有一个细节,因为切片的底层构造是数组,只有有援用数组的指针,数组中的元素就不会开释。这里取出切片最初一个元素后,将对应数组元素的指针设置为nil
,被动开释这个援用。
下面说过新建 Pool
对象时会创立一个 goroutine 定期检查和清理过期的 worker。通过调用 workerArray.retrieveExpiry()
获取过期的 worker 列表。workerStack
实现如下:
func (wq *workerStack) retrieveExpiry(duration time.Duration) []*goWorker {n := wq.len()
if n == 0 {return nil}
expiryTime := time.Now().Add(-duration)
index := wq.binarySearch(0, n-1, expiryTime)
wq.expiry = wq.expiry[:0]
if index != -1 {wq.expiry = append(wq.expiry, wq.items[:index+1]...)
m := copy(wq.items, wq.items[index+1:])
for i := m; i < n; i++ {wq.items[i] = nil
}
wq.items = wq.items[:m]
}
return wq.expiry
}
实现应用二分查找法找到已过期的最近一个 worker。因为过期工夫是依照 goroutine 执行工作后的闲暇工夫计算的,而 workerStack.insert()
入队程序决定了,它们的过期工夫是从早到晚的。所以能够应用二分查找:
func (wq *workerStack) binarySearch(l, r int, expiryTime time.Time) int {
var mid int
for l <= r {mid = (l + r) / 2
if expiryTime.Before(wq.items[mid].recycleTime) {r = mid - 1} else {l = mid + 1}
}
return r
}
二分查找的是最近过期的 worker,行将过期的 worker 的前一个。它和在它之前的 worker 曾经全副过期了。
如果找到索引 index
,将items
从结尾到 index
(包含)的所有 worker 复制到expiry
字段中。而后将 index
之后的所有未过期 worker 复制到切片头部,这里应用了 copy
函数。copy
返回理论复制的数量,即未过期的 worker 数量 m
。而后将切片items
从m
开始所有的元素置为 nil
,防止内存透露,因为它们曾经被复制到头部了。最初裁剪items
切片,返回过期 worker 切片。
loopQueue
loopQueue
实现基于循环队列,构造定义在文件 worker_loop_queue
中:
type loopQueue struct {items []*goWorker
expiry []*goWorker
head int
tail int
size int
isFull bool
}
func newWorkerLoopQueue(size int) *loopQueue {
return &loopQueue{items: make([]*goWorker, size),
size: size,
}
}
因为是循环队列,这里先创立好了一个长度为 size
的切片。循环队列有一个队列头指针head
,指向第一个有元素的地位,一个队列尾指针tail
,指向下一个能够寄存元素的地位。所以一开始状态如下:
在 tail
处增加元素,增加后 tail
指针后移。在 head
处取出元素,取出后 head
指针也后移。进行一段时间操作后,队列状态如下:
head
或 tail
指针到队列尾了,须要回绕。所以可能呈现这种状况:
当 tail
指针赶上 head
指针了,阐明队列就满了:
当 head
指针赶上 tail
指针了,队列再次为空:
依据示意图,咱们再来看 loopQueue
的操作方法就很简略了。
因为 head
和tail
相等的状况有可能是队列空,也有可能是队列满,所以 loopQueue
中减少一个 isFull
字段以示辨别。goroutine 实现工作之后,会将对应的 worker 对象放回 loopQueue
,执行的是insert()
办法:
func (wq *loopQueue) insert(worker *goWorker) error {
if wq.size == 0 {return errQueueIsReleased}
if wq.isFull {return errQueueIsFull}
wq.items[wq.tail] = worker
wq.tail++
if wq.tail == wq.size {wq.tail = 0}
if wq.tail == wq.head {wq.isFull = true}
return nil
}
这个办法执行的就是循环队列的入队流程,留神如果插入后 tail==head
了,阐明队列满了,设置 isFull
字段。
新工作到来调用 loopQueeue.detach()
办法获取一个闲暇的 worker 构造:
func (wq *loopQueue) detach() *goWorker {if wq.isEmpty() {return nil}
w := wq.items[wq.head]
wq.items[wq.head] = nil
wq.head++
if wq.head == wq.size {wq.head = 0}
wq.isFull = false
return w
}
这个办法对应的是循环队列的出队流程,留神每次出队后,队列必定不满了,isFull
要重置为false
。
与 workerStack
构造一样,先入的 worker 对象过期工夫早,后入的晚,获取过期 worker 的办法与 workerStack
中相似,只是没有应用二分查找了。这里就不赘述了。
再看 Pool
创立
介绍完两种 workerArray
的实现之后,再来看 Pool
的创立函数中 workers
字段的设置:
if p.options.PreAlloc {
if size == -1 {return nil, ErrInvalidPreAllocSize}
p.workers = newWorkerArray(loopQueueType, size)
} else {p.workers = newWorkerArray(stackType, 0)
}
newWorkerArray()
定义在文件 worker_array.go
中:
type arrayType int
const (
stackType arrayType = 1 << iota
loopQueueType
)
func newWorkerArray(aType arrayType, size int) workerArray {
switch aType {
case stackType:
return newWorkerStack(size)
case loopQueueType:
return newWorkerLoopQueue(size)
default:
return newWorkerStack(size)
}
}
即如果设置了预调配选项,就采纳 loopQueue
构造。否则就采纳 stack
的构造。
worker 构造
介绍完 Pool
的创立和构造,咱们来看看 worker 的构造。在 ants
中 worker 用构造体 goWorker
示意,定义在文件 worker.go
中。它的构造非常简单:
// src/github.com/panjf2000/ants/worker.go
type goWorker struct {
pool *Pool
task chan func()
recycleTime time.Time
}
具体字段含意很显著:
pool
:持有 goroutine 池的援用;task
:工作通道,通过这个通道将类型为func ()
的函数作为工作发送给goWorker
;recyleTime
:这个字段记录goWorker
什么时候被放回池中(即什么时候开始闲暇)。其实现工作后,在将其放回 goroutine 池的时候设置。
goWorker
创立时会调用 run()
办法,run()
办法中启动一个新 goroutine 解决工作。run()
主体流程非常简单:
func (w *goWorker) run() {go func() {
for f := range w.task {
if f == nil {return}
f()
if ok := w.pool.revertWorker(w); !ok {return}
}
}()}
这个办法启动一个新的 goroutine,而后不停地从 task
通道中接管工作,而后执行工作,工作执行实现之后调用池对象的 revertWorker()
办法将该 goWorker
对象放回池中,以便下次取出解决新的工作。revertWorker()
办法前面会详细分析。
这里留神,实际上 for f := range w.task
这个循环直到通道 task
敞开或取出为 nil
的工作才会终止。所以这个 goroutine 始终在运行,这正是 ants
高性能的关键所在。每个 goWorker
只会启动一次 goroutine,后续反复利用这个 goroutine。goroutine 每次只执行一个工作就会被放回池中。
还有一个细节,如果放回操作失败,则会调用return
,这会让 goroutine 运行完结,避免 goroutine 透露。
这里 f == nil
为 true 时return
,也是一个细节点,咱们前面讲池敞开的时候会具体介绍。
上面咱们看看 run()
办法的异样解决:
defer func() {w.pool.workerCache.Put(w)
if p := recover(); p != nil {
if ph := w.pool.options.PanicHandler; ph != nil {ph(p)
} else {w.pool.options.Logger.Printf("worker exits from a panic: %v\n", p)
var buf [4096]byte
n := runtime.Stack(buf[:], false)
w.pool.options.Logger.Printf("worker exits from panic: %s\n", string(buf[:n]))
}
}
w.pool.cond.Signal()}()
简略来说,就是在 defer
中通过 recover()
函数捕捉工作执行过程中抛出的 panic
。这时工作执行失败,goroutine 也完结了。然而goWorker
对象还是能够反复利用,所以 defer
函数一开始调用 w.pool.workerCache.Put(w)
将goWorker
对象放回 sync.Pool
池中。
接着就是解决 panic
,如果选项中指定了panic
处理器,间接调用这个处理器。否则,ants
调用选项中设置的 Logger
记录一些日志,如堆栈,panic
信息等。
最初须要调用 w.pool.cond.Signal()
告诉当初有闲暇的 goWorker
了。因为咱们理论运行的 goWorker
数量因为 panic
少了一个,而池中可能有其余工作在期待解决。
提交工作
接下来,通过提交工作就能够串起整个流程。由上一篇文章咱们晓得,能够调用池对象的 Submit()
办法提交工作:
func (p *Pool) Submit(task func()) error {if p.IsClosed() {return ErrPoolClosed}
var w *goWorker
if w = p.retrieveWorker(); w == nil {return ErrPoolOverload}
w.task <- task
return nil
}
首先判断池是否已敞开,而后调用 retrieveWorker()
办法获取一个闲暇的 worker,而后将工作 task
发送到 worker 的工作通道。上面是 retrieveWorker()
实现:
func (p *Pool) retrieveWorker() (w *goWorker) {p.lock.Lock()
w = p.workers.detach()
if w != nil {p.lock.Unlock()
} else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {p.lock.Unlock()
spawnWorker()} else {
if p.options.Nonblocking {p.lock.Unlock()
return
}
Reentry:
if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks {p.lock.Unlock()
return
}
p.blockingNum++
p.cond.Wait()
p.blockingNum--
var nw int
if nw = p.Running(); nw == 0 {p.lock.Unlock()
if !p.IsClosed() {spawnWorker()
}
return
}
if w = p.workers.detach(); w == nil {
if nw < capacity {p.lock.Unlock()
spawnWorker()
return
}
goto Reentry
}
p.lock.Unlock()}
return
}
这个办法略微有点简单,咱们一点点来看。首先调用 p.workers.detach()
获取 goWorker
对象。p.workers
是 loopQueue
或者 workerStack
对象,它们都实现了 detach()
办法,后面曾经介绍过了。
如果返回了一个 goWorker
对象,阐明有闲暇 goroutine,间接返回。
否则,池容量还没用完(即容量大于正在工作的 goWorker
数量),则调用 spawnWorker()
新建一个 goWorker
,执行其run()
办法:
spawnWorker := func() {w = p.workerCache.Get().(*goWorker)
w.run()}
否则,池容量已用完。如果设置了 非阻塞 选项,则间接返回。否则,如果设置了最大阻塞队列长度下限,且以后阻塞期待的工作数量曾经达到这个下限,间接返回。否则,阻塞期待数量 +1,调用 p.cond.Wait()
期待。
而后 goWorker.run()
实现一个工作后,调用池的 revertWorker()
办法放回goWorker
:
func (p *Pool) revertWorker(worker *goWorker) bool {if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() {return false}
worker.recycleTime = time.Now()
p.lock.Lock()
if p.IsClosed() {p.lock.Unlock()
return false
}
err := p.workers.insert(worker)
if err != nil {p.lock.Unlock()
return false
}
p.cond.Signal()
p.lock.Unlock()
return true
}
这里设置了 goWorker
的recycleTime
字段,用于断定过期。而后将 goWorker
放回池。workers
的 insert()
办法后面也曾经剖析过了。
接着调用 p.cond.Signal()
唤醒之前 retrieveWorker()
办法中的期待。retrieveWorker()
办法继续执行,阻塞期待数量 -1,这里判断以后 goWorker
的数量(也即 goroutine 数量)。如果数量等于 0,很有可能池子刚刚执行了 Release()
敞开,这时须要判断池是否处于敞开状态,如果是则间接返回。否则,调用 spawnWorker()
创立一个新的 goWorker
并执行其 run()
办法。
如果以后 goWorker
数量不为 0,则调用 p.workers.detach()
取出一个闲暇的 goWorker
返回。这个操作有可能失败,因为可能同时有多个 goroutine 在期待,唤醒的时候只有局部 goroutine 能获取到goWorker
。如果失败了,其容量还未用完,间接创立新的goWorker
,反之从新执行阻塞期待逻辑。
这里有很多加锁和解锁的逻辑,再加上和信号量混在一起很难看明确。其实只须要晓得一点就很简略了,那就是 p.cond.Wait()
外部会将以后 goroutine 挂起,而后解开它持有的锁,即会调用 p.lock.Unlock()
。这也是为什么revertWorker()
中p.lock.Lock()
加锁能胜利的起因。而后 p.cond.Signal()
或p.cond.Broadcast()
会唤醒因为 p.cond.Wait()
而挂起的 goroutine,然而须要 Signal()/Broadcast()
所在 goroutine 调用解锁办法。
最初,放上整体流程图:
清理过期goWorker
在 NewPool()
函数中会启动一个 goroutine 定期清理过期的goWorker
:
func (p *Pool) purgePeriodically() {heartbeat := time.NewTicker(p.options.ExpiryDuration)
defer heartbeat.Stop()
for range heartbeat.C {if p.IsClosed() {break}
p.lock.Lock()
expiredWorkers := p.workers.retrieveExpiry(p.options.ExpiryDuration)
p.lock.Unlock()
for i := range expiredWorkers {expiredWorkers[i].task <- nil
expiredWorkers[i] = nil
}
if p.Running() == 0 {p.cond.Broadcast()
}
}
}
如果池子已敞开,间接退出 goroutine。由选项 ExpiryDuration
来设置清理的距离,如果没有设置该选项,采纳默认值 1s:
// src/github.com/panjf2000/ants/pool.go
func NewPool(size int, options ...Option) (*Pool, error) {
if expiry := opts.ExpiryDuration; expiry < 0 {return nil, ErrInvalidPoolExpiry} else if expiry == 0 {opts.ExpiryDuration = DefaultCleanIntervalTime}
}
// src/github.com/panjf2000/ants/pool.go
const (DefaultCleanIntervalTime = time.Second)
而后就是每个清理周期,调用 p.workers.retrieveExpiry()
办法,取出过期的 goWorker
。 因为由这些 goWorker
启动的 goroutine 还阻塞在通道 task
上,所以要向该通道发送一个 nil
值,而 goWorker.run()
办法中接管到一个值为 nil
的工作会return
,完结 goroutine,防止了 goroutine 透露。
如果所有 goWorker
都被清理掉了,可能这时还有 goroutine 阻塞在 retrieveWorker()
办法中的 p.cond.Wait()
上,所以这里须要调用 p.cond.Broadcast()
唤醒这些 goroutine。
容量动静批改
在运行过程中,能够动静批改池的容量。调用 p.Tune(size int)
办法:
func (p *Pool) Tune(size int) {if capacity := p.Cap(); capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc {return}
atomic.StoreInt32(&p.capacity, int32(size))
}
这里只是简略设置了一下新的容量,不影响以后正在执行的goWorker
,而且如果设置了预调配选项,容量不能再次设置。
下次执行 revertWorker()
的时候就会以新的容量判断是否能放回,下次执行 retrieveWorker()
的时候也会以新容量判断是否能创立新goWorker
。
敞开和重新启动Pool
应用实现之后,须要敞开 Pool
,防止 goroutine 透露。调用池对象的Release()
办法敞开:
func (p *Pool) Release() {atomic.StoreInt32(&p.state, CLOSED)
p.lock.Lock()
p.workers.reset()
p.lock.Unlock()
p.cond.Broadcast()}
调用 p.workers.reset()
完结 loopQueue
或wokerStack
中的 goroutine,做一些清理工作,同时为了避免有 goroutine 阻塞在 p.cond.Wait()
上,执行一次p.cond.Broadcast()
。
workerStack
与 loopQueue
的reset()
基本相同,即发送 nil
到task
通道从而完结 goroutine,而后重置各个字段:
// loopQueue 版本
func (wq *loopQueue) reset() {if wq.isEmpty() {return}
Releasing:
if w := wq.detach(); w != nil {
w.task <- nil
goto Releasing
}
wq.items = wq.items[:0]
wq.size = 0
wq.head = 0
wq.tail = 0
}
// stack 版本
func (wq *workerStack) reset() {for i := 0; i < wq.len(); i++ {wq.items[i].task <- nil
wq.items[i] = nil
}
wq.items = wq.items[:0]
}
池敞开后还能够调用 Reboot()
重启:
func (p *Pool) Reboot() {if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {go p.purgePeriodically()
}
}
因为 p.purgePeriodically()
在p.Release()
之后检测到池敞开就间接退出了,这里须要从新开启一个 goroutine 定期清理。
PoolWithFunc
和WorkWithFunc
上一篇文章中咱们还介绍了另一种形式创立 Pool
,即NewPoolWithFunc()
,指定一个函数。前面提交工作时调用p.Invoke()
提供参数就能够执行该函数了。这种形式创立的 Pool 和 Woker 构造如下:
type PoolWithFunc struct {workers []*goWorkerWithFunc
poolFunc func(interface{})
}
type goWorkerWithFunc struct {
pool *PoolWithFunc
args chan interface{}
recycleTime time.Time
}
与后面介绍的 Pool
和goWorker
大体类似,只是 PoolWithFunc
保留了传入的函数对象,应用数组保留 worker。goWorkerWithFunc
以 interface{}
为args
通道的数据类型,其实也好了解,因为曾经有函数了,只须要传入数据作为参数就能够运行了:
func (w *goWorkerWithFunc) run() {go func() {
for args := range w.args {
if args == nil {return}
w.pool.poolFunc(args)
if ok := w.pool.revertWorker(w); !ok {return}
}
}()}
从通道接管函数参数,执行池中保留的函数对象。
其余细节
task
缓冲通道
还记得创立 p.workerCache
这个 sync.Pool
对象的代码么:
p.workerCache.New = func() interface{} {
return &goWorker{
pool: p,
task: make(chan func(), workerChanCap),
}
}
在 sync.Pool
中没有 goWorker
对象时,调用 New()
办法创立一个,留神到这里创立的 task
通道应用 workerChanCap
作为容量。这个变量定义在 ants.go
文件中:
var (
// workerChanCap determines whether the channel of a worker should be a buffered channel
// to get the best performance. Inspired by fasthttp at
// https://github.com/valyala/fasthttp/blob/master/workerpool.go#L139
workerChanCap = func() int {
// Use blocking channel if GOMAXPROCS=1.
// This switches context from sender to receiver immediately,
// which results in higher performance (under go1.5 at least).
if runtime.GOMAXPROCS(0) == 1 {return 0}
// Use non-blocking workerChan if GOMAXPROCS>1,
// since otherwise the sender might be dragged down if the receiver is CPU-bound.
return 1
}())
为了不便对照,我把正文也放上来了。ants
参考了驰名的 Web 框架 fasthttp
的实现。当 GOMAXPROCS
为 1 时(即操作系统线程数为 1),向通道 task
发送会挂起发送 goroutine,将执行流程转向接管 goroutine,这能晋升接管解决性能。如果 GOMAXPROCS
大于 1,ants
应用带缓冲的通道,为了避免接管 goroutine 是 CPU 密集的,导致发送 goroutine 被阻塞。上面是 fasthttp
中的相干代码:
// src/github.com/valyala/fasthttp/workerpool.go
var workerChanCap = func() int {
// Use blocking workerChan if GOMAXPROCS=1.
// This immediately switches Serve to WorkerFunc, which results
// in higher performance (under go1.5 at least).
if runtime.GOMAXPROCS(0) == 1 {return 0}
// Use non-blocking workerChan if GOMAXPROCS>1,
// since otherwise the Serve caller (Acceptor) may lag accepting
// new connections if WorkerFunc is CPU-bound.
return 1
}()
自旋锁
ants
利用 atomic.CompareAndSwapUint32()
这个原子操作实现了一个自旋锁。与其余类型的锁不同,自旋锁在加锁失败之后不会立即进入期待,而是会持续尝试。这对于很快就能取得锁的利用来说能极大晋升性能,因为能防止加锁和解锁导致的线程切换:
type spinLock uint32
func (sl *spinLock) Lock() {
backoff := 1
for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
for i := 0; i < backoff; i++ {runtime.Gosched()
}
backoff <<= 1
}
}
func (sl *spinLock) Unlock() {atomic.StoreUint32((*uint32)(sl), 0)
}
// NewSpinLock instantiates a spin-lock.
func NewSpinLock() sync.Locker {return new(spinLock)
}
另外这里应用了 指数退却 ,先等 1 个循环周期,通过runtime.Gosched()
通知运行时切换其余 goroutine 运行。如果还是获取不到锁,就再等 2 个周期。如果还是不行,再等 4,8,16… 以此类推。这能够避免短时间内获取不到锁,导致 CPU 工夫的节约。
总结
ants
源码短小精悍,没有援用其余任何第三方库。各种细节解决,各种性能优化的点都是值得咱们细细品味的。强烈建议大家读一读源码。浏览优良的源码,能极大地提高本身的编码素养。
大家如果发现好玩、好用的 Go 语言库,欢送到 Go 每日一库 GitHub 上提交 issue😄
参考
- ants GitHub:github.com/panjf2000/ants
- Go 每日一库 GitHub:https://github.com/darjun/go-daily-lib
我
我的博客:https://darjun.github.io
欢送关注我的微信公众号【GoUpUp】,独特学习,一起提高~