关于golang:Go-每日一库之-ants源码赏析

18次阅读

共计 14985 个字符,预计需要花费 38 分钟才能阅读完成。

简介

继上一篇 Go 每日一库之 ants,这篇文章咱们来一起看看 ants 的源码。

Pool

通过上篇文章,咱们晓得 ants 池有两种创立形式:

  • p, _ := ants.NewPool(cap):这种形式创立的池子对象须要调用 p.Submit(task) 提交工作,工作是一个无参数无返回值的函数;
  • p, _ := ants.NewPoolWithFunc(cap, func(interface{})):这种形式创立的池子对象须要指定池函数,并且应用 p.Invoke(arg) 调用池函数。arg就是传给池函数 func(interface{}) 的参数。

ants 中这两种池子应用不同的构造来示意:ants.Poolants.PoolWithFunc。咱们先来介绍PoolPoolWithFunc 构造也是相似的,介绍完 Pool 之后,咱们再简略比拟一下它们。

Pool构造定义在文件 pool.go 中:

// src/github.com/panjf2000/ants/pool.go
type Pool struct {
  capacity int32
  running int32
  workers workerArray
  state int32
  lock sync.Locker
  cond *sync.Cond
  workerCache sync.Pool
  blockingNum int
  options *Options
}

各个字段含意如下:

  • capacity:池容量,示意 ants 最多能创立的 goroutine 数量。如果为正数,示意容量无限度;
  • running:曾经创立的 worker goroutine 的数量;
  • workers:寄存一组 worker 对象,workerArray只是一个接口,示意一个 worker 容器,前面详述;
  • state:记录池子以后的状态,是否已敞开(CLOSED);
  • lock:锁。ants本人实现了一个自旋锁。用于同步并发操作;
  • cond:条件变量。解决工作期待和唤醒;
  • workerCache:应用 sync.Pool 对象池治理和创立 worker 对象,晋升性能;
  • blockingNum:阻塞期待的工作数量;
  • options:选项。上一篇文章曾经具体介绍过了。

这里明确一个概念,ants中为每个工作都是由 worker 对象来解决的,每个 worker 对象会对应创立一个 goroutine 来解决工作。ants中应用 goWorker 示意 worker:

// src/github.com/panjf2000/ants/worker.go
type goWorker struct {
  pool *Pool
  task chan func()
  recycleTime time.Time
}

后文具体介绍这一块内容,当初咱们只须要晓得 Pool.workers 字段就是寄存 goWorker 对象的容器。

Pool创立

创立 Pool 对象需调用 ants.NewPool(size, options) 函数。省略了一些解决选项的代码,最终代码如下:

// src/github.com/panjf2000/ants/pool.go
func NewPool(size int, options ...Option) (*Pool, error) {
  // ...
  p := &Pool{capacity: int32(size),
    lock:     internal.NewSpinLock(),
    options:  opts,
  }
  p.workerCache.New = func() interface{} {
    return &goWorker{
      pool: p,
      task: make(chan func(), workerChanCap),
    }
  }
  if p.options.PreAlloc {
    if size == -1 {return nil, ErrInvalidPreAllocSize}
    p.workers = newWorkerArray(loopQueueType, size)
  } else {p.workers = newWorkerArray(stackType, 0)
  }

  p.cond = sync.NewCond(p.lock)

  go p.purgePeriodically()
  return p, nil
}

代码不难理解:

  • 创立 Pool 对象,设置容量,创立一个自旋锁来初始化 lock 字段,设置选项;
  • 设置 workerCache 这个 sync.Pool 对象的 New 办法,在调用 sync.Pool 对象的 Get() 办法时,如果它没有缓存的 worker 对象了,则调用这个办法创立一个;
  • 依据是否设置了预调配选项,创立不同类型的 workers;
  • 应用 p.lock 锁创立一个条件变量;
  • 最初启动一个 goroutine 用于定期清理过期的 worker。

Pool.workers字段为 workerArray 类型,这实际上是一个接口,示意一个 worker 容器:

