序
本文次要钻研一下rocketmq-client-go的strategy
AllocateStrategy
rocketmq-client-go-v2.0.0/consumer/strategy.go
type AllocateStrategy func(string, string, []*primitive.MessageQueue, []string) []*primitive.MessageQueue
- AllocateStrategy定义了一个func
AllocateByAveragely
rocketmq-client-go-v2.0.0/consumer/strategy.go
func AllocateByAveragely(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue { if currentCID == "" || len(mqAll) == 0 || len(cidAll) == 0 { return nil } var ( find bool index int ) for idx := range cidAll { if cidAll[idx] == currentCID { find = true index = idx break } } if !find { rlog.Warning("[BUG] ConsumerId not in cidAll", map[string]interface{}{ rlog.LogKeyConsumerGroup: consumerGroup, "consumerId": currentCID, "cidAll": cidAll, }) return nil } mqSize := len(mqAll) cidSize := len(cidAll) mod := mqSize % cidSize var averageSize int if mqSize <= cidSize { averageSize = 1 } else { if mod > 0 && index < mod { averageSize = mqSize/cidSize + 1 } else { averageSize = mqSize / cidSize } } var startIndex int if mod > 0 && index < mod { startIndex = index * averageSize } else { startIndex = index*averageSize + mod } num := utils.MinInt(averageSize, mqSize-startIndex) result := make([]*primitive.MessageQueue, 0) for i := 0; i < num; i++ { result = append(result, mqAll[(startIndex+i)%mqSize]) } return result}
- AllocateByAveragely办法会计算averageSize,而后再依据averageSize计算startIndex,最初取
mqAll[(startIndex+i)%mqSize]
AllocateByAveragelyCircle
rocketmq-client-go-v2.0.0/consumer/strategy.go
func AllocateByAveragelyCircle(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue { if currentCID == "" || len(mqAll) == 0 || len(cidAll) == 0 { return nil } var ( find bool index int ) for idx := range cidAll { if cidAll[idx] == currentCID { find = true index = idx break } } if !find { rlog.Warning("[BUG] ConsumerId not in cidAll", map[string]interface{}{ rlog.LogKeyConsumerGroup: consumerGroup, "consumerId": currentCID, "cidAll": cidAll, }) return nil } result := make([]*primitive.MessageQueue, 0) for i := index; i < len(mqAll); i++ { if i%len(cidAll) == index { result = append(result, mqAll[i]) } } return result}
- AllocateByAveragelyCircle办法取
i%len(cidAll) == index
的下标
AllocateByConfig
rocketmq-client-go-v2.0.0/consumer/strategy.go
func AllocateByConfig(list []*primitive.MessageQueue) AllocateStrategy { return func(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue { return list }}
- AllocateByConfig间接返回配置的list
AllocateByMachineRoom
rocketmq-client-go-v2.0.0/consumer/strategy.go
func AllocateByMachineRoom(consumeridcs []string) AllocateStrategy { return func(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue { if currentCID == "" || len(mqAll) == 0 || len(cidAll) == 0 { return nil } var ( find bool index int ) for idx := range cidAll { if cidAll[idx] == currentCID { find = true index = idx break } } if !find { rlog.Warning("[BUG] ConsumerId not in cidAll", map[string]interface{}{ rlog.LogKeyConsumerGroup: consumerGroup, "consumerId": currentCID, "cidAll": cidAll, }) return nil } var premqAll []*primitive.MessageQueue for _, mq := range mqAll { temp := strings.Split(mq.BrokerName, "@") if len(temp) == 2 { for _, idc := range consumeridcs { if idc == temp[0] { premqAll = append(premqAll, mq) } } } } mod := len(premqAll) / len(cidAll) rem := len(premqAll) % len(cidAll) startIndex := mod * index endIndex := startIndex + mod result := make([]*primitive.MessageQueue, 0) for i := startIndex; i < endIndex; i++ { result = append(result, mqAll[i]) } if rem > index { result = append(result, premqAll[index+mod*len(cidAll)]) } return result }}
- AllocateByMachineRoom办法对于startIndex与endIndex之间的取对应的mqAll[i],若rem大于index,则取premqAll[index+mod*len(cidAll)]
AllocateByConsistentHash
rocketmq-client-go-v2.0.0/consumer/strategy.go
func AllocateByConsistentHash(virtualNodeCnt int) AllocateStrategy { return func(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue { if currentCID == "" || len(mqAll) == 0 || len(cidAll) == 0 { return nil } var ( find bool ) for idx := range cidAll { if cidAll[idx] == currentCID { find = true break } } if !find { rlog.Warning("[BUG] ConsumerId not in cidAll", map[string]interface{}{ rlog.LogKeyConsumerGroup: consumerGroup, "consumerId": currentCID, "cidAll": cidAll, }) return nil } c := consistent.New() c.NumberOfReplicas = virtualNodeCnt for _, cid := range cidAll { c.Add(cid) } result := make([]*primitive.MessageQueue, 0) for _, mq := range mqAll { clientNode, err := c.Get(mq.String()) if err != nil { rlog.Warning("[BUG] AllocateByConsistentHash err: %s", map[string]interface{}{ rlog.LogKeyUnderlayError: err, }) } if currentCID == clientNode { result = append(result, mq) } } return result }}
- AllocateByConsistentHash办法会应用consistent.New()来创立Consistent,而后依据virtualNodeCnt设置其NumberOfReplicas属性,而后通过c.Get(mq.String())获取clientNode
小结
AllocateStrategy定义了一个func;strategy.go提供了AllocateByAveragely、AllocateByAveragelyCircle、AllocateByConfig、AllocateByMachineRoom、AllocateByConsistentHash等办法
doc
- strategy