在浏览 go-zero 源码之前咱们先来看看罕用的负载平衡算法,看看其原理,以及是如何实现,而后咱们在用这些负载平衡算法来和 go-zero 的比照下,看看各自的优缺点是啥。
轮询
proxy 服务与 ndoe 服务配置文件
{ "proxy": { "url": "127.0.0.1:8080" }, "nodes": [ { "url": "127.0.0.1:8081" }, { "url": "127.0.0.1:8082" }, { "url": "127.0.0.1:8083" } ]}
proxy 服务、 ndoe 服务、轮询算法代码
// 配置type Config struct { Proxy Proxy `json:"proxy"` Nodes []*Node `json:"nodes"`}// proxy 服务器配置type Proxy struct { Url string `json:"url"`}// node 服务器配置type Node struct { URL string `json:"url"` IsDead bool useCount int mu sync.RWMutex}var cfg Configfunc init() { // 加载配置文件 data, err := ioutil.ReadFile("./config.json") if err != nil { log.Fatal(err.Error()) } json.Unmarshal(data, &cfg)}// 设置 node 服务器宕机状态func (node *Node) SetDead(b bool) { node.mu.Lock() node.IsDead = b node.mu.Unlock()}// 获取 node 服务器是否宕机func (node *Node) GetIsDead() bool { node.mu.RLock() isAlive := node.IsDead node.mu.RUnlock() return isAlive}var ( mu sync.Mutex idx int = 0)// 轮询算法func rrlbbHandler(w http.ResponseWriter, r *http.Request) { maxLen := len(cfg.Nodes) // Round Robin mu.Lock() currentNode := cfg.Nodes[idx%maxLen] // 循环数组 if currentNode.GetIsDead() { idx++ // 如果 node 宕机,则轮询到下一个 node currentNode = cfg.Nodes[idx%maxLen] } currentNode.useCount++ targetURL, err := url.Parse("http://" + currentNode.URL) log.Println(targetURL.Host) if err != nil { log.Fatal(err.Error()) } idx++ mu.Unlock() reverseProxy := httputil.NewSingleHostReverseProxy(targetURL) reverseProxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, e error) { log.Printf("%v is dead.", targetURL) currentNode.SetDead(true) rrlbbHandler(w, r) // 节点宕机 递归调用本人 } reverseProxy.ServeHTTP(w, r)}// node是否存活func isAlive(url *url.URL) bool { conn, err := net.DialTimeout("tcp", url.Host, time.Minute*1) if err != nil { log.Printf("Unreachable to %v, error %s:", url.Host, err.Error()) return false } defer conn.Close() return true}// node探活func healthCheck() { t := time.NewTicker(time.Minute * 1) for { select { case <-t.C: for _, node := range cfg.Nodes { pingURL, err := url.Parse(node.URL) if err != nil { log.Fatal(err.Error()) } isAlive := isAlive(pingURL) node.SetDead(!isAlive) msg := "ok" if !isAlive { msg = "dead" } log.Printf("%v checked %s by healthcheck", node.URL, msg) } } }}// 启动 proxy 服务func proxyServerStart() { var err error go healthCheck() s := http.Server{ Addr: cfg.Proxy.Url, Handler: http.HandlerFunc(rrlbbHandler), } if err = s.ListenAndServe(); err != nil { log.Fatal(err.Error()) }}// 启动所有 node 服务func nodeServerStart() { http.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) { w.Write([]byte("pong")) }) wg := new(sync.WaitGroup) wg.Add(len(cfg.Nodes)) for i, node := range cfg.Nodes { go func() { if i > 0 { // 模仿一个node宕机 log.Fatal(http.ListenAndServe(node.URL, nil)) } wg.Done() }() time.Sleep(time.Millisecond * 100) } wg.Wait()}
最外围的算法就是这一段,非常简单,轮询的实质其实是循环数组
currentNode := cfg.Nodes[idx%maxLen] // 数组循环 if currentNode.GetIsDead() { idx++ // 如果 node 宕机,则轮询到下一个 node currentNode = cfg.Nodes[idx%maxLen] }
咱们来编写测试代码来测试下吧
func Test_RoundRobinBalancer(t *testing.T) { go nodeServerStart() time.Sleep(time.Millisecond * 200) go proxyServerStart() time.Sleep(time.Millisecond * 100) for _, tt := range [...]struct { name, method, uri string body io.Reader want *http.Request wantBody string }{ { name: "GET with ping url", method: "GET", uri: "http://127.0.0.1:8080/ping", body: nil, wantBody: "pong", }, } { t.Run(tt.name, func(t *testing.T) { for i := 1; i <= 10; i++ { body, err := utils.HttpRequest(tt.method, tt.uri, tt.body) if err != nil { t.Errorf("ReadAll: %v", err) } if string(body) != tt.wantBody { t.Errorf("Body = %q; want %q", body, tt.wantBody) } } for _, node := range cfg.Nodes { log.Printf("node: %s useCount: %d", node.URL, node.useCount) } }) }}
测试后果如下:
-------- node 调度程序--------2022/04/06 19:50:24 127.0.0.1:80812022/04/06 19:50:24 http://127.0.0.1:8081 is dead.2022/04/06 19:50:24 127.0.0.1:80822022/04/06 19:50:24 127.0.0.1:80832022/04/06 19:50:24 127.0.0.1:80822022/04/06 19:50:24 127.0.0.1:80832022/04/06 19:50:24 127.0.0.1:80822022/04/06 19:50:24 127.0.0.1:80832022/04/06 19:50:24 127.0.0.1:80822022/04/06 19:50:24 127.0.0.1:80832022/04/06 19:50:24 127.0.0.1:80822022/04/06 19:50:24 127.0.0.1:8083-------- node 调用次数 --------2022/04/06 19:50:24 node: 127.0.0.1:8081 useCount: 12022/04/06 19:50:24 node: 127.0.0.1:8082 useCount: 52022/04/06 19:50:24 node: 127.0.0.1:8083 useCount: 5
第一个 node 宕机,这一段输入了宕机状态
2022/04/06 19:28:48 127.0.0.1:80812022/04/06 19:28:48 http://127.0.0.1:8081 is dead.
从这一段能够看出节点服务是被交替调用
2022/04/06 19:28:48 127.0.0.1:80822022/04/06 19:28:48 127.0.0.1:80832022/04/06 19:28:48 127.0.0.1:80822022/04/06 19:28:48 127.0.0.1:80832022/04/06 19:28:48 127.0.0.1:80822022/04/06 19:28:48 127.0.0.1:80832022/04/06 19:28:48 127.0.0.1:80822022/04/06 19:28:48 127.0.0.1:80832022/04/06 19:28:48 127.0.0.1:80822022/04/06 19:28:48 127.0.0.1:8083
在这一段能够看出 node 1 被调用了一次,而后递归调用本人,申请别离被 node2 和 node3 各调用 5 次。
阐明咱们的轮询调度算法是胜利的,大家能够复制代码,本人尝试运行下
2022/04/06 19:28:48 node: 127.0.0.1:8081 useCount: 12022/04/06 19:28:48 node: 127.0.0.1:8082 useCount: 52022/04/06 19:28:48 node: 127.0.0.1:8083 useCount: 5
随机轮询
随机轮询算法也十分的 easy
咱们在 rrlbHandle
函数上面增加如下函数
// 随机轮询算法func rrrlbHandler(w http.ResponseWriter, r *http.Request) { maxLen := len(cfg.Nodes) // Rand Round Robin mu.Lock() idx, _ := rand.Int(rand.Reader, big.NewInt(int64(maxLen))) // 获取随机数 currentNode := cfg.Nodes[int(idx.Int64())%maxLen] // 获取随机节点 if currentNode.GetIsDead() { idx, _ = rand.Int(rand.Reader, big.NewInt(int64(maxLen))) currentNode = cfg.Nodes[int(idx.Int64())%maxLen] } currentNode.useCount++ targetURL, err := url.Parse("http://" + cfg.Nodes[int(idx.Int64())%maxLen].URL) log.Println(targetURL.Host) if err != nil { log.Fatal(err.Error()) } mu.Unlock() reverseProxy := httputil.NewSingleHostReverseProxy(targetURL) reverseProxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, e error) { // NOTE: It is better to implement retry. log.Printf("%v is dead.", targetURL) currentNode.SetDead(true) rrrlbHandler(w, r) } reverseProxy.ServeHTTP(w, r)}
锁机轮询的外围算法如下
idx, _ := rand.Int(rand.Reader, big.NewInt(int64(maxLen))) // 获取随机数 currentNode := cfg.Nodes[int(idx.Int64())%maxLen] // 获取随机节点 if currentNode.GetIsDead() { idx, _ = rand.Int(rand.Reader, big.NewInt(int64(maxLen))) currentNode = cfg.Nodes[int(idx.Int64())%maxLen] }
编写测试代码来测试下
首先批改proxyServerStart
服务函数
func proxyServerStart() { var err error go healthCheck() s := http.Server{ Addr: cfg.Proxy.Url, // Handler: http.HandlerFunc(rrlbbHandler), // 敞开轮询调度算法 Handler: http.HandlerFunc(rrrlbHandler), // 开启随机轮询调度算法 } if err = s.ListenAndServe(); err != nil { log.Fatal(err.Error()) }}
测试代码与轮询算法测试代码放弃不变
测试后果如下:
-------- node 调度程序--------2022/04/06 19:49:51 127.0.0.1:80812022/04/06 19:49:51 http://127.0.0.1:8081 is dead.2022/04/06 19:49:51 127.0.0.1:80822022/04/06 19:49:51 127.0.0.1:80812022/04/06 19:49:51 http://127.0.0.1:8081 is dead.2022/04/06 19:49:51 127.0.0.1:80822022/04/06 19:49:51 127.0.0.1:80832022/04/06 19:49:51 127.0.0.1:80832022/04/06 19:49:51 127.0.0.1:80822022/04/06 19:49:51 127.0.0.1:80832022/04/06 19:49:51 127.0.0.1:80832022/04/06 19:49:51 127.0.0.1:80812022/04/06 19:49:51 http://127.0.0.1:8081 is dead.2022/04/06 19:49:51 127.0.0.1:80832022/04/06 19:49:51 127.0.0.1:80812022/04/06 19:49:51 http://127.0.0.1:8081 is dead.2022/04/06 19:49:51 127.0.0.1:80822022/04/06 19:49:51 127.0.0.1:8082-------- node 调用次数 --------2022/04/06 19:49:51 node: 127.0.0.1:8081 useCount: 42022/04/06 19:49:51 node: 127.0.0.1:8082 useCount: 52022/04/06 19:49:51 node: 127.0.0.1:8083 useCount: 5
从测试后果中能够看出,node 调用程序是随机的,node 调用次数负载到未宕机的 node2、node3 上总计被调用10次
阐明咱们的算法也是胜利的
加权轮询
加权轮询咱们也基于轮询的代码来批改
批改配置文件
{ "proxy": { "url": "127.0.0.1:8080" }, "nodes": [ { "url": "127.0.0.1:8081", "weight": 2 }, { "url": "127.0.0.1:8082", "weight": 3 }, { "url": "127.0.0.1:8083", "weight": 5 } ]}
咱们再给 Node
的构造体加两个属性
currentWeight
node长期权重effectiveWeight
node无效权重Weight
node权重
type Node struct { currentWeight int // node长期权重 effectiveWeight int // node无效权重 Weight int `json:"weight"` // node权重 IsDead bool useCount int URL string `json:"url"` mu sync.RWMutex}
批改 init
函数如下如下代码
func init() { data, err := ioutil.ReadFile("./config.json") if err != nil { log.Fatal(err.Error()) } json.Unmarshal(data, &cfg) for _, node := range cfg.Nodes { node.currentWeight = node.Weight }}
批改 rrlbHandler
函数为如下代码
func rrlbHandler(w http.ResponseWriter, r *http.Request) { mu.Lock() currentNode := cfg.Next() targetURL, err := url.Parse("http://" + currentNode.URL) if err != nil { log.Fatal(err.Error()) } log.Println(targetURL.Host) mu.Unlock() reverseProxy := httputil.NewSingleHostReverseProxy(targetURL) reverseProxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, e error) { // NOTE: It is better to implement retry. log.Printf("%v is dead.", targetURL) currentNode.SetDead(true) rrlbHandler(w, r) } reverseProxy.ServeHTTP(w, r)}
增加 Next
函数代码如下,此函数即为加权轮询外围算法
留神:在获取最大长期权重 node 的过程中咱们要保障最大长期权重node的长期权重要继续递加,而且各个node的长期权重要继续递增,以保障调度的平滑性
func (c *Config) Next() *Node { totalEffectiveWeight := 0 var maxWeightNode *Node for _, node := range c.Nodes { // 1.统计所有node无效权证之和 totalEffectiveWeight += node.effectiveWeight // 2.变更node长期权重=node长期权重+node无效权重 // node长期权重递增,交替加权,以保障调度平滑性 node.currentWeight += node.effectiveWeight // 3.node无效权重默认与node权长期重雷同,通信异样时-1,通信胜利+1,直到复原到 weight 大小 if node.effectiveWeight < node.Weight { if node.GetIsDead() { node.effectiveWeight-- } else { node.effectiveWeight++ } } // 4.抉择最大长期权重node if maxWeightNode == nil || node.currentWeight > maxWeightNode.currentWeight { maxWeightNode = node } } if maxWeightNode == nil { return nil } // 5.变更 node长期权重=node长期权重-node无效权重之和 // 最大权重node长期权重递加,交替减权,以保障调度平滑性 maxWeightNode.currentWeight -= totalEffectiveWeight if maxWeightNode.GetIsDead() { maxWeightNode = c.Next() } maxWeightNode.useCount++ return maxWeightNode}
测试后果如下:
-------- node 调度程序--------2022/04/06 21:50:00 127.0.0.1:80832022/04/06 21:50:00 127.0.0.1:80832022/04/06 21:50:00 127.0.0.1:80832022/04/06 21:50:00 127.0.0.1:80822022/04/06 21:50:00 127.0.0.1:80832022/04/06 21:50:00 127.0.0.1:80822022/04/06 21:50:00 127.0.0.1:80832022/04/06 21:50:00 127.0.0.1:80832022/04/06 21:50:00 127.0.0.1:80812022/04/06 21:50:00 http://127.0.0.1:8081 is dead.2022/04/06 21:50:00 127.0.0.1:80832022/04/06 21:50:00 127.0.0.1:8082-------- node 调用次数 --------2022/04/06 21:50:00 node: 127.0.0.1:8081 useCount: 12022/04/06 21:50:00 node: 127.0.0.1:8082 useCount: 32022/04/06 21:50:00 node: 127.0.0.1:8083 useCount: 7
从后果中能够看出,调度还是比拟平滑的,而且对应权重node在调度中调用次数也比拟正当