type workerArray interface {len() int
  isEmpty() bool
  insert(worker *goWorker) error
  detach() *goWorker
  retrieveExpiry(duration time.Duration) []*goWorker
  reset()}

每个办法从名字上很好了解含意:

  • len() int:worker 数量;
  • isEmpty() bool:worker 数量是否为 0;
  • insert(worker *goWorker) error:goroutine 工作执行完结后,将相应的 worker 放回 workerArray 中;
  • detach() *goWorker:从 workerArray 中取出一个 worker;
  • retrieveExpiry(duration time.Duration) []*goWorker:取出所有的过期 worker;
  • reset():重置容器。

workerArrayants 中有两种实现,即 workerStackloopQueue

workerStack

咱们先来介绍一下 workerStack,它位于文件worker_stack.go 中:

// src/github.com/panjf2000/ants/worker_stack.go
type workerStack struct {items  []*goWorker
  expiry []*goWorker
  size   int
}

func newWorkerStack(size int) *workerStack {
  return &workerStack{items: make([]*goWorker, 0, size),
    size:  size,
  }
}
  • items:闲暇的worker
  • expiry:过期的worker

goroutine 实现工作之后,Pool池会将相应的 worker 放回 workerStack,调用workerStack.insert() 间接 appenditems中即可:

func (wq *workerStack) insert(worker *goWorker) error {wq.items = append(wq.items, worker)
  return nil
}

新工作到来时,会调用 workerStack.detach() 从容器中取出一个闲暇的 worker:

func (wq *workerStack) detach() *goWorker {l := wq.len()
  if l == 0 {return nil}

  w := wq.items[l-1]
  wq.items[l-1] = nil // avoid memory leaks
  wq.items = wq.items[:l-1]

  return w
}

这里总是返回最初一个 worker,每次 insert() 也是 append 到最初,合乎栈后进先出的特点,故称为workerStack

这里有一个细节,因为切片的底层构造是数组,只有有援用数组的指针,数组中的元素就不会开释。这里取出切片最初一个元素后,将对应数组元素的指针设置为nil,被动开释这个援用。

下面说过新建 Pool 对象时会创立一个 goroutine 定期检查和清理过期的 worker。通过调用 workerArray.retrieveExpiry() 获取过期的 worker 列表。workerStack实现如下:

func (wq *workerStack) retrieveExpiry(duration time.Duration) []*goWorker {n := wq.len()
  if n == 0 {return nil}

  expiryTime := time.Now().Add(-duration)
  index := wq.binarySearch(0, n-1, expiryTime)

  wq.expiry = wq.expiry[:0]
  if index != -1 {wq.expiry = append(wq.expiry, wq.items[:index+1]...)
    m := copy(wq.items, wq.items[index+1:])
    for i := m; i < n; i++ {wq.items[i] = nil
    }
    wq.items = wq.items[:m]
  }
  return wq.expiry
}

实现应用二分查找法找到已过期的最近一个 worker。因为过期工夫是依照 goroutine 执行工作后的闲暇工夫计算的,而 workerStack.insert() 入队程序决定了,它们的过期工夫是从早到晚的。所以能够应用二分查找:

func (wq *workerStack) binarySearch(l, r int, expiryTime time.Time) int {
  var mid int
  for l <= r {mid = (l + r) / 2
    if expiryTime.Before(wq.items[mid].recycleTime) {r = mid - 1} else {l = mid + 1}
  }
  return r
}

二分查找的是最近过期的 worker,行将过期的 worker 的前一个。它和在它之前的 worker 曾经全副过期了。

如果找到索引 index,将items 从结尾到 index(包含)的所有 worker 复制到expiry 字段中。而后将 index 之后的所有未过期 worker 复制到切片头部,这里应用了 copy 函数。copy返回理论复制的数量,即未过期的 worker 数量 m。而后将切片itemsm开始所有的元素置为 nil,防止内存透露,因为它们曾经被复制到头部了。最初裁剪items 切片,返回过期 worker 切片。

