聊聊rocketmqclientgo的strategy

31次阅读

共计 4621 个字符,预计需要花费 12 分钟才能阅读完成。

本文次要钻研一下 rocketmq-client-go 的 strategy

AllocateStrategy

rocketmq-client-go-v2.0.0/consumer/strategy.go

type AllocateStrategy func(string, string, []*primitive.MessageQueue, []string) []*primitive.MessageQueue
  • AllocateStrategy 定义了一个 func

AllocateByAveragely

rocketmq-client-go-v2.0.0/consumer/strategy.go

func AllocateByAveragely(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue,
    cidAll []string) []*primitive.MessageQueue {if currentCID == "" || len(mqAll) == 0 || len(cidAll) == 0 {return nil}

    var (
        find  bool
        index int
    )
    for idx := range cidAll {if cidAll[idx] == currentCID {
            find = true
            index = idx
            break
        }
    }
    if !find {rlog.Warning("[BUG] ConsumerId not in cidAll", map[string]interface{}{
            rlog.LogKeyConsumerGroup: consumerGroup,
            "consumerId":             currentCID,
            "cidAll":                 cidAll,
        })
        return nil
    }

    mqSize := len(mqAll)
    cidSize := len(cidAll)
    mod := mqSize % cidSize

    var averageSize int
    if mqSize <= cidSize {averageSize = 1} else {
        if mod > 0 && index < mod {averageSize = mqSize/cidSize + 1} else {averageSize = mqSize / cidSize}
    }

    var startIndex int
    if mod > 0 && index < mod {startIndex = index * averageSize} else {startIndex = index*averageSize + mod}

    num := utils.MinInt(averageSize, mqSize-startIndex)
    result := make([]*primitive.MessageQueue, 0)
    for i := 0; i < num; i++ {result = append(result, mqAll[(startIndex+i)%mqSize])
    }
    return result
}
  • AllocateByAveragely 办法会计算 averageSize,而后再依据 averageSize 计算 startIndex,最初取mqAll[(startIndex+i)%mqSize]

AllocateByAveragelyCircle

rocketmq-client-go-v2.0.0/consumer/strategy.go

func AllocateByAveragelyCircle(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue,
    cidAll []string) []*primitive.MessageQueue {if currentCID == "" || len(mqAll) == 0 || len(cidAll) == 0 {return nil}

    var (
        find  bool
        index int
    )
    for idx := range cidAll {if cidAll[idx] == currentCID {
            find = true
            index = idx
            break
        }
    }
    if !find {rlog.Warning("[BUG] ConsumerId not in cidAll", map[string]interface{}{
            rlog.LogKeyConsumerGroup: consumerGroup,
            "consumerId":             currentCID,
            "cidAll":                 cidAll,
        })
        return nil
    }

    result := make([]*primitive.MessageQueue, 0)
    for i := index; i < len(mqAll); i++ {if i%len(cidAll) == index {result = append(result, mqAll[i])
        }
    }
    return result
}
  • AllocateByAveragelyCircle 办法取 i%len(cidAll) == index 的下标

AllocateByConfig

rocketmq-client-go-v2.0.0/consumer/strategy.go

func AllocateByConfig(list []*primitive.MessageQueue) AllocateStrategy {return func(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue {return list}
}
  • AllocateByConfig 间接返回配置的 list

AllocateByMachineRoom

rocketmq-client-go-v2.0.0/consumer/strategy.go

func AllocateByMachineRoom(consumeridcs []string) AllocateStrategy {return func(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue {if currentCID == "" || len(mqAll) == 0 || len(cidAll) == 0 {return nil}

        var (
            find  bool
            index int
        )
        for idx := range cidAll {if cidAll[idx] == currentCID {
                find = true
                index = idx
                break
            }
        }
        if !find {rlog.Warning("[BUG] ConsumerId not in cidAll", map[string]interface{}{
                rlog.LogKeyConsumerGroup: consumerGroup,
                "consumerId":             currentCID,
                "cidAll":                 cidAll,
            })
            return nil
        }

        var premqAll []*primitive.MessageQueue
        for _, mq := range mqAll {temp := strings.Split(mq.BrokerName, "@")
            if len(temp) == 2 {
                for _, idc := range consumeridcs {if idc == temp[0] {premqAll = append(premqAll, mq)
                    }
                }
            }
        }

        mod := len(premqAll) / len(cidAll)
        rem := len(premqAll) % len(cidAll)
        startIndex := mod * index
        endIndex := startIndex + mod

        result := make([]*primitive.MessageQueue, 0)
        for i := startIndex; i < endIndex; i++ {result = append(result, mqAll[i])
        }
        if rem > index {result = append(result, premqAll[index+mod*len(cidAll)])
        }
        return result
    }
}
  • AllocateByMachineRoom 办法对于 startIndex 与 endIndex 之间的取对应的 mqAll[i],若 rem 大于 index,则取 premqAll[index+mod*len(cidAll)]

AllocateByConsistentHash

rocketmq-client-go-v2.0.0/consumer/strategy.go

func AllocateByConsistentHash(virtualNodeCnt int) AllocateStrategy {return func(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue {if currentCID == "" || len(mqAll) == 0 || len(cidAll) == 0 {return nil}

        var (find bool)
        for idx := range cidAll {if cidAll[idx] == currentCID {
                find = true
                break
            }
        }
        if !find {rlog.Warning("[BUG] ConsumerId not in cidAll", map[string]interface{}{
                rlog.LogKeyConsumerGroup: consumerGroup,
                "consumerId":             currentCID,
                "cidAll":                 cidAll,
            })
            return nil
        }

        c := consistent.New()
        c.NumberOfReplicas = virtualNodeCnt
        for _, cid := range cidAll {c.Add(cid)
        }

        result := make([]*primitive.MessageQueue, 0)
        for _, mq := range mqAll {clientNode, err := c.Get(mq.String())
            if err != nil {rlog.Warning("[BUG] AllocateByConsistentHash err: %s", map[string]interface{}{rlog.LogKeyUnderlayError: err,})
            }
            if currentCID == clientNode {result = append(result, mq)
            }
        }
        return result
    }
}
  • AllocateByConsistentHash 办法会应用 consistent.New()来创立 Consistent,而后依据 virtualNodeCnt 设置其 NumberOfReplicas 属性,而后通过 c.Get(mq.String())获取 clientNode

小结

AllocateStrategy 定义了一个 func;strategy.go 提供了 AllocateByAveragely、AllocateByAveragelyCircle、AllocateByConfig、AllocateByMachineRoom、AllocateByConsistentHash 等办法

doc

  • strategy

正文完
 0