在浏览 go-zero 源码之前咱们先来看看罕用的负载平衡算法,看看其原理,以及是如何实现,而后咱们在用这些负载平衡算法来和 go-zero 的比照下,看看各自的优缺点是啥。

轮询

proxy 服务与 ndoe 服务配置文件

{    "proxy": {        "url": "127.0.0.1:8080"    },    "nodes": [        {            "url": "127.0.0.1:8081"        },        {            "url": "127.0.0.1:8082"        },        {            "url": "127.0.0.1:8083"        }    ]}

proxy 服务、 ndoe 服务、轮询算法代码

// 配置type Config struct {    Proxy Proxy   `json:"proxy"`    Nodes []*Node `json:"nodes"`}// proxy 服务器配置type Proxy struct {    Url string `json:"url"`}// node 服务器配置type Node struct {    URL      string `json:"url"`    IsDead   bool    useCount int    mu       sync.RWMutex}var cfg Configfunc init() {    // 加载配置文件    data, err := ioutil.ReadFile("./config.json")    if err != nil {        log.Fatal(err.Error())    }    json.Unmarshal(data, &cfg)}// 设置 node 服务器宕机状态func (node *Node) SetDead(b bool) {    node.mu.Lock()    node.IsDead = b    node.mu.Unlock()}// 获取 node 服务器是否宕机func (node *Node) GetIsDead() bool {    node.mu.RLock()    isAlive := node.IsDead    node.mu.RUnlock()    return isAlive}var (    mu  sync.Mutex    idx int = 0)// 轮询算法func rrlbbHandler(w http.ResponseWriter, r *http.Request) {    maxLen := len(cfg.Nodes)    // Round Robin    mu.Lock()    currentNode := cfg.Nodes[idx%maxLen] // 循环数组    if currentNode.GetIsDead() {        idx++ // 如果 node 宕机,则轮询到下一个 node        currentNode = cfg.Nodes[idx%maxLen]    }    currentNode.useCount++    targetURL, err := url.Parse("http://" + currentNode.URL)    log.Println(targetURL.Host)    if err != nil {        log.Fatal(err.Error())    }    idx++    mu.Unlock()    reverseProxy := httputil.NewSingleHostReverseProxy(targetURL)    reverseProxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, e error) {        log.Printf("%v is dead.", targetURL)        currentNode.SetDead(true)        rrlbbHandler(w, r) // 节点宕机 递归调用本人    }    reverseProxy.ServeHTTP(w, r)}// node是否存活func isAlive(url *url.URL) bool {    conn, err := net.DialTimeout("tcp", url.Host, time.Minute*1)    if err != nil {        log.Printf("Unreachable to %v, error %s:", url.Host, err.Error())        return false    }    defer conn.Close()    return true}// node探活func healthCheck() {    t := time.NewTicker(time.Minute * 1)    for {        select {        case <-t.C:            for _, node := range cfg.Nodes {                pingURL, err := url.Parse(node.URL)                if err != nil {                    log.Fatal(err.Error())                }                isAlive := isAlive(pingURL)                node.SetDead(!isAlive)                msg := "ok"                if !isAlive {                    msg = "dead"                }                log.Printf("%v checked %s by healthcheck", node.URL, msg)            }        }    }}// 启动 proxy 服务func proxyServerStart() {    var err error    go healthCheck()    s := http.Server{        Addr:    cfg.Proxy.Url,        Handler: http.HandlerFunc(rrlbbHandler),    }    if err = s.ListenAndServe(); err != nil {        log.Fatal(err.Error())    }}// 启动所有 node 服务func nodeServerStart() {    http.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {        w.Write([]byte("pong"))    })    wg := new(sync.WaitGroup)    wg.Add(len(cfg.Nodes))    for i, node := range cfg.Nodes {        go func() {            if i > 0 {                  // 模仿一个node宕机                 log.Fatal(http.ListenAndServe(node.URL, nil))            }            wg.Done()        }()        time.Sleep(time.Millisecond * 100)    }    wg.Wait()}

