本文次要钻研一下tempodb的Pool

Pool

tempo/tempodb/pool/pool.go

type Pool struct {    cfg  *Config    size *atomic.Int32    workQueue  chan *job    shutdownCh chan struct{}}
Pool定义了cfg、size、workQueue、shutdownCh属性

job

tempo/tempodb/pool/pool.go

type job struct {    ctx     context.Context    cancel  context.CancelFunc    payload interface{}    fn      JobFunc    wg        *sync.WaitGroup    resultsCh chan []byte    stop      *atomic.Bool    err       *atomic.Error}type JobFunc func(ctx context.Context, payload interface{}) ([]byte, error)
job定义了ctx、cancel、payload、JobFunc、wg、resultsCh、stop、err属性;JobFunc接管payload,返回[]byte类型的后果

Config

tempo/tempodb/pool/pool.go

type Config struct {    MaxWorkers int `yaml:"max_workers"`    QueueDepth int `yaml:"queue_depth"`}// default is concurrency disabledfunc defaultConfig() *Config {    return &Config{        MaxWorkers: 30,        QueueDepth: 10000,    }}
Config能够指定MaxWorkers、QueueDepth两个属性;defaultConfig默认的配置是MaxWorkers为30,QueueDepth为10000

NewPool

tempo/tempodb/pool/pool.go

func NewPool(cfg *Config) *Pool {    if cfg == nil {        cfg = defaultConfig()    }    q := make(chan *job, cfg.QueueDepth)    p := &Pool{        cfg:        cfg,        workQueue:  q,        size:       atomic.NewInt32(0),        shutdownCh: make(chan struct{}),    }    for i := 0; i < cfg.MaxWorkers; i++ {        go p.worker(q)    }    p.reportQueueLength()    metricQueryQueueMax.Set(float64(cfg.QueueDepth))    return p}
NewPool依据Config创立Pool,同时依据cfg.MaxWorkers启动对应个数的p.worker(q),而后执行p.reportQueueLength()

worker

tempo/tempodb/pool/pool.go

func (p *Pool) worker(j <-chan *job) {    for {        select {        case <-p.shutdownCh:            return        case j, ok := <-j:            if !ok {                return            }            runJob(j)            p.size.Dec()        }    }}
worker办法通过for循环进行select,若是p.shutdownCh则间接return跳出循环;若是接管到新job则执行runJob及p.size.Dec()

runJob

tempo/tempodb/pool/pool.go

