乐趣区

关于golang:聊聊tempodb的Pool

本文次要钻研一下 tempodb 的 Pool

Pool

tempo/tempodb/pool/pool.go

type Pool struct {
    cfg  *Config
    size *atomic.Int32

    workQueue  chan *job
    shutdownCh chan struct{}}

Pool 定义了 cfg、size、workQueue、shutdownCh 属性

job

tempo/tempodb/pool/pool.go

type job struct {
    ctx     context.Context
    cancel  context.CancelFunc
    payload interface{}
    fn      JobFunc

    wg        *sync.WaitGroup
    resultsCh chan []byte
    stop      *atomic.Bool
    err       *atomic.Error
}

type JobFunc func(ctx context.Context, payload interface{}) ([]byte, error)

job 定义了 ctx、cancel、payload、JobFunc、wg、resultsCh、stop、err 属性;JobFunc 接管 payload,返回 []byte 类型的后果

Config

tempo/tempodb/pool/pool.go

type Config struct {
    MaxWorkers int `yaml:"max_workers"`
    QueueDepth int `yaml:"queue_depth"`
}

// default is concurrency disabled
func defaultConfig() *Config {
    return &Config{
        MaxWorkers: 30,
        QueueDepth: 10000,
    }
}

Config 能够指定 MaxWorkers、QueueDepth 两个属性;defaultConfig 默认的配置是 MaxWorkers 为 30,QueueDepth 为 10000

NewPool

tempo/tempodb/pool/pool.go

func NewPool(cfg *Config) *Pool {
    if cfg == nil {cfg = defaultConfig()
    }

    q := make(chan *job, cfg.QueueDepth)
    p := &Pool{
        cfg:        cfg,
        workQueue:  q,
        size:       atomic.NewInt32(0),
        shutdownCh: make(chan struct{}),
    }

    for i := 0; i < cfg.MaxWorkers; i++ {go p.worker(q)
    }

    p.reportQueueLength()

    metricQueryQueueMax.Set(float64(cfg.QueueDepth))

    return p
}

NewPool 依据 Config 创立 Pool,同时依据 cfg.MaxWorkers 启动对应个数的 p.worker(q),而后执行 p.reportQueueLength()

worker

tempo/tempodb/pool/pool.go

func (p *Pool) worker(j <-chan *job) {
    for {
        select {
        case <-p.shutdownCh:
            return
        case j, ok := <-j:
            if !ok {return}
            runJob(j)
            p.size.Dec()}
    }
}

worker 办法通过 for 循环进行 select,若是 p.shutdownCh 则间接 return 跳出循环;若是接管到新 job 则执行 runJob 及 p.size.Dec()

runJob

tempo/tempodb/pool/pool.go

func runJob(job *job) {defer job.wg.Done()

    if job.stop.Load() {return}

    msg, err := job.fn(job.ctx, job.payload)
    if msg != nil {job.stop.Store(true) // one job was successful.  stop all others
        // Commenting out job cancellations for now because of a resource leak suspected in the GCS golang client.
        // Issue logged here: https://github.com/googleapis/google-cloud-go/issues/3018
        // job.cancel()
        select {
        case job.resultsCh <- msg:
        default: // if we hit default it means that something else already returned a good result.  /shrug
        }
    }
    if err != nil {job.err.Store(err)
    }
}

runJob 办法先注册 job.wg.Done()的 defer,而后判断 job.stop,若为 true 间接 return;之后执行 job.fn,若 msg 不为 nil 则标记 job.stop 为 true,而后写入 msg 到 job.resultsCh;若 err 不为 nil 则执行 job.err.Store

Shutdown

tempo/tempodb/pool/pool.go

func (p *Pool) Shutdown() {close(p.workQueue)
    close(p.shutdownCh)
}

Shutdown 办法敞开 p.workQueue、p.shutdownCh 这两个 channel

RunJobs

tempo/tempodb/pool/pool.go

