乐趣区

聊聊rocketmqclientgo的QueueSelector

本文主要研究一下 rocketmq-client-go 的 QueueSelector

QueueSelector

rocketmq-client-go-v2.0.0/producer/selector.go

type QueueSelector interface {Select(*primitive.Message, []*primitive.MessageQueue) *primitive.MessageQueue
}
  • QueueSelector 接口,定义了 Select 方法

manualQueueSelector

rocketmq-client-go-v2.0.0/producer/selector.go

type manualQueueSelector struct{}

func NewManualQueueSelector() QueueSelector {return new(manualQueueSelector)
}

func (manualQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue {return message.Queue}
  • manualQueueSelector 的 select 方法直接返回 message.Queue

NewRandomQueueSelector

rocketmq-client-go-v2.0.0/producer/selector.go

type randomQueueSelector struct {rander *rand.Rand}

func NewRandomQueueSelector() QueueSelector {s := new(randomQueueSelector)
    s.rander = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
    return s
}

func (r randomQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue {i := r.rander.Intn(len(queues))
    return queues[i]
}
  • NewRandomQueueSelector 方法先创建 randomQueueSelector,然后设置其 rander;Select 方法通过 r.rander.Intn(len(queues))随机选择 index,然后从 queue 取值

roundRobinQueueSelector

rocketmq-client-go-v2.0.0/producer/selector.go

type roundRobinQueueSelector struct {
    sync.Locker
    indexer map[string]*int32
}

func NewRoundRobinQueueSelector() QueueSelector {
    s := &roundRobinQueueSelector{Locker:  new(sync.Mutex),
        indexer: map[string]*int32{},}
    return s
}

func (r *roundRobinQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue {
    t := message.Topic
    if _, exist := r.indexer[t]; !exist {r.Lock()
        if _, exist := r.indexer[t]; !exist {var v = int32(0)
            r.indexer[t] = &v
        }
        r.Unlock()}
    index := r.indexer[t]

    i := atomic.AddInt32(index, 1)
    if i < 0 {
        i = -i
        atomic.StoreInt32(index, 0)
    }
    qIndex := int(i) % len(queues)
    return queues[qIndex]
}
  • roundRobinQueueSelector 的 qIndex 为int(i) % len(queues)

hashQueueSelector

rocketmq-client-go-v2.0.0/producer/selector.go

type hashQueueSelector struct {random QueueSelector}

func NewHashQueueSelector() QueueSelector {
    return &hashQueueSelector{random: NewRandomQueueSelector(),
    }
}

// hashQueueSelector choose the queue by hash if message having sharding key, otherwise choose queue by random instead.
func (h *hashQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue {key := message.GetShardingKey()
    if len(key) == 0 {return h.random.Select(message, queues)
    }

    hasher := fnv.New32a()
    _, err := hasher.Write([]byte(key))
    if err != nil {return nil}
    queueId := int(hasher.Sum32()) % len(queues)
    if queueId < 0 {queueId = -queueId}
    return queues[queueId]
}
  • hashQueueSelector 通过 int(hasher.Sum32()) % len(queues) 来计算 queue 的 index

小结

rocketmq-client-go 的 selector.go 定义了 manualQueueSelector、roundRobinQueueSelector、hashQueueSelector

doc

  • selector.go
退出移动版