golang 并发安全Map以及分段锁的实现

38次阅读

共计 6624 个字符,预计需要花费 17 分钟才能阅读完成。

涉及概念

并发安全 Map
分段锁
sync.Map
CAS (Compare And Swap)
双检查

分断锁
type SimpleCache struct {
mu sync.RWMutex
items map[interface{}]*simpleItem
}
    在日常开发中,上述这种数据结构肯定不少见,因为 golang 的原生 map 是非并发安全的,所以为了保证 map 的并发安全,最简单的方式就是给 map 加锁。之前使用过两个本地内存缓存的开源库,gcache, cache2go,其中存储缓存对象的结构都是这样,对于轻量级的缓存库,为了设计简洁(包含清理过期对象等 ) 再加上当需要缓存大量数据时有 redis,memcache 等明星项目解决。但是如果抛开这些因素遇到真正数量巨大的数据量时,直接对一个 map 加锁,当 map 中的值越来越多,访问 map 的请求越来越多,大家都竞争这一把锁显得并发访问控制变重。在 go1.9 引入 sync.Map 之前,比较流行的做法就是使用分段锁,顾名思义就是将锁分段,将锁的粒度变小,将存储的对象分散到各个分片中,每个分片由一把锁控制,这样使得当需要对在 A 分片上的数据进行读写时不会影响 B 分片的读写。
分段锁的实现
// Map 分片
type ConcurrentMap []*ConcurrentMapShared

// 每一个 Map 是一个加锁的并发安全 Map
type ConcurrentMapShared struct {
items map[string]interface{}
sync.RWMutex // 各个分片 Map 各自的锁
}
    主流的分段锁,即通过 hash 取模的方式找到当前访问的 key 处于哪一个分片之上,再对该分片进行加锁之后再读写。分片定位时,常用有 BKDR, FNV32 等 hash 算法得到 key 的 hash 值。
func New() ConcurrentMap {
// SHARD_COUNT 默认 32 个分片
m := make(ConcurrentMap, SHARD_COUNT)
for i := 0; i < SHARD_COUNT; i++ {
m[i] = &ConcurrentMapShared{
items: make(map[string]interface{}),
}
}
return m
}
    在初始化好分片后,对分片上的数据进行读写时就需要用 hash 取模进行分段定位来确认即将要读写的分片。
获取段定位
func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared {
return m[uint(fnv32(key))%uint(SHARD_COUNT)]
}

// FNV hash
func fnv32(key string) uint32 {
hash := uint32(2166136261)
const prime32 = uint32(16777619)
for i := 0; i < len(key); i++ {
hash *= prime32
hash ^= uint32(key[i])
}
return hash
}
之后对于 map 的 GET SET 就简单顺利成章的完成
Set And Get
func (m ConcurrentMap) Set(key string, value interface{}) {
shard := m.GetShard(key) // 段定位找到分片
shard.Lock() // 分片上锁
shard.items[key] = value // 分片操作
shard.Unlock() // 分片解锁
}

func (m ConcurrentMap) Get(key string) (interface{}, bool) {
shard := m.GetShard(key)
shard.RLock()
val, ok := shard.items[key]
shard.RUnlock()
return val, ok
}
    由此一个分段锁 Map 就实现了,但是比起普通的 Map, 常用到的方法比如获取所有 key, 获取所有 Val 操作是要比原生 Map 复杂的,因为要遍历每一个分片的每一个数据,好在 golang 的并发特性使得解决这类问题变得非常简单
Keys
// 统计当前分段 map 中 item 的个数
func (m ConcurrentMap) Count() int {
count := 0
for i := 0; i < SHARD_COUNT; i++ {
shard := m[i]
shard.RLock()
count += len(shard.items)
shard.RUnlock()
}
return count
}