loopQueue

loopQueue实现基于循环队列,构造定义在文件 worker_loop_queue 中:

type loopQueue struct {items  []*goWorker
  expiry []*goWorker
  head   int
  tail   int
  size   int
  isFull bool
}

func newWorkerLoopQueue(size int) *loopQueue {
  return &loopQueue{items: make([]*goWorker, size),
    size:  size,
  }
}

因为是循环队列,这里先创立好了一个长度为 size 的切片。循环队列有一个队列头指针head,指向第一个有元素的地位,一个队列尾指针tail,指向下一个能够寄存元素的地位。所以一开始状态如下:

tail 处增加元素,增加后 tail 指针后移。在 head 处取出元素,取出后 head 指针也后移。进行一段时间操作后,队列状态如下:

headtail 指针到队列尾了,须要回绕。所以可能呈现这种状况:

tail 指针赶上 head 指针了,阐明队列就满了:

head 指针赶上 tail 指针了,队列再次为空:

依据示意图,咱们再来看 loopQueue 的操作方法就很简略了。

因为 headtail相等的状况有可能是队列空,也有可能是队列满,所以 loopQueue 中减少一个 isFull 字段以示辨别。goroutine 实现工作之后,会将对应的 worker 对象放回 loopQueue,执行的是insert() 办法:

func (wq *loopQueue) insert(worker *goWorker) error {
  if wq.size == 0 {return errQueueIsReleased}

  if wq.isFull {return errQueueIsFull}
  wq.items[wq.tail] = worker
  wq.tail++

  if wq.tail == wq.size {wq.tail = 0}
  if wq.tail == wq.head {wq.isFull = true}

  return nil
}

这个办法执行的就是循环队列的入队流程,留神如果插入后 tail==head 了,阐明队列满了,设置 isFull 字段。

新工作到来调用 loopQueeue.detach() 办法获取一个闲暇的 worker 构造:

func (wq *loopQueue) detach() *goWorker {if wq.isEmpty() {return nil}

  w := wq.items[wq.head]
  wq.items[wq.head] = nil
  wq.head++
  if wq.head == wq.size {wq.head = 0}
  wq.isFull = false

  return w
}

这个办法对应的是循环队列的出队流程,留神每次出队后,队列必定不满了,isFull要重置为false

workerStack 构造一样,先入的 worker 对象过期工夫早,后入的晚,获取过期 worker 的办法与 workerStack 中相似,只是没有应用二分查找了。这里就不赘述了。

再看 Pool 创立

介绍完两种 workerArray 的实现之后,再来看 Pool 的创立函数中 workers 字段的设置:

if p.options.PreAlloc {
  if size == -1 {return nil, ErrInvalidPreAllocSize}
  p.workers = newWorkerArray(loopQueueType, size)
} else {p.workers = newWorkerArray(stackType, 0)
}

newWorkerArray()定义在文件 worker_array.go 中:

type arrayType int

const (
  stackType arrayType = 1 << iota
  loopQueueType
)

func newWorkerArray(aType arrayType, size int) workerArray {
  switch aType {
  case stackType:
    return newWorkerStack(size)
  case loopQueueType:
    return newWorkerLoopQueue(size)
  default:
    return newWorkerStack(size)
  }
}

即如果设置了预调配选项,就采纳 loopQueue 构造。否则就采纳 stack 的构造。

worker 构造

介绍完 Pool 的创立和构造,咱们来看看 worker 的构造。在 ants 中 worker 用构造体 goWorker 示意,定义在文件 worker.go 中。它的构造非常简单:

// src/github.com/panjf2000/ants/worker.go
type goWorker struct {
  pool *Pool
  task chan func()
  recycleTime time.Time
}

具体字段含意很显著:

  • pool:持有 goroutine 池的援用;
  • task:工作通道,通过这个通道将类型为 func () 的函数作为工作发送给goWorker
  • recyleTime:这个字段记录 goWorker 什么时候被放回池中(即什么时候开始闲暇)。其实现工作后,在将其放回 goroutine 池的时候设置。