func (p *Pool) RunJobs(ctx context.Context, payloads []interface{}, fn JobFunc) ([]byte, error) {ctx, cancel := context.WithCancel(ctx)
    defer cancel()

    totalJobs := len(payloads)

    // sanity check before we even attempt to start adding jobs
    if int(p.size.Load())+totalJobs > p.cfg.QueueDepth {return nil, fmt.Errorf("queue doesn't have room for %d jobs", len(payloads))
    }

    resultsCh := make(chan []byte, 1) // way for jobs to send back results
    err := atomic.NewError(nil)       // way for jobs to send back an error
    stop := atomic.NewBool(false)     // way to signal to the jobs to quit
    wg := &sync.WaitGroup{}           // way to wait for all jobs to complete

    // add each job one at a time.  even though we checked length above these might still fail
    for _, payload := range payloads {wg.Add(1)
        j := &job{
            ctx:       ctx,
            cancel:    cancel,
            fn:        fn,
            payload:   payload,
            wg:        wg,
            resultsCh: resultsCh,
            stop:      stop,
            err:       err,
        }

        select {
        case p.workQueue <- j:
            p.size.Inc()
        default:
            wg.Done()
            stop.Store(true)
            return nil, fmt.Errorf("failed to add a job to work queue")
        }
    }

    // wait for all jobs to finish
    wg.Wait()

    // see if anything ended up in the results channel
    var msg []byte
    select {
    case msg = <-resultsCh:
    default:
    }

    // ignore err if msg != nil.  otherwise errors like "context cancelled"
    //  will take precedence over the err
    if msg != nil {return msg, nil}

    return nil, err.Load()}

RunJobs 办法遍历 payloads 创立 job,而后放到 p.workQueue;它应用 WaitGroup 来期待所有 job 执行实现,最初接管 msg 返回

小结

tempodb 提供了一个 job 的 pool,NewPool 依据 Config 创立 Pool,同时依据 cfg.MaxWorkers 启动对应个数的 p.worker(q),而后执行 p.reportQueueLength();RunJobs 办法用于提交 jobs 并期待后果;Shutdown 办法用于敞开 pool 的 workQueue、shutdownCh 这两个 channel。