// 获取所有的 key
func (m ConcurrentMap) Keys() []string {
count := m.Count()
ch := make(chan string, count)

// 每一个分片启动一个协程 遍历 key
go func() {
wg := sync.WaitGroup{}
wg.Add(SHARD_COUNT)
for _, shard := range m {

go func(shard *ConcurrentMapShared) {
defer wg.Done()

shard.RLock()

// 每个分片中的 key 遍历后都写入统计用的 channel
for key := range shard.items {
ch <- key
}

shard.RUnlock()
}(shard)
}
wg.Wait()
close(ch)
}()

keys := make([]string, count)
// 统计各个协程并发读取 Map 分片的 key
for k := range ch {
keys = append(keys, k)
}
return keys
}
    这里写了一个 benchMark 来对该分段锁 Map 和原生的 Map 加锁方式进行压测,场景为将一万个不重复的键值对同时以 100 万次写和 100 万次读,分别进行 5 次压测,如下压测代码
func BenchmarkMapShared(b *testing.B) {
num := 10000
testCase := genNoRepetTestCase(num) // 10000 个不重复的键值对
m := New()
for _, v := range testCase {
m.Set(v.Key, v.Val)
}
b.ResetTimer()

for i := 0; i < 5; i++ {
b.Run(strconv.Itoa(i), func(b *testing.B) {

b.N = 1000000

wg := sync.WaitGroup{}
wg.Add(b.N * 2)
for i := 0; i < b.N; i++ {
e := testCase[rand.Intn(num)]

go func(key string, val interface{}) {
m.Set(key, val)
wg.Done()
}(e.Key, e.Val)

go func(key string) {
_, _ = m.Get(key)
wg.Done()
}(e.Key)

}
wg.Wait()
})
}
}

原生 Map 加锁压测结果
分段锁压测结果
可以看出在将锁的粒度细化后再面对大量需要控制并发安全的访问时,分段锁 Map 的耗时比原生 Map 加锁要快 3 倍有余
Sync.Map
    go1.9 之后加入了支持并发安全的 Map sync.Map, sync.Map 通过一份只使用原子操作的数据和一份冗余了只读数据的加锁数据实现一定程度上的读写分离,使得大多数读操作和更新操作是原子操作,写入新数据才加锁的方式来提升性能。以下是 sync.Map 源码剖析, 结构体中的注释都会在具体实现代码中提示相呼应
type Map struct {
// 保护 dirty 的锁
mu Mutex
// 只读数据(修改采用原子操作 )
read atomic.Value
// 包含只读中所有数据(冗余),写入新数据时也在 dirty 中操作
dirty map[interface{}]*entry
// 当原子操作访问只读 read 时找不到数据时会去 dirty 中寻找,此时 misses+1,dirty 及作为存储新写入的数据,又冗余了只读结构中的数据,所以当 misses > dirty 的长度时,会将 dirty 升级为 read,同时将老的 dirty 置 nil
misses int
}

// Map struct 中的 read 就是 readOnly 的指针
type readOnly struct {
// 基础 Map
m map[interface{}]*entry
// 用于表示当前 dirty 中是否有 read 中不存在的数据,在写入数据时,如果发现 dirty 中没有新数据且 dirty 为 nil 时,会将 read 中未被删除的数据拷贝一份冗余到 dirty 中,过程与 Map struct 中的 misses 相呼应
amended bool
}

// 数据项
type entry struct {
p unsafe.Pointer
}

// 用于标记数据项已被删除(主要保证数据冗余时的并发安全)
// 上述 Map 结构中说到有一个将 read 数据拷贝冗余至 dirty 的过程,因为删除数据项是将 *entry 置 nil, 为了避免冗余过程中因并发问题导致 *entry 改变而影响到拷贝后的 dirty 正确性,所以 sync.Map 使用 expunged 来标记 entry 是否被删除
var expunged = unsafe.Pointer(new(interface{}))

    在下面 sync.Map 具体实现中将会看到很多“双检查”代码,因为通过原子操作获取的值可能在进行其他非原子操作过程中已改变,所以再非原子操作后需要使用之前原子操作获取的值需要再次进行原子操作获取。
    compareAndSwap 交换并比较,用于在多线程编程中实现不被打断的数据交换操作,从而避免多线程同时改写某一数据时导致数据不一致问题。