最外围的算法就是这一段,非常简单,轮询的实质其实是循环数组

    currentNode := cfg.Nodes[idx%maxLen] // 数组循环    if currentNode.GetIsDead() {        idx++ // 如果 node 宕机,则轮询到下一个 node        currentNode = cfg.Nodes[idx%maxLen]    }

咱们来编写测试代码来测试下吧

func Test_RoundRobinBalancer(t *testing.T) {    go nodeServerStart()    time.Sleep(time.Millisecond * 200)    go proxyServerStart()    time.Sleep(time.Millisecond * 100)    for _, tt := range [...]struct {        name, method, uri string        body              io.Reader        want              *http.Request        wantBody          string    }{        {            name:     "GET with ping url",            method:   "GET",            uri:      "http://127.0.0.1:8080/ping",            body:     nil,            wantBody: "pong",        },    } {        t.Run(tt.name, func(t *testing.T) {            for i := 1; i <= 10; i++ {                body, err := utils.HttpRequest(tt.method, tt.uri, tt.body)                if err != nil {                    t.Errorf("ReadAll: %v", err)                }                if string(body) != tt.wantBody {                    t.Errorf("Body = %q; want %q", body, tt.wantBody)                }            }            for _, node := range cfg.Nodes {                log.Printf("node: %s useCount: %d", node.URL, node.useCount)            }        })    }}

测试后果如下:

-------- node 调度程序--------2022/04/06 19:50:24 127.0.0.1:80812022/04/06 19:50:24 http://127.0.0.1:8081 is dead.2022/04/06 19:50:24 127.0.0.1:80822022/04/06 19:50:24 127.0.0.1:80832022/04/06 19:50:24 127.0.0.1:80822022/04/06 19:50:24 127.0.0.1:80832022/04/06 19:50:24 127.0.0.1:80822022/04/06 19:50:24 127.0.0.1:80832022/04/06 19:50:24 127.0.0.1:80822022/04/06 19:50:24 127.0.0.1:80832022/04/06 19:50:24 127.0.0.1:80822022/04/06 19:50:24 127.0.0.1:8083-------- node 调用次数 --------2022/04/06 19:50:24 node: 127.0.0.1:8081 useCount: 12022/04/06 19:50:24 node: 127.0.0.1:8082 useCount: 52022/04/06 19:50:24 node: 127.0.0.1:8083 useCount: 5

第一个 node 宕机,这一段输入了宕机状态

2022/04/06 19:28:48 127.0.0.1:80812022/04/06 19:28:48 http://127.0.0.1:8081 is dead.

从这一段能够看出节点服务是被交替调用

2022/04/06 19:28:48 127.0.0.1:80822022/04/06 19:28:48 127.0.0.1:80832022/04/06 19:28:48 127.0.0.1:80822022/04/06 19:28:48 127.0.0.1:80832022/04/06 19:28:48 127.0.0.1:80822022/04/06 19:28:48 127.0.0.1:80832022/04/06 19:28:48 127.0.0.1:80822022/04/06 19:28:48 127.0.0.1:80832022/04/06 19:28:48 127.0.0.1:80822022/04/06 19:28:48 127.0.0.1:8083

在这一段能够看出 node 1 被调用了一次,而后递归调用本人,申请别离被 node2 和 node3 各调用 5 次。

阐明咱们的轮询调度算法是胜利的,大家能够复制代码,本人尝试运行下

2022/04/06 19:28:48 node: 127.0.0.1:8081 useCount: 12022/04/06 19:28:48 node: 127.0.0.1:8082 useCount: 52022/04/06 19:28:48 node: 127.0.0.1:8083 useCount: 5

随机轮询

随机轮询算法也十分的 easy

咱们在 rrlbHandle 函数上面增加如下函数

