关于golang:gozero源码阅读负载均衡下第六期

51次阅读

共计 11887 个字符,预计需要花费 30 分钟才能阅读完成。

一致性哈希

一致性哈希次要针对的是缓存服务做负载平衡,以保障缓存节点变更后缓存生效过多,导致缓存穿透,从而把数据库打死。

一致性哈希原理能够参考这篇文章图解一致性哈希算法,细节分析本文不再赘述。

咱们来看看其外围算法

// service node 构造体定义
type ServiceNode struct {
    Ip    string
    Port  string
    Index int
}

// 返回 service node 实例
func NewServiceNode(ip, port string) *ServiceNode {
    return &ServiceNode{
        Ip:   ip,
        Port: port,
    }
}

func (sn *ServiceNode) SetIndex(index int) {sn.Index = index}

type UInt32Slice []uint32

// Len()
func (s UInt32Slice) Len() int {return len(s)
}

// Less()
func (s UInt32Slice) Less(i, j int) bool {return s[i] < s[j]
}

// Swap()
func (s UInt32Slice) Swap(i, j int) {s[i], s[j] = s[j], s[i]
}

// 虚构节点构造定义
type VirtualNode struct {VirtualNodes map[uint32]*ServiceNode
    NodeKeys     UInt32Slice
    sync.RWMutex
}

// 实例化虚构节点对象
func NewVirtualNode() *VirtualNode {
    return &VirtualNode{VirtualNodes: map[uint32]*ServiceNode{},}
}

// 增加虚构节点
func (v *VirtualNode) AddVirtualNode(serviceNode *ServiceNode, virtualNum uint) {
    // 并发读写 map- 加锁
    v.Lock()
    defer v.Unlock()
    for i := uint(0); i < virtualNum; i++ {hashStr := serviceNode.Ip + ":" + serviceNode.Port + ":" + strconv.Itoa(int(i))
        v.VirtualNodes[v.getHashCode(hashStr)] = serviceNode
    }
    // 虚构节点 hash 值排序
    v.sortHash()}

// 移除虚构节点
func (v *VirtualNode) RemoveVirtualNode(serviceNode *ServiceNode, virtualNum uint) {
    // 并发读写 map- 加锁
    v.Lock()
    defer v.Unlock()
    for i := uint(0); i < virtualNum; i++ {hashStr := serviceNode.Ip + ":" + serviceNode.Port + ":" + strconv.Itoa(int(i))
        delete(v.VirtualNodes, v.getHashCode(hashStr))
    }
    v.sortHash()}

// 获取虚构节点 (二分查找)
func (v *VirtualNode) GetVirtualNodel(routeKey string) *ServiceNode {
    // 并发读写 map- 加读锁, 可并发读不可同时写
    v.RLock()
    defer v.RUnlock()
    index := 0
    hashCode := v.getHashCode(routeKey)
    i := sort.Search(len(v.NodeKeys), func(i int) bool {return v.NodeKeys[i] > hashCode })
    // 当 i 大于下标最大值时, 证实没找到, 给到第 0 个虚构节点, 当 i 小于 node 节点数时, index 为以后节点
    if i < len(v.NodeKeys) {index = i} else {index = 0}
    // 返回具体节点
    return v.VirtualNodes[v.NodeKeys[index]]
}

// hash 数值排序
func (v *VirtualNode) sortHash() {
    v.NodeKeys = nil
    for k := range v.VirtualNodes {v.NodeKeys = append(v.NodeKeys, k)
    }
    sort.Sort(v.NodeKeys)
}

// 获取 hash code(采纳 md5 字符串后计算)
func (v *VirtualNode) getHashCode(nodeHash string) uint32 {
    // crc32 形式 hash code
    // return crc32.ChecksumIEEE([]byte(nodeHash))
    md5 := md5.New()
    md5.Write([]byte(nodeHash))
    md5Str := hex.EncodeToString(md5.Sum(nil))
    h := 0
    byteHash := []byte(md5Str)
    for i := 0; i < 32; i++ {
        h <<= 8
        h |= int(byteHash[i]) & 0xFF
    }
    return uint32(h)
}

