关于golang:聊聊gost的GenericTaskPool

39次阅读

共计 5744 个字符,预计需要花费 15 分钟才能阅读完成。

本文次要钻研一下 gost 的 GenericTaskPool

GenericTaskPool

gost/sync/task_pool.go

// GenericTaskPool represents an generic task pool.
type GenericTaskPool interface {
    // AddTask wait idle worker add task
    AddTask(t task) bool
    // AddTaskAlways add task to queues or do it immediately
    AddTaskAlways(t task)
    // AddTaskBalance add task to idle queue
    AddTaskBalance(t task)
    // Close use to close the task pool
    Close()
    // IsClosed use to check pool status.
    IsClosed() bool}

GenericTaskPool 接口定义了 AddTask、AddTaskAlways、AddTaskBalance、Close、IsClosed 接口

TaskPool

gost/sync/task_pool.go

type TaskPool struct {
    TaskPoolOptions

    idx    uint32 // round robin index
    qArray []chan task
    wg     sync.WaitGroup

    once sync.Once
    done chan struct{}}

// return false when the pool is stop
func (p *TaskPool) AddTask(t task) (ok bool) {idx := atomic.AddUint32(&p.idx, 1)
    id := idx % uint32(p.tQNumber)

    select {
    case <-p.done:
        return false
    default:
        p.qArray[id] <- t
        return true
    }
}

func (p *TaskPool) AddTaskAlways(t task) {id := atomic.AddUint32(&p.idx, 1) % uint32(p.tQNumber)

    select {case p.qArray[id] <- t:
        return
    default:
        goSafely(t)
    }
}

// do it immediately when no idle queue
func (p *TaskPool) AddTaskBalance(t task) {length := len(p.qArray)

    // try len/2 times to lookup idle queue
    for i := 0; i < length/2; i++ {
        select {case p.qArray[rand.Intn(length)] <- t:
            return
        default:
            continue
        }
    }

    goSafely(t)
}

// check whether the session has been closed.
func (p *TaskPool) IsClosed() bool {
    select {
    case <-p.done:
        return true

    default:
        return false
    }
}

func (p *TaskPool) Close() {p.stop()
    p.wg.Wait()
    for i := range p.qArray {close(p.qArray[i])
    }
}

TaskPool 定义了 TaskPoolOptions、idx、qArray、wg、once、done 属性;它实现了 GenericTaskPool 接口;AddTask 办法在 pool 是 done 的时候会返回 false,其余状况会递增 idx,而后依据 tQNumber 计算 id,往 qArray[id] 写入 task;AddTaskAlways 办法会疏忽 pool 的敞开信息;AddTaskBalance 办法会尝试 len/ 2 次随机往 qArray 写入 task,都写入不胜利则 goSafely 执行;IsClosed 次要是读取 done 的 channel 信息;Close 办法执行 stop 及 wg.Wait(),最初遍历 qArray 挨个执行 close

NewTaskPool

gost/sync/task_pool.go

func NewTaskPool(opts ...TaskPoolOption) GenericTaskPool {
    var tOpts TaskPoolOptions
    for _, opt := range opts {opt(&tOpts)
    }

    tOpts.validate()

    p := &TaskPool{
        TaskPoolOptions: tOpts,
        qArray:          make([]chan task, tOpts.tQNumber),
        done:            make(chan struct{}),
    }

    for i := 0; i < p.tQNumber; i++ {p.qArray[i] = make(chan task, p.tQLen)
    }
    p.start()

    return p
}

NewTaskPool 通过 TaskPoolOptions 来创立 TaskPool

TaskPoolOption

gost/sync/options.go

const (
    defaultTaskQNumber = 10
    defaultTaskQLen    = 128
)

/////////////////////////////////////////
// Task Pool Options
/////////////////////////////////////////

// TaskPoolOptions is optional settings for task pool
type TaskPoolOptions struct {
    tQLen      int // task queue length. buffer size per queue
    tQNumber   int // task queue number. number of queue
    tQPoolSize int // task pool size. number of workers
}

func (o *TaskPoolOptions) validate() {
    if o.tQPoolSize < 1 {panic(fmt.Sprintf("illegal pool size %d", o.tQPoolSize))
    }

    if o.tQLen < 1 {o.tQLen = defaultTaskQLen}

    if o.tQNumber < 1 {o.tQNumber = defaultTaskQNumber}

    if o.tQNumber > o.tQPoolSize {o.tQNumber = o.tQPoolSize}
}

type TaskPoolOption func(*TaskPoolOptions)

// WithTaskPoolTaskPoolSize set @size of the task queue pool size
func WithTaskPoolTaskPoolSize(size int) TaskPoolOption {return func(o *TaskPoolOptions) {o.tQPoolSize = size}
}

// WithTaskPoolTaskQueueLength set @length of the task queue length
func WithTaskPoolTaskQueueLength(length int) TaskPoolOption {return func(o *TaskPoolOptions) {o.tQLen = length}
}