// 随机轮询算法func rrrlbHandler(w http.ResponseWriter, r *http.Request) {    maxLen := len(cfg.Nodes)    // Rand Round Robin    mu.Lock()    idx, _ := rand.Int(rand.Reader, big.NewInt(int64(maxLen))) // 获取随机数    currentNode := cfg.Nodes[int(idx.Int64())%maxLen] // 获取随机节点    if currentNode.GetIsDead() {        idx, _ = rand.Int(rand.Reader, big.NewInt(int64(maxLen)))        currentNode = cfg.Nodes[int(idx.Int64())%maxLen]    }    currentNode.useCount++    targetURL, err := url.Parse("http://" + cfg.Nodes[int(idx.Int64())%maxLen].URL)    log.Println(targetURL.Host)    if err != nil {        log.Fatal(err.Error())    }    mu.Unlock()    reverseProxy := httputil.NewSingleHostReverseProxy(targetURL)    reverseProxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, e error) {        // NOTE: It is better to implement retry.        log.Printf("%v is dead.", targetURL)        currentNode.SetDead(true)        rrrlbHandler(w, r)    }    reverseProxy.ServeHTTP(w, r)}

锁机轮询的外围算法如下

    idx, _ := rand.Int(rand.Reader, big.NewInt(int64(maxLen))) // 获取随机数    currentNode := cfg.Nodes[int(idx.Int64())%maxLen] // 获取随机节点    if currentNode.GetIsDead() {        idx, _ = rand.Int(rand.Reader, big.NewInt(int64(maxLen)))        currentNode = cfg.Nodes[int(idx.Int64())%maxLen]    }

编写测试代码来测试下

首先批改proxyServerStart服务函数