doc

  • [cortex](## 序

本文次要钻研一下 tempodb 的 Pool

Pool

tempo/tempodb/pool/pool.go

type Pool struct {
    cfg  *Config
    size *atomic.Int32

    workQueue  chan *job
    shutdownCh chan struct{}}

Pool 定义了 cfg、size、workQueue、shutdownCh 属性

job

tempo/tempodb/pool/pool.go

type job struct {
    ctx     context.Context
    cancel  context.CancelFunc
    payload interface{}
    fn      JobFunc

    wg        *sync.WaitGroup
    resultsCh chan []byte
    stop      *atomic.Bool
    err       *atomic.Error
}

type JobFunc func(ctx context.Context, payload interface{}) ([]byte, error)

job 定义了 ctx、cancel、payload、JobFunc、wg、resultsCh、stop、err 属性;JobFunc 接管 payload,返回 []byte 类型的后果

Config

tempo/tempodb/pool/pool.go

type Config struct {
    MaxWorkers int `yaml:"max_workers"`
    QueueDepth int `yaml:"queue_depth"`
}

// default is concurrency disabled
func defaultConfig() *Config {
    return &Config{
        MaxWorkers: 30,
        QueueDepth: 10000,
    }
}

Config 能够指定 MaxWorkers、QueueDepth 两个属性;defaultConfig 默认的配置是 MaxWorkers 为 30,QueueDepth 为 10000

NewPool

tempo/tempodb/pool/pool.go

func NewPool(cfg *Config) *Pool {
    if cfg == nil {cfg = defaultConfig()
    }

    q := make(chan *job, cfg.QueueDepth)
    p := &Pool{
        cfg:        cfg,
        workQueue:  q,
        size:       atomic.NewInt32(0),
        shutdownCh: make(chan struct{}),
    }

    for i := 0; i < cfg.MaxWorkers; i++ {go p.worker(q)
    }

    p.reportQueueLength()

    metricQueryQueueMax.Set(float64(cfg.QueueDepth))

    return p
}

NewPool 依据 Config 创立 Pool,同时依据 cfg.MaxWorkers 启动对应个数的 p.worker(q),而后执行 p.reportQueueLength()

worker

tempo/tempodb/pool/pool.go

func (p *Pool) worker(j <-chan *job) {
    for {
        select {
        case <-p.shutdownCh:
            return
        case j, ok := <-j:
            if !ok {return}
            runJob(j)
            p.size.Dec()}
    }
}

worker 办法通过 for 循环进行 select,若是 p.shutdownCh 则间接 return 跳出循环;若是接管到新 job 则执行 runJob 及 p.size.Dec()

runJob

tempo/tempodb/pool/pool.go

func runJob(job *job) {defer job.wg.Done()

    if job.stop.Load() {return}

    msg, err := job.fn(job.ctx, job.payload)
    if msg != nil {job.stop.Store(true) // one job was successful.  stop all others
        // Commenting out job cancellations for now because of a resource leak suspected in the GCS golang client.
        // Issue logged here: https://github.com/googleapis/google-cloud-go/issues/3018
        // job.cancel()
        select {
        case job.resultsCh <- msg:
        default: // if we hit default it means that something else already returned a good result.  /shrug
        }
    }
    if err != nil {job.err.Store(err)
    }
}

runJob 办法先注册 job.wg.Done()的 defer,而后判断 job.stop,若为 true 间接 return;之后执行 job.fn,若 msg 不为 nil 则标记 job.stop 为 true,而后写入 msg 到 job.resultsCh;若 err 不为 nil 则执行 job.err.Store

Shutdown

tempo/tempodb/pool/pool.go

func (p *Pool) Shutdown() {close(p.workQueue)
    close(p.shutdownCh)
}

Shutdown 办法敞开 p.workQueue、p.shutdownCh 这两个 channel

RunJobs

tempo/tempodb/pool/pool.go

func (p *Pool) RunJobs(ctx context.Context, payloads []interface{}, fn JobFunc) ([]byte, error) {ctx, cancel := context.WithCancel(ctx)
    defer cancel()

    totalJobs := len(payloads)

    // sanity check before we even attempt to start adding jobs
    if int(p.size.Load())+totalJobs > p.cfg.QueueDepth {return nil, fmt.Errorf("queue doesn't have room for %d jobs", len(payloads))
    }

    resultsCh := make(chan []byte, 1) // way for jobs to send back results
    err := atomic.NewError(nil)       // way for jobs to send back an error
    stop := atomic.NewBool(false)     // way to signal to the jobs to quit
    wg := &sync.WaitGroup{}           // way to wait for all jobs to complete

    // add each job one at a time.  even though we checked length above these might still fail
    for _, payload := range payloads {wg.Add(1)
        j := &job{
            ctx:       ctx,
            cancel:    cancel,
            fn:        fn,
            payload:   payload,
            wg:        wg,
            resultsCh: resultsCh,
            stop:      stop,
            err:       err,
        }

        select {
        case p.workQueue <- j:
            p.size.Inc()
        default:
            wg.Done()
            stop.Store(true)
            return nil, fmt.Errorf("failed to add a job to work queue")
        }
    }

    // wait for all jobs to finish
    wg.Wait()

    // see if anything ended up in the results channel
    var msg []byte
    select {
    case msg = <-resultsCh:
    default:
    }

    // ignore err if msg != nil.  otherwise errors like "context cancelled"
    //  will take precedence over the err
    if msg != nil {return msg, nil}

    return nil, err.Load()}

RunJobs 办法遍历 payloads 创立 job,而后放到 p.workQueue;它应用 WaitGroup 来期待所有 job 执行实现,最初接管 msg 返回

小结

tempodb 提供了一个 job 的 pool,NewPool 依据 Config 创立 Pool,同时依据 cfg.MaxWorkers 启动对应个数的 p.worker(q),而后执行 p.reportQueueLength();RunJobs 办法用于提交 jobs 并期待后果;Shutdown 办法用于敞开 pool 的 workQueue、shutdownCh 这两个 channel。

doc

  • tempo
退出移动版