读写锁RWMutex
type RWMutex struct { w Mutex // 互斥锁解决多个writer的竞争 writerSem uint32 // writer信号量 readerSem uint32 // reader信号量 readerCount int32 // reader的数量 readerWait int32 // writer期待实现的reader的数量}// 最大的 reader 数量。const rwmutexMaxReaders = 1 << 30
RLock/RUnlock
当 writer 申请锁的时候,是无奈扭转既有的reader 持有锁的事实的,也不会强制这些 reader 开释锁,它的优先权只是限定起初的reader 不要和它抢。所以,rUnlockSlow 将持有锁的 reader 计数缩小 1 的时候,会查看既有的 reader 是不是都曾经开释了锁,如果都开释了锁,就会唤醒 writer,让 writer 持有锁。
func (rw *RWMutex) RLock() { if race.Enabled { _ = rw.w.state race.Disable() } if atomic.AddInt32(&rw.readerCount, 1) < 0 { // A writer is pending, wait for it. runtime_SemacquireMutex(&rw.readerSem, false, 0) } if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(&rw.readerSem)) }}func (rw *RWMutex) RUnlock() { if race.Enabled { _ = rw.w.state race.ReleaseMerge(unsafe.Pointer(&rw.writerSem)) race.Disable() } if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { // Outlined slow-path to allow the fast-path to be inlined rw.rUnlockSlow(r) } if race.Enabled { race.Enable() }}func (rw *RWMutex) rUnlockSlow(r int32) { if r+1 == 0 || r+1 == -rwmutexMaxReaders { race.Enable() fatal("sync: RUnlock of unlocked RWMutex") } // A writer is pending. if atomic.AddInt32(&rw.readerWait, -1) == 0 { // The last reader unblocks the writer. runtime_Semrelease(&rw.writerSem, false, 1) }}
Lock
一旦一个 writer 取得了外部的互斥锁,就会反转 readerCount 字段,把它从原来的正整
数 readerCount(>=0) 批改为正数(readerCount-rwmutexMaxReaders),让这个字段
放弃两个含意(既保留了 reader 的数量,又示意以后有 writer)。
func (rw *RWMutex) Lock() { if race.Enabled { _ = rw.w.state race.Disable() } // First, resolve competition with other writers. rw.w.Lock() // Announce to readers there is a pending writer. // 反转readerCount,通知reader有writer竞争锁 // 记录以后沉闷的 reader 数量(持有读锁还没有开释的那些 reader) r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders // Wait for active readers. // 如果以后有reader持有锁,那么须要期待 // 如果 readerCount 不是 0,就阐明以后有持有读锁的 reader,RWMutex 须要把这个以后 readerCount 赋值给 readerWait 字段保留下来, 同时,这个 writer 进入阻塞期待状态。每当一个 reader 开释读锁的时候(调用 RUnlock 办法时),readerWait 字段就减 1,直到所有的沉闷的 reader 都开释了读锁,才会唤醒这个 writer if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { runtime_SemacquireMutex(&rw.writerSem, false, 0) } if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(&rw.readerSem)) race.Acquire(unsafe.Pointer(&rw.writerSem)) }}
Unlock
func (rw *RWMutex) Unlock() { if race.Enabled { _ = rw.w.state race.Release(unsafe.Pointer(&rw.readerSem)) race.Disable() } // Announce to readers there is no active writer. // 反转 readerCount 字段 r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) if r >= rwmutexMaxReaders { race.Enable() fatal("sync: Unlock of unlocked RWMutex") } // Unblock blocked readers, if any. // 唤醒阻塞的reader们 for i := 0; i < int(r); i++ { runtime_Semrelease(&rw.readerSem, false, 0) } // Allow other writers to proceed. // 开释外部的互斥锁 rw.w.Unlock() if race.Enabled { race.Enable() }}
在 Lock 办法中,是先获取外部互斥锁,才会批改的其余字段;而在 Unlock 办法中,是先批改的其余字段,才会开释外部互斥锁,这样能力保障字段的批改也受到互斥锁的爱护。
TryRLock /TryLock
TryRLock 尝试锁定 rw 以进行读取并报告它是否胜利
func (rw *RWMutex) TryRLock() bool { if race.Enabled { _ = rw.w.state race.Disable() } for { c := atomic.LoadInt32(&rw.readerCount) if c < 0 { if race.Enabled { race.Enable() } return false } if atomic.CompareAndSwapInt32(&rw.readerCount, c, c+1) { if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(&rw.readerSem)) } return true } }}
TryLock 尝试锁定 rw 以进行写入并报告它是否胜利。
func (rw *RWMutex) TryLock() bool { if race.Enabled { _ = rw.w.state race.Disable() } if !rw.w.TryLock() { if race.Enabled { race.Enable() } return false } if !atomic.CompareAndSwapInt32(&rw.readerCount, 0, -rwmutexMaxReaders) { rw.w.Unlock() if race.Enabled { race.Enable() } return false } if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(&rw.readerSem)) race.Acquire(unsafe.Pointer(&rw.writerSem)) } return true}
RLocker
RLocker 返回一个 Locker 接口,该接口通过调用 rw.RLock 和 rw.RUnlock 来实现 Lock 和 Unlock 办法。
func (rw *RWMutex) RLocker() Locker { return (*rlocker)(rw)}type rlocker RWMutexfunc (r *rlocker) Lock() { (*RWMutex)(r).RLock() }func (r *rlocker) Unlock() { (*RWMutex)(r).RUnlock() }