goWorker创立时会调用 run() 办法,run()办法中启动一个新 goroutine 解决工作。run()主体流程非常简单:

func (w *goWorker) run() {go func() {
    for f := range w.task {
      if f == nil {return}
      f()
      if ok := w.pool.revertWorker(w); !ok {return}
    }
  }()}

这个办法启动一个新的 goroutine,而后不停地从 task 通道中接管工作,而后执行工作,工作执行实现之后调用池对象的 revertWorker() 办法将该 goWorker 对象放回池中,以便下次取出解决新的工作。revertWorker()办法前面会详细分析。

这里留神,实际上 for f := range w.task 这个循环直到通道 task 敞开或取出为 nil 的工作才会终止。所以这个 goroutine 始终在运行,这正是 ants 高性能的关键所在。每个 goWorker 只会启动一次 goroutine,后续反复利用这个 goroutine。goroutine 每次只执行一个工作就会被放回池中。

还有一个细节,如果放回操作失败,则会调用return,这会让 goroutine 运行完结,避免 goroutine 透露

这里 f == nil 为 true 时return,也是一个细节点,咱们前面讲池敞开的时候会具体介绍。

上面咱们看看 run() 办法的异样解决:

defer func() {w.pool.workerCache.Put(w)
  if p := recover(); p != nil {
    if ph := w.pool.options.PanicHandler; ph != nil {ph(p)
    } else {w.pool.options.Logger.Printf("worker exits from a panic: %v\n", p)
      var buf [4096]byte
      n := runtime.Stack(buf[:], false)
      w.pool.options.Logger.Printf("worker exits from panic: %s\n", string(buf[:n]))
    }
  }
  w.pool.cond.Signal()}()

简略来说,就是在 defer 中通过 recover() 函数捕捉工作执行过程中抛出的 panic。这时工作执行失败,goroutine 也完结了。然而goWorker 对象还是能够反复利用,所以 defer 函数一开始调用 w.pool.workerCache.Put(w)goWorker对象放回 sync.Pool 池中。

接着就是解决 panic,如果选项中指定了panic 处理器,间接调用这个处理器。否则,ants调用选项中设置的 Logger 记录一些日志,如堆栈,panic信息等。

最初须要调用 w.pool.cond.Signal() 告诉当初有闲暇的 goWorker 了。因为咱们理论运行的 goWorker 数量因为 panic 少了一个,而池中可能有其余工作在期待解决。

提交工作

接下来,通过提交工作就能够串起整个流程。由上一篇文章咱们晓得,能够调用池对象的 Submit() 办法提交工作:

func (p *Pool) Submit(task func()) error {if p.IsClosed() {return ErrPoolClosed}
  var w *goWorker
  if w = p.retrieveWorker(); w == nil {return ErrPoolOverload}
  w.task <- task
  return nil
}

首先判断池是否已敞开,而后调用 retrieveWorker() 办法获取一个闲暇的 worker,而后将工作 task 发送到 worker 的工作通道。上面是 retrieveWorker() 实现:

func (p *Pool) retrieveWorker() (w *goWorker) {p.lock.Lock()

  w = p.workers.detach()
  if w != nil {p.lock.Unlock()
  } else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {p.lock.Unlock()
    spawnWorker()} else {
    if p.options.Nonblocking {p.lock.Unlock()
      return
    }
  Reentry:
    if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks {p.lock.Unlock()
      return
    }
    p.blockingNum++
    p.cond.Wait()
    p.blockingNum--
    var nw int
    if nw = p.Running(); nw == 0 {p.lock.Unlock()
      if !p.IsClosed() {spawnWorker()
      }
      return
    }
    if w = p.workers.detach(); w == nil {
      if nw < capacity {p.lock.Unlock()
        spawnWorker()
        return
      }
      goto Reentry
    }

    p.lock.Unlock()}
  return
}

