序
本文次要钻研一下 tunny
Worker
type Worker interface {
// Process will synchronously perform a job and return the result.
Process(interface{}) interface{}
// BlockUntilReady is called before each job is processed and must block the
// calling goroutine until the Worker is ready to process the next job.
BlockUntilReady()
// Interrupt is called when a job is cancelled. The worker is responsible
// for unblocking the Process implementation.
Interrupt()
// Terminate is called when a Worker is removed from the processing pool
// and is responsible for cleaning up any held resources.
Terminate()}
Worker 接口定义了 Process、BlockUntilReady、Interrupt、Terminate 办法
closureWorker
type closureWorker struct {processor func(interface{}) interface{}}
func (w *closureWorker) Process(payload interface{}) interface{} {return w.processor(payload)
}
func (w *closureWorker) BlockUntilReady() {}
func (w *closureWorker) Interrupt() {}
func (w *closureWorker) Terminate() {}
closureWorker 定义了 processor 属性,它实现了 Worker 接口的 Process、BlockUntilReady、Interrupt、Terminate 办法,其中 Process 办法委托给 processor
callbackWorker
type callbackWorker struct{}
func (w *callbackWorker) Process(payload interface{}) interface{} {f, ok := payload.(func())
if !ok {return ErrJobNotFunc}
f()
return nil
}
func (w *callbackWorker) BlockUntilReady() {}
func (w *callbackWorker) Interrupt() {}
func (w *callbackWorker) Terminate() {}
callbackWorker 定义了 processor 属性,它实现了 Worker 接口的 Process、BlockUntilReady、Interrupt、Terminate 办法,其中 Process 办法执行的是 payload 函数
Pool
type Pool struct {
queuedJobs int64
ctor func() Worker
workers []*workerWrapper
reqChan chan workRequest
workerMut sync.Mutex
}
func New(n int, ctor func() Worker) *Pool {
p := &Pool{
ctor: ctor,
reqChan: make(chan workRequest),
}
p.SetSize(n)
return p
}
func NewFunc(n int, f func(interface{}) interface{}) *Pool {return New(n, func() Worker {
return &closureWorker{processor: f,}
})
}
func NewCallback(n int) *Pool {return New(n, func() Worker {return &callbackWorker{}
})
}
Pool 定义了 queuedJobs、ctor、workers、reqChan、workerMut 属性;New 办法依据 n 和 ctor 创立 Pool;NewFunc 办法依据 n 和 f 来创立 closureWorker;NewCallback 办法创立 callbackWorker
Process
func (p *Pool) Process(payload interface{}) interface{} {atomic.AddInt64(&p.queuedJobs, 1)
request, open := <-p.reqChan
if !open {panic(ErrPoolNotRunning)
}
request.jobChan <- payload
payload, open = <-request.retChan
if !open {panic(ErrWorkerClosed)
}
atomic.AddInt64(&p.queuedJobs, -1)
return payload
}
Process 办法首先递增 queuedJobs,而后从 reqChan 读取 request,而后往 jobChan 写入 payload,之后再期待 retChan,最初递加 queuedJobs
SetSize
func (p *Pool) SetSize(n int) {p.workerMut.Lock()
defer p.workerMut.Unlock()
lWorkers := len(p.workers)
if lWorkers == n {return}
// Add extra workers if N > len(workers)
for i := lWorkers; i < n; i++ {p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor()))
}
// Asynchronously stop all workers > N
for i := n; i < lWorkers; i++ {p.workers[i].stop()}
// Synchronously wait for all workers > N to stop
for i := n; i < lWorkers; i++ {p.workers[i].join()}
// Remove stopped workers from slice
p.workers = p.workers[:n]
}
SetSize 办法首先通过 workerMut 加锁,而后依据 lWorkers 创立 newWorkerWrapper,之后执行 worker.stop,再执行 worker.join(),而后清空 workers
Close
func (p *Pool) Close() {p.SetSize(0)
close(p.reqChan)
}
Close 办法执行 SetSize(0) 及 close(p.reqChan)
实例
func TestFuncJob(t *testing.T) {pool := NewFunc(10, func(in interface{}) interface{} {intVal := in.(int)
return intVal * 2
})
defer pool.Close()
for i := 0; i < 10; i++ {ret := pool.Process(10)
if exp, act := 20, ret.(int); exp != act {t.Errorf("Wrong result: %v != %v", act, exp)
}
}
}
TestFuncJob 通过 NewFunc 创立 pool,
小结
tunny 的 Worker 接口定义了 Process、BlockUntilReady、Interrupt、Terminate 办法;NewFunc 办法创立的是 closureWorker,NewCallback 办法创立的是 callbackWorker。
doc
- tunny