sync.Map Write
func (m *Map) Store(key, value interface{}) {
// 先不上锁,而是从只读数据中按 key 读取,如果已存在以 compareAndSwap 操作进行覆盖 (update)
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}

m.mu.Lock()
// 双检查获取 read
read, _ = m.read.Load().(readOnly)
// 如果 data 在 read 中,更新 entry
if e, ok := read.m[key]; ok {
// 如果原子操作读到的数据是被标记删除的,则视为新数据写入 dirty
if e.unexpungeLocked() {
m.dirty[key] = e
}
// 原子操作写新数据
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
// 原子操作写新数据
e.storeLocked(&value)
} else {
// 新数据
// 当 dirty 中没有新数据时,将 read 中数据冗余到 dirty
if !read.amended {
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}

m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}

func (e *entry) tryStore(i *interface{}) bool {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return false
}
for {
if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
return true
}
p = atomic.LoadPointer(&e.p)
if p == expunged {
return false
}
}
}

// 在 dirty 中没有比 read 多出的新数据时触发冗余
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}

read, _ := m.read.Load().(readOnly)
m.dirty = make(map[interface{}]*entry, len(read.m))
for k, e := range read.m {
// 检查 entry 是否被删除,被删除的数据不冗余
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}

func (e *entry) tryExpungeLocked() (isExpunged bool) {
p := atomic.LoadPointer(&e.p)
for p == nil {
// 将被删除(置 nil)的数据以 cas 原子操作标记为 expunged(防止因并发情况下其他操作导致冗余进 dirty 的数据不正确)
if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
return true
}
p = atomic.LoadPointer(&e.p)
}
return p == expunged
}

sync.Map Read
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]

// 只读数据中没有,并且 dirty 有比 read 多的数据,加锁在 dirty 中找
if !ok && read.amended {
m.mu.Lock()
// 双检查,因为上锁之前的语句是非原子性的
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
// 只读中没有读取到的次数 +1
e, ok = m.dirty[key]
// 检查是否达到触发 dirty 升级 read 的条件
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
// atomic.Load 但被标记为删除的会返回 nil
return e.load()
}

func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}

sync.Map DELETE
func (m *Map) Delete(key interface{}) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
// 只读中不存在需要到 dirty 中去删除
if !ok && read.amended {
m.mu.Lock()
// 双检查,因为上锁之前的语句是非原子性的
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
delete(m.dirty, key)
}
m.mu.Unlock()
}
if ok {
e.delete()
}
}

func (e *entry) delete() (hadValue bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return true
}
}
}

同样以刚刚压测原生加锁 Map 和分段锁的方式来压测 sync.Map
    压测平均下来 sync.Map 和分段锁差别不大,但是比起分段锁,sync.Map 则将锁的粒度更加的细小到对数据的状态上,使得大多数据可以无锁化操作,同时比分段锁拥有更好的拓展性,因为分段锁使用前总是要定一个分片数量,在做扩容或者缩小时很麻烦, 但要达到 sync.Map 这种性能既好又能动态扩容的程度,代码就相对复杂很多。
    还有注意在使用 sync.Map 时切忌不要将其拷贝,go 源码中有对 sync.Map 注释到”A Map must not be copied after first use.”因为当 sync.Map 被拷贝之后,Map 类型的 dirty 还是那个 map 但是 read 和 锁却不是之前的 read 和锁 (都不在一个世界你拿什么保护我), 所以必然导致并发不安全(为了写博我把 sync.Map 代码复制出来一份把私有成员改成可外部访问的打印指针)

正文完
 0