序
本文次要钻研一下 tempodb 的 Pool
Pool
tempo/tempodb/pool/pool.go
type Pool struct {
cfg *Config
size *atomic.Int32
workQueue chan *job
shutdownCh chan struct{}}
Pool 定义了 cfg、size、workQueue、shutdownCh 属性
job
tempo/tempodb/pool/pool.go
type job struct {
ctx context.Context
cancel context.CancelFunc
payload interface{}
fn JobFunc
wg *sync.WaitGroup
resultsCh chan []byte
stop *atomic.Bool
err *atomic.Error
}
type JobFunc func(ctx context.Context, payload interface{}) ([]byte, error)
job 定义了 ctx、cancel、payload、JobFunc、wg、resultsCh、stop、err 属性;JobFunc 接管 payload,返回
[]byte
类型的后果
Config
tempo/tempodb/pool/pool.go
type Config struct {
MaxWorkers int `yaml:"max_workers"`
QueueDepth int `yaml:"queue_depth"`
}
// default is concurrency disabled
func defaultConfig() *Config {
return &Config{
MaxWorkers: 30,
QueueDepth: 10000,
}
}
Config 能够指定 MaxWorkers、QueueDepth 两个属性;defaultConfig 默认的配置是 MaxWorkers 为 30,QueueDepth 为 10000
NewPool
tempo/tempodb/pool/pool.go
func NewPool(cfg *Config) *Pool {
if cfg == nil {cfg = defaultConfig()
}
q := make(chan *job, cfg.QueueDepth)
p := &Pool{
cfg: cfg,
workQueue: q,
size: atomic.NewInt32(0),
shutdownCh: make(chan struct{}),
}
for i := 0; i < cfg.MaxWorkers; i++ {go p.worker(q)
}
p.reportQueueLength()
metricQueryQueueMax.Set(float64(cfg.QueueDepth))
return p
}
NewPool 依据 Config 创立 Pool,同时依据 cfg.MaxWorkers 启动对应个数的 p.worker(q),而后执行 p.reportQueueLength()
worker
tempo/tempodb/pool/pool.go
func (p *Pool) worker(j <-chan *job) {
for {
select {
case <-p.shutdownCh:
return
case j, ok := <-j:
if !ok {return}
runJob(j)
p.size.Dec()}
}
}
worker 办法通过 for 循环进行 select,若是 p.shutdownCh 则间接 return 跳出循环;若是接管到新 job 则执行 runJob 及 p.size.Dec()
runJob
tempo/tempodb/pool/pool.go
func runJob(job *job) {defer job.wg.Done()
if job.stop.Load() {return}
msg, err := job.fn(job.ctx, job.payload)
if msg != nil {job.stop.Store(true) // one job was successful. stop all others
// Commenting out job cancellations for now because of a resource leak suspected in the GCS golang client.
// Issue logged here: https://github.com/googleapis/google-cloud-go/issues/3018
// job.cancel()
select {
case job.resultsCh <- msg:
default: // if we hit default it means that something else already returned a good result. /shrug
}
}
if err != nil {job.err.Store(err)
}
}
runJob 办法先注册 job.wg.Done()的 defer,而后判断 job.stop,若为 true 间接 return;之后执行 job.fn,若 msg 不为 nil 则标记 job.stop 为 true,而后写入 msg 到 job.resultsCh;若 err 不为 nil 则执行 job.err.Store
Shutdown
tempo/tempodb/pool/pool.go
func (p *Pool) Shutdown() {close(p.workQueue)
close(p.shutdownCh)
}
Shutdown 办法敞开 p.workQueue、p.shutdownCh 这两个 channel
RunJobs
tempo/tempodb/pool/pool.go
func (p *Pool) RunJobs(ctx context.Context, payloads []interface{}, fn JobFunc) ([]byte, error) {ctx, cancel := context.WithCancel(ctx)
defer cancel()
totalJobs := len(payloads)
// sanity check before we even attempt to start adding jobs
if int(p.size.Load())+totalJobs > p.cfg.QueueDepth {return nil, fmt.Errorf("queue doesn't have room for %d jobs", len(payloads))
}
resultsCh := make(chan []byte, 1) // way for jobs to send back results
err := atomic.NewError(nil) // way for jobs to send back an error
stop := atomic.NewBool(false) // way to signal to the jobs to quit
wg := &sync.WaitGroup{} // way to wait for all jobs to complete
// add each job one at a time. even though we checked length above these might still fail
for _, payload := range payloads {wg.Add(1)
j := &job{
ctx: ctx,
cancel: cancel,
fn: fn,
payload: payload,
wg: wg,
resultsCh: resultsCh,
stop: stop,
err: err,
}
select {
case p.workQueue <- j:
p.size.Inc()
default:
wg.Done()
stop.Store(true)
return nil, fmt.Errorf("failed to add a job to work queue")
}
}
// wait for all jobs to finish
wg.Wait()
// see if anything ended up in the results channel
var msg []byte
select {
case msg = <-resultsCh:
default:
}
// ignore err if msg != nil. otherwise errors like "context cancelled"
// will take precedence over the err
if msg != nil {return msg, nil}
return nil, err.Load()}
RunJobs 办法遍历 payloads 创立 job,而后放到 p.workQueue;它应用 WaitGroup 来期待所有 job 执行实现,最初接管 msg 返回
小结
tempodb 提供了一个 job 的 pool,NewPool 依据 Config 创立 Pool,同时依据 cfg.MaxWorkers 启动对应个数的 p.worker(q),而后执行 p.reportQueueLength();RunJobs 办法用于提交 jobs 并期待后果;Shutdown 办法用于敞开 pool 的 workQueue、shutdownCh 这两个 channel。
doc
- [cortex](## 序
本文次要钻研一下 tempodb 的 Pool
Pool
tempo/tempodb/pool/pool.go
type Pool struct {
cfg *Config
size *atomic.Int32
workQueue chan *job
shutdownCh chan struct{}}
Pool 定义了 cfg、size、workQueue、shutdownCh 属性
job
tempo/tempodb/pool/pool.go
type job struct {
ctx context.Context
cancel context.CancelFunc
payload interface{}
fn JobFunc
wg *sync.WaitGroup
resultsCh chan []byte
stop *atomic.Bool
err *atomic.Error
}
type JobFunc func(ctx context.Context, payload interface{}) ([]byte, error)
job 定义了 ctx、cancel、payload、JobFunc、wg、resultsCh、stop、err 属性;JobFunc 接管 payload,返回
[]byte
类型的后果
Config
tempo/tempodb/pool/pool.go
type Config struct {
MaxWorkers int `yaml:"max_workers"`
QueueDepth int `yaml:"queue_depth"`
}
// default is concurrency disabled
func defaultConfig() *Config {
return &Config{
MaxWorkers: 30,
QueueDepth: 10000,
}
}
Config 能够指定 MaxWorkers、QueueDepth 两个属性;defaultConfig 默认的配置是 MaxWorkers 为 30,QueueDepth 为 10000
NewPool
tempo/tempodb/pool/pool.go
func NewPool(cfg *Config) *Pool {
if cfg == nil {cfg = defaultConfig()
}
q := make(chan *job, cfg.QueueDepth)
p := &Pool{
cfg: cfg,
workQueue: q,
size: atomic.NewInt32(0),
shutdownCh: make(chan struct{}),
}
for i := 0; i < cfg.MaxWorkers; i++ {go p.worker(q)
}
p.reportQueueLength()
metricQueryQueueMax.Set(float64(cfg.QueueDepth))
return p
}
NewPool 依据 Config 创立 Pool,同时依据 cfg.MaxWorkers 启动对应个数的 p.worker(q),而后执行 p.reportQueueLength()
worker
tempo/tempodb/pool/pool.go
func (p *Pool) worker(j <-chan *job) {
for {
select {
case <-p.shutdownCh:
return
case j, ok := <-j:
if !ok {return}
runJob(j)
p.size.Dec()}
}
}
worker 办法通过 for 循环进行 select,若是 p.shutdownCh 则间接 return 跳出循环;若是接管到新 job 则执行 runJob 及 p.size.Dec()
runJob
tempo/tempodb/pool/pool.go
func runJob(job *job) {defer job.wg.Done()
if job.stop.Load() {return}
msg, err := job.fn(job.ctx, job.payload)
if msg != nil {job.stop.Store(true) // one job was successful. stop all others
// Commenting out job cancellations for now because of a resource leak suspected in the GCS golang client.
// Issue logged here: https://github.com/googleapis/google-cloud-go/issues/3018
// job.cancel()
select {
case job.resultsCh <- msg:
default: // if we hit default it means that something else already returned a good result. /shrug
}
}
if err != nil {job.err.Store(err)
}
}
runJob 办法先注册 job.wg.Done()的 defer,而后判断 job.stop,若为 true 间接 return;之后执行 job.fn,若 msg 不为 nil 则标记 job.stop 为 true,而后写入 msg 到 job.resultsCh;若 err 不为 nil 则执行 job.err.Store
Shutdown
tempo/tempodb/pool/pool.go
func (p *Pool) Shutdown() {close(p.workQueue)
close(p.shutdownCh)
}
Shutdown 办法敞开 p.workQueue、p.shutdownCh 这两个 channel
RunJobs
tempo/tempodb/pool/pool.go
func (p *Pool) RunJobs(ctx context.Context, payloads []interface{}, fn JobFunc) ([]byte, error) {ctx, cancel := context.WithCancel(ctx)
defer cancel()
totalJobs := len(payloads)
// sanity check before we even attempt to start adding jobs
if int(p.size.Load())+totalJobs > p.cfg.QueueDepth {return nil, fmt.Errorf("queue doesn't have room for %d jobs", len(payloads))
}
resultsCh := make(chan []byte, 1) // way for jobs to send back results
err := atomic.NewError(nil) // way for jobs to send back an error
stop := atomic.NewBool(false) // way to signal to the jobs to quit
wg := &sync.WaitGroup{} // way to wait for all jobs to complete
// add each job one at a time. even though we checked length above these might still fail
for _, payload := range payloads {wg.Add(1)
j := &job{
ctx: ctx,
cancel: cancel,
fn: fn,
payload: payload,
wg: wg,
resultsCh: resultsCh,
stop: stop,
err: err,
}
select {
case p.workQueue <- j:
p.size.Inc()
default:
wg.Done()
stop.Store(true)
return nil, fmt.Errorf("failed to add a job to work queue")
}
}
// wait for all jobs to finish
wg.Wait()
// see if anything ended up in the results channel
var msg []byte
select {
case msg = <-resultsCh:
default:
}
// ignore err if msg != nil. otherwise errors like "context cancelled"
// will take precedence over the err
if msg != nil {return msg, nil}
return nil, err.Load()}
RunJobs 办法遍历 payloads 创立 job,而后放到 p.workQueue;它应用 WaitGroup 来期待所有 job 执行实现,最初接管 msg 返回
小结
tempodb 提供了一个 job 的 pool,NewPool 依据 Config 创立 Pool,同时依据 cfg.MaxWorkers 启动对应个数的 p.worker(q),而后执行 p.reportQueueLength();RunJobs 办法用于提交 jobs 并期待后果;Shutdown 办法用于敞开 pool 的 workQueue、shutdownCh 这两个 channel。
doc
- tempo