关于go:读写锁RWMutex

1次阅读

共计 3592 个字符,预计需要花费 9 分钟才能阅读完成。

读写锁 RWMutex

type RWMutex struct {
    w           Mutex  // 互斥锁解决多个 writer 的竞争
    writerSem   uint32 // writer 信号量
    readerSem   uint32 // reader 信号量
    readerCount int32  // reader 的数量
    readerWait  int32  // writer 期待实现的 reader 的数量
}

// 最大的 reader 数量。const rwmutexMaxReaders = 1 << 30

RLock/RUnlock

当 writer 申请锁的时候,是无奈扭转既有的 reader 持有锁的事实的,也不会强制这些 reader 开释锁,它的优先权只是限定起初的 reader 不要和它抢。所以,rUnlockSlow 将持有锁的 reader 计数缩小 1 的时候,会查看既有的 reader 是不是都曾经开释了锁,如果都开释了锁,就会唤醒 writer,让 writer 持有锁。

func (rw *RWMutex) RLock() {
    if race.Enabled {
        _ = rw.w.state
        race.Disable()}
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
        // A writer is pending, wait for it.
        runtime_SemacquireMutex(&rw.readerSem, false, 0)
    }
    if race.Enabled {race.Enable()
        race.Acquire(unsafe.Pointer(&rw.readerSem))
    }
}

func (rw *RWMutex) RUnlock() {
    if race.Enabled {
        _ = rw.w.state
        race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
        race.Disable()}
    if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
        // Outlined slow-path to allow the fast-path to be inlined
        rw.rUnlockSlow(r)
    }
    if race.Enabled {race.Enable()
    }
}

func (rw *RWMutex) rUnlockSlow(r int32) {
    if r+1 == 0 || r+1 == -rwmutexMaxReaders {race.Enable()
        fatal("sync: RUnlock of unlocked RWMutex")
    }
    // A writer is pending.
    if atomic.AddInt32(&rw.readerWait, -1) == 0 {
        // The last reader unblocks the writer.
        runtime_Semrelease(&rw.writerSem, false, 1)
    }
}

Lock

一旦一个 writer 取得了外部的互斥锁,就会反转 readerCount 字段,把它从原来的正整
数 readerCount(>=0) 批改为正数(readerCount-rwmutexMaxReaders),让这个字段
放弃两个含意(既保留了 reader 的数量,又示意以后有 writer)。

func (rw *RWMutex) Lock() {
    if race.Enabled {
        _ = rw.w.state
        race.Disable()}
    // First, resolve competition with other writers.
    rw.w.Lock()
    // Announce to readers there is a pending writer.
    // 反转 readerCount,通知 reader 有 writer 竞争锁
    // 记录以后沉闷的 reader 数量(持有读锁还没有开释的那些 reader)
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
    // Wait for active readers.
    // 如果以后有 reader 持有锁,那么须要期待
    // 如果 readerCount 不是 0,就阐明以后有持有读锁的 reader,RWMutex 须要把这个以后 readerCount 赋值给 readerWait 字段保留下来,同时,这个 writer 进入阻塞期待状态。每当一个 reader 开释读锁的时候(调用 RUnlock 办法时),readerWait 字段就减 1,直到所有的沉闷的 reader 都开释了读锁,才会唤醒这个 writer
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {runtime_SemacquireMutex(&rw.writerSem, false, 0)
    }
    if race.Enabled {race.Enable()
        race.Acquire(unsafe.Pointer(&rw.readerSem))
        race.Acquire(unsafe.Pointer(&rw.writerSem))
    }
}

Unlock

func (rw *RWMutex) Unlock() {
    if race.Enabled {
        _ = rw.w.state
        race.Release(unsafe.Pointer(&rw.readerSem))
        race.Disable()}

    // Announce to readers there is no active writer.
    // 反转 readerCount 字段
    r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
    if r >= rwmutexMaxReaders {race.Enable()
        fatal("sync: Unlock of unlocked RWMutex")
    }
    // Unblock blocked readers, if any.
    // 唤醒阻塞的 reader 们
    for i := 0; i < int(r); i++ {runtime_Semrelease(&rw.readerSem, false, 0)
    }
    // Allow other writers to proceed.
    // 开释外部的互斥锁
    rw.w.Unlock()
    if race.Enabled {race.Enable()
    }
}

在 Lock 办法中,是先获取外部互斥锁,才会批改的其余字段;而在 Unlock 办法中,是先批改的其余字段,才会开释外部互斥锁,这样能力保障字段的批改也受到互斥锁的爱护。

TryRLock /TryLock

TryRLock 尝试锁定 rw 以进行读取并报告它是否胜利

func (rw *RWMutex) TryRLock() bool {
    if race.Enabled {
        _ = rw.w.state
        race.Disable()}
    for {c := atomic.LoadInt32(&rw.readerCount)
        if c < 0 {
            if race.Enabled {race.Enable()
            }
            return false
        }
        if atomic.CompareAndSwapInt32(&rw.readerCount, c, c+1) {
            if race.Enabled {race.Enable()
                race.Acquire(unsafe.Pointer(&rw.readerSem))
            }
            return true
        }
    }
}

TryLock 尝试锁定 rw 以进行写入并报告它是否胜利。

func (rw *RWMutex) TryLock() bool {
    if race.Enabled {
        _ = rw.w.state
        race.Disable()}
    if !rw.w.TryLock() {
        if race.Enabled {race.Enable()
        }
        return false
    }
    if !atomic.CompareAndSwapInt32(&rw.readerCount, 0, -rwmutexMaxReaders) {rw.w.Unlock()
        if race.Enabled {race.Enable()
        }
        return false
    }
    if race.Enabled {race.Enable()
        race.Acquire(unsafe.Pointer(&rw.readerSem))
        race.Acquire(unsafe.Pointer(&rw.writerSem))
    }
    return true
}

RLocker

RLocker 返回一个 Locker 接口,该接口通过调用 rw.RLock 和 rw.RUnlock 来实现 Lock 和 Unlock 办法。

func (rw *RWMutex) RLocker() Locker {return (*rlocker)(rw)
}

type rlocker RWMutex

func (r *rlocker) Lock()   { (*RWMutex)(r).RLock()}
func (r *rlocker) Unlock() { (*RWMutex)(r).RUnlock()}
正文完
 0