共计 9383 个字符,预计需要花费 24 分钟才能阅读完成。
在浏览 go-zero 源码之前咱们先来看看罕用的负载平衡算法,看看其原理,以及是如何实现,而后咱们在用这些负载平衡算法来和 go-zero 的比照下,看看各自的优缺点是啥。
轮询
proxy 服务与 ndoe 服务配置文件
{
"proxy": {"url": "127.0.0.1:8080"},
"nodes": [
{"url": "127.0.0.1:8081"},
{"url": "127.0.0.1:8082"},
{"url": "127.0.0.1:8083"}
]
}
proxy 服务、ndoe 服务、轮询算法代码
// 配置
type Config struct {
Proxy Proxy `json:"proxy"`
Nodes []*Node `json:"nodes"`}
// proxy 服务器配置
type Proxy struct {Url string `json:"url"`}
// node 服务器配置
type Node struct {
URL string `json:"url"`
IsDead bool
useCount int
mu sync.RWMutex
}
var cfg Config
func init() {
// 加载配置文件
data, err := ioutil.ReadFile("./config.json")
if err != nil {log.Fatal(err.Error())
}
json.Unmarshal(data, &cfg)
}
// 设置 node 服务器宕机状态
func (node *Node) SetDead(b bool) {node.mu.Lock()
node.IsDead = b
node.mu.Unlock()}
// 获取 node 服务器是否宕机
func (node *Node) GetIsDead() bool {node.mu.RLock()
isAlive := node.IsDead
node.mu.RUnlock()
return isAlive
}
var (
mu sync.Mutex
idx int = 0
)
// 轮询算法
func rrlbbHandler(w http.ResponseWriter, r *http.Request) {maxLen := len(cfg.Nodes)
// Round Robin
mu.Lock()
currentNode := cfg.Nodes[idx%maxLen] // 循环数组
if currentNode.GetIsDead() {
idx++ // 如果 node 宕机,则轮询到下一个 node
currentNode = cfg.Nodes[idx%maxLen]
}
currentNode.useCount++
targetURL, err := url.Parse("http://" + currentNode.URL)
log.Println(targetURL.Host)
if err != nil {log.Fatal(err.Error())
}
idx++
mu.Unlock()
reverseProxy := httputil.NewSingleHostReverseProxy(targetURL)
reverseProxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, e error) {log.Printf("%v is dead.", targetURL)
currentNode.SetDead(true)
rrlbbHandler(w, r) // 节点宕机 递归调用本人
}
reverseProxy.ServeHTTP(w, r)
}
// node 是否存活
func isAlive(url *url.URL) bool {conn, err := net.DialTimeout("tcp", url.Host, time.Minute*1)
if err != nil {log.Printf("Unreachable to %v, error %s:", url.Host, err.Error())
return false
}
defer conn.Close()
return true
}
// node 探活
func healthCheck() {t := time.NewTicker(time.Minute * 1)
for {
select {
case <-t.C:
for _, node := range cfg.Nodes {pingURL, err := url.Parse(node.URL)
if err != nil {log.Fatal(err.Error())
}
isAlive := isAlive(pingURL)
node.SetDead(!isAlive)
msg := "ok"
if !isAlive {msg = "dead"}
log.Printf("%v checked %s by healthcheck", node.URL, msg)
}
}
}
}
// 启动 proxy 服务
func proxyServerStart() {
var err error
go healthCheck()
s := http.Server{
Addr: cfg.Proxy.Url,
Handler: http.HandlerFunc(rrlbbHandler),
}
if err = s.ListenAndServe(); err != nil {log.Fatal(err.Error())
}
}
// 启动所有 node 服务
func nodeServerStart() {http.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {w.Write([]byte("pong"))
})
wg := new(sync.WaitGroup)
wg.Add(len(cfg.Nodes))
for i, node := range cfg.Nodes {go func() {
if i > 0 {
// 模仿一个 node 宕机
log.Fatal(http.ListenAndServe(node.URL, nil))
}
wg.Done()}()
time.Sleep(time.Millisecond * 100)
}
wg.Wait()}
最外围的算法就是这一段,非常简单,轮询的实质其实是循环数组
currentNode := cfg.Nodes[idx%maxLen] // 数组循环
if currentNode.GetIsDead() {
idx++ // 如果 node 宕机,则轮询到下一个 node
currentNode = cfg.Nodes[idx%maxLen]
}
咱们来编写测试代码来测试下吧
func Test_RoundRobinBalancer(t *testing.T) {go nodeServerStart()
time.Sleep(time.Millisecond * 200)
go proxyServerStart()
time.Sleep(time.Millisecond * 100)
for _, tt := range [...]struct {
name, method, uri string
body io.Reader
want *http.Request
wantBody string
}{
{
name: "GET with ping url",
method: "GET",
uri: "http://127.0.0.1:8080/ping",
body: nil,
wantBody: "pong",
},
} {t.Run(tt.name, func(t *testing.T) {
for i := 1; i <= 10; i++ {body, err := utils.HttpRequest(tt.method, tt.uri, tt.body)
if err != nil {t.Errorf("ReadAll: %v", err)
}
if string(body) != tt.wantBody {t.Errorf("Body = %q; want %q", body, tt.wantBody)
}
}
for _, node := range cfg.Nodes {log.Printf("node: %s useCount: %d", node.URL, node.useCount)
}
})
}
}
测试后果如下:
-------- node 调度程序 --------
2022/04/06 19:50:24 127.0.0.1:8081
2022/04/06 19:50:24 http://127.0.0.1:8081 is dead.
2022/04/06 19:50:24 127.0.0.1:8082
2022/04/06 19:50:24 127.0.0.1:8083
2022/04/06 19:50:24 127.0.0.1:8082
2022/04/06 19:50:24 127.0.0.1:8083
2022/04/06 19:50:24 127.0.0.1:8082
2022/04/06 19:50:24 127.0.0.1:8083
2022/04/06 19:50:24 127.0.0.1:8082
2022/04/06 19:50:24 127.0.0.1:8083
2022/04/06 19:50:24 127.0.0.1:8082
2022/04/06 19:50:24 127.0.0.1:8083
-------- node 调用次数 --------
2022/04/06 19:50:24 node: 127.0.0.1:8081 useCount: 1
2022/04/06 19:50:24 node: 127.0.0.1:8082 useCount: 5
2022/04/06 19:50:24 node: 127.0.0.1:8083 useCount: 5
第一个 node 宕机,这一段输入了宕机状态
2022/04/06 19:28:48 127.0.0.1:8081
2022/04/06 19:28:48 http://127.0.0.1:8081 is dead.
从这一段能够看出节点服务是被交替调用
2022/04/06 19:28:48 127.0.0.1:8082
2022/04/06 19:28:48 127.0.0.1:8083
2022/04/06 19:28:48 127.0.0.1:8082
2022/04/06 19:28:48 127.0.0.1:8083
2022/04/06 19:28:48 127.0.0.1:8082
2022/04/06 19:28:48 127.0.0.1:8083
2022/04/06 19:28:48 127.0.0.1:8082
2022/04/06 19:28:48 127.0.0.1:8083
2022/04/06 19:28:48 127.0.0.1:8082
2022/04/06 19:28:48 127.0.0.1:8083
在这一段能够看出 node 1 被调用了一次,而后递归调用本人,申请别离被 node2 和 node3 各调用 5 次。
阐明咱们的轮询调度算法是胜利的,大家能够复制代码,本人尝试运行下
2022/04/06 19:28:48 node: 127.0.0.1:8081 useCount: 1
2022/04/06 19:28:48 node: 127.0.0.1:8082 useCount: 5
2022/04/06 19:28:48 node: 127.0.0.1:8083 useCount: 5
随机轮询
随机轮询算法也十分的 easy
咱们在 rrlbHandle
函数上面增加如下函数
// 随机轮询算法
func rrrlbHandler(w http.ResponseWriter, r *http.Request) {maxLen := len(cfg.Nodes)
// Rand Round Robin
mu.Lock()
idx, _ := rand.Int(rand.Reader, big.NewInt(int64(maxLen))) // 获取随机数
currentNode := cfg.Nodes[int(idx.Int64())%maxLen] // 获取随机节点
if currentNode.GetIsDead() {idx, _ = rand.Int(rand.Reader, big.NewInt(int64(maxLen)))
currentNode = cfg.Nodes[int(idx.Int64())%maxLen]
}
currentNode.useCount++
targetURL, err := url.Parse("http://" + cfg.Nodes[int(idx.Int64())%maxLen].URL)
log.Println(targetURL.Host)
if err != nil {log.Fatal(err.Error())
}
mu.Unlock()
reverseProxy := httputil.NewSingleHostReverseProxy(targetURL)
reverseProxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, e error) {
// NOTE: It is better to implement retry.
log.Printf("%v is dead.", targetURL)
currentNode.SetDead(true)
rrrlbHandler(w, r)
}
reverseProxy.ServeHTTP(w, r)
}
锁机轮询的外围算法如下
idx, _ := rand.Int(rand.Reader, big.NewInt(int64(maxLen))) // 获取随机数
currentNode := cfg.Nodes[int(idx.Int64())%maxLen] // 获取随机节点
if currentNode.GetIsDead() {idx, _ = rand.Int(rand.Reader, big.NewInt(int64(maxLen)))
currentNode = cfg.Nodes[int(idx.Int64())%maxLen]
}
编写测试代码来测试下
首先批改 proxyServerStart
服务函数
func proxyServerStart() {
var err error
go healthCheck()
s := http.Server{
Addr: cfg.Proxy.Url,
// Handler: http.HandlerFunc(rrlbbHandler), // 敞开轮询调度算法
Handler: http.HandlerFunc(rrrlbHandler), // 开启随机轮询调度算法
}
if err = s.ListenAndServe(); err != nil {log.Fatal(err.Error())
}
}
测试代码与轮询算法测试代码放弃不变
测试后果如下:
-------- node 调度程序 --------
2022/04/06 19:49:51 127.0.0.1:8081
2022/04/06 19:49:51 http://127.0.0.1:8081 is dead.
2022/04/06 19:49:51 127.0.0.1:8082
2022/04/06 19:49:51 127.0.0.1:8081
2022/04/06 19:49:51 http://127.0.0.1:8081 is dead.
2022/04/06 19:49:51 127.0.0.1:8082
2022/04/06 19:49:51 127.0.0.1:8083
2022/04/06 19:49:51 127.0.0.1:8083
2022/04/06 19:49:51 127.0.0.1:8082
2022/04/06 19:49:51 127.0.0.1:8083
2022/04/06 19:49:51 127.0.0.1:8083
2022/04/06 19:49:51 127.0.0.1:8081
2022/04/06 19:49:51 http://127.0.0.1:8081 is dead.
2022/04/06 19:49:51 127.0.0.1:8083
2022/04/06 19:49:51 127.0.0.1:8081
2022/04/06 19:49:51 http://127.0.0.1:8081 is dead.
2022/04/06 19:49:51 127.0.0.1:8082
2022/04/06 19:49:51 127.0.0.1:8082
-------- node 调用次数 --------
2022/04/06 19:49:51 node: 127.0.0.1:8081 useCount: 4
2022/04/06 19:49:51 node: 127.0.0.1:8082 useCount: 5
2022/04/06 19:49:51 node: 127.0.0.1:8083 useCount: 5
从测试后果中能够看出,node 调用程序是随机的,node 调用次数负载到未宕机的 node2、node3 上总计被调用 10 次
阐明咱们的算法也是胜利的
加权轮询
加权轮询咱们也基于轮询的代码来批改
批改配置文件
{
"proxy": {"url": "127.0.0.1:8080"},
"nodes": [
{
"url": "127.0.0.1:8081",
"weight": 2
},
{
"url": "127.0.0.1:8082",
"weight": 3
},
{
"url": "127.0.0.1:8083",
"weight": 5
}
]
}
咱们再给 Node
的构造体加两个属性
currentWeight
node 长期权重effectiveWeight
node 无效权重Weight
node 权重
type Node struct {
currentWeight int // node 长期权重
effectiveWeight int // node 无效权重
Weight int `json:"weight"` // node 权重
IsDead bool
useCount int
URL string `json:"url"`
mu sync.RWMutex
}
批改 init
函数如下如下代码
func init() {data, err := ioutil.ReadFile("./config.json")
if err != nil {log.Fatal(err.Error())
}
json.Unmarshal(data, &cfg)
for _, node := range cfg.Nodes {node.currentWeight = node.Weight}
}
批改 rrlbHandler
函数为如下代码
func rrlbHandler(w http.ResponseWriter, r *http.Request) {mu.Lock()
currentNode := cfg.Next()
targetURL, err := url.Parse("http://" + currentNode.URL)
if err != nil {log.Fatal(err.Error())
}
log.Println(targetURL.Host)
mu.Unlock()
reverseProxy := httputil.NewSingleHostReverseProxy(targetURL)
reverseProxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, e error) {
// NOTE: It is better to implement retry.
log.Printf("%v is dead.", targetURL)
currentNode.SetDead(true)
rrlbHandler(w, r)
}
reverseProxy.ServeHTTP(w, r)
}
增加 Next
函数代码如下,此函数即为加权轮询外围算法
留神:在获取最大长期权重 node 的过程中咱们要保障 最大长期权重 node 的长期权重要继续递加 ,而且 各个 node 的长期权重要继续递增,以保障调度的平滑性
func (c *Config) Next() *Node {
totalEffectiveWeight := 0
var maxWeightNode *Node
for _, node := range c.Nodes {
// 1. 统计所有 node 无效权证之和
totalEffectiveWeight += node.effectiveWeight
// 2. 变更 node 长期权重 =node 长期权重 +node 无效权重
// node 长期权重递增,交替加权,以保障调度平滑性
node.currentWeight += node.effectiveWeight
// 3.node 无效权重默认与 node 权长期重雷同,通信异样时 -1, 通信胜利 +1, 直到复原到 weight 大小
if node.effectiveWeight < node.Weight {if node.GetIsDead() {node.effectiveWeight--} else {node.effectiveWeight++}
}
// 4. 抉择最大长期权重 node
if maxWeightNode == nil || node.currentWeight > maxWeightNode.currentWeight {maxWeightNode = node}
}
if maxWeightNode == nil {return nil}
// 5. 变更 node 长期权重 =node 长期权重 -node 无效权重之和
// 最大权重 node 长期权重递加,交替减权,以保障调度平滑性
maxWeightNode.currentWeight -= totalEffectiveWeight
if maxWeightNode.GetIsDead() {maxWeightNode = c.Next()
}
maxWeightNode.useCount++
return maxWeightNode
}
测试后果如下:
-------- node 调度程序 --------
2022/04/06 21:50:00 127.0.0.1:8083
2022/04/06 21:50:00 127.0.0.1:8083
2022/04/06 21:50:00 127.0.0.1:8083
2022/04/06 21:50:00 127.0.0.1:8082
2022/04/06 21:50:00 127.0.0.1:8083
2022/04/06 21:50:00 127.0.0.1:8082
2022/04/06 21:50:00 127.0.0.1:8083
2022/04/06 21:50:00 127.0.0.1:8083
2022/04/06 21:50:00 127.0.0.1:8081
2022/04/06 21:50:00 http://127.0.0.1:8081 is dead.
2022/04/06 21:50:00 127.0.0.1:8083
2022/04/06 21:50:00 127.0.0.1:8082
-------- node 调用次数 --------
2022/04/06 21:50:00 node: 127.0.0.1:8081 useCount: 1
2022/04/06 21:50:00 node: 127.0.0.1:8082 useCount: 3
2022/04/06 21:50:00 node: 127.0.0.1:8083 useCount: 7
从后果中能够看出,调度还是比拟平滑的,而且对应权重 node 在调度中调用次数也比拟正当