协程池次要是为了缩小 go 协程频繁创立、销毁带来的性能损耗,尽管能够忽略不计,然而网上说非凡状况还是有用的。

那这个协程池通俗易懂来讲,比方老板给员工分配任务:

老板领了一堆工作,得找工人干活呀, 那领导就拿出一个工作,给一个闲暇的员工 A,再把下一个工作,给另外一个闲暇的员工 B 。

这时候 A 或者 B,指不定谁先忙完了

如果有人忙完了,领导就把下一个工作,给先忙完的人。A/B 就是协程池外面的两个协程

上面这段代码,实现了如下性能

  1. 协程池数量下限管制、最大闲暇工夫设置
  2. 定时清理闲暇协程清理,开释内存
  3. 工作散发
  4. 协程复用
package gopoolimport (   "context" "log" "sync" "time")type Task func()// boss 老板type GoPool struct {   MaxWorkerIdleTime time.Duration // worker 最大闲暇工夫 MaxWorkerNum int32 // 协程最大数量 TaskEntryChan chan *Task // 工作入列 Workers []*worker // 已创立worker FreeWorkerChan chan *worker // 闲暇worker Lock sync.Mutex}const (   WorkerStatusStop = 1 WorkerStatusLive = 0)// 干活的人type worker struct {   Pool *GoPool StartTime time.Time // 开始工夫 TaskChan chan *Task // 执行队列 LastWorkTime time.Time // 最初执行工夫 Ctx context.Context Cancel context.CancelFunc Status int32 // 被过期删掉的标记}var defaultPool = func() *GoPool {   return NewPool()}()// 初始化func NewPool() *GoPool {   g := &GoPool{      MaxWorkerIdleTime: 10 * time.Second,      MaxWorkerNum:      20,      TaskEntryChan:     make(chan *Task, 2000),      FreeWorkerChan:    make(chan *worker, 2000),   }   // 散发工作 go g.dispatchTask()   //清理闲暇worker go g.fireWorker()   return g}// 定期清理闲暇workerfunc (g *GoPool) fireWorker() {   for {      select {      // 10秒执行一次 case <-time.After(10 * time.Second):         for k, w := range g.Workers {            if time.Now().Sub(w.LastWorkTime) > g.MaxWorkerIdleTime {               log.Printf("overtime %v %p", k, w)               // 终止协程 w.Cancel()               // 清理Free w.Status = WorkerStatusStop            }         }         g.Lock.Lock()         g.Workers = g.cleanWorker(g.Workers)         g.Lock.Unlock()      }   }}// 递归清理无用workerfunc (g *GoPool) cleanWorker(workers []*worker) []*worker {   for k, w := range workers {      if time.Now().Sub(w.LastWorkTime) > g.MaxWorkerIdleTime {         workers = append(workers[:k], workers[k+1:]...) // 删除两头1个元素 return g.cleanWorker(workers)      }   }   return workers}// 散发工作func (g *GoPool) dispatchTask() {   for {      select {      case t := <-g.TaskEntryChan:         log.Printf("dispatch task %p", t)         // 获取worker w := g.fetchWorker()         // 将工作扔给worker w.accept(t)      }   }}// 获取可用workerfunc (g *GoPool) fetchWorker() *worker {   for {      select {      // 获取闲暇worker case w := <-g.FreeWorkerChan:         if w.Status == WorkerStatusLive {            return w         }      default:         // 创立新的worker if int32(len(g.Workers)) < g.MaxWorkerNum {            w := &worker{               Pool:         g,               StartTime:    time.Now(),               LastWorkTime: time.Now(),               TaskChan:     make(chan *Task, 1),               Ctx:          context.Background(),               Status:       WorkerStatusLive,            }            ctx, cancel := context.WithCancel(w.Ctx)            w.Cancel = cancel            // 接到工作本人去执行吧 go w.execute(ctx)            g.Lock.Lock()            g.Workers = append(g.Workers, w)            g.Lock.Unlock()            g.FreeWorkerChan <- w            log.Printf("worker create %p", w)         }      }   }}// 增加工作func (g *GoPool) addTask(t Task) {   // 将工作放到入口工作队列 g.TaskEntryChan <- &t}// 接受任务func (w *worker) accept(t *Task) {   // 每个worker本人的工作队列 w.TaskChan <- t}// 执行工作func (w *worker) execute(ctx context.Context) {   for {      select {      case t := <-w.TaskChan:         // 执行 (*t)()         // 记录工作状态 w.LastWorkTime = time.Now()         w.Pool.FreeWorkerChan <- w      case <-ctx.Done():         log.Printf("worker done %p", w)         return }   }}// 执行func SafeGo(t Task) {   defaultPool.addTask(t)}