一致性哈希

一致性哈希次要针对的是缓存服务做负载平衡,以保障缓存节点变更后缓存生效过多,导致缓存穿透,从而把数据库打死。

一致性哈希原理能够参考这篇文章图解一致性哈希算法,细节分析本文不再赘述。

咱们来看看其外围算法

// service node 构造体定义type ServiceNode struct {    Ip    string    Port  string    Index int}// 返回service node实例func NewServiceNode(ip, port string) *ServiceNode {    return &ServiceNode{        Ip:   ip,        Port: port,    }}func (sn *ServiceNode) SetIndex(index int) {    sn.Index = index}type UInt32Slice []uint32// Len()func (s UInt32Slice) Len() int {    return len(s)}// Less()func (s UInt32Slice) Less(i, j int) bool {    return s[i] < s[j]}// Swap()func (s UInt32Slice) Swap(i, j int) {    s[i], s[j] = s[j], s[i]}// 虚构节点构造定义type VirtualNode struct {    VirtualNodes map[uint32]*ServiceNode    NodeKeys     UInt32Slice    sync.RWMutex}// 实例化虚构节点对象func NewVirtualNode() *VirtualNode {    return &VirtualNode{        VirtualNodes: map[uint32]*ServiceNode{},    }}// 增加虚构节点func (v *VirtualNode) AddVirtualNode(serviceNode *ServiceNode, virtualNum uint) {    // 并发读写map-加锁    v.Lock()    defer v.Unlock()    for i := uint(0); i < virtualNum; i++ {        hashStr := serviceNode.Ip + ":" + serviceNode.Port + ":" + strconv.Itoa(int(i))        v.VirtualNodes[v.getHashCode(hashStr)] = serviceNode    }    // 虚构节点hash值排序    v.sortHash()}// 移除虚构节点func (v *VirtualNode) RemoveVirtualNode(serviceNode *ServiceNode, virtualNum uint) {    // 并发读写map-加锁    v.Lock()    defer v.Unlock()    for i := uint(0); i < virtualNum; i++ {        hashStr := serviceNode.Ip + ":" + serviceNode.Port + ":" + strconv.Itoa(int(i))        delete(v.VirtualNodes, v.getHashCode(hashStr))    }    v.sortHash()}// 获取虚构节点(二分查找)func (v *VirtualNode) GetVirtualNodel(routeKey string) *ServiceNode {    // 并发读写map-加读锁,可并发读不可同时写    v.RLock()    defer v.RUnlock()    index := 0    hashCode := v.getHashCode(routeKey)    i := sort.Search(len(v.NodeKeys), func(i int) bool { return v.NodeKeys[i] > hashCode })    // 当i大于下标最大值时,证实没找到, 给到第0个虚构节点, 当i小于node节点数时, index为以后节点    if i < len(v.NodeKeys) {        index = i    } else {        index = 0    }    // 返回具体节点    return v.VirtualNodes[v.NodeKeys[index]]}// hash数值排序func (v *VirtualNode) sortHash() {    v.NodeKeys = nil    for k := range v.VirtualNodes {        v.NodeKeys = append(v.NodeKeys, k)    }    sort.Sort(v.NodeKeys)}// 获取hash code(采纳md5字符串后计算)func (v *VirtualNode) getHashCode(nodeHash string) uint32 {    // crc32形式hash code    // return crc32.ChecksumIEEE([]byte(nodeHash))    md5 := md5.New()    md5.Write([]byte(nodeHash))    md5Str := hex.EncodeToString(md5.Sum(nil))    h := 0    byteHash := []byte(md5Str)    for i := 0; i < 32; i++ {        h <<= 8        h |= int(byteHash[i]) & 0xFF    }    return uint32(h)}

咱们来写测试代码,测试下