这个办法略微有点简单,咱们一点点来看。首先调用 p.workers.detach() 获取 goWorker 对象。p.workersloopQueue 或者 workerStack 对象,它们都实现了 detach() 办法,后面曾经介绍过了。

如果返回了一个 goWorker 对象,阐明有闲暇 goroutine,间接返回。

否则,池容量还没用完(即容量大于正在工作的 goWorker 数量),则调用 spawnWorker() 新建一个 goWorker,执行其run() 办法:

spawnWorker := func() {w = p.workerCache.Get().(*goWorker)
  w.run()}

否则,池容量已用完。如果设置了 非阻塞 选项,则间接返回。否则,如果设置了最大阻塞队列长度下限,且以后阻塞期待的工作数量曾经达到这个下限,间接返回。否则,阻塞期待数量 +1,调用 p.cond.Wait() 期待。

而后 goWorker.run() 实现一个工作后,调用池的 revertWorker() 办法放回goWorker

func (p *Pool) revertWorker(worker *goWorker) bool {if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() {return false}
  worker.recycleTime = time.Now()
  p.lock.Lock()

  if p.IsClosed() {p.lock.Unlock()
    return false
  }

  err := p.workers.insert(worker)
  if err != nil {p.lock.Unlock()
    return false
  }

  p.cond.Signal()
  p.lock.Unlock()
  return true
}

这里设置了 goWorkerrecycleTime字段,用于断定过期。而后将 goWorker 放回池。workersinsert() 办法后面也曾经剖析过了。

接着调用 p.cond.Signal() 唤醒之前 retrieveWorker() 办法中的期待。retrieveWorker()办法继续执行,阻塞期待数量 -1,这里判断以后 goWorker 的数量(也即 goroutine 数量)。如果数量等于 0,很有可能池子刚刚执行了 Release() 敞开,这时须要判断池是否处于敞开状态,如果是则间接返回。否则,调用 spawnWorker() 创立一个新的 goWorker 并执行其 run() 办法。

如果以后 goWorker 数量不为 0,则调用 p.workers.detach() 取出一个闲暇的 goWorker 返回。这个操作有可能失败,因为可能同时有多个 goroutine 在期待,唤醒的时候只有局部 goroutine 能获取到goWorker。如果失败了,其容量还未用完,间接创立新的goWorker,反之从新执行阻塞期待逻辑。

这里有很多加锁和解锁的逻辑,再加上和信号量混在一起很难看明确。其实只须要晓得一点就很简略了,那就是 p.cond.Wait() 外部会将以后 goroutine 挂起,而后解开它持有的锁,即会调用 p.lock.Unlock()。这也是为什么revertWorker()p.lock.Lock()加锁能胜利的起因。而后 p.cond.Signal()p.cond.Broadcast()会唤醒因为 p.cond.Wait() 而挂起的 goroutine,然而须要 Signal()/Broadcast() 所在 goroutine 调用解锁办法。

最初,放上整体流程图:

清理过期goWorker

NewPool() 函数中会启动一个 goroutine 定期清理过期的goWorker

func (p *Pool) purgePeriodically() {heartbeat := time.NewTicker(p.options.ExpiryDuration)
  defer heartbeat.Stop()

  for range heartbeat.C {if p.IsClosed() {break}

    p.lock.Lock()
    expiredWorkers := p.workers.retrieveExpiry(p.options.ExpiryDuration)
    p.lock.Unlock()

    for i := range expiredWorkers {expiredWorkers[i].task <- nil
      expiredWorkers[i] = nil
    }

    if p.Running() == 0 {p.cond.Broadcast()
    }
  }
}

如果池子已敞开,间接退出 goroutine。由选项 ExpiryDuration 来设置清理的距离,如果没有设置该选项,采纳默认值 1s:

// src/github.com/panjf2000/ants/pool.go
func NewPool(size int, options ...Option) (*Pool, error) {
  if expiry := opts.ExpiryDuration; expiry < 0 {return nil, ErrInvalidPoolExpiry} else if expiry == 0 {opts.ExpiryDuration = DefaultCleanIntervalTime}
}

