本文次要钻研一下tunny的workerWrapper

workerWrapper

type workerWrapper struct {    worker        Worker    interruptChan chan struct{}    // reqChan is NOT owned by this type, it is used to send requests for work.    reqChan chan<- workRequest    // closeChan can be closed in order to cleanly shutdown this worker.    closeChan chan struct{}    // closedChan is closed by the run() goroutine when it exits.    closedChan chan struct{}}func newWorkerWrapper(    reqChan chan<- workRequest,    worker Worker,) *workerWrapper {    w := workerWrapper{        worker:        worker,        interruptChan: make(chan struct{}),        reqChan:       reqChan,        closeChan:     make(chan struct{}),        closedChan:    make(chan struct{}),    }    go w.run()    return &w}
workerWrapper包装了worker,定义了interruptChan、reqChan、closeChan、closedChan属性

interrupt

func (w *workerWrapper) interrupt() {    close(w.interruptChan)    w.worker.Interrupt()}
interrupt办法敞开w.interruptChan,执行w.worker.Interrupt()

run

func (w *workerWrapper) run() {    jobChan, retChan := make(chan interface{}), make(chan interface{})    defer func() {        w.worker.Terminate()        close(retChan)        close(w.closedChan)    }()    for {        // NOTE: Blocking here will prevent the worker from closing down.        w.worker.BlockUntilReady()        select {        case w.reqChan <- workRequest{            jobChan:       jobChan,            retChan:       retChan,            interruptFunc: w.interrupt,        }:            select {            case payload := <-jobChan:                result := w.worker.Process(payload)                select {                case retChan <- result:                case <-w.interruptChan:                    w.interruptChan = make(chan struct{})                }            case _, _ = <-w.interruptChan:                w.interruptChan = make(chan struct{})            }        case <-w.closeChan:            return        }    }}
run首先创立jobChan、retChan,而后for循环执行select读取reqChan,之后读取jobChan的payload,进行解决,而后写入到retChan

stop

func (w *workerWrapper) stop() {    close(w.closeChan)}
stop办法敞开w.closeChan

join

func (w *workerWrapper) join() {    <-w.closedChan}
join办法则期待w.closedChan

小结

tunny的workerWrapper包装了worker,定义了interruptChan、reqChan、closeChan、closedChan属性,它提供了interrupt、run、stop、join办法。

doc

  • tunny