咱们来写测试代码,测试下

func Test_HashConsistency(t *testing.T) {
    // 实例化 10 个实体节点
    var serverNodes []*hashconsistency.ServiceNode
    serverNodes = append(serverNodes, hashconsistency.NewServiceNode("127.0.0.1", "3300"))
    serverNodes = append(serverNodes, hashconsistency.NewServiceNode("127.0.0.1", "3301"))
    serverNodes = append(serverNodes, hashconsistency.NewServiceNode("127.0.0.1", "3302"))
    serverNodes = append(serverNodes, hashconsistency.NewServiceNode("127.0.0.1", "3303"))
    serverNodes = append(serverNodes, hashconsistency.NewServiceNode("127.0.0.1", "3304"))
    serverNodes = append(serverNodes, hashconsistency.NewServiceNode("127.0.0.1", "3305"))
    serverNodes = append(serverNodes, hashconsistency.NewServiceNode("127.0.0.1", "3306"))
    serverNodes = append(serverNodes, hashconsistency.NewServiceNode("127.0.0.1", "3307"))
    serverNodes = append(serverNodes, hashconsistency.NewServiceNode("127.0.0.1", "3308"))
    serverNodes = append(serverNodes, hashconsistency.NewServiceNode("127.0.0.1", "3309"))
    serverNodesLen := uint(len(serverNodes))
    virtualNodeService := hashconsistency.NewVirtualNode()
    // 增加对应的虚拟化节点数
    for _, sn := range serverNodes {virtualNodeService.AddVirtualNode(sn, serverNodesLen)
    }
    // 打印节点列表
    var nodes1, nodes2 []string
    fmt.Println("-------- node 调度程序 --------")
    for i := 1; i <= 20; i++ {
        // 移除 node2 节点
        if i == 11 {virtualNodeService.RemoveVirtualNode(serverNodes[1], serverNodesLen)
        }
        cacheKey := fmt.Sprintf("user:id:%d", i%10)
        // 获取对应节点地址
        serviceNode := virtualNodeService.GetVirtualNodel(cacheKey)
        str := fmt.Sprintf("node: %s cachekey: %s", serviceNode.Ip+":"+serviceNode.Port, cacheKey)
        if i <= 10 {nodes1 = append(nodes1, str)
        } else {nodes2 = append(nodes2, str)
        }
    }
    utils.PrintDiff(strings.Join(nodes1, "\n"), strings.Join(nodes2, "\n"))
}

测试后果如下:

-------- node 调度程序 --------
-node: 127.0.0.1:3301 cachekey: user:id:1 // node1 宕机
+node: 127.0.0.1:3300 cachekey: user:id:1 // 原 node1 的缓路由到此 node0
 node: 127.0.0.1:3309 cachekey: user:id:2
 node: 127.0.0.1:3309 cachekey: user:id:3
 node: 127.0.0.1:3309 cachekey: user:id:4
 node: 127.0.0.1:3300 cachekey: user:id:5
 node: 127.0.0.1:3307 cachekey: user:id:6
-node: 127.0.0.1:3301 cachekey: user:id:7 // node1 宕机
+node: 127.0.0.1:3302 cachekey: user:id:7 // 原 node1 的缓路由到此 node2
 node: 127.0.0.1:3305 cachekey: user:id:8
-node: 127.0.0.1:3301 cachekey: user:id:9 // node1 宕机
+node: 127.0.0.1:3300 cachekey: user:id:9 // 原 node1 的缓路由到此 node0
 node: 127.0.0.1:3309 cachekey: user:id:0

从测试中能够看出宕机的 node 都被主动路由到最近的 node,而没有宕机的 node 持续承接旧的缓存 key,阐明通过一致性哈希算法,能够保障咱们的缓存不会因为服务宕机操作大面积缓存生效的问题

咱们再把一致性哈希算法带入到服务中,来看看成果如何

// Config is a configuration.
type Config struct {
    Proxy                     Proxy   `json:"proxy"`
    Nodes                     []*Node `json:"nodes"`
    HashConsistency           *VirtualNode
    HashConsistencyVirtualNum uint
}

