一致性哈希
一致性哈希次要针对的是缓存服务做负载平衡,以保障缓存节点变更后缓存生效过多,导致缓存穿透,从而把数据库打死。
一致性哈希原理能够参考这篇文章图解一致性哈希算法,细节分析本文不再赘述。
咱们来看看其外围算法
// service node 构造体定义
type ServiceNode struct {
Ip string
Port string
Index int
}
// 返回 service node 实例
func NewServiceNode(ip, port string) *ServiceNode {
return &ServiceNode{
Ip: ip,
Port: port,
}
}
func (sn *ServiceNode) SetIndex(index int) {sn.Index = index}
type UInt32Slice []uint32
// Len()
func (s UInt32Slice) Len() int {return len(s)
}
// Less()
func (s UInt32Slice) Less(i, j int) bool {return s[i] < s[j]
}
// Swap()
func (s UInt32Slice) Swap(i, j int) {s[i], s[j] = s[j], s[i]
}
// 虚构节点构造定义
type VirtualNode struct {VirtualNodes map[uint32]*ServiceNode
NodeKeys UInt32Slice
sync.RWMutex
}
// 实例化虚构节点对象
func NewVirtualNode() *VirtualNode {
return &VirtualNode{VirtualNodes: map[uint32]*ServiceNode{},}
}
// 增加虚构节点
func (v *VirtualNode) AddVirtualNode(serviceNode *ServiceNode, virtualNum uint) {
// 并发读写 map- 加锁
v.Lock()
defer v.Unlock()
for i := uint(0); i < virtualNum; i++ {hashStr := serviceNode.Ip + ":" + serviceNode.Port + ":" + strconv.Itoa(int(i))
v.VirtualNodes[v.getHashCode(hashStr)] = serviceNode
}
// 虚构节点 hash 值排序
v.sortHash()}
// 移除虚构节点
func (v *VirtualNode) RemoveVirtualNode(serviceNode *ServiceNode, virtualNum uint) {
// 并发读写 map- 加锁
v.Lock()
defer v.Unlock()
for i := uint(0); i < virtualNum; i++ {hashStr := serviceNode.Ip + ":" + serviceNode.Port + ":" + strconv.Itoa(int(i))
delete(v.VirtualNodes, v.getHashCode(hashStr))
}
v.sortHash()}
// 获取虚构节点 (二分查找)
func (v *VirtualNode) GetVirtualNodel(routeKey string) *ServiceNode {
// 并发读写 map- 加读锁, 可并发读不可同时写
v.RLock()
defer v.RUnlock()
index := 0
hashCode := v.getHashCode(routeKey)
i := sort.Search(len(v.NodeKeys), func(i int) bool {return v.NodeKeys[i] > hashCode })
// 当 i 大于下标最大值时, 证实没找到, 给到第 0 个虚构节点, 当 i 小于 node 节点数时, index 为以后节点
if i < len(v.NodeKeys) {index = i} else {index = 0}
// 返回具体节点
return v.VirtualNodes[v.NodeKeys[index]]
}
// hash 数值排序
func (v *VirtualNode) sortHash() {
v.NodeKeys = nil
for k := range v.VirtualNodes {v.NodeKeys = append(v.NodeKeys, k)
}
sort.Sort(v.NodeKeys)
}
// 获取 hash code(采纳 md5 字符串后计算)
func (v *VirtualNode) getHashCode(nodeHash string) uint32 {
// crc32 形式 hash code
// return crc32.ChecksumIEEE([]byte(nodeHash))
md5 := md5.New()
md5.Write([]byte(nodeHash))
md5Str := hex.EncodeToString(md5.Sum(nil))
h := 0
byteHash := []byte(md5Str)
for i := 0; i < 32; i++ {
h <<= 8
h |= int(byteHash[i]) & 0xFF
}
return uint32(h)
}
咱们来写测试代码,测试下
func Test_HashConsistency(t *testing.T) {
// 实例化 10 个实体节点
var serverNodes []*hashconsistency.ServiceNode
serverNodes = append(serverNodes, hashconsistency.NewServiceNode("127.0.0.1", "3300"))
serverNodes = append(serverNodes, hashconsistency.NewServiceNode("127.0.0.1", "3301"))
serverNodes = append(serverNodes, hashconsistency.NewServiceNode("127.0.0.1", "3302"))
serverNodes = append(serverNodes, hashconsistency.NewServiceNode("127.0.0.1", "3303"))
serverNodes = append(serverNodes, hashconsistency.NewServiceNode("127.0.0.1", "3304"))
serverNodes = append(serverNodes, hashconsistency.NewServiceNode("127.0.0.1", "3305"))
serverNodes = append(serverNodes, hashconsistency.NewServiceNode("127.0.0.1", "3306"))
serverNodes = append(serverNodes, hashconsistency.NewServiceNode("127.0.0.1", "3307"))
serverNodes = append(serverNodes, hashconsistency.NewServiceNode("127.0.0.1", "3308"))
serverNodes = append(serverNodes, hashconsistency.NewServiceNode("127.0.0.1", "3309"))
serverNodesLen := uint(len(serverNodes))
virtualNodeService := hashconsistency.NewVirtualNode()
// 增加对应的虚拟化节点数
for _, sn := range serverNodes {virtualNodeService.AddVirtualNode(sn, serverNodesLen)
}
// 打印节点列表
var nodes1, nodes2 []string
fmt.Println("-------- node 调度程序 --------")
for i := 1; i <= 20; i++ {
// 移除 node2 节点
if i == 11 {virtualNodeService.RemoveVirtualNode(serverNodes[1], serverNodesLen)
}
cacheKey := fmt.Sprintf("user:id:%d", i%10)
// 获取对应节点地址
serviceNode := virtualNodeService.GetVirtualNodel(cacheKey)
str := fmt.Sprintf("node: %s cachekey: %s", serviceNode.Ip+":"+serviceNode.Port, cacheKey)
if i <= 10 {nodes1 = append(nodes1, str)
} else {nodes2 = append(nodes2, str)
}
}
utils.PrintDiff(strings.Join(nodes1, "\n"), strings.Join(nodes2, "\n"))
}
测试后果如下:
-------- node 调度程序 --------
-node: 127.0.0.1:3301 cachekey: user:id:1 // node1 宕机
+node: 127.0.0.1:3300 cachekey: user:id:1 // 原 node1 的缓路由到此 node0
node: 127.0.0.1:3309 cachekey: user:id:2
node: 127.0.0.1:3309 cachekey: user:id:3
node: 127.0.0.1:3309 cachekey: user:id:4
node: 127.0.0.1:3300 cachekey: user:id:5
node: 127.0.0.1:3307 cachekey: user:id:6
-node: 127.0.0.1:3301 cachekey: user:id:7 // node1 宕机
+node: 127.0.0.1:3302 cachekey: user:id:7 // 原 node1 的缓路由到此 node2
node: 127.0.0.1:3305 cachekey: user:id:8
-node: 127.0.0.1:3301 cachekey: user:id:9 // node1 宕机
+node: 127.0.0.1:3300 cachekey: user:id:9 // 原 node1 的缓路由到此 node0
node: 127.0.0.1:3309 cachekey: user:id:0
从测试中能够看出宕机的 node 都被主动路由到最近的 node,而没有宕机的 node 持续承接旧的缓存 key,阐明通过一致性哈希算法,能够保障咱们的缓存不会因为服务宕机操作大面积缓存生效的问题
咱们再把一致性哈希算法带入到服务中,来看看成果如何
// Config is a configuration.
type Config struct {
Proxy Proxy `json:"proxy"`
Nodes []*Node `json:"nodes"`
HashConsistency *VirtualNode
HashConsistencyVirtualNum uint
}
// Proxy is a reverse proxy, and means load balancer.
type Proxy struct {Url string `json:"url"`}
// Node is servers which load balancer is transferred.
type Node struct {
URL string `json:"url"`
IsDead bool
UseCount int
mu sync.RWMutex
}
var cfg Config
func init() {data, err := ioutil.ReadFile("./config.json")
if err != nil {log.Fatal(err.Error())
}
json.Unmarshal(data, &cfg)
if cfg.HashConsistencyVirtualNum == 0 {cfg.HashConsistencyVirtualNum = 10}
cfg.HashConsistency = NewVirtualNode()
for i, node := range cfg.Nodes {addr := strings.Split(node.URL, ":")
serviceNode := NewServiceNode(addr[0], addr[1])
serviceNode.SetIndex(i)
cfg.HashConsistency.AddVirtualNode(serviceNode, cfg.HashConsistencyVirtualNum)
}
}
func GetCfg() Config {return cfg}
// SetDead updates the value of IsDead in node.
func (node *Node) SetDead(b bool) {node.mu.Lock()
node.IsDead = b
addr := strings.Split(node.URL, ":")
serviceNode := NewServiceNode(addr[0], addr[1])
cfg.HashConsistency.RemoveVirtualNode(serviceNode, cfg.HashConsistencyVirtualNum)
node.mu.Unlock()}
// GetIsDead returns the value of IsDead in node.
func (node *Node) GetIsDead() bool {node.mu.RLock()
isAlive := node.IsDead
node.mu.RUnlock()
return isAlive
}
var mu sync.Mutex
// rrlbbHandler is a handler for round robin load balancing
func rrlbbHandler(w http.ResponseWriter, r *http.Request) {
// Round Robin
mu.Lock()
cacheKey := r.Header.Get("cache-key")
virtualNodel := cfg.HashConsistency.GetVirtualNodel(cacheKey)
targetURL, err := url.Parse(fmt.Sprintf("http://%s:%s", virtualNodel.Ip, virtualNodel.Port))
if err != nil {log.Fatal(err.Error())
}
currentNode := cfg.Nodes[virtualNodel.Index]
currentNode.UseCount++
if currentNode.GetIsDead() {rrlbbHandler(w, r)
return
}
mu.Unlock()
reverseProxy := httputil.NewSingleHostReverseProxy(targetURL)
reverseProxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, e error) {
// NOTE: It is better to implement retry.
log.Printf("%v is dead.", targetURL)
currentNode.SetDead(true)
rrlbbHandler(w, r)
}
w.Header().Add("balancer-node", virtualNodel.Ip+virtualNodel.Port)
reverseProxy.ServeHTTP(w, r)
}
// pingNode checks if the node is alive.
func isAlive(url *url.URL) bool {conn, err := net.DialTimeout("tcp", url.Host, time.Minute*1)
if err != nil {log.Printf("Unreachable to %v, error %s:", url.Host, err.Error())
return false
}
defer conn.Close()
return true
}
// healthCheck is a function for healthcheck
func healthCheck() {t := time.NewTicker(time.Minute * 1)
for {
select {
case <-t.C:
for _, node := range cfg.Nodes {pingURL, err := url.Parse(node.URL)
if err != nil {log.Fatal(err.Error())
}
isAlive := isAlive(pingURL)
node.SetDead(!isAlive)
msg := "ok"
if !isAlive {msg = "dead"}
log.Printf("%v checked %s by healthcheck", node.URL, msg)
}
}
}
}
// ProxyServerStart serves a proxy
func ProxyServerStart() {
var err error
go healthCheck()
s := http.Server{
Addr: cfg.Proxy.Url,
Handler: http.HandlerFunc(rrlbbHandler),
}
if err = s.ListenAndServe(); err != nil {log.Fatal(err.Error())
}
}
// ProxyServerStart serves a node
func NodeServerStart() {http.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {w.Write([]byte("pong"))
})
wg := new(sync.WaitGroup)
wg.Add(len(cfg.Nodes))
for i, node := range cfg.Nodes {go func() {
if i != 0 {log.Fatal(http.ListenAndServe(node.URL, nil))
}
// log.Fatal(http.ListenAndServe(node.URL, nil))
wg.Done()}()
time.Sleep(time.Millisecond * 100)
}
wg.Wait()}
编写测试代码测试下:
func Test_HashConsistencyWithServer(t *testing.T) {go hashconsistency.NodeServerStart()
time.Sleep(time.Millisecond * 200)
go hashconsistency.ProxyServerStart()
time.Sleep(time.Millisecond * 100)
for _, tt := range [...]struct {
name, method, uri string
body io.Reader
want *http.Request
wantBody string
}{
{
name: "GET with ping url",
method: "GET",
uri: "http://127.0.0.1:8080/ping",
body: nil,
wantBody: "pong",
},
} {t.Run(tt.name, func(t *testing.T) {fmt.Println("-------- node 调度程序 --------")
var nodes1, nodes2 []string
for i := 1; i <= 20; i++ {cacheKey := fmt.Sprintf("user:id:%d", i%10)
cli := utils.NewHttpClient().
SetHeader(map[string]string{"cache-key": cacheKey,}).SetMethod(tt.method).SetUrl(tt.uri).SetBody(tt.body)
err := cli.Request(nil)
if err != nil {t.Errorf("ReadAll: %v", err)
}
str := fmt.Sprintf("node: %s cachekey: %s", cli.GetRspHeader().Get("balancer-node"), cacheKey)
if err != nil {t.Errorf("ReadAll: %v", err)
}
if string(cli.GetRspBody()) != tt.wantBody {t.Errorf("Body = %q; want %q", cli.GetRspBody(), tt.wantBody)
}
if i <= 10 {nodes1 = append(nodes1, str)
} else {nodes2 = append(nodes2, str)
}
}
utils.PrintDiff(strings.Join(nodes1, "\n"), strings.Join(nodes2, "\n"))
fmt.Println("-------- node 调用次数 --------")
for _, node := range hashconsistency.GetCfg().Nodes {log.Printf("node: %s useCount: %d", node.URL, node.UseCount)
}
})
}
}
测试后果如下:
-------- node 调度程序 --------
2022/04/08 15:14:55 http://127.0.0.1:8081 is dead.
node: 127.0.0.18082 cachekey: user:id:1
-node: 127.0.0.18081 cachekey: user:id:2
+node: 127.0.0.18083 cachekey: user:id:2
node: 127.0.0.18083 cachekey: user:id:3
node: 127.0.0.18082 cachekey: user:id:4
node: 127.0.0.18082 cachekey: user:id:5
node: 127.0.0.18082 cachekey: user:id:6
node: 127.0.0.18083 cachekey: user:id:7
node: 127.0.0.18083 cachekey: user:id:8
node: 127.0.0.18082 cachekey: user:id:9
node: 127.0.0.18083 cachekey: user:id:0
-------- node 调用次数 --------
2022/04/08 15:14:55 node: 127.0.0.1:8081 useCount: 1
2022/04/08 15:14:55 node: 127.0.0.1:8082 useCount: 10
2022/04/08 15:14:55 node: 127.0.0.1:8083 useCount: 10
测试后果合乎预期,nice :)
go-zero
go-zero 的负载平衡算法通过替换 grpc 默认负载平衡算法来实现负载平衡
具体正文代码请参阅 https://github.com/TTSimple/g…
咱们看看其中外围的两个算法
- 一、牛顿冷却
原理请参阅 https://www.ruanyifeng.com/bl…
const (decayTime = int64(time.Second * 1) // 消退工夫
)
type NLOC struct{}
func NewNLOC() *NLOC {return &NLOC{}
}
func (n *NLOC) Hot(timex time.Time) float64 {td := time.Now().Unix() - timex.Unix()
if td < 0 {td = 0}
w := math.Exp(float64(-td) / float64(decayTime))
// w, _ = utils.MathRound(w, 9)
return w
}
咱们来测试下:
func Test_NLOC(t *testing.T) {timer := time.NewTimer(time.Second * 10)
quit := make(chan struct{})
defer timer.Stop()
go func() {
<-timer.C
close(quit)
}()
timex := time.Now()
go func() {n := NewNLOC()
ticker := time.NewTicker(time.Second * 1)
for {
<-ticker.C
fmt.Println(n.Hot(timex))
}
}()
for {
<-quit
return
}
}
测试后果如下:
0.999999900000005
0.99999980000002
0.999999700000045
0.99999960000008
0.999999500000125
0.99999940000018
0.999999300000245
0.99999920000032
0.999999100000405
0.9999990000005
从下面后果中能够看出,热度是随工夫逐步消退的
- 二、EWMA 滑动均匀
原理请参阅 https://blog.csdn.net/mzpmzk/…
const (
AVG_METRIC_AGE float64 = 30.0
DECAY float64 = 2 / (float64(AVG_METRIC_AGE) + 1)
)
type SimpleEWMA struct {// 以后平均值。在用 Add() 增加后,这个值会更新所有数值的平均值。value float64
}
// 增加并更新滑动平均值
func (e *SimpleEWMA) Add(value float64) {
if e.value == 0 { // this is a proxy for "uninitialized"
e.value = value
} else {e.value = (value * DECAY) + (e.value * (1 - DECAY))
}
}
// 获取以后滑动平均值
func (e *SimpleEWMA) Value() float64 {return e.value}
// 设置 ewma 值
func (e *SimpleEWMA) Set(value float64) {e.value = value}
编写测试代码测试下:
const testMargin = 0.00000001
var samples = [100]float64{
4599, 5711, 4746, 4621, 5037, 4218, 4925, 4281, 5207, 5203, 5594, 5149,
4948, 4994, 6056, 4417, 4973, 4714, 4964, 5280, 5074, 4913, 4119, 4522,
4631, 4341, 4909, 4750, 4663, 5167, 3683, 4964, 5151, 4892, 4171, 5097,
3546, 4144, 4551, 6557, 4234, 5026, 5220, 4144, 5547, 4747, 4732, 5327,
5442, 4176, 4907, 3570, 4684, 4161, 5206, 4952, 4317, 4819, 4668, 4603,
4885, 4645, 4401, 4362, 5035, 3954, 4738, 4545, 5433, 6326, 5927, 4983,
5364, 4598, 5071, 5231, 5250, 4621, 4269, 3953, 3308, 3623, 5264, 5322,
5395, 4753, 4936, 5315, 5243, 5060, 4989, 4921, 4480, 3426, 3687, 4220,
3197, 5139, 6101, 5279,
}
func withinMargin(a, b float64) bool {return math.Abs(a-b) <= testMargin
}
func TestSimpleEWMA(t *testing.T) {
var e SimpleEWMA
for _, f := range samples {e.Add(f)
}
fmt.Println(e.Value())
if !withinMargin(e.Value(), 4734.500946466118) {t.Errorf("e.Value() is %v, wanted %v", e.Value(), 4734.500946466118)
}
e.Set(1.0)
if e.Value() != 1.0 {t.Errorf("e.Value() is %v", e.Value())
}
}
测试胜利,加油!!!
援用文章:
- 自适应负载平衡算法原理与实现
- 基于 gRPC 的注册发现与负载平衡的原理和实战
- 负载平衡 -P2C 算法
- Kratos 源码剖析:Warden 负载平衡算法之 P2C
- Golang 实现加权轮询负载平衡
- 指数加权挪动均匀 (Exponential Weighted Moving Average)