// src/github.com/panjf2000/ants/pool.go
const (DefaultCleanIntervalTime = time.Second)

而后就是每个清理周期,调用 p.workers.retrieveExpiry() 办法,取出过期的 goWorker 因为由这些 goWorker 启动的 goroutine 还阻塞在通道 task 上,所以要向该通道发送一个 nil 值,而 goWorker.run() 办法中接管到一个值为 nil 的工作会return,完结 goroutine,防止了 goroutine 透露

如果所有 goWorker 都被清理掉了,可能这时还有 goroutine 阻塞在 retrieveWorker() 办法中的 p.cond.Wait() 上,所以这里须要调用 p.cond.Broadcast() 唤醒这些 goroutine。

容量动静批改

在运行过程中,能够动静批改池的容量。调用 p.Tune(size int) 办法:

func (p *Pool) Tune(size int) {if capacity := p.Cap(); capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc {return}
  atomic.StoreInt32(&p.capacity, int32(size))
}

这里只是简略设置了一下新的容量,不影响以后正在执行的goWorker,而且如果设置了预调配选项,容量不能再次设置。

下次执行 revertWorker() 的时候就会以新的容量判断是否能放回,下次执行 retrieveWorker() 的时候也会以新容量判断是否能创立新goWorker

敞开和重新启动Pool

应用实现之后,须要敞开 Pool,防止 goroutine 透露。调用池对象的Release() 办法敞开:

func (p *Pool) Release() {atomic.StoreInt32(&p.state, CLOSED)
  p.lock.Lock()
  p.workers.reset()
  p.lock.Unlock()
  p.cond.Broadcast()}

调用 p.workers.reset() 完结 loopQueuewokerStack中的 goroutine,做一些清理工作,同时为了避免有 goroutine 阻塞在 p.cond.Wait() 上,执行一次p.cond.Broadcast()

workerStackloopQueuereset()基本相同,即发送 niltask通道从而完结 goroutine,而后重置各个字段:

// loopQueue 版本
func (wq *loopQueue) reset() {if wq.isEmpty() {return}

Releasing:
  if w := wq.detach(); w != nil {
    w.task <- nil
    goto Releasing
  }
  wq.items = wq.items[:0]
  wq.size = 0
  wq.head = 0
  wq.tail = 0
}

// stack 版本
func (wq *workerStack) reset() {for i := 0; i < wq.len(); i++ {wq.items[i].task <- nil
    wq.items[i] = nil
  }
  wq.items = wq.items[:0]
}

池敞开后还能够调用 Reboot() 重启:

func (p *Pool) Reboot() {if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {go p.purgePeriodically()
  }
}

因为 p.purgePeriodically()p.Release()之后检测到池敞开就间接退出了,这里须要从新开启一个 goroutine 定期清理。

PoolWithFuncWorkWithFunc

上一篇文章中咱们还介绍了另一种形式创立 Pool,即NewPoolWithFunc(),指定一个函数。前面提交工作时调用p.Invoke() 提供参数就能够执行该函数了。这种形式创立的 Pool 和 Woker 构造如下:

type PoolWithFunc struct {workers []*goWorkerWithFunc
  poolFunc func(interface{})
}

type goWorkerWithFunc struct {
  pool *PoolWithFunc
  args chan interface{}
  recycleTime time.Time
}

与后面介绍的 PoolgoWorker大体类似,只是 PoolWithFunc 保留了传入的函数对象,应用数组保留 worker。goWorkerWithFuncinterface{}args通道的数据类型,其实也好了解,因为曾经有函数了,只须要传入数据作为参数就能够运行了:

func (w *goWorkerWithFunc) run() {go func() {
    for args := range w.args {
      if args == nil {return}
      w.pool.poolFunc(args)
      if ok := w.pool.revertWorker(w); !ok {return}
    }
  }()}

