本文次要钻研一下rocketmq-client-go的strategy

AllocateStrategy

rocketmq-client-go-v2.0.0/consumer/strategy.go

type AllocateStrategy func(string, string, []*primitive.MessageQueue, []string) []*primitive.MessageQueue
  • AllocateStrategy定义了一个func

AllocateByAveragely

rocketmq-client-go-v2.0.0/consumer/strategy.go

func AllocateByAveragely(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue,    cidAll []string) []*primitive.MessageQueue {    if currentCID == "" || len(mqAll) == 0 || len(cidAll) == 0 {        return nil    }    var (        find  bool        index int    )    for idx := range cidAll {        if cidAll[idx] == currentCID {            find = true            index = idx            break        }    }    if !find {        rlog.Warning("[BUG] ConsumerId not in cidAll", map[string]interface{}{            rlog.LogKeyConsumerGroup: consumerGroup,            "consumerId":             currentCID,            "cidAll":                 cidAll,        })        return nil    }    mqSize := len(mqAll)    cidSize := len(cidAll)    mod := mqSize % cidSize    var averageSize int    if mqSize <= cidSize {        averageSize = 1    } else {        if mod > 0 && index < mod {            averageSize = mqSize/cidSize + 1        } else {            averageSize = mqSize / cidSize        }    }    var startIndex int    if mod > 0 && index < mod {        startIndex = index * averageSize    } else {        startIndex = index*averageSize + mod    }    num := utils.MinInt(averageSize, mqSize-startIndex)    result := make([]*primitive.MessageQueue, 0)    for i := 0; i < num; i++ {        result = append(result, mqAll[(startIndex+i)%mqSize])    }    return result}
  • AllocateByAveragely办法会计算averageSize,而后再依据averageSize计算startIndex,最初取mqAll[(startIndex+i)%mqSize]

AllocateByAveragelyCircle

rocketmq-client-go-v2.0.0/consumer/strategy.go

func AllocateByAveragelyCircle(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue,    cidAll []string) []*primitive.MessageQueue {    if currentCID == "" || len(mqAll) == 0 || len(cidAll) == 0 {        return nil    }    var (        find  bool        index int    )    for idx := range cidAll {        if cidAll[idx] == currentCID {            find = true            index = idx            break        }    }    if !find {        rlog.Warning("[BUG] ConsumerId not in cidAll", map[string]interface{}{            rlog.LogKeyConsumerGroup: consumerGroup,            "consumerId":             currentCID,            "cidAll":                 cidAll,        })        return nil    }    result := make([]*primitive.MessageQueue, 0)    for i := index; i < len(mqAll); i++ {        if i%len(cidAll) == index {            result = append(result, mqAll[i])        }    }    return result}
  • AllocateByAveragelyCircle办法取i%len(cidAll) == index的下标

AllocateByConfig

rocketmq-client-go-v2.0.0/consumer/strategy.go

func AllocateByConfig(list []*primitive.MessageQueue) AllocateStrategy {    return func(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue {        return list    }}
  • AllocateByConfig间接返回配置的list

AllocateByMachineRoom

rocketmq-client-go-v2.0.0/consumer/strategy.go

func AllocateByMachineRoom(consumeridcs []string) AllocateStrategy {    return func(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue {        if currentCID == "" || len(mqAll) == 0 || len(cidAll) == 0 {            return nil        }        var (            find  bool            index int        )        for idx := range cidAll {            if cidAll[idx] == currentCID {                find = true                index = idx                break            }        }        if !find {            rlog.Warning("[BUG] ConsumerId not in cidAll", map[string]interface{}{                rlog.LogKeyConsumerGroup: consumerGroup,                "consumerId":             currentCID,                "cidAll":                 cidAll,            })            return nil        }        var premqAll []*primitive.MessageQueue        for _, mq := range mqAll {            temp := strings.Split(mq.BrokerName, "@")            if len(temp) == 2 {                for _, idc := range consumeridcs {                    if idc == temp[0] {                        premqAll = append(premqAll, mq)                    }                }            }        }        mod := len(premqAll) / len(cidAll)        rem := len(premqAll) % len(cidAll)        startIndex := mod * index        endIndex := startIndex + mod        result := make([]*primitive.MessageQueue, 0)        for i := startIndex; i < endIndex; i++ {            result = append(result, mqAll[i])        }        if rem > index {            result = append(result, premqAll[index+mod*len(cidAll)])        }        return result    }}
  • AllocateByMachineRoom办法对于startIndex与endIndex之间的取对应的mqAll[i],若rem大于index,则取premqAll[index+mod*len(cidAll)]

AllocateByConsistentHash

rocketmq-client-go-v2.0.0/consumer/strategy.go

func AllocateByConsistentHash(virtualNodeCnt int) AllocateStrategy {    return func(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue {        if currentCID == "" || len(mqAll) == 0 || len(cidAll) == 0 {            return nil        }        var (            find bool        )        for idx := range cidAll {            if cidAll[idx] == currentCID {                find = true                break            }        }        if !find {            rlog.Warning("[BUG] ConsumerId not in cidAll", map[string]interface{}{                rlog.LogKeyConsumerGroup: consumerGroup,                "consumerId":             currentCID,                "cidAll":                 cidAll,            })            return nil        }        c := consistent.New()        c.NumberOfReplicas = virtualNodeCnt        for _, cid := range cidAll {            c.Add(cid)        }        result := make([]*primitive.MessageQueue, 0)        for _, mq := range mqAll {            clientNode, err := c.Get(mq.String())            if err != nil {                rlog.Warning("[BUG] AllocateByConsistentHash err: %s", map[string]interface{}{                    rlog.LogKeyUnderlayError: err,                })            }            if currentCID == clientNode {                result = append(result, mq)            }        }        return result    }}
  • AllocateByConsistentHash办法会应用consistent.New()来创立Consistent,而后依据virtualNodeCnt设置其NumberOfReplicas属性,而后通过c.Get(mq.String())获取clientNode

小结

AllocateStrategy定义了一个func;strategy.go提供了AllocateByAveragely、AllocateByAveragelyCircle、AllocateByConfig、AllocateByMachineRoom、AllocateByConsistentHash等办法

doc

  • strategy