func proxyServerStart() {    var err error    go healthCheck()    s := http.Server{        Addr:    cfg.Proxy.Url,        // Handler: http.HandlerFunc(rrlbbHandler), // 敞开轮询调度算法        Handler: http.HandlerFunc(rrrlbHandler), // 开启随机轮询调度算法    }    if err = s.ListenAndServe(); err != nil {        log.Fatal(err.Error())    }}

测试代码与轮询算法测试代码放弃不变

测试后果如下:

-------- node 调度程序--------2022/04/06 19:49:51 127.0.0.1:80812022/04/06 19:49:51 http://127.0.0.1:8081 is dead.2022/04/06 19:49:51 127.0.0.1:80822022/04/06 19:49:51 127.0.0.1:80812022/04/06 19:49:51 http://127.0.0.1:8081 is dead.2022/04/06 19:49:51 127.0.0.1:80822022/04/06 19:49:51 127.0.0.1:80832022/04/06 19:49:51 127.0.0.1:80832022/04/06 19:49:51 127.0.0.1:80822022/04/06 19:49:51 127.0.0.1:80832022/04/06 19:49:51 127.0.0.1:80832022/04/06 19:49:51 127.0.0.1:80812022/04/06 19:49:51 http://127.0.0.1:8081 is dead.2022/04/06 19:49:51 127.0.0.1:80832022/04/06 19:49:51 127.0.0.1:80812022/04/06 19:49:51 http://127.0.0.1:8081 is dead.2022/04/06 19:49:51 127.0.0.1:80822022/04/06 19:49:51 127.0.0.1:8082-------- node 调用次数 --------2022/04/06 19:49:51 node: 127.0.0.1:8081 useCount: 42022/04/06 19:49:51 node: 127.0.0.1:8082 useCount: 52022/04/06 19:49:51 node: 127.0.0.1:8083 useCount: 5

从测试后果中能够看出,node 调用程序是随机的,node 调用次数负载到未宕机的 node2、node3 上总计被调用10次

阐明咱们的算法也是胜利的

加权轮询

加权轮询咱们也基于轮询的代码来批改

批改配置文件

{    "proxy": {        "url": "127.0.0.1:8080"    },    "nodes": [        {            "url": "127.0.0.1:8081",            "weight": 2        },        {            "url": "127.0.0.1:8082",            "weight": 3        },        {            "url": "127.0.0.1:8083",            "weight": 5        }    ]}

咱们再给 Node 的构造体加两个属性

  • currentWeightnode长期权重
  • effectiveWeightnode无效权重
  • Weight node权重
type Node struct {    currentWeight   int // node长期权重    effectiveWeight int // node无效权重    Weight          int    `json:"weight"` // node权重    IsDead          bool    useCount        int    URL             string `json:"url"`    mu              sync.RWMutex}

批改 init 函数如下如下代码

func init() {    data, err := ioutil.ReadFile("./config.json")    if err != nil {        log.Fatal(err.Error())    }    json.Unmarshal(data, &cfg)    for _, node := range cfg.Nodes {        node.currentWeight = node.Weight    }}

批改 rrlbHandler 函数为如下代码

func rrlbHandler(w http.ResponseWriter, r *http.Request) {    mu.Lock()    currentNode := cfg.Next()    targetURL, err := url.Parse("http://" + currentNode.URL)    if err != nil {        log.Fatal(err.Error())    }    log.Println(targetURL.Host)    mu.Unlock()    reverseProxy := httputil.NewSingleHostReverseProxy(targetURL)    reverseProxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, e error) {        // NOTE: It is better to implement retry.        log.Printf("%v is dead.", targetURL)        currentNode.SetDead(true)        rrlbHandler(w, r)    }    reverseProxy.ServeHTTP(w, r)}

增加 Next 函数代码如下,此函数即为加权轮询外围算法

留神:在获取最大长期权重 node 的过程中咱们要保障最大长期权重node的长期权重要继续递加,而且各个node的长期权重要继续递增,以保障调度的平滑性

func (c *Config) Next() *Node {    totalEffectiveWeight := 0    var maxWeightNode *Node    for _, node := range c.Nodes {        // 1.统计所有node无效权证之和        totalEffectiveWeight += node.effectiveWeight        // 2.变更node长期权重=node长期权重+node无效权重        // node长期权重递增,交替加权,以保障调度平滑性        node.currentWeight += node.effectiveWeight        // 3.node无效权重默认与node权长期重雷同,通信异样时-1,通信胜利+1,直到复原到 weight 大小        if node.effectiveWeight < node.Weight {            if node.GetIsDead() {                node.effectiveWeight--            } else {                node.effectiveWeight++            }        }        // 4.抉择最大长期权重node        if maxWeightNode == nil || node.currentWeight > maxWeightNode.currentWeight {            maxWeightNode = node        }    }    if maxWeightNode == nil {        return nil    }    // 5.变更 node长期权重=node长期权重-node无效权重之和    // 最大权重node长期权重递加,交替减权,以保障调度平滑性    maxWeightNode.currentWeight -= totalEffectiveWeight    if maxWeightNode.GetIsDead() {        maxWeightNode = c.Next()    }    maxWeightNode.useCount++    return maxWeightNode}

测试后果如下:

-------- node 调度程序--------2022/04/06 21:50:00 127.0.0.1:80832022/04/06 21:50:00 127.0.0.1:80832022/04/06 21:50:00 127.0.0.1:80832022/04/06 21:50:00 127.0.0.1:80822022/04/06 21:50:00 127.0.0.1:80832022/04/06 21:50:00 127.0.0.1:80822022/04/06 21:50:00 127.0.0.1:80832022/04/06 21:50:00 127.0.0.1:80832022/04/06 21:50:00 127.0.0.1:80812022/04/06 21:50:00 http://127.0.0.1:8081 is dead.2022/04/06 21:50:00 127.0.0.1:80832022/04/06 21:50:00 127.0.0.1:8082-------- node 调用次数 --------2022/04/06 21:50:00 node: 127.0.0.1:8081 useCount: 12022/04/06 21:50:00 node: 127.0.0.1:8082 useCount: 32022/04/06 21:50:00 node: 127.0.0.1:8083 useCount: 7

从后果中能够看出,调度还是比拟平滑的,而且对应权重node在调度中调用次数也比拟正当