本文主要研究一下rocketmq-client-go的QueueSelector

QueueSelector

rocketmq-client-go-v2.0.0/producer/selector.go

type QueueSelector interface {    Select(*primitive.Message, []*primitive.MessageQueue) *primitive.MessageQueue}
  • QueueSelector接口,定义了Select方法

manualQueueSelector

rocketmq-client-go-v2.0.0/producer/selector.go

type manualQueueSelector struct{}func NewManualQueueSelector() QueueSelector {    return new(manualQueueSelector)}func (manualQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue {    return message.Queue}
  • manualQueueSelector的select方法直接返回message.Queue

NewRandomQueueSelector

rocketmq-client-go-v2.0.0/producer/selector.go

type randomQueueSelector struct {    rander *rand.Rand}func NewRandomQueueSelector() QueueSelector {    s := new(randomQueueSelector)    s.rander = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))    return s}func (r randomQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue {    i := r.rander.Intn(len(queues))    return queues[i]}
  • NewRandomQueueSelector方法先创建randomQueueSelector,然后设置其rander;Select方法通过r.rander.Intn(len(queues))随机选择index,然后从queue取值

roundRobinQueueSelector

rocketmq-client-go-v2.0.0/producer/selector.go

type roundRobinQueueSelector struct {    sync.Locker    indexer map[string]*int32}func NewRoundRobinQueueSelector() QueueSelector {    s := &roundRobinQueueSelector{        Locker:  new(sync.Mutex),        indexer: map[string]*int32{},    }    return s}func (r *roundRobinQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue {    t := message.Topic    if _, exist := r.indexer[t]; !exist {        r.Lock()        if _, exist := r.indexer[t]; !exist {            var v = int32(0)            r.indexer[t] = &v        }        r.Unlock()    }    index := r.indexer[t]    i := atomic.AddInt32(index, 1)    if i < 0 {        i = -i        atomic.StoreInt32(index, 0)    }    qIndex := int(i) % len(queues)    return queues[qIndex]}
  • roundRobinQueueSelector的qIndex为int(i) % len(queues)

hashQueueSelector

rocketmq-client-go-v2.0.0/producer/selector.go

type hashQueueSelector struct {    random QueueSelector}func NewHashQueueSelector() QueueSelector {    return &hashQueueSelector{        random: NewRandomQueueSelector(),    }}// hashQueueSelector choose the queue by hash if message having sharding key, otherwise choose queue by random instead.func (h *hashQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue {    key := message.GetShardingKey()    if len(key) == 0 {        return h.random.Select(message, queues)    }    hasher := fnv.New32a()    _, err := hasher.Write([]byte(key))    if err != nil {        return nil    }    queueId := int(hasher.Sum32()) % len(queues)    if queueId < 0 {        queueId = -queueId    }    return queues[queueId]}
  • hashQueueSelector通过int(hasher.Sum32()) % len(queues)来计算queue的index

小结

rocketmq-client-go的selector.go定义了manualQueueSelector、roundRobinQueueSelector、hashQueueSelector

doc

  • selector.go