// Proxy is a reverse proxy, and means load balancer.
type Proxy struct {Url string `json:"url"`}

// Node is servers which load balancer is transferred.
type Node struct {
    URL      string `json:"url"`
    IsDead   bool
    UseCount int
    mu       sync.RWMutex
}

var cfg Config

func init() {data, err := ioutil.ReadFile("./config.json")
    if err != nil {log.Fatal(err.Error())
    }
    json.Unmarshal(data, &cfg)
    if cfg.HashConsistencyVirtualNum == 0 {cfg.HashConsistencyVirtualNum = 10}
    cfg.HashConsistency = NewVirtualNode()
    for i, node := range cfg.Nodes {addr := strings.Split(node.URL, ":")
        serviceNode := NewServiceNode(addr[0], addr[1])
        serviceNode.SetIndex(i)
        cfg.HashConsistency.AddVirtualNode(serviceNode, cfg.HashConsistencyVirtualNum)
    }
}

func GetCfg() Config {return cfg}

// SetDead updates the value of IsDead in node.
func (node *Node) SetDead(b bool) {node.mu.Lock()
    node.IsDead = b
    addr := strings.Split(node.URL, ":")
    serviceNode := NewServiceNode(addr[0], addr[1])
    cfg.HashConsistency.RemoveVirtualNode(serviceNode, cfg.HashConsistencyVirtualNum)
    node.mu.Unlock()}

// GetIsDead returns the value of IsDead in node.
func (node *Node) GetIsDead() bool {node.mu.RLock()
    isAlive := node.IsDead
    node.mu.RUnlock()
    return isAlive
}

var mu sync.Mutex

// rrlbbHandler is a handler for round robin load balancing
func rrlbbHandler(w http.ResponseWriter, r *http.Request) {
    // Round Robin
    mu.Lock()
    cacheKey := r.Header.Get("cache-key")
    virtualNodel := cfg.HashConsistency.GetVirtualNodel(cacheKey)
    targetURL, err := url.Parse(fmt.Sprintf("http://%s:%s", virtualNodel.Ip, virtualNodel.Port))
    if err != nil {log.Fatal(err.Error())
    }
    currentNode := cfg.Nodes[virtualNodel.Index]
    currentNode.UseCount++
    if currentNode.GetIsDead() {rrlbbHandler(w, r)
        return
    }
    mu.Unlock()
    reverseProxy := httputil.NewSingleHostReverseProxy(targetURL)
    reverseProxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, e error) {
        // NOTE: It is better to implement retry.
        log.Printf("%v is dead.", targetURL)
        currentNode.SetDead(true)
        rrlbbHandler(w, r)
    }
    w.Header().Add("balancer-node", virtualNodel.Ip+virtualNodel.Port)
    reverseProxy.ServeHTTP(w, r)
}

// pingNode checks if the node is alive.
func isAlive(url *url.URL) bool {conn, err := net.DialTimeout("tcp", url.Host, time.Minute*1)
    if err != nil {log.Printf("Unreachable to %v, error %s:", url.Host, err.Error())
        return false
    }
    defer conn.Close()
    return true
}

// healthCheck is a function for healthcheck
func healthCheck() {t := time.NewTicker(time.Minute * 1)
    for {
        select {
        case <-t.C:
            for _, node := range cfg.Nodes {pingURL, err := url.Parse(node.URL)
                if err != nil {log.Fatal(err.Error())
                }
                isAlive := isAlive(pingURL)
                node.SetDead(!isAlive)
                msg := "ok"
                if !isAlive {msg = "dead"}
                log.Printf("%v checked %s by healthcheck", node.URL, msg)
            }
        }
    }
}

// ProxyServerStart serves a proxy
func ProxyServerStart() {
    var err error
    go healthCheck()
    s := http.Server{
        Addr:    cfg.Proxy.Url,
        Handler: http.HandlerFunc(rrlbbHandler),
    }
    if err = s.ListenAndServe(); err != nil {log.Fatal(err.Error())
    }
}

