协程池次要是为了缩小 go 协程频繁创立、销毁带来的性能损耗,尽管能够忽略不计,然而网上说非凡状况还是有用的。
那这个协程池通俗易懂来讲,比方老板给员工分配任务:
老板领了一堆工作,得找工人干活呀,那领导就拿出一个工作,给一个闲暇的员工 A,再把下一个工作,给另外一个闲暇的员工 B。
这时候 A 或者 B,指不定谁先忙完了
如果有人忙完了,领导就把下一个工作,给先忙完的人。A/B 就是协程池外面的两个协程
上面这段代码,实现了如下性能
- 协程池数量下限管制、最大闲暇工夫设置
- 定时清理闲暇协程清理,开释内存
- 工作散发
- 协程复用
package gopool
import (
"context"
"log" "sync" "time")
type Task func()
// boss 老板
type GoPool struct {
MaxWorkerIdleTime time.Duration // worker 最大闲暇工夫
MaxWorkerNum int32 // 协程最大数量
TaskEntryChan chan *Task // 工作入列
Workers []*worker // 已创立 worker
FreeWorkerChan chan *worker // 闲暇 worker
Lock sync.Mutex
}
const (
WorkerStatusStop = 1
WorkerStatusLive = 0
)
// 干活的人
type worker struct {
Pool *GoPool
StartTime time.Time // 开始工夫
TaskChan chan *Task // 执行队列
LastWorkTime time.Time // 最初执行工夫
Ctx context.Context
Cancel context.CancelFunc
Status int32 // 被过期删掉的标记
}
var defaultPool = func() *GoPool {return NewPool()
}()
// 初始化
func NewPool() *GoPool {
g := &GoPool{
MaxWorkerIdleTime: 10 * time.Second,
MaxWorkerNum: 20,
TaskEntryChan: make(chan *Task, 2000),
FreeWorkerChan: make(chan *worker, 2000),
}
// 散发工作
go g.dispatchTask()
// 清理闲暇 worker
go g.fireWorker()
return g
}
// 定期清理闲暇 worker
func (g *GoPool) fireWorker() {
for {
select {
// 10 秒执行一次
case <-time.After(10 * time.Second):
for k, w := range g.Workers {if time.Now().Sub(w.LastWorkTime) > g.MaxWorkerIdleTime {log.Printf("overtime %v %p", k, w)
// 终止协程
w.Cancel()
// 清理 Free
w.Status = WorkerStatusStop
}
}
g.Lock.Lock()
g.Workers = g.cleanWorker(g.Workers)
g.Lock.Unlock()}
}
}
// 递归清理无用 worker
func (g *GoPool) cleanWorker(workers []*worker) []*worker {
for k, w := range workers {if time.Now().Sub(w.LastWorkTime) > g.MaxWorkerIdleTime {workers = append(workers[:k], workers[k+1:]...) // 删除两头 1 个元素
return g.cleanWorker(workers)
}
}
return workers
}
// 散发工作
func (g *GoPool) dispatchTask() {
for {
select {
case t := <-g.TaskEntryChan:
log.Printf("dispatch task %p", t)
// 获取 worker
w := g.fetchWorker()
// 将工作扔给 worker
w.accept(t)
}
}
}
// 获取可用 worker
func (g *GoPool) fetchWorker() *worker {
for {
select {
// 获取闲暇 worker
case w := <-g.FreeWorkerChan:
if w.Status == WorkerStatusLive {return w}
default:
// 创立新的 worker
if int32(len(g.Workers)) < g.MaxWorkerNum {
w := &worker{
Pool: g,
StartTime: time.Now(),
LastWorkTime: time.Now(),
TaskChan: make(chan *Task, 1),
Ctx: context.Background(),
Status: WorkerStatusLive,
}
ctx, cancel := context.WithCancel(w.Ctx)
w.Cancel = cancel
// 接到工作本人去执行吧
go w.execute(ctx)
g.Lock.Lock()
g.Workers = append(g.Workers, w)
g.Lock.Unlock()
g.FreeWorkerChan <- w
log.Printf("worker create %p", w)
}
}
}
}
// 增加工作
func (g *GoPool) addTask(t Task) {
// 将工作放到入口工作队列
g.TaskEntryChan <- &t
}
// 接受任务
func (w *worker) accept(t *Task) {
// 每个 worker 本人的工作队列
w.TaskChan <- t
}
// 执行工作
func (w *worker) execute(ctx context.Context) {
for {
select {
case t := <-w.TaskChan:
// 执行
(*t)()
// 记录工作状态
w.LastWorkTime = time.Now()
w.Pool.FreeWorkerChan <- w
case <-ctx.Done():
log.Printf("worker done %p", w)
return
}
}
}
// 执行
func SafeGo(t Task) {defaultPool.addTask(t)
}