func Test_HashConsistency(t *testing.T) {    // 实例化10个实体节点    var serverNodes []*hashconsistency.ServiceNode    serverNodes = append(serverNodes, hashconsistency.NewServiceNode("127.0.0.1", "3300"))    serverNodes = append(serverNodes, hashconsistency.NewServiceNode("127.0.0.1", "3301"))    serverNodes = append(serverNodes, hashconsistency.NewServiceNode("127.0.0.1", "3302"))    serverNodes = append(serverNodes, hashconsistency.NewServiceNode("127.0.0.1", "3303"))    serverNodes = append(serverNodes, hashconsistency.NewServiceNode("127.0.0.1", "3304"))    serverNodes = append(serverNodes, hashconsistency.NewServiceNode("127.0.0.1", "3305"))    serverNodes = append(serverNodes, hashconsistency.NewServiceNode("127.0.0.1", "3306"))    serverNodes = append(serverNodes, hashconsistency.NewServiceNode("127.0.0.1", "3307"))    serverNodes = append(serverNodes, hashconsistency.NewServiceNode("127.0.0.1", "3308"))    serverNodes = append(serverNodes, hashconsistency.NewServiceNode("127.0.0.1", "3309"))    serverNodesLen := uint(len(serverNodes))    virtualNodeService := hashconsistency.NewVirtualNode()    // 增加对应的虚拟化节点数    for _, sn := range serverNodes {        virtualNodeService.AddVirtualNode(sn, serverNodesLen)    }    // 打印节点列表    var nodes1, nodes2 []string    fmt.Println("-------- node 调度程序--------")    for i := 1; i <= 20; i++ {        // 移除node2节点        if i == 11 {            virtualNodeService.RemoveVirtualNode(serverNodes[1], serverNodesLen)        }        cacheKey := fmt.Sprintf("user:id:%d", i%10)        // 获取对应节点地址        serviceNode := virtualNodeService.GetVirtualNodel(cacheKey)        str := fmt.Sprintf("node: %s cachekey: %s", serviceNode.Ip+":"+serviceNode.Port, cacheKey)        if i <= 10 {            nodes1 = append(nodes1, str)        } else {            nodes2 = append(nodes2, str)        }    }    utils.PrintDiff(strings.Join(nodes1, "\n"), strings.Join(nodes2, "\n"))}

测试后果如下:

-------- node 调度程序---------node: 127.0.0.1:3301 cachekey: user:id:1 // node1宕机+node: 127.0.0.1:3300 cachekey: user:id:1 // 原node1的缓路由到此node0 node: 127.0.0.1:3309 cachekey: user:id:2 node: 127.0.0.1:3309 cachekey: user:id:3 node: 127.0.0.1:3309 cachekey: user:id:4 node: 127.0.0.1:3300 cachekey: user:id:5 node: 127.0.0.1:3307 cachekey: user:id:6-node: 127.0.0.1:3301 cachekey: user:id:7 // node1宕机+node: 127.0.0.1:3302 cachekey: user:id:7 // 原node1的缓路由到此node2 node: 127.0.0.1:3305 cachekey: user:id:8-node: 127.0.0.1:3301 cachekey: user:id:9 // node1宕机+node: 127.0.0.1:3300 cachekey: user:id:9 // 原node1的缓路由到此node0 node: 127.0.0.1:3309 cachekey: user:id:0

从测试中能够看出宕机的node都被主动路由到最近的node,而没有宕机的node持续承接旧的缓存key,阐明通过一致性哈希算法,能够保障咱们的缓存不会因为服务宕机操作大面积缓存生效的问题

咱们再把一致性哈希算法带入到服务中,来看看成果如何

// Config is a configuration.type Config struct {    Proxy                     Proxy   `json:"proxy"`    Nodes                     []*Node `json:"nodes"`    HashConsistency           *VirtualNode    HashConsistencyVirtualNum uint}// Proxy is a reverse proxy, and means load balancer.type Proxy struct {    Url string `json:"url"`}// Node is servers which load balancer is transferred.type Node struct {    URL      string `json:"url"`    IsDead   bool    UseCount int    mu       sync.RWMutex}var cfg Configfunc init() {    data, err := ioutil.ReadFile("./config.json")    if err != nil {        log.Fatal(err.Error())    }    json.Unmarshal(data, &cfg)    if cfg.HashConsistencyVirtualNum == 0 {        cfg.HashConsistencyVirtualNum = 10    }    cfg.HashConsistency = NewVirtualNode()    for i, node := range cfg.Nodes {        addr := strings.Split(node.URL, ":")        serviceNode := NewServiceNode(addr[0], addr[1])        serviceNode.SetIndex(i)        cfg.HashConsistency.AddVirtualNode(serviceNode, cfg.HashConsistencyVirtualNum)    }}func GetCfg() Config {    return cfg}// SetDead updates the value of IsDead in node.func (node *Node) SetDead(b bool) {    node.mu.Lock()    node.IsDead = b    addr := strings.Split(node.URL, ":")    serviceNode := NewServiceNode(addr[0], addr[1])    cfg.HashConsistency.RemoveVirtualNode(serviceNode, cfg.HashConsistencyVirtualNum)    node.mu.Unlock()}// GetIsDead returns the value of IsDead in node.func (node *Node) GetIsDead() bool {    node.mu.RLock()    isAlive := node.IsDead    node.mu.RUnlock()    return isAlive}var mu sync.Mutex// rrlbbHandler is a handler for round robin load balancingfunc rrlbbHandler(w http.ResponseWriter, r *http.Request) {    // Round Robin    mu.Lock()    cacheKey := r.Header.Get("cache-key")    virtualNodel := cfg.HashConsistency.GetVirtualNodel(cacheKey)    targetURL, err := url.Parse(fmt.Sprintf("http://%s:%s", virtualNodel.Ip, virtualNodel.Port))    if err != nil {        log.Fatal(err.Error())    }    currentNode := cfg.Nodes[virtualNodel.Index]    currentNode.UseCount++    if currentNode.GetIsDead() {        rrlbbHandler(w, r)        return    }    mu.Unlock()    reverseProxy := httputil.NewSingleHostReverseProxy(targetURL)    reverseProxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, e error) {        // NOTE: It is better to implement retry.        log.Printf("%v is dead.", targetURL)        currentNode.SetDead(true)        rrlbbHandler(w, r)    }    w.Header().Add("balancer-node", virtualNodel.Ip+virtualNodel.Port)    reverseProxy.ServeHTTP(w, r)}// pingNode checks if the node is alive.func isAlive(url *url.URL) bool {    conn, err := net.DialTimeout("tcp", url.Host, time.Minute*1)    if err != nil {        log.Printf("Unreachable to %v, error %s:", url.Host, err.Error())        return false    }    defer conn.Close()    return true}// healthCheck is a function for healthcheckfunc healthCheck() {    t := time.NewTicker(time.Minute * 1)    for {        select {        case <-t.C:            for _, node := range cfg.Nodes {                pingURL, err := url.Parse(node.URL)                if err != nil {                    log.Fatal(err.Error())                }                isAlive := isAlive(pingURL)                node.SetDead(!isAlive)                msg := "ok"                if !isAlive {                    msg = "dead"                }                log.Printf("%v checked %s by healthcheck", node.URL, msg)            }        }    }}// ProxyServerStart serves a proxyfunc ProxyServerStart() {    var err error    go healthCheck()    s := http.Server{        Addr:    cfg.Proxy.Url,        Handler: http.HandlerFunc(rrlbbHandler),    }    if err = s.ListenAndServe(); err != nil {        log.Fatal(err.Error())    }}// ProxyServerStart serves a nodefunc NodeServerStart() {    http.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {        w.Write([]byte("pong"))    })    wg := new(sync.WaitGroup)    wg.Add(len(cfg.Nodes))    for i, node := range cfg.Nodes {        go func() {            if i != 0 {                log.Fatal(http.ListenAndServe(node.URL, nil))            }            // log.Fatal(http.ListenAndServe(node.URL, nil))            wg.Done()        }()        time.Sleep(time.Millisecond * 100)    }    wg.Wait()}

编写测试代码测试下:

func Test_HashConsistencyWithServer(t *testing.T) {    go hashconsistency.NodeServerStart()    time.Sleep(time.Millisecond * 200)    go hashconsistency.ProxyServerStart()    time.Sleep(time.Millisecond * 100)    for _, tt := range [...]struct {        name, method, uri string        body              io.Reader        want              *http.Request        wantBody          string    }{        {            name:     "GET with ping url",            method:   "GET",            uri:      "http://127.0.0.1:8080/ping",            body:     nil,            wantBody: "pong",        },    } {        t.Run(tt.name, func(t *testing.T) {            fmt.Println("-------- node 调度程序--------")            var nodes1, nodes2 []string            for i := 1; i <= 20; i++ {                cacheKey := fmt.Sprintf("user:id:%d", i%10)                cli := utils.NewHttpClient().                    SetHeader(map[string]string{                        "cache-key": cacheKey,                    }).SetMethod(tt.method).SetUrl(tt.uri).SetBody(tt.body)                err := cli.Request(nil)                if err != nil {                    t.Errorf("ReadAll: %v", err)                }                str := fmt.Sprintf("node: %s cachekey: %s", cli.GetRspHeader().Get("balancer-node"), cacheKey)                if err != nil {                    t.Errorf("ReadAll: %v", err)                }                if string(cli.GetRspBody()) != tt.wantBody {                    t.Errorf("Body = %q; want %q", cli.GetRspBody(), tt.wantBody)                }                if i <= 10 {                    nodes1 = append(nodes1, str)                } else {                    nodes2 = append(nodes2, str)                }            }            utils.PrintDiff(strings.Join(nodes1, "\n"), strings.Join(nodes2, "\n"))            fmt.Println("-------- node 调用次数 --------")            for _, node := range hashconsistency.GetCfg().Nodes {                log.Printf("node: %s useCount: %d", node.URL, node.UseCount)            }        })    }}

测试后果如下:

-------- node 调度程序--------2022/04/08 15:14:55 http://127.0.0.1:8081 is dead. node: 127.0.0.18082 cachekey: user:id:1-node: 127.0.0.18081 cachekey: user:id:2+node: 127.0.0.18083 cachekey: user:id:2 node: 127.0.0.18083 cachekey: user:id:3 node: 127.0.0.18082 cachekey: user:id:4 node: 127.0.0.18082 cachekey: user:id:5 node: 127.0.0.18082 cachekey: user:id:6 node: 127.0.0.18083 cachekey: user:id:7 node: 127.0.0.18083 cachekey: user:id:8 node: 127.0.0.18082 cachekey: user:id:9 node: 127.0.0.18083 cachekey: user:id:0-------- node 调用次数 --------2022/04/08 15:14:55 node: 127.0.0.1:8081 useCount: 12022/04/08 15:14:55 node: 127.0.0.1:8082 useCount: 102022/04/08 15:14:55 node: 127.0.0.1:8083 useCount: 10

测试后果合乎预期,nice :)

