序
本文主要研究一下tempodb的Pool
Pool
tempo/tempodb/pool/pool.go
type Pool struct {
cfg *Config
size *atomic.Int32
workQueue chan *job
shutdownCh chan struct{}
}
Pool定义了cfg、size、workQueue、shutdownCh属性
job
tempo/tempodb/pool/pool.go
type job struct {
ctx context.Context
cancel context.CancelFunc
payload interface{}
fn JobFunc
wg *sync.WaitGroup
resultsCh chan []byte
stop *atomic.Bool
err *atomic.Error
}
type JobFunc func(ctx context.Context, payload interface{}) ([]byte, error)
job定义了ctx、cancel、payload、JobFunc、wg、resultsCh、stop、err属性;JobFunc接收payload,返回[]byte
类型的结果
Config
tempo/tempodb/pool/pool.go
type Config struct {
MaxWorkers int `yaml:"max_workers"`
QueueDepth int `yaml:"queue_depth"`
}
// default is concurrency disabled
func defaultConfig() *Config {
return &Config{
MaxWorkers: 30,
QueueDepth: 10000,
}
}
Config可以指定MaxWorkers、QueueDepth两个属性;defaultConfig默认的配置是MaxWorkers为30,QueueDepth为10000
NewPool
tempo/tempodb/pool/pool.go
func NewPool(cfg *Config) *Pool {
if cfg == nil {
cfg = defaultConfig()
}
q := make(chan *job, cfg.QueueDepth)
p := &Pool{
cfg: cfg,
workQueue: q,
size: atomic.NewInt32(0),
shutdownCh: make(chan struct{}),
}
for i := 0; i < cfg.MaxWorkers; i++ {
go p.worker(q)
}
p.reportQueueLength()
metricQueryQueueMax.Set(float64(cfg.QueueDepth))
return p
}
NewPool根据Config创建Pool,同时根据cfg.MaxWorkers启动对应个数的p.worker(q),然后执行p.reportQueueLength()
worker
tempo/tempodb/pool/pool.go
func (p *Pool) worker(j <-chan *job) {
for {
select {
case <-p.shutdownCh:
return
case j, ok := <-j:
if !ok {
return
}
runJob(j)
p.size.Dec()
}
}
}
worker方法通过for循环进行select,若是p.shutdownCh则直接return跳出循环;若是接收到新job则执行runJob及p.size.Dec()
runJob
tempo/tempodb/pool/pool.go
func runJob(job *job) {
defer job.wg.Done()
if job.stop.Load() {
return
}
msg, err := job.fn(job.ctx, job.payload)
if msg != nil {
job.stop.Store(true) // one job was successful. stop all others
// Commenting out job cancellations for now because of a resource leak suspected in the GCS golang client.
// Issue logged here: https://github.com/googleapis/google-cloud-go/issues/3018
// job.cancel()
select {
case job.resultsCh <- msg:
default: // if we hit default it means that something else already returned a good result. /shrug
}
}
if err != nil {
job.err.Store(err)
}
}
runJob方法先注册job.wg.Done()的defer,然后判断job.stop,若为true直接return;之后执行job.fn,若msg不为nil则标记job.stop为true,然后写入msg到job.resultsCh;若err不为nil则执行job.err.Store
Shutdown
tempo/tempodb/pool/pool.go
func (p *Pool) Shutdown() {
close(p.workQueue)
close(p.shutdownCh)
}
Shutdown方法关闭p.workQueue、p.shutdownCh这两个channel
RunJobs
tempo/tempodb/pool/pool.go
func (p *Pool) RunJobs(ctx context.Context, payloads []interface{}, fn JobFunc) ([]byte, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
totalJobs := len(payloads)
// sanity check before we even attempt to start adding jobs
if int(p.size.Load())+totalJobs > p.cfg.QueueDepth {
return nil, fmt.Errorf("queue doesn't have room for %d jobs", len(payloads))
}
resultsCh := make(chan []byte, 1) // way for jobs to send back results
err := atomic.NewError(nil) // way for jobs to send back an error
stop := atomic.NewBool(false) // way to signal to the jobs to quit
wg := &sync.WaitGroup{} // way to wait for all jobs to complete
// add each job one at a time. even though we checked length above these might still fail
for _, payload := range payloads {
wg.Add(1)
j := &job{
ctx: ctx,
cancel: cancel,
fn: fn,
payload: payload,
wg: wg,
resultsCh: resultsCh,
stop: stop,
err: err,
}
select {
case p.workQueue <- j:
p.size.Inc()
default:
wg.Done()
stop.Store(true)
return nil, fmt.Errorf("failed to add a job to work queue")
}
}
// wait for all jobs to finish
wg.Wait()
// see if anything ended up in the results channel
var msg []byte
select {
case msg = <-resultsCh:
default:
}
// ignore err if msg != nil. otherwise errors like "context cancelled"
// will take precedence over the err
if msg != nil {
return msg, nil
}
return nil, err.Load()
}
RunJobs方法遍历payloads创建job,然后放到p.workQueue;它使用WaitGroup来等待所有job执行完成,最后接收msg返回
小结
tempodb提供了一个job的pool,NewPool根据Config创建Pool,同时根据cfg.MaxWorkers启动对应个数的p.worker(q),然后执行p.reportQueueLength();RunJobs方法用于提交jobs并等待结果;Shutdown方法用于关闭pool的workQueue、shutdownCh这两个channel。
doc
有疑问加站长微信联系(非本文作者)