Balancer是一个由Golang开发的反向代理7层负载平衡,是一个适宜初学者学习的Golang我的项目,明天咱们就来看看这个我的项目是如何实现的。
前言
在开始理解具体的我的项目前须要理解一些根底的概念。
反向代理
反向代理指的是当用户拜访接口或者是服务器资源时并不间接拜访具体服务器,而是通过拜访代理服务器,而后代理服务器依据具体的用户申请去具体的内网中的服务器获取所需的数据。
反向代理在互联网中被大量利用,通常反向代理由web服务器来实现,例如nginx
,openresty
等。
负载平衡
单点利用会面临可用性和性能的问题,因而往往须要进行多台服务部署。此时为了使得流量均匀分布到不同的服务器上,须要启用负载平衡机制。
一般来说负载平衡的能力是反向代理服务器自带的能力,负载平衡会有不少的算法,轮询加权等等,这个后续会介绍。
代码实现
Balancer
作为一个反向代理的负载均衡器,其蕴含了不同负载平衡算法实现,以及一些心跳放弃,健康检查的根底能力。
我的项目构造
Balancer
的我的项目构造极为简略,次要就是服务器的根本代理实现proxy
和负载平衡算法实现balancer
服务器根本构造
Balancer
基于"net/http/httputil"
包来实现本身根底能力,通过其ReverseProxy
来实现本身的反向代理能力。
服务所需配置样例如下:
schema: httpport: 8089ssl_certificate:ssl_certificate_key:tcp_health_check: truehealth_check_interval: 3 max_allowed: 100location: - pattern: / proxy_pass: - "http://192.168.1.1" - "http://192.168.1.2:1015" - "https://192.168.1.2" - "http://my-server.com" balance_mode: round-robin
服务器启动的入口在main
中,启动办法很简略:
func main() { config, err := ReadConfig("config.yaml") if err != nil { log.Fatalf("read config error: %s", err) } err = config.Validation() if err != nil { log.Fatalf("verify config error: %s", err) } router := mux.NewRouter() for _, l := range config.Location { httpProxy, err := proxy.NewHTTPProxy(l.ProxyPass, l.BalanceMode) if err != nil { log.Fatalf("create proxy error: %s", err) } // start health check if config.HealthCheck { httpProxy.HealthCheck(config.HealthCheckInterval) } router.Handle(l.Pattern, httpProxy) } if config.MaxAllowed > 0 { router.Use(maxAllowedMiddleware(config.MaxAllowed)) } svr := http.Server{ Addr: ":" + strconv.Itoa(config.Port), Handler: router, } // print config detail config.Print() // listen and serve if config.Schema == "http" { err := svr.ListenAndServe() if err != nil { log.Fatalf("listen and serve error: %s", err) } } else if config.Schema == "https" { err := svr.ListenAndServeTLS(config.SSLCertificate, config.SSLCertificateKey) if err != nil { log.Fatalf("listen and serve error: %s", err) } }}
在上述的启动办法中做了如下几件事:
- 获取到服务器的配置并进行解析
- 应用配置中配置的反向代理地址和负载平衡算法来初始化服务器并开启健康检查
- 启动服务
上述流程很简略,其重点在于服务器的构建:
type HTTPProxy struct { hostMap map[string]*httputil.ReverseProxy lb balancer.Balancer sync.RWMutex // protect alive alive map[string]bool}
下面是Balancer
服务器的根本构造体,其中蕴含了负载均衡器lb
,hostMap
用来记录主机和反向代理之间的映射关系,alive
用来记录反向代理服务器的衰弱状态。
func NewHTTPProxy(targetHosts []string, algorithm string) ( *HTTPProxy, error) { hosts := make([]string, 0) hostMap := make(map[string]*httputil.ReverseProxy) alive := make(map[string]bool) for _, targetHost := range targetHosts { url, err := url.Parse(targetHost) if err != nil { return nil, err } proxy := httputil.NewSingleHostReverseProxy(url) originDirector := proxy.Director proxy.Director = func(req *http.Request) { originDirector(req) req.Header.Set(XProxy, ReverseProxy) req.Header.Set(XRealIP, GetIP(req)) } host := GetHost(url) alive[host] = true // initial mark alive hostMap[host] = proxy hosts = append(hosts, host) } lb, err := balancer.Build(algorithm, hosts) if err != nil { return nil, err } return &HTTPProxy{ hostMap: hostMap, lb: lb, alive: alive, }, nil}
NewHTTPProxy
实用于构建服务器反向代理的。他接管指标host的数组和负载平衡算法,之后将数据进行整合以此构建残缺的hostMap
和alive
中的map数据。在上述的处理过程中反向代理额定增加了两个申请头将之传递到上游。
负载平衡算法
Balancer
整个我的项目的外围是他实现的多种不同负载平衡算法,包含:bounded,random,consistent-hash,ip-hash,p2c,least-load,round-robin
咱们先来看下负载均衡器是如何设计的:
首先对负载均衡器进行形象,形象出Balancer
接口:
type Balancer interface { Add(string) Remove(string) Balance(string) (string, error) Inc(string) Done(string)}
其中Add
和Remove
用于增删负载平衡集群中的具体机器,Inc
和Done
则用于管制申请数目(如果有配置最大申请数),Balance
用于抉择最初负载平衡算法计算出的指标服务器。
并且其提供了各个算法的map映射以及创立的工厂办法:
type Factory func([]string) Balancervar factories = make(map[string]Factory)func Build(algorithm string, hosts []string) (Balancer, error) { factory, ok := factories[algorithm] if !ok { return nil, AlgorithmNotSupportedError } return factory(hosts), nil}
因为大部分算法的Add
和Remove
逻辑雷同,因而构建了BaseBalancer
将这部分代码形象进去:
type BaseBalancer struct { sync.RWMutex hosts []string}// Add new host to the balancerfunc (b *BaseBalancer) Add(host string) { b.Lock() defer b.Unlock() for _, h := range b.hosts { if h == host { return } } b.hosts = append(b.hosts, host)}// Remove new host from the balancerfunc (b *BaseBalancer) Remove(host string) { b.Lock() defer b.Unlock() for i, h := range b.hosts { if h == host { b.hosts = append(b.hosts[:i], b.hosts[i+1:]...) return } }}// Balance selects a suitable host accordingfunc (b *BaseBalancer) Balance(key string) (string, error) { return "", nil}// Inc .func (b *BaseBalancer) Inc(_ string) {}// Done .func (b *BaseBalancer) Done(_ string) {}
大体逻辑是通过数组存储host信息,Add
和Remove
办法就是在数组中进行增删具体的host信息。
round robin
round robin
个别称之为轮询,是最经典,用处最宽泛的负载平衡算法之一。
type RoundRobin struct { BaseBalancer i uint64}func (r *RoundRobin) Balance(_ string) (string, error) { r.RLock() defer r.RUnlock() if len(r.hosts) == 0 { return "", NoHostError } host := r.hosts[r.i%uint64(len(r.hosts))] r.i++ return host, nil}
它的代码实现原理是在RoundRobin
的构造体中定义好一个uint
值,每次申请时应用这个值和服务器数据进行取模操作以决定最初的指标服务器,并且在申请之后对于这个值进行累加,以确保下次申请应用不同服务器。
ip hash
ip hash
是在服务器场合上应用较多的一种负载平衡策略,其策略主旨是雷同用户ip的申请总是会落在雷同的服务器上。
func (r *IPHash) Balance(key string) (string, error) { r.RLock() defer r.RUnlock() if len(r.hosts) == 0 { return "", NoHostError } value := crc32.ChecksumIEEE([]byte(key)) % uint32(len(r.hosts)) return r.hosts[value], nil}
代码中的实现是将用户ip利用ChecksumIEEE
转换成uint之后和服务器数目进行取模操作。
random
顾名思义random
是随机选取算法。
代码很简略:
func (r *Random) Balance(_ string) (string, error) { r.RLock() defer r.RUnlock() if len(r.hosts) == 0 { return "", NoHostError } return r.hosts[r.rnd.Intn(len(r.hosts))], nil}
consistent hash
consistent hash
即为一致性哈希,如果不晓得什么是一致性hash能够去网上找材料,或者参考我之前的文章分布式系统中的哈希算法
func (c *Consistent) Add(host string) { c.ch.Add(host)}func (c *Consistent) Remove(host string) { c.ch.Remove(host)}func (c *Consistent) Balance(key string) (string, error) { if len(c.ch.Hosts()) == 0 { return "", NoHostError } return c.ch.Get(key)}
代码中的一致性哈希并未本人实现,借助的是开源库"github.com/lafikl/consistent"
bounded
bounded
指的是Consistent Hashing with Bounded Loads
,即有界负载的一致性哈希。此处就不多赘述,大家能够参考此篇文章。后续有人有趣味的话我可能独自开文讲一讲。
实现形式和一般一致性哈希一样,都是借助的开源库:
func (b *Bounded) Add(host string) { b.ch.Add(host)}func (b *Bounded) Remove(host string) { b.ch.Remove(host)}func (b *Bounded) Balance(key string) (string, error) { if len(b.ch.Hosts()) == 0 { return "", NoHostError } return b.ch.GetLeast(key)}func (b *Bounded) Inc(host string) { b.ch.Inc(host)}func (b *Bounded) Done(host string) { b.ch.Done(host)}
least load
least load
是最小负载算法,应用此算法,申请会抉择集群中负载最小的机器。
func (h *host) Tag() interface{} { return h.name }func (h *host) Key() float64 { return float64(h.load) }type LeastLoad struct { sync.RWMutex heap *fibHeap.FibHeap}func NewLeastLoad(hosts []string) Balancer { ll := &LeastLoad{heap: fibHeap.NewFibHeap()} for _, h := range hosts { ll.Add(h) } return ll}func (l *LeastLoad) Add(hostName string) { l.Lock() defer l.Unlock() if ok := l.heap.GetValue(hostName); ok != nil { return } _ = l.heap.InsertValue(&host{hostName, 0})}func (l *LeastLoad) Remove(hostName string) { l.Lock() defer l.Unlock() if ok := l.heap.GetValue(hostName); ok == nil { return } _ = l.heap.Delete(hostName)}func (l *LeastLoad) Balance(_ string) (string, error) { l.RLock() defer l.RUnlock() if l.heap.Num() == 0 { return "", NoHostError } return l.heap.MinimumValue().Tag().(string), nil}func (l *LeastLoad) Inc(hostName string) { l.Lock() defer l.Unlock() if ok := l.heap.GetValue(hostName); ok == nil { return } h := l.heap.GetValue(hostName) h.(*host).load++ _ = l.heap.IncreaseKeyValue(h)}func (l *LeastLoad) Done(hostName string) { l.Lock() defer l.Unlock() if ok := l.heap.GetValue(hostName); ok == nil { return } h := l.heap.GetValue(hostName) h.(*host).load-- _ = l.heap.DecreaseKeyValue(h)}
代码中比较简单的应用了以后正在解决的申请数目来作为服务器的负载程度,并且借助了开源库"github.com/starwander/GoFibonacciHeap"
来保护集群中服务器的负载值。
p2c
p2c全称 Power of Two Random Choices
,个别翻译为两次随机抉择算法,出自论文[The Power of Two Random Choices: A Survey of
Techniques and Results](http://www.eecs.harvard.edu/~michaelm/postscripts/handbook200...)
大题的思路是从服务器列表中进行两次随机抉择获取两个节点,而后进行比拟选出最终的指标服务器节点。
const Salt = "%#!"type host struct { name string load uint64}type P2C struct { sync.RWMutex hosts []*host rnd *rand.Rand loadMap map[string]*host}func (p *P2C) Balance(key string) (string, error) { p.RLock() defer p.RUnlock() if len(p.hosts) == 0 { return "", NoHostError } n1, n2 := p.hash(key) host := n2 if p.loadMap[n1].load <= p.loadMap[n2].load { host = n1 } return host, nil}func (p *P2C) hash(key string) (string, string) { var n1, n2 string if len(key) > 0 { saltKey := key + Salt n1 = p.hosts[crc32.ChecksumIEEE([]byte(key))%uint32(len(p.hosts))].name n2 = p.hosts[crc32.ChecksumIEEE([]byte(saltKey))%uint32(len(p.hosts))].name return n1, n2 } n1 = p.hosts[p.rnd.Intn(len(p.hosts))].name n2 = p.hosts[p.rnd.Intn(len(p.hosts))].name return n1, n2}func (p *P2C) Inc(host string) { p.Lock() defer p.Unlock() h, ok := p.loadMap[host] if !ok { return } h.load++}func (p *P2C) Done(host string) { p.Lock() defer p.Unlock() h, ok := p.loadMap[host] if !ok { return } if h.load > 0 { h.load-- }}
代码的实现思路是这样的,依据用户的ip作为key来进行hash操作,如果ip为空随机选取两个服务器,如果不为空,则别离应用ip和ip加盐后ChecksumIEEE
计算出的值来选取服务器,选出两个服务器后比拟两者的负载情况,抉择负载更小的那个。负载的计算形式和least load
保持一致。
总结
咱们能够看到Balancer
实际上实现了简略的方向代理能力以及实现了多种的负载平衡算法,不仅能够作为服务器独自运行,还能够作为sdk提供负载平衡算法的应用。其代码实现较为简单,然而非常清晰,非常适合刚接触golang的开发者。