本文次要钻研一下dapr的consistent hash

consistent_hash

dapr/pkg/placement/hashing/consistent_hash.go

var replicationFactor int// ErrNoHosts is an error for no hostsvar ErrNoHosts = errors.New("no hosts added")// ConsistentHashTables is a table holding a map of consistent hashes with a given versiontype ConsistentHashTables struct {    Version string    Entries map[string]*Consistent}// Host represents a host of stateful entities with a given name, id, port and loadtype Host struct {    Name  string    Port  int64    Load  int64    AppID string}// Consistent represents a data structure for consistent hashingtype Consistent struct {    hosts     map[uint64]string    sortedSet []uint64    loadMap   map[string]*Host    totalLoad int64    sync.RWMutex}
ConsistentHashTables定义了Version、Entries属性,Entries是个map,value为Consistent;Consistent定义了hosts、sortedSet、loadMap、totalLoad、sync.RWMutex属性

GetInternals

dapr/pkg/placement/hashing/consistent_hash.go

// GetInternals returns the internal data structure of the consistent hashfunc (c *Consistent) GetInternals() (map[uint64]string, []uint64, map[string]*Host, int64) {    c.RLock()    defer c.RUnlock()    return c.hosts, c.sortedSet, c.loadMap, c.totalLoad}
GetInternals办法返回hosts、sortedSet、loadMap、totalLoad属性

Add

dapr/pkg/placement/hashing/consistent_hash.go

// Add adds a host with port to the tablefunc (c *Consistent) Add(host, id string, port int64) bool {    c.Lock()    defer c.Unlock()    if _, ok := c.loadMap[host]; ok {        return true    }    c.loadMap[host] = &Host{Name: host, AppID: id, Load: 0, Port: port}    for i := 0; i < replicationFactor; i++ {        h := c.hash(fmt.Sprintf("%s%d", host, i))        c.hosts[h] = host        c.sortedSet = append(c.sortedSet, h)    }    // sort hashes ascendingly    sort.Slice(c.sortedSet, func(i int, j int) bool {        return c.sortedSet[i] < c.sortedSet[j]    })    return false}
Add办法创立Host并增加到loadMap中,之后依据replicationFactor次数对host进行hash并增加到hosts及sortedSet中,最初对sortedSet进行排序

Get

dapr/pkg/placement/hashing/consistent_hash.go

// Get returns the host that owns `key`.//// As described in https://en.wikipedia.org/wiki/Consistent_hashing//// It returns ErrNoHosts if the ring has no hosts in it.func (c *Consistent) Get(key string) (string, error) {    c.RLock()    defer c.RUnlock()    if len(c.hosts) == 0 {        return "", ErrNoHosts    }    h := c.hash(key)    idx := c.search(h)    return c.hosts[c.sortedSet[idx]], nil}
Get办法先对key进行hash,而后通过search查找idx,最初找到idx在sortedSet中对应的host,最初从hosts中返回对应host

GetHost

dapr/pkg/placement/hashing/consistent_hash.go

// GetHost gets a hostfunc (c *Consistent) GetHost(key string) (*Host, error) {    h, err := c.Get(key)    if err != nil {        return nil, err    }    return c.loadMap[h], nil}
GetHost办法先get key,之后再去loadMap获取host

GetLeast

dapr/pkg/placement/hashing/consistent_hash.go

// GetLeast uses Consistent Hashing With Bounded loads//// https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html//// to pick the least loaded host that can serve the key//// It returns ErrNoHosts if the ring has no hosts in it.//func (c *Consistent) GetLeast(key string) (string, error) {    c.RLock()    defer c.RUnlock()    if len(c.hosts) == 0 {        return "", ErrNoHosts    }    h := c.hash(key)    idx := c.search(h)    i := idx    for {        host := c.hosts[c.sortedSet[i]]        if c.loadOK(host) {            return host, nil        }        i++        if i >= len(c.hosts) {            i = 0        }    }}
GetLeast办法先对key进行hash,而后通过search获取idx,之后通过loadOK来获取least loaded host

search

dapr/pkg/placement/hashing/consistent_hash.go

func (c *Consistent) search(key uint64) int {    idx := sort.Search(len(c.sortedSet), func(i int) bool {        return c.sortedSet[i] >= key    })    if idx >= len(c.sortedSet) {        idx = 0    }    return idx}
search办法通过sort.Search依据key获取idx

小结

dapr的Consistent定义了hosts、sortedSet、loadMap、totalLoad、sync.RWMutex属性;它定义了GetInternals、Add、Get、GetHost、GetLeast、search等办法。

doc

  • dapr