共计 8287 个字符,预计需要花费 21 分钟才能阅读完成。
Balancer 是一个由 Golang 开发的反向代理 7 层负载平衡,是一个适宜初学者学习的 Golang 我的项目,明天咱们就来看看这个我的项目是如何实现的。
前言
在开始理解具体的我的项目前须要理解一些根底的概念。
反向代理
反向代理指的是当用户拜访接口或者是服务器资源时并不间接拜访具体服务器,而是通过拜访代理服务器,而后代理服务器依据具体的用户申请去具体的内网中的服务器获取所需的数据。
反向代理在互联网中被大量利用,通常反向代理由 web 服务器来实现,例如 nginx
,openresty
等。
负载平衡
单点利用会面临可用性和性能的问题,因而往往须要进行多台服务部署。此时为了使得流量均匀分布到不同的服务器上,须要启用负载平衡机制。
一般来说负载平衡的能力是反向代理服务器自带的能力,负载平衡会有不少的算法,轮询加权等等,这个后续会介绍。
代码实现
Balancer
作为一个反向代理的负载均衡器,其蕴含了不同负载平衡算法实现,以及一些心跳放弃,健康检查的根底能力。
我的项目构造
Balancer
的我的项目构造极为简略,次要就是服务器的根本代理实现 proxy
和负载平衡算法实现balancer
服务器根本构造
Balancer
基于 "net/http/httputil"
包来实现本身根底能力,通过其 ReverseProxy
来实现本身的反向代理能力。
服务所需配置样例如下:
schema: http
port: 8089
ssl_certificate:
ssl_certificate_key:
tcp_health_check: true
health_check_interval: 3
max_allowed: 100
location:
- pattern: /
proxy_pass:
- "http://192.168.1.1"
- "http://192.168.1.2:1015"
- "https://192.168.1.2"
- "http://my-server.com"
balance_mode: round-robin
服务器启动的入口在 main
中,启动办法很简略:
func main() {config, err := ReadConfig("config.yaml")
if err != nil {log.Fatalf("read config error: %s", err)
}
err = config.Validation()
if err != nil {log.Fatalf("verify config error: %s", err)
}
router := mux.NewRouter()
for _, l := range config.Location {httpProxy, err := proxy.NewHTTPProxy(l.ProxyPass, l.BalanceMode)
if err != nil {log.Fatalf("create proxy error: %s", err)
}
// start health check
if config.HealthCheck {httpProxy.HealthCheck(config.HealthCheckInterval)
}
router.Handle(l.Pattern, httpProxy)
}
if config.MaxAllowed > 0 {router.Use(maxAllowedMiddleware(config.MaxAllowed))
}
svr := http.Server{Addr: ":" + strconv.Itoa(config.Port),
Handler: router,
}
// print config detail
config.Print()
// listen and serve
if config.Schema == "http" {err := svr.ListenAndServe()
if err != nil {log.Fatalf("listen and serve error: %s", err)
}
} else if config.Schema == "https" {err := svr.ListenAndServeTLS(config.SSLCertificate, config.SSLCertificateKey)
if err != nil {log.Fatalf("listen and serve error: %s", err)
}
}
}
在上述的启动办法中做了如下几件事:
- 获取到服务器的配置并进行解析
- 应用配置中配置的反向代理地址和负载平衡算法来初始化服务器并开启健康检查
- 启动服务
上述流程很简略,其重点在于服务器的构建:
type HTTPProxy struct {hostMap map[string]*httputil.ReverseProxy
lb balancer.Balancer
sync.RWMutex // protect alive
alive map[string]bool
}
下面是 Balancer
服务器的根本构造体,其中蕴含了负载均衡器 lb
,hostMap
用来记录主机和反向代理之间的映射关系,alive
用来记录反向代理服务器的衰弱状态。
func NewHTTPProxy(targetHosts []string, algorithm string) (*HTTPProxy, error) {hosts := make([]string, 0)
hostMap := make(map[string]*httputil.ReverseProxy)
alive := make(map[string]bool)
for _, targetHost := range targetHosts {url, err := url.Parse(targetHost)
if err != nil {return nil, err}
proxy := httputil.NewSingleHostReverseProxy(url)
originDirector := proxy.Director
proxy.Director = func(req *http.Request) {originDirector(req)
req.Header.Set(XProxy, ReverseProxy)
req.Header.Set(XRealIP, GetIP(req))
}
host := GetHost(url)
alive[host] = true // initial mark alive
hostMap[host] = proxy
hosts = append(hosts, host)
}
lb, err := balancer.Build(algorithm, hosts)
if err != nil {return nil, err}
return &HTTPProxy{
hostMap: hostMap,
lb: lb,
alive: alive,
}, nil
}
NewHTTPProxy
实用于构建服务器反向代理的。他接管指标 host 的数组和负载平衡算法,之后将数据进行整合以此构建残缺的 hostMap
和alive
中的 map 数据。在上述的处理过程中反向代理额定增加了两个申请头将之传递到上游。
负载平衡算法
Balancer
整个我的项目的外围是他实现的多种不同负载平衡算法,包含:bounded,random,consistent-hash,ip-hash,p2c,least-load,round-robin
咱们先来看下负载均衡器是如何设计的:
首先对负载均衡器进行形象,形象出 Balancer
接口:
type Balancer interface {Add(string)
Remove(string)
Balance(string) (string, error)
Inc(string)
Done(string)
}
其中 Add
和Remove
用于增删负载平衡集群中的具体机器,Inc
和 Done
则用于管制申请数目(如果有配置最大申请数),Balance
用于抉择最初负载平衡算法计算出的指标服务器。
并且其提供了各个算法的 map 映射以及创立的工厂办法:
type Factory func([]string) Balancer
var factories = make(map[string]Factory)
func Build(algorithm string, hosts []string) (Balancer, error) {factory, ok := factories[algorithm]
if !ok {return nil, AlgorithmNotSupportedError}
return factory(hosts), nil
}
因为大部分算法的 Add
和Remove
逻辑雷同,因而构建了 BaseBalancer
将这部分代码形象进去:
type BaseBalancer struct {
sync.RWMutex
hosts []string}
// Add new host to the balancer
func (b *BaseBalancer) Add(host string) {b.Lock()
defer b.Unlock()
for _, h := range b.hosts {
if h == host {return}
}
b.hosts = append(b.hosts, host)
}
// Remove new host from the balancer
func (b *BaseBalancer) Remove(host string) {b.Lock()
defer b.Unlock()
for i, h := range b.hosts {
if h == host {b.hosts = append(b.hosts[:i], b.hosts[i+1:]...)
return
}
}
}
// Balance selects a suitable host according
func (b *BaseBalancer) Balance(key string) (string, error) {return "", nil}
// Inc .
func (b *BaseBalancer) Inc(_ string) {}
// Done .
func (b *BaseBalancer) Done(_ string) {}
大体逻辑是通过数组存储 host 信息,Add
和 Remove
办法就是在数组中进行增删具体的 host 信息。
round robin
round robin
个别称之为轮询,是最经典,用处最宽泛的负载平衡算法之一。
type RoundRobin struct {
BaseBalancer
i uint64
}
func (r *RoundRobin) Balance(_ string) (string, error) {r.RLock()
defer r.RUnlock()
if len(r.hosts) == 0 {return "", NoHostError}
host := r.hosts[r.i%uint64(len(r.hosts))]
r.i++
return host, nil
}
它的代码实现原理是在 RoundRobin
的构造体中定义好一个 uint
值,每次申请时应用这个值和服务器数据进行取模操作以决定最初的指标服务器,并且在申请之后对于这个值进行累加,以确保下次申请应用不同服务器。
ip hash
ip hash
是在服务器场合上应用较多的一种负载平衡策略,其策略主旨是雷同用户 ip 的申请总是会落在雷同的服务器上。
func (r *IPHash) Balance(key string) (string, error) {r.RLock()
defer r.RUnlock()
if len(r.hosts) == 0 {return "", NoHostError}
value := crc32.ChecksumIEEE([]byte(key)) % uint32(len(r.hosts))
return r.hosts[value], nil
}
代码中的实现是将用户 ip 利用 ChecksumIEEE
转换成 uint 之后和服务器数目进行取模操作。
random
顾名思义 random
是随机选取算法。
代码很简略:
func (r *Random) Balance(_ string) (string, error) {r.RLock()
defer r.RUnlock()
if len(r.hosts) == 0 {return "", NoHostError}
return r.hosts[r.rnd.Intn(len(r.hosts))], nil
}
consistent hash
consistent hash
即为一致性哈希,如果不晓得什么是一致性 hash 能够去网上找材料,或者参考我之前的文章分布式系统中的哈希算法
func (c *Consistent) Add(host string) {c.ch.Add(host)
}
func (c *Consistent) Remove(host string) {c.ch.Remove(host)
}
func (c *Consistent) Balance(key string) (string, error) {if len(c.ch.Hosts()) == 0 {return "", NoHostError}
return c.ch.Get(key)
}
代码中的一致性哈希并未本人实现,借助的是开源库"github.com/lafikl/consistent"
bounded
bounded
指的是Consistent Hashing with Bounded Loads
,即有界负载的一致性哈希。此处就不多赘述,大家能够参考此篇文章。后续有人有趣味的话我可能独自开文讲一讲。
实现形式和一般一致性哈希一样,都是借助的开源库:
func (b *Bounded) Add(host string) {b.ch.Add(host)
}
func (b *Bounded) Remove(host string) {b.ch.Remove(host)
}
func (b *Bounded) Balance(key string) (string, error) {if len(b.ch.Hosts()) == 0 {return "", NoHostError}
return b.ch.GetLeast(key)
}
func (b *Bounded) Inc(host string) {b.ch.Inc(host)
}
func (b *Bounded) Done(host string) {b.ch.Done(host)
}
least load
least load
是最小负载算法,应用此算法,申请会抉择集群中负载最小的机器。
func (h *host) Tag() interface{} {return h.name}
func (h *host) Key() float64 { return float64(h.load) }
type LeastLoad struct {
sync.RWMutex
heap *fibHeap.FibHeap
}
func NewLeastLoad(hosts []string) Balancer {ll := &LeastLoad{heap: fibHeap.NewFibHeap()}
for _, h := range hosts {ll.Add(h)
}
return ll
}
func (l *LeastLoad) Add(hostName string) {l.Lock()
defer l.Unlock()
if ok := l.heap.GetValue(hostName); ok != nil {return}
_ = l.heap.InsertValue(&host{hostName, 0})
}
func (l *LeastLoad) Remove(hostName string) {l.Lock()
defer l.Unlock()
if ok := l.heap.GetValue(hostName); ok == nil {return}
_ = l.heap.Delete(hostName)
}
func (l *LeastLoad) Balance(_ string) (string, error) {l.RLock()
defer l.RUnlock()
if l.heap.Num() == 0 {return "", NoHostError}
return l.heap.MinimumValue().Tag().(string), nil
}
func (l *LeastLoad) Inc(hostName string) {l.Lock()
defer l.Unlock()
if ok := l.heap.GetValue(hostName); ok == nil {return}
h := l.heap.GetValue(hostName)
h.(*host).load++
_ = l.heap.IncreaseKeyValue(h)
}
func (l *LeastLoad) Done(hostName string) {l.Lock()
defer l.Unlock()
if ok := l.heap.GetValue(hostName); ok == nil {return}
h := l.heap.GetValue(hostName)
h.(*host).load--
_ = l.heap.DecreaseKeyValue(h)
}
代码中比较简单的应用了以后正在解决的申请数目来作为服务器的负载程度,并且借助了开源库 "github.com/starwander/GoFibonacciHeap"
来保护集群中服务器的负载值。
p2c
p2c 全称 Power of Two Random Choices
,个别翻译为两次随机抉择算法,出自论文[The Power of Two Random Choices: A Survey of
Techniques and Results](http://www.eecs.harvard.edu/~michaelm/postscripts/handbook200…)
大题的思路是从服务器列表中进行两次随机抉择获取两个节点,而后进行比拟选出最终的指标服务器节点。
const Salt = "%#!"
type host struct {
name string
load uint64
}
type P2C struct {
sync.RWMutex
hosts []*host
rnd *rand.Rand
loadMap map[string]*host
}
func (p *P2C) Balance(key string) (string, error) {p.RLock()
defer p.RUnlock()
if len(p.hosts) == 0 {return "", NoHostError}
n1, n2 := p.hash(key)
host := n2
if p.loadMap[n1].load <= p.loadMap[n2].load {host = n1}
return host, nil
}
func (p *P2C) hash(key string) (string, string) {
var n1, n2 string
if len(key) > 0 {
saltKey := key + Salt
n1 = p.hosts[crc32.ChecksumIEEE([]byte(key))%uint32(len(p.hosts))].name
n2 = p.hosts[crc32.ChecksumIEEE([]byte(saltKey))%uint32(len(p.hosts))].name
return n1, n2
}
n1 = p.hosts[p.rnd.Intn(len(p.hosts))].name
n2 = p.hosts[p.rnd.Intn(len(p.hosts))].name
return n1, n2
}
func (p *P2C) Inc(host string) {p.Lock()
defer p.Unlock()
h, ok := p.loadMap[host]
if !ok {return}
h.load++
}
func (p *P2C) Done(host string) {p.Lock()
defer p.Unlock()
h, ok := p.loadMap[host]
if !ok {return}
if h.load > 0 {h.load--}
}
代码的实现思路是这样的,依据用户的 ip 作为 key 来进行 hash 操作,如果 ip 为空随机选取两个服务器,如果不为空,则别离应用 ip 和 ip 加盐后 ChecksumIEEE
计算出的值来选取服务器,选出两个服务器后比拟两者的负载情况,抉择负载更小的那个。负载的计算形式和 least load
保持一致。
总结
咱们能够看到 Balancer
实际上实现了简略的方向代理能力以及实现了多种的负载平衡算法,不仅能够作为服务器独自运行,还能够作为 sdk 提供负载平衡算法的应用。其代码实现较为简单,然而非常清晰,非常适合刚接触 golang 的开发者。