关于golang:Go进阶并发编程RWMutex

6次阅读

共计 3409 个字符,预计需要花费 9 分钟才能阅读完成。

规范库中的 RWMutex 是一个 reader/writer 互斥锁。RWMutex 在某一时刻只能由任意数量的 reader 持有,或者是只被单个的 writer 持有。RWMutex 的办法也很少,总共有 5 个:

  • Lock/Unlock:写操作时调用的办法。如果锁曾经被 reader 或者 writer 持有,那么,Lock 办法会始终阻塞,直到能获取到锁;Unlock 则是配对的开释锁的办法。
  • RLock/RUnlock:读操作时调用的办法。如果锁曾经被 writer 持有的话,RLock 办法会始终阻塞,直到能获取到锁,否则就间接返回;而 RUnlock 是 reader 开释锁的办法。
  • RLocker:这个办法的作用是为读操作返回一个 Locker 接口的对象。它的 Lock 办法会调用 RWMutex 的 RLock 办法,它的 Unlock 办法会调用 RWMutex 的 RUnlock 办法。

如果你遇到能够明确辨别 reader 和 writer goroutine 的场景,且有大量的并发读、大量的并发写,并且有强烈的性能需求,就能够思考应用读写锁 RWMutex 替换 Mutex。

实现原理

读写锁设计方案

基于对读和写操作的优先级,读写锁的设计和实现能够分成三类:

  • Read-preferring:读优先的设计能够提供很高的并发性,然而在竞争强烈的状况下可能会导致写饥饿。
  • Write-preferring:写优先的设计意味着,如果曾经有一个 writer 在期待申请锁的话,它会阻止新来的申请锁的 reader 获取到锁,所以优先保障 writer。
  • 不指定优先级:这种设计比较简单,不辨别 reader 和 writer 优先级,某些场景下这种不指定优先级的设计反而更无效。

Go 规范库中的 RWMutex 设计是 Write-preferring 计划,一个正在阻塞的 Lock 调用会排除新的 reader 申请到锁。

数据结构

RWMutex 蕴含一个 Mutex,以及四个辅助字段 writerSem、readerSem、readerCount 和 readerWait:

type RWMutex struct {
  w           Mutex   // 互斥锁解决多个 writer 的竞争
  writerSem   uint32  // writer 信号量
  readerSem   uint32  // reader 信号量
  readerCount int32   // reader 的数量
  readerWait  int32   // writer 期待实现的 reader 的数量
}

const rwmutexMaxReaders = 1 << 30
  • 字段 w:为 writer 的竞争而应用的互斥锁;
  • 字段 readerCount:记录以后 reader 的数量(以及是否有 writer 竞争锁);
  • readerWait:记录 writer 申请锁时须要期待 read 实现的 reader 的数量;
  • writerSem 和 readerSem:都是为了阻塞设计的信号量。
  • 常量 rwmutexMaxReaders:定义了最大的 reader 数量。

RLock/RUnlock

咱们看一下精简后的 RLock 和 RUnlock 办法:

func (rw *RWMutex) RLock() {if atomic.AddInt32(&rw.readerCount, 1) < 0 {
        // rw.readerCount 是负值的时候,意味着此时有 writer 期待申请锁,因为 writer 优先级高,所以把起初的 reader 阻塞休眠
        runtime_SemacquireMutex(&rw.readerSem, false, 0)
    }
}

func (rw *RWMutex) RUnlock() {if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {rw.rUnlockSlow(r) // 有期待的 writer
    }
}

func (rw *RWMutex) rUnlockSlow(r int32) {if atomic.AddInt32(&rw.readerWait, -1) == 0 {
        // 最初一个 reader 了,writer 终于有机会取得锁了
        runtime_Semrelease(&rw.writerSem, false, 1)
    }
}

第 2 行是对 reader 计数加 1。你可能比拟困惑的是,readerCount 怎么还可能为正数呢?其实,这是因为 readerCount 这个字段有双重含意:

  • 没有 writer 竞争或持有锁时,readerCount 和咱们失常了解的 reader 的计数是一样的;
  • 然而,如果有 writer 竞争锁或者持有锁时,那么,readerCount 不仅仅承当着 reader 的计数性能,还可能标识以后是否有 writer 竞争或持有锁,在这种状况下,申请锁的 reader 的解决进入第 4 行,阻塞期待锁的开释。

调用 RUnlock 的时候,咱们须要将 Reader 的计数减去 1(第 9 行),因为 reader 的数量缩小了一个。然而,第 9 行的 AddInt32 的返回值还有另外一个含意。如果它是负值,就示意以后有 writer 竞争锁,在这种状况下,还会调用 rUnlockSlow 办法,查看是不是 reader 都开释读锁了,如果读锁都开释了,那么能够唤醒申请写锁的 writer 了。

rUnlockSlow 将持有锁的 reader 计数缩小 1 的时候,会查看既有的 reader 是不是都曾经开释了锁,如果都开释了锁,就会唤醒 writer,让 writer 持有锁。

Lock

RWMutex 是一个多 writer 多 reader 的读写锁,所以同时可能有多个 writer 和 reader。那么,为了防止 writer 之间的竞争,RWMutex 就会应用一个 Mutex 来保障 writer 的互斥。

一旦一个 writer 取得了外部的互斥锁,就会反转 readerCount 字段,把它从原来的正整数 readerCount(>=0) 批改为正数(readerCount-rwmutexMaxReaders),让这个字段放弃两个含意(既保留了 reader 的数量,又示意以后有 writer)。

咱们来看下上面的代码。第 5 行,还会记录以后沉闷的 reader 数量,所谓沉闷的 reader,就是指持有读锁还没有开释的那些 reader。

func (rw *RWMutex) Lock() {
    // 首先解决其余 writer 竞争问题
    rw.w.Lock()
    // 反转 readerCount,通知 reader 有 writer 竞争锁
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
    // 如果以后有 reader 持有锁,那么须要期待
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {runtime_SemacquireMutex(&rw.writerSem, false, 0)
    }
}

如果 readerCount 不是 0,就阐明以后有持有读锁的 reader,RWMutex 须要把这个以后 readerCount 赋值给 readerWait 字段保留下来(第 7 行),同时,这个 writer 进入阻塞期待状态(第 8 行)。

每当一个 reader 开释读锁的时候(调用 RUnlock 办法时),readerWait 字段就减 1,直到所有的沉闷的 reader 都开释了读锁,才会唤醒这个 writer。

Unlock

当一个 writer 开释锁的时候,它会再次反转 readerCount 字段。能够必定的是,因为以后锁由 writer 持有,所以,readerCount 字段是反转过的,并且减去了 rwmutexMaxReaders 这个常数,变成了正数。所以,这里的反转办法就是给它减少 rwmutexMaxReaders 这个常数值。

既然 writer 要开释锁了,那么就须要唤醒之后新来的 reader,不用再阻塞它们了,让它们继续执行就好了。

在 RWMutex 的 Unlock 返回之前,须要把外部的互斥锁开释。开释结束后,其余的 writer 才能够持续竞争这把锁。

func (rw *RWMutex) Unlock() {
    // 通知 reader 没有沉闷的 writer 了
    r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
    
    // 唤醒阻塞的 reader 们
    for i := 0; i < int(r); i++ {runtime_Semrelease(&rw.readerSem, false, 0)
    }
    // 开释外部的互斥锁
    rw.w.Unlock()}
正文完
 0