// WithTaskPoolTaskQueueNumber set @number of the task queue number
func WithTaskPoolTaskQueueNumber(number int) TaskPoolOption {return func(o *TaskPoolOptions) {o.tQNumber = number}
}

TaskPoolOptions 定义了 tQLen、tQNumber、tQPoolSize 属性,提供了 WithTaskPoolTaskPoolSize、WithTaskPoolTaskQueueLength、WithTaskPoolTaskQueueNumber、validate 办法

start

gost/sync/task_pool.go

func (p *TaskPool) start() {
    for i := 0; i < p.tQPoolSize; i++ {p.wg.Add(1)
        workerID := i
        q := p.qArray[workerID%p.tQNumber]
        p.safeRun(workerID, q)
    }
}

func (p *TaskPool) safeRun(workerID int, q chan task) {
    gxruntime.GoSafely(nil, false,
        func() {err := p.run(int(workerID), q)
            if err != nil {
                // log error to stderr
                log.Printf("gost/TaskPool.run error: %s", err.Error())
            }
        },
        nil,
    )
}

start 办法依据 tQPoolSize 挨个执行 safeRun;safeRun 办法通过 GoSafely 执行 p.run(int(workerID), q)

run

gost/sync/task_pool.go

// worker
func (p *TaskPool) run(id int, q chan task) error {defer p.wg.Done()

    var (
        ok bool
        t  task
    )

    for {
        select {
        case <-p.done:
            if 0 < len(q) {
                return fmt.Errorf("task worker %d exit now while its task buffer length %d is greater than 0",
                    id, len(q))
            }

            return nil

        case t, ok = <-q:
            if ok {func() {defer func() {if r := recover(); r != nil {
                            fmt.Fprintf(os.Stderr, "%s goroutine panic: %v\n%s\n",
                                time.Now(), r, string(debug.Stack()))
                        }
                    }()
                    t()}()}
        }
    }
}

run 办法通过 for 循环进行 select,若读取到 p.done 则退出循环;若是读取到 task 则执行 task

taskPoolSimple

gost/sync/task_pool.go

type taskPoolSimple struct {
    work chan task     // task channel
    sem  chan struct{} // gr pool size

    wg sync.WaitGroup

    once sync.Once
    done chan struct{}}

taskPoolSimple 定义了 work、sem、wg、once、done 属性;它实现了 GenericTaskPool 接口;AddTask 办法先判断 done,之后写入 work 及 sem;AddTaskAlways 办法在 select 不到 channel 的时候会执行 goSafely;AddTaskBalance 办法理论执行的是 AddTaskAlways;IsClosed 办法读取 done 信息;Close 办法执行 stop 及 wg.Wait()

实例

gost/sync/task_pool_test.go

func TestTaskPool(t *testing.T) {numCPU := runtime.NumCPU()
    //taskCnt := int64(numCPU * numCPU * 100)

    tp := NewTaskPool(WithTaskPoolTaskPoolSize(1),
        WithTaskPoolTaskQueueNumber(1),
        WithTaskPoolTaskQueueLength(1),
    )

    //task, cnt := newCountTask()
    task, _ := newCountTask()

    var wg sync.WaitGroup
    for i := 0; i < numCPU*numCPU; i++ {wg.Add(1)
        go func() {
            for j := 0; j < 100; j++ {ok := tp.AddTask(task)
                if !ok {t.Log(j)
                }
            }
            wg.Done()}()}
    wg.Wait()
    tp.Close()

    //if taskCnt != atomic.LoadInt64(cnt) {//    //t.Error("want", taskCnt, "got", *cnt)
    //}
}

func TestTaskPoolSimple(t *testing.T) {numCPU := runtime.NumCPU()
    taskCnt := int64(numCPU * numCPU * 100)

    tp := NewTaskPoolSimple(1)

    task, cnt := newCountTask()

    var wg sync.WaitGroup
    for i := 0; i < numCPU*numCPU; i++ {wg.Add(1)
        go func() {
            for j := 0; j < 100; j++ {ok := tp.AddTask(task)
                if !ok {t.Log(j)
                }
            }
            wg.Done()}()}
    wg.Wait()

    cntValue := atomic.LoadInt64(cnt)
    if taskCnt != cntValue {t.Error("want", taskCnt, "got", cntValue)
    }
}

TaskPoolSimple 的创立比较简单,只须要提供 size 参数即可;TaskPool 的创立须要提供 TaskPoolOption,有 WithTaskPoolTaskPoolSize、WithTaskPoolTaskQueueNumber、WithTaskPoolTaskQueueLength 这些 option

小结

gost 的 GenericTaskPool 接口定义了 AddTask、AddTaskAlways、AddTaskBalance、Close、IsClosed 接口;这里有 TaskPool、taskPoolSimple 两个实现。

doc

  • gost

正文完
 0