本文次要钻研一下tunny

Worker

type Worker interface {    // Process will synchronously perform a job and return the result.    Process(interface{}) interface{}    // BlockUntilReady is called before each job is processed and must block the    // calling goroutine until the Worker is ready to process the next job.    BlockUntilReady()    // Interrupt is called when a job is cancelled. The worker is responsible    // for unblocking the Process implementation.    Interrupt()    // Terminate is called when a Worker is removed from the processing pool    // and is responsible for cleaning up any held resources.    Terminate()}
Worker接口定义了Process、BlockUntilReady、Interrupt、Terminate办法

closureWorker

type closureWorker struct {    processor func(interface{}) interface{}}func (w *closureWorker) Process(payload interface{}) interface{} {    return w.processor(payload)}func (w *closureWorker) BlockUntilReady() {}func (w *closureWorker) Interrupt()       {}func (w *closureWorker) Terminate()       {}
closureWorker定义了processor属性,它实现了Worker接口的Process、BlockUntilReady、Interrupt、Terminate办法,其中Process办法委托给processor

callbackWorker

type callbackWorker struct{}func (w *callbackWorker) Process(payload interface{}) interface{} {    f, ok := payload.(func())    if !ok {        return ErrJobNotFunc    }    f()    return nil}func (w *callbackWorker) BlockUntilReady() {}func (w *callbackWorker) Interrupt()       {}func (w *callbackWorker) Terminate()       {}
callbackWorker定义了processor属性,它实现了Worker接口的Process、BlockUntilReady、Interrupt、Terminate办法,其中Process办法执行的是payload函数

Pool

type Pool struct {    queuedJobs int64    ctor    func() Worker    workers []*workerWrapper    reqChan chan workRequest    workerMut sync.Mutex}func New(n int, ctor func() Worker) *Pool {    p := &Pool{        ctor:    ctor,        reqChan: make(chan workRequest),    }    p.SetSize(n)    return p}func NewFunc(n int, f func(interface{}) interface{}) *Pool {    return New(n, func() Worker {        return &closureWorker{            processor: f,        }    })}func NewCallback(n int) *Pool {    return New(n, func() Worker {        return &callbackWorker{}    })}
Pool定义了queuedJobs、ctor、workers、reqChan、workerMut属性;New办法依据n和ctor创立Pool;NewFunc办法依据n和f来创立closureWorker;NewCallback办法创立callbackWorker

Process

func (p *Pool) Process(payload interface{}) interface{} {    atomic.AddInt64(&p.queuedJobs, 1)    request, open := <-p.reqChan    if !open {        panic(ErrPoolNotRunning)    }    request.jobChan <- payload    payload, open = <-request.retChan    if !open {        panic(ErrWorkerClosed)    }    atomic.AddInt64(&p.queuedJobs, -1)    return payload}
Process办法首先递增queuedJobs,而后从reqChan读取request,而后往jobChan写入payload,之后再期待retChan,最初递加queuedJobs

SetSize

func (p *Pool) SetSize(n int) {    p.workerMut.Lock()    defer p.workerMut.Unlock()    lWorkers := len(p.workers)    if lWorkers == n {        return    }    // Add extra workers if N > len(workers)    for i := lWorkers; i < n; i++ {        p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor()))    }    // Asynchronously stop all workers > N    for i := n; i < lWorkers; i++ {        p.workers[i].stop()    }    // Synchronously wait for all workers > N to stop    for i := n; i < lWorkers; i++ {        p.workers[i].join()    }    // Remove stopped workers from slice    p.workers = p.workers[:n]}
SetSize办法首先通过workerMut加锁,而后依据lWorkers创立newWorkerWrapper,之后执行worker.stop,再执行worker.join(),而后清空workers

Close

func (p *Pool) Close() {    p.SetSize(0)    close(p.reqChan)}
Close办法执行SetSize(0)及close(p.reqChan)

实例

func TestFuncJob(t *testing.T) {    pool := NewFunc(10, func(in interface{}) interface{} {        intVal := in.(int)        return intVal * 2    })    defer pool.Close()    for i := 0; i < 10; i++ {        ret := pool.Process(10)        if exp, act := 20, ret.(int); exp != act {            t.Errorf("Wrong result: %v != %v", act, exp)        }    }}
TestFuncJob通过NewFunc创立pool,

小结

tunny的Worker接口定义了Process、BlockUntilReady、Interrupt、Terminate办法;NewFunc办法创立的是closureWorker,NewCallback办法创立的是callbackWorker。

doc

  • tunny