序
本文主要研究一下rocketmq-client-go的QueueSelector
QueueSelector
rocketmq-client-go-v2.0.0/producer/selector.go
type QueueSelector interface { Select(*primitive.Message, []*primitive.MessageQueue) *primitive.MessageQueue}
- QueueSelector接口,定义了Select方法
manualQueueSelector
rocketmq-client-go-v2.0.0/producer/selector.go
type manualQueueSelector struct{}func NewManualQueueSelector() QueueSelector { return new(manualQueueSelector)}func (manualQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue { return message.Queue}
- manualQueueSelector的select方法直接返回message.Queue
NewRandomQueueSelector
rocketmq-client-go-v2.0.0/producer/selector.go
type randomQueueSelector struct { rander *rand.Rand}func NewRandomQueueSelector() QueueSelector { s := new(randomQueueSelector) s.rander = rand.New(rand.NewSource(time.Now().UTC().UnixNano())) return s}func (r randomQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue { i := r.rander.Intn(len(queues)) return queues[i]}
- NewRandomQueueSelector方法先创建randomQueueSelector,然后设置其rander;Select方法通过r.rander.Intn(len(queues))随机选择index,然后从queue取值
roundRobinQueueSelector
rocketmq-client-go-v2.0.0/producer/selector.go
type roundRobinQueueSelector struct { sync.Locker indexer map[string]*int32}func NewRoundRobinQueueSelector() QueueSelector { s := &roundRobinQueueSelector{ Locker: new(sync.Mutex), indexer: map[string]*int32{}, } return s}func (r *roundRobinQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue { t := message.Topic if _, exist := r.indexer[t]; !exist { r.Lock() if _, exist := r.indexer[t]; !exist { var v = int32(0) r.indexer[t] = &v } r.Unlock() } index := r.indexer[t] i := atomic.AddInt32(index, 1) if i < 0 { i = -i atomic.StoreInt32(index, 0) } qIndex := int(i) % len(queues) return queues[qIndex]}
- roundRobinQueueSelector的qIndex为
int(i) % len(queues)
hashQueueSelector
rocketmq-client-go-v2.0.0/producer/selector.go
type hashQueueSelector struct { random QueueSelector}func NewHashQueueSelector() QueueSelector { return &hashQueueSelector{ random: NewRandomQueueSelector(), }}// hashQueueSelector choose the queue by hash if message having sharding key, otherwise choose queue by random instead.func (h *hashQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue { key := message.GetShardingKey() if len(key) == 0 { return h.random.Select(message, queues) } hasher := fnv.New32a() _, err := hasher.Write([]byte(key)) if err != nil { return nil } queueId := int(hasher.Sum32()) % len(queues) if queueId < 0 { queueId = -queueId } return queues[queueId]}
- hashQueueSelector通过
int(hasher.Sum32()) % len(queues)
来计算queue的index
小结
rocketmq-client-go的selector.go定义了manualQueueSelector、roundRobinQueueSelector、hashQueueSelector
doc
- selector.go