关于golang:聊聊tempo的ExclusiveQueues

48次阅读

共计 2042 个字符,预计需要花费 6 分钟才能阅读完成。

本文次要钻研一下 tempo 的 ExclusiveQueues

ExclusiveQueues

tempo/pkg/flushqueues/exclusivequeues.go

type ExclusiveQueues struct {queues     []*util.PriorityQueue
    index      *atomic.Int32
    activeKeys sync.Map
}

ExclusiveQueues 定义了 queues、index、activeKeys 属性

New

tempo/pkg/flushqueues/exclusivequeues.go

// New creates a new set of flush queues with a prom gauge to track current depth
func New(queues int, metric prometheus.Gauge) *ExclusiveQueues {
    f := &ExclusiveQueues{queues: make([]*util.PriorityQueue, queues),
        index:  atomic.NewInt32(0),
    }

    for j := 0; j < queues; j++ {f.queues[j] = util.NewPriorityQueue(metric)
    }

    return f
}

New 办法先创立 ExclusiveQueues,而后依据指定的 queue 个数通过 util.NewPriorityQueue(metric)创立 PriorityQueue

Enqueue

tempo/pkg/flushqueues/exclusivequeues.go

// Enqueue adds the op to the next queue and prevents any other items to be added with this key
func (f *ExclusiveQueues) Enqueue(op util.Op) {_, ok := f.activeKeys.Load(op.Key())
    if ok {return}

    f.activeKeys.Store(op.Key(), struct{}{})
    f.Requeue(op)
}

Enqueue 办法先从 activeKeys 查找指定的 key,若曾经存在则提前返回,不存在则放入 activeKeys 中,而后执行 f.Requeue(op)

Requeue

tempo/pkg/flushqueues/exclusivequeues.go

// Requeue adds an op that is presumed to already be covered by activeKeys
func (f *ExclusiveQueues) Requeue(op util.Op) {flushQueueIndex := int(f.index.Inc()) % len(f.queues)
    f.queues[flushQueueIndex].Enqueue(op)
}

Requeue 办法首先通过 int(f.index.Inc()) % len(f.queues) 计算 flushQueueIndex,而后找到对应的 queue,执行 Enqueue 办法

Dequeue

tempo/pkg/flushqueues/exclusivequeues.go

// Dequeue removes the next op from the requested queue.  After dequeueing the calling
//  process either needs to call ClearKey or Requeue
func (f *ExclusiveQueues) Dequeue(q int) util.Op {return f.queues[q].Dequeue()}

Dequeue 办法执行 f.queues[q]对应 queue 的 Dequeue

Clear

tempo/pkg/flushqueues/exclusivequeues.go

// Clear unblocks the requested op.  This should be called only after a flush has been successful
func (f *ExclusiveQueues) Clear(op util.Op) {f.activeKeys.Delete(op.Key())
}

Clear 办法将指定 key 从 activeKeys 中移除

Stop

tempo/pkg/flushqueues/exclusivequeues.go

// Stop closes all queues
func (f *ExclusiveQueues) Stop() {
    for _, q := range f.queues {q.Close()
    }
}

Stop 办法遍历 f.queues,挨个执行 q.Close()

小结

tempo 的 ExclusiveQueues 定义了 queues、index、activeKeys 属性;它提供了 Enqueue、Requeue、Dequeue、Clear、Stop 办法。

doc

  • tempo

正文完
 0