func runJob(job *job) {    defer job.wg.Done()    if job.stop.Load() {        return    }    msg, err := job.fn(job.ctx, job.payload)    if msg != nil {        job.stop.Store(true) // one job was successful.  stop all others        // Commenting out job cancellations for now because of a resource leak suspected in the GCS golang client.        // Issue logged here: https://github.com/googleapis/google-cloud-go/issues/3018        // job.cancel()        select {        case job.resultsCh <- msg:        default: // if we hit default it means that something else already returned a good result.  /shrug        }    }    if err != nil {        job.err.Store(err)    }}
runJob办法先注册job.wg.Done()的defer,而后判断job.stop,若为true间接return;之后执行job.fn,若msg不为nil则标记job.stop为true,而后写入msg到job.resultsCh;若err不为nil则执行job.err.Store

Shutdown

tempo/tempodb/pool/pool.go

func (p *Pool) Shutdown() {    close(p.workQueue)    close(p.shutdownCh)}
Shutdown办法敞开p.workQueue、p.shutdownCh这两个channel

RunJobs

tempo/tempodb/pool/pool.go

func (p *Pool) RunJobs(ctx context.Context, payloads []interface{}, fn JobFunc) ([]byte, error) {    ctx, cancel := context.WithCancel(ctx)    defer cancel()    totalJobs := len(payloads)    // sanity check before we even attempt to start adding jobs    if int(p.size.Load())+totalJobs > p.cfg.QueueDepth {        return nil, fmt.Errorf("queue doesn't have room for %d jobs", len(payloads))    }    resultsCh := make(chan []byte, 1) // way for jobs to send back results    err := atomic.NewError(nil)       // way for jobs to send back an error    stop := atomic.NewBool(false)     // way to signal to the jobs to quit    wg := &sync.WaitGroup{}           // way to wait for all jobs to complete    // add each job one at a time.  even though we checked length above these might still fail    for _, payload := range payloads {        wg.Add(1)        j := &job{            ctx:       ctx,            cancel:    cancel,            fn:        fn,            payload:   payload,            wg:        wg,            resultsCh: resultsCh,            stop:      stop,            err:       err,        }        select {        case p.workQueue <- j:            p.size.Inc()        default:            wg.Done()            stop.Store(true)            return nil, fmt.Errorf("failed to add a job to work queue")        }    }    // wait for all jobs to finish    wg.Wait()    // see if anything ended up in the results channel    var msg []byte    select {    case msg = <-resultsCh:    default:    }    // ignore err if msg != nil.  otherwise errors like "context cancelled"    //  will take precedence over the err    if msg != nil {        return msg, nil    }    return nil, err.Load()}
RunJobs办法遍历payloads创立job,而后放到p.workQueue;它应用WaitGroup来期待所有job执行实现,最初接管msg返回

小结

tempodb提供了一个job的pool,NewPool依据Config创立Pool,同时依据cfg.MaxWorkers启动对应个数的p.worker(q),而后执行p.reportQueueLength();RunJobs办法用于提交jobs并期待后果;Shutdown办法用于敞开pool的workQueue、shutdownCh这两个channel。

doc

  • [cortex](## 序

本文次要钻研一下tempodb的Pool

Pool

tempo/tempodb/pool/pool.go

type Pool struct {    cfg  *Config    size *atomic.Int32    workQueue  chan *job    shutdownCh chan struct{}}
Pool定义了cfg、size、workQueue、shutdownCh属性

job

tempo/tempodb/pool/pool.go

type job struct {    ctx     context.Context    cancel  context.CancelFunc    payload interface{}    fn      JobFunc    wg        *sync.WaitGroup    resultsCh chan []byte    stop      *atomic.Bool    err       *atomic.Error}type JobFunc func(ctx context.Context, payload interface{}) ([]byte, error)
job定义了ctx、cancel、payload、JobFunc、wg、resultsCh、stop、err属性;JobFunc接管payload,返回[]byte类型的后果

Config

tempo/tempodb/pool/pool.go

type Config struct {    MaxWorkers int `yaml:"max_workers"`    QueueDepth int `yaml:"queue_depth"`}// default is concurrency disabledfunc defaultConfig() *Config {    return &Config{        MaxWorkers: 30,        QueueDepth: 10000,    }}
Config能够指定MaxWorkers、QueueDepth两个属性;defaultConfig默认的配置是MaxWorkers为30,QueueDepth为10000

NewPool

tempo/tempodb/pool/pool.go

func NewPool(cfg *Config) *Pool {    if cfg == nil {        cfg = defaultConfig()    }    q := make(chan *job, cfg.QueueDepth)    p := &Pool{        cfg:        cfg,        workQueue:  q,        size:       atomic.NewInt32(0),        shutdownCh: make(chan struct{}),    }    for i := 0; i < cfg.MaxWorkers; i++ {        go p.worker(q)    }    p.reportQueueLength()    metricQueryQueueMax.Set(float64(cfg.QueueDepth))    return p}
NewPool依据Config创立Pool,同时依据cfg.MaxWorkers启动对应个数的p.worker(q),而后执行p.reportQueueLength()

worker

tempo/tempodb/pool/pool.go

func (p *Pool) worker(j <-chan *job) {    for {        select {        case <-p.shutdownCh:            return        case j, ok := <-j:            if !ok {                return            }            runJob(j)            p.size.Dec()        }    }}
worker办法通过for循环进行select,若是p.shutdownCh则间接return跳出循环;若是接管到新job则执行runJob及p.size.Dec()

runJob

tempo/tempodb/pool/pool.go

func runJob(job *job) {    defer job.wg.Done()    if job.stop.Load() {        return    }    msg, err := job.fn(job.ctx, job.payload)    if msg != nil {        job.stop.Store(true) // one job was successful.  stop all others        // Commenting out job cancellations for now because of a resource leak suspected in the GCS golang client.        // Issue logged here: https://github.com/googleapis/google-cloud-go/issues/3018        // job.cancel()        select {        case job.resultsCh <- msg:        default: // if we hit default it means that something else already returned a good result.  /shrug        }    }    if err != nil {        job.err.Store(err)    }}
runJob办法先注册job.wg.Done()的defer,而后判断job.stop,若为true间接return;之后执行job.fn,若msg不为nil则标记job.stop为true,而后写入msg到job.resultsCh;若err不为nil则执行job.err.Store

Shutdown

tempo/tempodb/pool/pool.go

func (p *Pool) Shutdown() {    close(p.workQueue)    close(p.shutdownCh)}
Shutdown办法敞开p.workQueue、p.shutdownCh这两个channel

RunJobs

tempo/tempodb/pool/pool.go

func (p *Pool) RunJobs(ctx context.Context, payloads []interface{}, fn JobFunc) ([]byte, error) {    ctx, cancel := context.WithCancel(ctx)    defer cancel()    totalJobs := len(payloads)    // sanity check before we even attempt to start adding jobs    if int(p.size.Load())+totalJobs > p.cfg.QueueDepth {        return nil, fmt.Errorf("queue doesn't have room for %d jobs", len(payloads))    }    resultsCh := make(chan []byte, 1) // way for jobs to send back results    err := atomic.NewError(nil)       // way for jobs to send back an error    stop := atomic.NewBool(false)     // way to signal to the jobs to quit    wg := &sync.WaitGroup{}           // way to wait for all jobs to complete    // add each job one at a time.  even though we checked length above these might still fail    for _, payload := range payloads {        wg.Add(1)        j := &job{            ctx:       ctx,            cancel:    cancel,            fn:        fn,            payload:   payload,            wg:        wg,            resultsCh: resultsCh,            stop:      stop,            err:       err,        }        select {        case p.workQueue <- j:            p.size.Inc()        default:            wg.Done()            stop.Store(true)            return nil, fmt.Errorf("failed to add a job to work queue")        }    }    // wait for all jobs to finish    wg.Wait()    // see if anything ended up in the results channel    var msg []byte    select {    case msg = <-resultsCh:    default:    }    // ignore err if msg != nil.  otherwise errors like "context cancelled"    //  will take precedence over the err    if msg != nil {        return msg, nil    }    return nil, err.Load()}
RunJobs办法遍历payloads创立job,而后放到p.workQueue;它应用WaitGroup来期待所有job执行实现,最初接管msg返回

小结

tempodb提供了一个job的pool,NewPool依据Config创立Pool,同时依据cfg.MaxWorkers启动对应个数的p.worker(q),而后执行p.reportQueueLength();RunJobs办法用于提交jobs并期待后果;Shutdown办法用于敞开pool的workQueue、shutdownCh这两个channel。

doc

  • tempo