前言

golang 的sync包下有种锁,一种是sync.RWMutex,另一种是sync.Mutex,本文将解说下sync.RWMutex是如何实现的?实用于什么场景?如何防止读/写 饥饿问题?就让咱们带着这些问题来看源码是如何实现的

例子

package mainimport (   "fmt"   "math/rand"   "sync")type Content struct {   rw  sync.RWMutex   val int}func (c *Content) Read() int {   c.rw.RLock()   defer c.rw.RUnlock()   return c.val}func (c *Content) Write(v int) {   c.rw.Lock()   defer c.rw.Unlock()   c.val = v}func main() {   const (      readerNum = 100      writerNum = 3   )   content := new(Content)   var wg sync.WaitGroup   for i := 0; i < writerNum; i++ {      wg.Add(1)      go func() {         defer wg.Done()         content.Write(rand.Intn(10))      }()   }   for i := 0; i < readerNum; i++ {      wg.Add(1)      go func() {         defer wg.Done()         fmt.Println(content.Read())      }()   }}

互斥性

  • 读读不互斥
  • 读写互斥
  • 写写互斥

源码

type RWMutex struct {    w           Mutex  // held if there are pending writers //当要获取写锁时,须要对w加锁    writerSem   uint32 // semaphore for writers to wait for completing readers //writers应用的信号量,用于期待readers实现读操作    readerSem   uint32 // semaphore for readers to wait for completing writers //readers应用的信号量,用于期待writers实现写申请    readerCount int32  // number of pending readers  //以后正在读的readers数量,也即曾经获取读锁胜利的数量    readerWait  int32  // number of departing readers //期待readers实现读操作的数量,从readerCount拷贝过去,用于写锁申请时,示意还剩多少读锁未开释}

获取读锁

func (rw *RWMutex) RLock() {   ...   if atomic.AddInt32(&rw.readerCount, 1) < 0 {       // A writer is pending, wait for it.      runtime_SemacquireMutex(&rw.readerSem, false, 0)   }   ...}

readerCount大于0时,阐明曾经有reader获取读锁,那么间接返回胜利,示意获取读锁胜利,若atomic.AddInt32(&rw.readerCount, 1)<0示意曾经有写锁再排队,此时写锁会将readerCount置为一个很小的正数(下文源码会解释),那么这个时候有reader来获取读锁时,只能在 readerSem中排队,这样就不会导致写锁饥饿.

获取写锁

func (rw *RWMutex) Lock() {   ...   // First, resolve competition with other writers.   rw.w.Lock()   // Announce to readers there is a pending writer.   r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders  //注: rwmutexMaxReaders = 1 << 30   // Wait for active readers.   if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {      runtime_SemacquireMutex(&rw.writerSem, false, 0)   }   ...}

writer 获取写锁是,首先w进行加锁,这样就能够防止其余的writer 也来获取写锁。

atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders)readerCount置为一个很小的正数,这样就能够阻止reader间接获取读锁,从而在 readerSem中排队。

曾经阻止了起初的writer和reader,那么须要期待曾经胜利获取读锁的reader 开释读锁,这里能力获取写锁, 这里将readerCount 拷贝到readerWait,而后本次writer 进入 writerSem中排队,期待曾经获取读锁的reader开释读锁,并告诉这个writer.

开释读锁

func (rw *RWMutex) RUnlock() {   ...   if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {      // Outlined slow-path to allow the fast-path to be inlined      rw.rUnlockSlow(r)   }   ...}func (rw *RWMutex) rUnlockSlow(r int32) {    if r+1 == 0 || r+1 == -rwmutexMaxReaders {        throw("sync: RUnlock of unlocked RWMutex")    }    // A writer is pending.    if atomic.AddInt32(&rw.readerWait, -1) == 0 {        // The last reader unblocks the writer.        runtime_Semrelease(&rw.writerSem, false, 1)    }}

由下面获取读锁可知,每次获取一个读锁,readerCount加一,所以这里须要减一,如果减一之后小于0,阐明有writer正在获取锁。那么,须要调用rUnlockSlow进行后续操作。

  1. 判断readerWait是否等于0,也即是否还有reader 还没有开释读锁。
  2. 若等于0,则示意在writer 获取写锁开始,全副的reader曾经开释读锁,这时就须要告诉唤醒之前那个还阻塞在获取写锁的writer

开释写锁

func (rw *RWMutex) Unlock() {   ...   // Announce to readers there is no active writer.   r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)   if r >= rwmutexMaxReaders {      throw("sync: Unlock of unlocked RWMutex")   }   // Unblock blocked readers, if any.   for i := 0; i < int(r); i++ {      runtime_Semrelease(&rw.readerSem, false, 0)   }   // Allow other writers to proceed.   rw.w.Unlock()   ...}

这里次要通过atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)复原readerCount,复原后的值就是以后阻塞在获取读锁的reader数量,这时就须要

runtime_Semrelease(&rw.readerSem, false, 0)将这些reader 全副唤醒,示意他们获取到读锁。

性能比拟

以下数据来自参考文献[1]中作者benchmark 的数据,这里应用sync.Locksync.RWMutex来比展现应用读写锁性能劣势,其中writeRadio 示意 reader:writer 的比值,耗时减低绝对sync.Lock而言。阐明在读多写少的场景中,读写锁能大幅晋升性能。

writeRatio31020501001000
耗时升高24%71.3%83.7%90.9%93.5%95.7%

参考文献

  1. https://segmentfault.com/a/11...