乐趣区

关于golang:聊聊golang的tunny

本文次要钻研一下 tunny

Worker

type Worker interface {
    // Process will synchronously perform a job and return the result.
    Process(interface{}) interface{}

    // BlockUntilReady is called before each job is processed and must block the
    // calling goroutine until the Worker is ready to process the next job.
    BlockUntilReady()

    // Interrupt is called when a job is cancelled. The worker is responsible
    // for unblocking the Process implementation.
    Interrupt()

    // Terminate is called when a Worker is removed from the processing pool
    // and is responsible for cleaning up any held resources.
    Terminate()}

Worker 接口定义了 Process、BlockUntilReady、Interrupt、Terminate 办法

closureWorker

type closureWorker struct {processor func(interface{}) interface{}}

func (w *closureWorker) Process(payload interface{}) interface{} {return w.processor(payload)
}

func (w *closureWorker) BlockUntilReady() {}
func (w *closureWorker) Interrupt()       {}
func (w *closureWorker) Terminate()       {}

closureWorker 定义了 processor 属性,它实现了 Worker 接口的 Process、BlockUntilReady、Interrupt、Terminate 办法,其中 Process 办法委托给 processor

callbackWorker

type callbackWorker struct{}

func (w *callbackWorker) Process(payload interface{}) interface{} {f, ok := payload.(func())
    if !ok {return ErrJobNotFunc}
    f()
    return nil
}

func (w *callbackWorker) BlockUntilReady() {}
func (w *callbackWorker) Interrupt()       {}
func (w *callbackWorker) Terminate()       {}

callbackWorker 定义了 processor 属性,它实现了 Worker 接口的 Process、BlockUntilReady、Interrupt、Terminate 办法,其中 Process 办法执行的是 payload 函数

Pool

type Pool struct {
    queuedJobs int64

    ctor    func() Worker
    workers []*workerWrapper
    reqChan chan workRequest

    workerMut sync.Mutex
}

func New(n int, ctor func() Worker) *Pool {
    p := &Pool{
        ctor:    ctor,
        reqChan: make(chan workRequest),
    }
    p.SetSize(n)

    return p
}

func NewFunc(n int, f func(interface{}) interface{}) *Pool {return New(n, func() Worker {
        return &closureWorker{processor: f,}
    })
}

func NewCallback(n int) *Pool {return New(n, func() Worker {return &callbackWorker{}
    })
}

Pool 定义了 queuedJobs、ctor、workers、reqChan、workerMut 属性;New 办法依据 n 和 ctor 创立 Pool;NewFunc 办法依据 n 和 f 来创立 closureWorker;NewCallback 办法创立 callbackWorker

Process

func (p *Pool) Process(payload interface{}) interface{} {atomic.AddInt64(&p.queuedJobs, 1)

    request, open := <-p.reqChan
    if !open {panic(ErrPoolNotRunning)
    }

    request.jobChan <- payload

    payload, open = <-request.retChan
    if !open {panic(ErrWorkerClosed)
    }

    atomic.AddInt64(&p.queuedJobs, -1)
    return payload
}

Process 办法首先递增 queuedJobs,而后从 reqChan 读取 request,而后往 jobChan 写入 payload,之后再期待 retChan,最初递加 queuedJobs

SetSize

func (p *Pool) SetSize(n int) {p.workerMut.Lock()
    defer p.workerMut.Unlock()

    lWorkers := len(p.workers)
    if lWorkers == n {return}

    // Add extra workers if N > len(workers)
    for i := lWorkers; i < n; i++ {p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor()))
    }

    // Asynchronously stop all workers > N
    for i := n; i < lWorkers; i++ {p.workers[i].stop()}

    // Synchronously wait for all workers > N to stop
    for i := n; i < lWorkers; i++ {p.workers[i].join()}

    // Remove stopped workers from slice
    p.workers = p.workers[:n]
}

SetSize 办法首先通过 workerMut 加锁,而后依据 lWorkers 创立 newWorkerWrapper,之后执行 worker.stop,再执行 worker.join(),而后清空 workers

Close

func (p *Pool) Close() {p.SetSize(0)
    close(p.reqChan)
}

Close 办法执行 SetSize(0) 及 close(p.reqChan)

实例

func TestFuncJob(t *testing.T) {pool := NewFunc(10, func(in interface{}) interface{} {intVal := in.(int)
        return intVal * 2
    })
    defer pool.Close()

    for i := 0; i < 10; i++ {ret := pool.Process(10)
        if exp, act := 20, ret.(int); exp != act {t.Errorf("Wrong result: %v != %v", act, exp)
        }
    }
}

TestFuncJob 通过 NewFunc 创立 pool,

小结

tunny 的 Worker 接口定义了 Process、BlockUntilReady、Interrupt、Terminate 办法;NewFunc 办法创立的是 closureWorker,NewCallback 办法创立的是 callbackWorker。

doc

  • tunny
退出移动版