共计 3320 个字符,预计需要花费 9 分钟才能阅读完成。
前言
golang 的 sync
包下有种锁,一种是 sync.RWMutex
, 另一种是sync.Mutex
, 本文将解说下sync.RWMutex
是如何实现的?实用于什么场景?如何防止 读 / 写 饥饿 问题?就让咱们带着这些问题来看源码是如何实现的
例子
package main
import (
"fmt"
"math/rand"
"sync"
)
type Content struct {
rw sync.RWMutex
val int
}
func (c *Content) Read() int {c.rw.RLock()
defer c.rw.RUnlock()
return c.val
}
func (c *Content) Write(v int) {c.rw.Lock()
defer c.rw.Unlock()
c.val = v
}
func main() {
const (
readerNum = 100
writerNum = 3
)
content := new(Content)
var wg sync.WaitGroup
for i := 0; i < writerNum; i++ {wg.Add(1)
go func() {defer wg.Done()
content.Write(rand.Intn(10))
}()}
for i := 0; i < readerNum; i++ {wg.Add(1)
go func() {defer wg.Done()
fmt.Println(content.Read())
}()}
}
互斥性
- 读读不互斥
- 读写互斥
- 写写互斥
源码
type RWMutex struct {
w Mutex // held if there are pending writers // 当要获取写锁时,须要对 w 加锁
writerSem uint32 // semaphore for writers to wait for completing readers //writers 应用的信号量,用于期待 readers 实现读操作
readerSem uint32 // semaphore for readers to wait for completing writers //readers 应用的信号量,用于期待 writers 实现写申请
readerCount int32 // number of pending readers // 以后正在读的 readers 数量,也即曾经获取读锁胜利的数量
readerWait int32 // number of departing readers // 期待 readers 实现读操作的数量,从 readerCount 拷贝过去,用于写锁申请时,示意还剩多少读锁未开释
}
获取读锁
func (rw *RWMutex) RLock() {
...
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
...
}
若 readerCount
大于 0 时,阐明曾经有 reader 获取读锁,那么间接返回胜利,示意获取读锁胜利,若 atomic.AddInt32(&rw.readerCount, 1)<0
示意曾经有写锁再排队,此时写锁会将 readerCount
置为一个很小的正数 (下文源码会解释),那么这个时候有 reader 来获取读锁时,只能在 readerSem
中排队,这样就不会导致写锁饥饿.
获取写锁
func (rw *RWMutex) Lock() {
...
// First, resolve competition with other writers.
rw.w.Lock()
// Announce to readers there is a pending writer.
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders // 注:rwmutexMaxReaders = 1 << 30
// Wait for active readers.
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
...
}
writer 获取写锁是,首先 w
进行加锁,这样就能够防止其余的 writer 也来获取写锁。
atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders)
将 readerCount
置为一个很小的正数,这样就能够阻止 reader 间接获取读锁, 从而在 readerSem
中排队。
曾经阻止了起初的 writer 和 reader,那么须要期待曾经胜利获取读锁的 reader 开释读锁,这里能力获取写锁,这里将 readerCount
拷贝到 readerWait
, 而后本次 writer 进入 writerSem
中排队,期待曾经获取读锁的 reader 开释读锁,并告诉这个 writer.
开释读锁
func (rw *RWMutex) RUnlock() {
...
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
...
}
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
由下面获取读锁可知,每次获取一个读锁,readerCount
加一,所以这里须要减一,如果减一之后小于 0,阐明有 writer 正在获取锁。那么,须要调用 rUnlockSlow
进行后续操作。
- 判断
readerWait
是否等于 0,也即是否还有 reader 还没有开释读锁。 - 若等于 0,则示意在 writer 获取写锁开始,全副的 reader 曾经开释读锁,这时就须要告诉唤醒之前那个还阻塞在获取写锁的 writer
开释写锁
func (rw *RWMutex) Unlock() {
...
// Announce to readers there is no active writer.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {throw("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
...
}
这里次要通过 atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
复原readerCount
,复原后的值就是以后阻塞在获取读锁的 reader 数量, 这时就须要
runtime_Semrelease(&rw.readerSem, false, 0)
将这些 reader 全副唤醒,示意他们获取到读锁。
性能比拟
以下数据来自参考文献 [1] 中作者 benchmark 的数据,这里应用 sync.Lock
和sync.RWMutex
来比展现应用读写锁性能劣势,其中 writeRadio 示意 reader:writer 的比值,耗时减低绝对 sync.Lock
而言。阐明在读多写少的场景中,读写锁能大幅晋升性能。
writeRatio | 3 | 10 | 20 | 50 | 100 | 1000 |
---|---|---|---|---|---|---|
耗时升高 | 24% | 71.3% | 83.7% | 90.9% | 93.5% | 95.7% |
参考文献
- https://segmentfault.com/a/11…