// ProxyServerStart serves a node
func NodeServerStart() {http.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {w.Write([]byte("pong"))
    })
    wg := new(sync.WaitGroup)
    wg.Add(len(cfg.Nodes))
    for i, node := range cfg.Nodes {go func() {
            if i != 0 {log.Fatal(http.ListenAndServe(node.URL, nil))
            }
            // log.Fatal(http.ListenAndServe(node.URL, nil))
            wg.Done()}()
        time.Sleep(time.Millisecond * 100)
    }
    wg.Wait()}

编写测试代码测试下:

func Test_HashConsistencyWithServer(t *testing.T) {go hashconsistency.NodeServerStart()
    time.Sleep(time.Millisecond * 200)
    go hashconsistency.ProxyServerStart()
    time.Sleep(time.Millisecond * 100)
    for _, tt := range [...]struct {
        name, method, uri string
        body              io.Reader
        want              *http.Request
        wantBody          string
    }{
        {
            name:     "GET with ping url",
            method:   "GET",
            uri:      "http://127.0.0.1:8080/ping",
            body:     nil,
            wantBody: "pong",
        },
    } {t.Run(tt.name, func(t *testing.T) {fmt.Println("-------- node 调度程序 --------")
            var nodes1, nodes2 []string
            for i := 1; i <= 20; i++ {cacheKey := fmt.Sprintf("user:id:%d", i%10)
                cli := utils.NewHttpClient().
                    SetHeader(map[string]string{"cache-key": cacheKey,}).SetMethod(tt.method).SetUrl(tt.uri).SetBody(tt.body)
                err := cli.Request(nil)
                if err != nil {t.Errorf("ReadAll: %v", err)
                }
                str := fmt.Sprintf("node: %s cachekey: %s", cli.GetRspHeader().Get("balancer-node"), cacheKey)
                if err != nil {t.Errorf("ReadAll: %v", err)
                }
                if string(cli.GetRspBody()) != tt.wantBody {t.Errorf("Body = %q; want %q", cli.GetRspBody(), tt.wantBody)
                }
                if i <= 10 {nodes1 = append(nodes1, str)
                } else {nodes2 = append(nodes2, str)
                }
            }
            utils.PrintDiff(strings.Join(nodes1, "\n"), strings.Join(nodes2, "\n"))
            fmt.Println("-------- node 调用次数 --------")
            for _, node := range hashconsistency.GetCfg().Nodes {log.Printf("node: %s useCount: %d", node.URL, node.UseCount)
            }
        })
    }
}

测试后果如下:

-------- node 调度程序 --------
2022/04/08 15:14:55 http://127.0.0.1:8081 is dead.
 node: 127.0.0.18082 cachekey: user:id:1
-node: 127.0.0.18081 cachekey: user:id:2
+node: 127.0.0.18083 cachekey: user:id:2
 node: 127.0.0.18083 cachekey: user:id:3
 node: 127.0.0.18082 cachekey: user:id:4
 node: 127.0.0.18082 cachekey: user:id:5
 node: 127.0.0.18082 cachekey: user:id:6
 node: 127.0.0.18083 cachekey: user:id:7
 node: 127.0.0.18083 cachekey: user:id:8
 node: 127.0.0.18082 cachekey: user:id:9
 node: 127.0.0.18083 cachekey: user:id:0
-------- node 调用次数 --------
2022/04/08 15:14:55 node: 127.0.0.1:8081 useCount: 1
2022/04/08 15:14:55 node: 127.0.0.1:8082 useCount: 10
2022/04/08 15:14:55 node: 127.0.0.1:8083 useCount: 10

测试后果合乎预期,nice :)

go-zero

go-zero 的负载平衡算法通过替换 grpc 默认负载平衡算法来实现负载平衡

具体正文代码请参阅 https://github.com/TTSimple/g…

咱们看看其中外围的两个算法

  • 一、牛顿冷却

原理请参阅 https://www.ruanyifeng.com/bl…

const (decayTime = int64(time.Second * 1) // 消退工夫
)

type NLOC struct{}

