聊聊rocketmqclientgo的strategy
序本文次要钻研一下rocketmq-client-go的strategy AllocateStrategyrocketmq-client-go-v2.0.0/consumer/strategy.go type AllocateStrategy func(string, string, []*primitive.MessageQueue, []string) []*primitive.MessageQueueAllocateStrategy定义了一个funcAllocateByAveragelyrocketmq-client-go-v2.0.0/consumer/strategy.go func AllocateByAveragely(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue { if currentCID == "" || len(mqAll) == 0 || len(cidAll) == 0 { return nil } var ( find bool index int ) for idx := range cidAll { if cidAll[idx] == currentCID { find = true index = idx break } } if !find { rlog.Warning("[BUG] ConsumerId not in cidAll", map[string]interface{}{ rlog.LogKeyConsumerGroup: consumerGroup, "consumerId": currentCID, "cidAll": cidAll, }) return nil } mqSize := len(mqAll) cidSize := len(cidAll) mod := mqSize % cidSize var averageSize int if mqSize <= cidSize { averageSize = 1 } else { if mod > 0 && index < mod { averageSize = mqSize/cidSize + 1 } else { averageSize = mqSize / cidSize } } var startIndex int if mod > 0 && index < mod { startIndex = index * averageSize } else { startIndex = index*averageSize + mod } num := utils.MinInt(averageSize, mqSize-startIndex) result := make([]*primitive.MessageQueue, 0) for i := 0; i < num; i++ { result = append(result, mqAll[(startIndex+i)%mqSize]) } return result}AllocateByAveragely办法会计算averageSize,而后再依据averageSize计算startIndex,最初取mqAll[(startIndex+i)%mqSize]AllocateByAveragelyCirclerocketmq-client-go-v2.0.0/consumer/strategy.go ...