共计 4621 个字符,预计需要花费 12 分钟才能阅读完成。
序
本文次要钻研一下 rocketmq-client-go 的 strategy
AllocateStrategy
rocketmq-client-go-v2.0.0/consumer/strategy.go
type AllocateStrategy func(string, string, []*primitive.MessageQueue, []string) []*primitive.MessageQueue
- AllocateStrategy 定义了一个 func
AllocateByAveragely
rocketmq-client-go-v2.0.0/consumer/strategy.go
func AllocateByAveragely(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue,
cidAll []string) []*primitive.MessageQueue {if currentCID == "" || len(mqAll) == 0 || len(cidAll) == 0 {return nil}
var (
find bool
index int
)
for idx := range cidAll {if cidAll[idx] == currentCID {
find = true
index = idx
break
}
}
if !find {rlog.Warning("[BUG] ConsumerId not in cidAll", map[string]interface{}{
rlog.LogKeyConsumerGroup: consumerGroup,
"consumerId": currentCID,
"cidAll": cidAll,
})
return nil
}
mqSize := len(mqAll)
cidSize := len(cidAll)
mod := mqSize % cidSize
var averageSize int
if mqSize <= cidSize {averageSize = 1} else {
if mod > 0 && index < mod {averageSize = mqSize/cidSize + 1} else {averageSize = mqSize / cidSize}
}
var startIndex int
if mod > 0 && index < mod {startIndex = index * averageSize} else {startIndex = index*averageSize + mod}
num := utils.MinInt(averageSize, mqSize-startIndex)
result := make([]*primitive.MessageQueue, 0)
for i := 0; i < num; i++ {result = append(result, mqAll[(startIndex+i)%mqSize])
}
return result
}
- AllocateByAveragely 办法会计算 averageSize,而后再依据 averageSize 计算 startIndex,最初取
mqAll[(startIndex+i)%mqSize]
AllocateByAveragelyCircle
rocketmq-client-go-v2.0.0/consumer/strategy.go
func AllocateByAveragelyCircle(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue,
cidAll []string) []*primitive.MessageQueue {if currentCID == "" || len(mqAll) == 0 || len(cidAll) == 0 {return nil}
var (
find bool
index int
)
for idx := range cidAll {if cidAll[idx] == currentCID {
find = true
index = idx
break
}
}
if !find {rlog.Warning("[BUG] ConsumerId not in cidAll", map[string]interface{}{
rlog.LogKeyConsumerGroup: consumerGroup,
"consumerId": currentCID,
"cidAll": cidAll,
})
return nil
}
result := make([]*primitive.MessageQueue, 0)
for i := index; i < len(mqAll); i++ {if i%len(cidAll) == index {result = append(result, mqAll[i])
}
}
return result
}
- AllocateByAveragelyCircle 办法取
i%len(cidAll) == index
的下标
AllocateByConfig
rocketmq-client-go-v2.0.0/consumer/strategy.go
func AllocateByConfig(list []*primitive.MessageQueue) AllocateStrategy {return func(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue {return list}
}
- AllocateByConfig 间接返回配置的 list
AllocateByMachineRoom
rocketmq-client-go-v2.0.0/consumer/strategy.go
func AllocateByMachineRoom(consumeridcs []string) AllocateStrategy {return func(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue {if currentCID == "" || len(mqAll) == 0 || len(cidAll) == 0 {return nil}
var (
find bool
index int
)
for idx := range cidAll {if cidAll[idx] == currentCID {
find = true
index = idx
break
}
}
if !find {rlog.Warning("[BUG] ConsumerId not in cidAll", map[string]interface{}{
rlog.LogKeyConsumerGroup: consumerGroup,
"consumerId": currentCID,
"cidAll": cidAll,
})
return nil
}
var premqAll []*primitive.MessageQueue
for _, mq := range mqAll {temp := strings.Split(mq.BrokerName, "@")
if len(temp) == 2 {
for _, idc := range consumeridcs {if idc == temp[0] {premqAll = append(premqAll, mq)
}
}
}
}
mod := len(premqAll) / len(cidAll)
rem := len(premqAll) % len(cidAll)
startIndex := mod * index
endIndex := startIndex + mod
result := make([]*primitive.MessageQueue, 0)
for i := startIndex; i < endIndex; i++ {result = append(result, mqAll[i])
}
if rem > index {result = append(result, premqAll[index+mod*len(cidAll)])
}
return result
}
}
- AllocateByMachineRoom 办法对于 startIndex 与 endIndex 之间的取对应的 mqAll[i],若 rem 大于 index,则取 premqAll[index+mod*len(cidAll)]
AllocateByConsistentHash
rocketmq-client-go-v2.0.0/consumer/strategy.go
func AllocateByConsistentHash(virtualNodeCnt int) AllocateStrategy {return func(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue {if currentCID == "" || len(mqAll) == 0 || len(cidAll) == 0 {return nil}
var (find bool)
for idx := range cidAll {if cidAll[idx] == currentCID {
find = true
break
}
}
if !find {rlog.Warning("[BUG] ConsumerId not in cidAll", map[string]interface{}{
rlog.LogKeyConsumerGroup: consumerGroup,
"consumerId": currentCID,
"cidAll": cidAll,
})
return nil
}
c := consistent.New()
c.NumberOfReplicas = virtualNodeCnt
for _, cid := range cidAll {c.Add(cid)
}
result := make([]*primitive.MessageQueue, 0)
for _, mq := range mqAll {clientNode, err := c.Get(mq.String())
if err != nil {rlog.Warning("[BUG] AllocateByConsistentHash err: %s", map[string]interface{}{rlog.LogKeyUnderlayError: err,})
}
if currentCID == clientNode {result = append(result, mq)
}
}
return result
}
}
- AllocateByConsistentHash 办法会应用 consistent.New()来创立 Consistent,而后依据 virtualNodeCnt 设置其 NumberOfReplicas 属性,而后通过 c.Get(mq.String())获取 clientNode
小结
AllocateStrategy 定义了一个 func;strategy.go 提供了 AllocateByAveragely、AllocateByAveragelyCircle、AllocateByConfig、AllocateByMachineRoom、AllocateByConsistentHash 等办法
doc
- strategy
正文完