func NewNLOC() *NLOC {return &NLOC{}
}

func (n *NLOC) Hot(timex time.Time) float64 {td := time.Now().Unix() - timex.Unix()
    if td < 0 {td = 0}
    w := math.Exp(float64(-td) / float64(decayTime))
    // w, _ = utils.MathRound(w, 9)
    return w
}

咱们来测试下:

func Test_NLOC(t *testing.T) {timer := time.NewTimer(time.Second * 10)
    quit := make(chan struct{})

    defer timer.Stop()
    go func() {
        <-timer.C
        close(quit)
    }()

    timex := time.Now()
    go func() {n := NewNLOC()
        ticker := time.NewTicker(time.Second * 1)
        for {
            <-ticker.C
            fmt.Println(n.Hot(timex))
        }
    }()

    for {
        <-quit
        return
    }
}

测试后果如下:

0.999999900000005
0.99999980000002
0.999999700000045
0.99999960000008
0.999999500000125
0.99999940000018
0.999999300000245
0.99999920000032
0.999999100000405
0.9999990000005

从下面后果中能够看出,热度是随工夫逐步消退的

  • 二、EWMA 滑动均匀

原理请参阅 https://blog.csdn.net/mzpmzk/…

const (
    AVG_METRIC_AGE float64 = 30.0
    DECAY float64 = 2 / (float64(AVG_METRIC_AGE) + 1)
)

type SimpleEWMA struct {// 以后平均值。在用 Add() 增加后,这个值会更新所有数值的平均值。value float64
}

// 增加并更新滑动平均值
func (e *SimpleEWMA) Add(value float64) {
    if e.value == 0 { // this is a proxy for "uninitialized"
        e.value = value
    } else {e.value = (value * DECAY) + (e.value * (1 - DECAY))
    }
}

// 获取以后滑动平均值
func (e *SimpleEWMA) Value() float64 {return e.value}

// 设置 ewma 值
func (e *SimpleEWMA) Set(value float64) {e.value = value}

编写测试代码测试下:

const testMargin = 0.00000001

var samples = [100]float64{
    4599, 5711, 4746, 4621, 5037, 4218, 4925, 4281, 5207, 5203, 5594, 5149,
    4948, 4994, 6056, 4417, 4973, 4714, 4964, 5280, 5074, 4913, 4119, 4522,
    4631, 4341, 4909, 4750, 4663, 5167, 3683, 4964, 5151, 4892, 4171, 5097,
    3546, 4144, 4551, 6557, 4234, 5026, 5220, 4144, 5547, 4747, 4732, 5327,
    5442, 4176, 4907, 3570, 4684, 4161, 5206, 4952, 4317, 4819, 4668, 4603,
    4885, 4645, 4401, 4362, 5035, 3954, 4738, 4545, 5433, 6326, 5927, 4983,
    5364, 4598, 5071, 5231, 5250, 4621, 4269, 3953, 3308, 3623, 5264, 5322,
    5395, 4753, 4936, 5315, 5243, 5060, 4989, 4921, 4480, 3426, 3687, 4220,
    3197, 5139, 6101, 5279,
}

func withinMargin(a, b float64) bool {return math.Abs(a-b) <= testMargin
}

func TestSimpleEWMA(t *testing.T) {
    var e SimpleEWMA
    for _, f := range samples {e.Add(f)
    }
    fmt.Println(e.Value())
    if !withinMargin(e.Value(), 4734.500946466118) {t.Errorf("e.Value() is %v, wanted %v", e.Value(), 4734.500946466118)
    }
    e.Set(1.0)
    if e.Value() != 1.0 {t.Errorf("e.Value() is %v", e.Value())
    }
}

测试胜利,加油!!!

援用文章:

  • 自适应负载平衡算法原理与实现
  • 基于 gRPC 的注册发现与负载平衡的原理和实战
  • 负载平衡 -P2C 算法
  • Kratos 源码剖析:Warden 负载平衡算法之 P2C
  • Golang 实现加权轮询负载平衡
  • 指数加权挪动均匀 (Exponential Weighted Moving Average)

正文完
 0