go-zero

go-zero 的负载平衡算法通过替换 grpc 默认负载平衡算法来实现负载平衡

具体正文代码请参阅 https://github.com/TTSimple/g...

咱们看看其中外围的两个算法

  • 一、牛顿冷却

原理请参阅 https://www.ruanyifeng.com/bl...

const (    decayTime = int64(time.Second * 1) // 消退工夫)type NLOC struct{}func NewNLOC() *NLOC {    return &NLOC{}}func (n *NLOC) Hot(timex time.Time) float64 {    td := time.Now().Unix() - timex.Unix()    if td < 0 {        td = 0    }    w := math.Exp(float64(-td) / float64(decayTime))    // w, _ = utils.MathRound(w, 9)    return w}

咱们来测试下:

func Test_NLOC(t *testing.T) {    timer := time.NewTimer(time.Second * 10)    quit := make(chan struct{})    defer timer.Stop()    go func() {        <-timer.C        close(quit)    }()    timex := time.Now()    go func() {        n := NewNLOC()        ticker := time.NewTicker(time.Second * 1)        for {            <-ticker.C            fmt.Println(n.Hot(timex))        }    }()    for {        <-quit        return    }}

测试后果如下:

0.9999999000000050.999999800000020.9999997000000450.999999600000080.9999995000001250.999999400000180.9999993000002450.999999200000320.9999991000004050.9999990000005

从下面后果中能够看出,热度是随工夫逐步消退的

  • 二、EWMA 滑动均匀

原理请参阅 https://blog.csdn.net/mzpmzk/...

const (    AVG_METRIC_AGE float64 = 30.0    DECAY float64 = 2 / (float64(AVG_METRIC_AGE) + 1))type SimpleEWMA struct {    // 以后平均值。在用Add()增加后,这个值会更新所有数值的平均值。    value float64}// 增加并更新滑动平均值func (e *SimpleEWMA) Add(value float64) {    if e.value == 0 { // this is a proxy for "uninitialized"        e.value = value    } else {        e.value = (value * DECAY) + (e.value * (1 - DECAY))    }}// 获取以后滑动平均值func (e *SimpleEWMA) Value() float64 {    return e.value}// 设置 ewma 值func (e *SimpleEWMA) Set(value float64) {    e.value = value}

编写测试代码测试下:

const testMargin = 0.00000001var samples = [100]float64{    4599, 5711, 4746, 4621, 5037, 4218, 4925, 4281, 5207, 5203, 5594, 5149,    4948, 4994, 6056, 4417, 4973, 4714, 4964, 5280, 5074, 4913, 4119, 4522,    4631, 4341, 4909, 4750, 4663, 5167, 3683, 4964, 5151, 4892, 4171, 5097,    3546, 4144, 4551, 6557, 4234, 5026, 5220, 4144, 5547, 4747, 4732, 5327,    5442, 4176, 4907, 3570, 4684, 4161, 5206, 4952, 4317, 4819, 4668, 4603,    4885, 4645, 4401, 4362, 5035, 3954, 4738, 4545, 5433, 6326, 5927, 4983,    5364, 4598, 5071, 5231, 5250, 4621, 4269, 3953, 3308, 3623, 5264, 5322,    5395, 4753, 4936, 5315, 5243, 5060, 4989, 4921, 4480, 3426, 3687, 4220,    3197, 5139, 6101, 5279,}func withinMargin(a, b float64) bool {    return math.Abs(a-b) <= testMargin}func TestSimpleEWMA(t *testing.T) {    var e SimpleEWMA    for _, f := range samples {        e.Add(f)    }    fmt.Println(e.Value())    if !withinMargin(e.Value(), 4734.500946466118) {        t.Errorf("e.Value() is %v, wanted %v", e.Value(), 4734.500946466118)    }    e.Set(1.0)    if e.Value() != 1.0 {        t.Errorf("e.Value() is %v", e.Value())    }}

测试胜利,加油!!!

援用文章:

  • 自适应负载平衡算法原理与实现
  • 基于gRPC的注册发现与负载平衡的原理和实战
  • 负载平衡-P2C算法
  • Kratos 源码剖析:Warden 负载平衡算法之 P2C
  • Golang 实现加权轮询负载平衡
  • 指数加权挪动均匀(Exponential Weighted Moving Average)