从通道接管函数参数,执行池中保留的函数对象。

其余细节

task缓冲通道

还记得创立 p.workerCache 这个 sync.Pool 对象的代码么:

p.workerCache.New = func() interface{} {
  return &goWorker{
    pool: p,
    task: make(chan func(), workerChanCap),
  }
}

sync.Pool 中没有 goWorker 对象时,调用 New() 办法创立一个,留神到这里创立的 task 通道应用 workerChanCap 作为容量。这个变量定义在 ants.go 文件中:

var (
  // workerChanCap determines whether the channel of a worker should be a buffered channel
  // to get the best performance. Inspired by fasthttp at
  // https://github.com/valyala/fasthttp/blob/master/workerpool.go#L139
  workerChanCap = func() int {
    // Use blocking channel if GOMAXPROCS=1.
    // This switches context from sender to receiver immediately,
    // which results in higher performance (under go1.5 at least).
    if runtime.GOMAXPROCS(0) == 1 {return 0}

    // Use non-blocking workerChan if GOMAXPROCS>1,
    // since otherwise the sender might be dragged down if the receiver is CPU-bound.
    return 1
  }())

为了不便对照,我把正文也放上来了。ants参考了驰名的 Web 框架 fasthttp 的实现。当 GOMAXPROCS 为 1 时(即操作系统线程数为 1),向通道 task 发送会挂起发送 goroutine,将执行流程转向接管 goroutine,这能晋升接管解决性能。如果 GOMAXPROCS 大于 1,ants应用带缓冲的通道,为了避免接管 goroutine 是 CPU 密集的,导致发送 goroutine 被阻塞。上面是 fasthttp 中的相干代码:

// src/github.com/valyala/fasthttp/workerpool.go
var workerChanCap = func() int {
  // Use blocking workerChan if GOMAXPROCS=1.
  // This immediately switches Serve to WorkerFunc, which results
  // in higher performance (under go1.5 at least).
  if runtime.GOMAXPROCS(0) == 1 {return 0}

  // Use non-blocking workerChan if GOMAXPROCS>1,
  // since otherwise the Serve caller (Acceptor) may lag accepting
  // new connections if WorkerFunc is CPU-bound.
  return 1
}()

自旋锁

ants利用 atomic.CompareAndSwapUint32() 这个原子操作实现了一个自旋锁。与其余类型的锁不同,自旋锁在加锁失败之后不会立即进入期待,而是会持续尝试。这对于很快就能取得锁的利用来说能极大晋升性能,因为能防止加锁和解锁导致的线程切换:

type spinLock uint32

func (sl *spinLock) Lock() {
  backoff := 1
  for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
    for i := 0; i < backoff; i++ {runtime.Gosched()
    }
    backoff <<= 1
  }
}

func (sl *spinLock) Unlock() {atomic.StoreUint32((*uint32)(sl), 0)
}

// NewSpinLock instantiates a spin-lock.
func NewSpinLock() sync.Locker {return new(spinLock)
}

另外这里应用了 指数退却 ,先等 1 个循环周期,通过runtime.Gosched() 通知运行时切换其余 goroutine 运行。如果还是获取不到锁,就再等 2 个周期。如果还是不行,再等 4,8,16… 以此类推。这能够避免短时间内获取不到锁,导致 CPU 工夫的节约。

总结

ants源码短小精悍,没有援用其余任何第三方库。各种细节解决,各种性能优化的点都是值得咱们细细品味的。强烈建议大家读一读源码。浏览优良的源码,能极大地提高本身的编码素养。

大家如果发现好玩、好用的 Go 语言库,欢送到 Go 每日一库 GitHub 上提交 issue😄

参考

  1. ants GitHub:github.com/panjf2000/ants
  2. Go 每日一库 GitHub:https://github.com/darjun/go-daily-lib

我的博客:https://darjun.github.io

欢送关注我的微信公众号【GoUpUp】,独特学习,一起提高~

正文完
 0