前言
哈喽,大家好,我是
asong
。在上一文中:面试官:哥们 Go 语言互斥锁理解到什么水平了?咱们一起学习了 Go 语言中互斥锁是如何实现的,本文咱们就来一起学习 Go 语言中读写锁是如何设计的,互斥锁能够保障多线程在拜访同一片内存时不会呈现竞争来保障并发平安,因为互斥锁锁定代码临界区,所以当并发量较高的场景下会加剧锁竞争,执行效率就会越来越差;因而就引申出更细粒度的锁:读写锁,实用于读多写少的情景,接下来咱们就具体看看读写锁。
Golang 版本:1.118
读写锁简介
互斥锁咱们都晓得会锁定代码临界区,当有一个 goroutine
获取了互斥锁后,任何 goroutine
都不能够获取互斥锁,只能期待这个 goroutine
将互斥锁开释,无论读写操作都会加上一把大锁,在读多写少场景效率会很低,所以大佬们就设计出了读写锁,读写锁顾名思义是一把锁分为两局部:读锁和写锁,读锁容许多个线程同时取得,因为读操作自身是线程平安的,而写锁则是互斥锁,不容许多个线程同时取得写锁,并且写操作和读操作也是互斥的,总结来说:读读不互斥,读写互斥,写写互斥;
为什么要有读锁
有些敌人可能会有纳闷,为什么要有读锁,读操作又不会批改数据,多线程同时读取雷同的资源就是平安的,为什么还要加一个读锁呢?
举个例子阐明,在 Golang
中变量的赋值不是并发平安的,比方对一个 int
型变量执行 count++
操作,在并发下执行就会呈现预期之外的后果,因为 count++
操作分为三局部:读取 count
的值、将 count
的值加 1
,而后再将后果赋值给count
,这不是一个原子性操作,未加锁时在多个线程同时对该变量执行count++
操作会造成数据不统一,通过加上写锁能够解决这个问题,然而在读取的时候咱们不加读锁会怎么样呢?写个例子来看一下,只加写锁,不加读锁:
package main
import "sync"
const maxValue = 3
type test struct {
rw sync.RWMutex
index int
}
func (t *test) Get() int {return t.index}
func (t *test)Set() {t.rw.Lock()
t.index++
if t.index >= maxValue{t.index =0}
t.rw.Unlock()}
func main() {t := test{}
sw := sync.WaitGroup{}
for i:=0; i < 100000; i++{sw.Add(2)
go func() {t.Set()
sw.Done()}()
go func() {val := t.Get()
if val >= maxValue{print("get value error| value=", val, "\n")
}
sw.Done()}()}
sw.Wait()}
运行后果:
get value error| value=3
get value error| value=3
get value error| value=3
get value error| value=3
get value error| value=3
.....
每次运行后果都是不固定的,因为咱们没有加读锁,如果容许同时读和写,读取到的数据有可能就是中间状态,所以咱们能够总结进去读锁是很有必要的,读锁能够避免读到写两头的值。
读写锁的插队策略
多个读操作同时进行时也是线程平安的,一个线程获取读锁后,另外一个线程同样能够获取读锁,因为读锁是共享的,如果始终都有线程加读锁,前面再有线程加写锁就会始终获取不到锁造成阻塞,这时就须要一些策略来保障锁的公平性,避免出现锁饥饿,那么 Go
语言中读写锁采纳的是什么插队策略来防止饥饿问题呢?
这里咱们用一个例子来阐明一下 Go
语言的插队策略:
假如当初有 5 个 goroutine
别离是 G1
、G2
、G3
、G4
、G5
,当初G1
、G2
获取读锁胜利,还没开释读锁,G3
要执行写操作,获取写锁失败就会阻塞期待,以后阻塞写锁的读锁 goroutine
数量为 2:
后续 G4
进来想要获取读锁,这时她就会判断如果以后有写锁的 goroutine
正在阻塞期待,为了防止写锁饥饿,那这个 G4
也会进入阻塞期待,后续 G5
进来想要获取写锁,因为 G3
在占用互斥锁,所以 G5
会进入自旋 / 休眠 阻塞期待;
当初 G1
、G2
开释了读锁,当开释读锁是判断如果阻塞写锁 goroutine 的读锁 goroutine 数量为 0 了并且有写锁期待就会唤醒正在阻塞期待的写锁 G3
,G3
失去了唤醒:
G3
解决完写操作后会开释写锁,这一步会同时唤醒期待的读锁 / 写锁的 goroutine
,至于G4
、G5
谁能先获取锁就看谁比拟快了,就像抢媳妇一样,先下手的先得呀。
读写锁的实现
接下来咱们就深刻源码剖析一下,先看一下 RWMutex
构造都有啥:
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount int32 // number of pending readers
readerWait int32 // number of departing readers
}
w
:复用互斥锁提供的能力;writerSem
:写操作goroutine
阻塞期待信号量,当阻塞写操作的读操作goroutine
开释读锁时,通过该信号量告诉阻塞的写操作的goroutine
;readerSem
:读操作goroutine
阻塞期待信号量,当写操作goroutine
开释写锁时,通过该信号量告诉阻塞的读操作的goroutine
;redaerCount
:以后正在执行的读操作goroutine
数量;readerWait
:当写操作被阻塞时期待的读操作goroutine
个数;
读锁
读锁的对应办法如下:
func (rw *RWMutex) RLock() {
// 原子操作 readerCount 只有值不是正数就示意获取读锁胜利
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// 有一个正在期待的写锁,为了防止饥饿前面进来的读锁进行阻塞期待
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
}
精简了竞态检测的办法,读锁办法就只有两行代码了,逻辑如下:
应用原子操作更新 readerCount
,将readercount
值加 1
,只有原子操作后值不为正数就示意加读锁胜利,如果值为正数示意曾经有写锁获取互斥锁胜利,写锁goroutine
正在期待或运行,所以为了防止饥饿前面进来的读锁要进行阻塞期待,调用 runtime_SemacquireMutex
阻塞期待。
非阻塞加读锁
Go
语言在 1.18
中引入了非阻塞加读锁的办法:
func (rw *RWMutex) TryRLock() bool {
for {
// 读取 readerCount 值能晓得以后是否有写锁在阻塞期待,如果值为正数,那么前面的读锁就会被阻塞住
c := atomic.LoadInt32(&rw.readerCount)
if c < 0 {
if race.Enabled {race.Enable()
}
return false
}
// 尝试获取读锁,for 循环不断尝试
if atomic.CompareAndSwapInt32(&rw.readerCount, c, c+1) {
if race.Enabled {race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
return true
}
}
}
因为读锁是共享的,在没有写锁阻塞期待时多个线程能够同时获取,所以原子性操作可能会失败,这里采纳 for
循环来减少尝试次数,很是奇妙。
开释读锁
开释读锁代码次要分为两局部,第一局部:
func (rw *RWMutex) RUnlock() {
// 将 readerCount 的值减 1,如果值等于等于 0 间接退出即可;否则进入 rUnlockSlow 解决
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
}
咱们都晓得 readerCount 的值代表以后正在执行的读操作 goroutine
数量,执行递加操作后的值大于等于 0
示意以后没有异样场景或写锁阻塞期待,所以间接退出即可,否则须要解决这两个逻辑:
rUnlockSlow
逻辑如下:
func (rw *RWMutex) rUnlockSlow(r int32) {
// r+ 1 等于 0 示意没有加读锁就开释读锁,异样场景要抛出异样
// r+1 == -rwmutexMaxReaders 也示意没有加读锁就是开释读锁
// 因为写锁加锁胜利后会将 readerCout 的值减去 rwmutexMaxReaders
if r+1 == 0 || r+1 == -rwmutexMaxReaders {race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// 如果有写锁正在期待读锁时会更新 readerWait 的值,所以一步递加 rw.readerWait 值
// 如果 readerWait 在原子操作后的值等于 0 了阐明以后阻塞写锁的读锁都曾经开释了,须要唤醒期待的写锁
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
解读一下这段代码:
r+1
等于0
阐明以后goroutine
没有加读锁就进行开释读锁操作,属于非法操作r+1 == -rwmutexMaxReaders
阐明写锁加锁胜利了会将readerCount
的减去rwmutexMaxReaders
变成正数,如果此前没有加读锁,那么间接开释读锁就会造成这个等式成立,也属于没有加读锁就进行开释读锁操作,属于非法操作;readerWait
代表写操作被阻塞时读操作的goroutine
数量,如果有写锁正在期待时就会更新readerWait
的值,读锁开释锁时须要readerWait
进行递加,如果递加后等于0
阐明以后阻塞写锁的读锁都曾经开释了,须要唤醒期待的写锁。(看下文写锁的代码就响应上了)
写锁
写锁对应的办法如下:
const rwmutexMaxReaders = 1 << 30
func (rw *RWMutex) Lock() {
// First, resolve competition with other writers.
// 写锁也就是互斥锁,复用互斥锁的能力来解决与其余写锁的竞争
// 如果写锁曾经被获取了,其余 goroutine 在获取写锁时会进入自旋或者休眠
rw.w.Lock()
// 将 readerCount 设置为负值,通知读锁当初有一个正在期待运行的写锁(获取互斥锁胜利)r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// 获取互斥锁胜利并不代表 goroutine 获取写锁胜利,咱们默认最大有 2^30 的读操作数目,减去这个最大数目
// 后依然不为 0 则示意后面还有读锁,须要期待读锁开释并更新写操作被阻塞时期待的读操作 goroutine 个数;if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
}
代码量不是很大,然而了解起来还有一点简单,我尝试用文字来解析一下,次要分为两局部:
- 获取互斥锁,写锁也就是互斥锁,这里咱们复用互斥锁
mutex
的加锁能力,当互斥锁加锁胜利后,其余写锁goroutine
再次尝试获取锁时就会进入自旋休眠期待; - 判断获取写锁是否胜利,这里有一个变量
rwmutexMaxReaders = 1 << 30
示意最大反对2^30
个并发读,互斥锁加锁胜利后,假如2^30
个读操作都曾经开释了读锁,通过原子操作将readerCount
设置为正数在加上2^30
,如果此时r
依然不为0
说面还有读操作正在进行,则写锁须要期待,同时通过原子操作更新readerWait
字段,也就是更新写操作被阻塞时期待的读操作goroutine
个数;readerWait
在上文的读锁开释锁时会进行判断,进行递加,以后readerWait
递加到0
时就会唤醒写锁。
非阻塞加写锁
Go 语言
在1.18
中引入了非阻塞加锁的办法:
func (rw *RWMutex) TryLock() bool {
// 先判断获取互斥锁是否胜利,没有胜利则间接返回 false
if !rw.w.TryLock() {
if race.Enabled {race.Enable()
}
return false
}
// 互斥锁获取胜利了,接下来就判断是否是否有读锁正在阻塞该写锁,如果没有间接更新 readerCount 为
// 正数获取写锁胜利;if !atomic.CompareAndSwapInt32(&rw.readerCount, 0, -rwmutexMaxReaders) {rw.w.Unlock()
if race.Enabled {race.Enable()
}
return false
}
return true
}
开释写锁
func (rw *RWMutex) Unlock() {
// Announce to readers there is no active writer.
// 将 readerCount 的复原为负数,也就是解除对读锁的互斥
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// 如果前面还有读操作的 goroutine 则须要唤醒他们
for i := 0; i < int(r); i++ {runtime_Semrelease(&rw.readerSem, false, 0)
}
// 开释互斥锁,写操作的 goroutine 和读操作的 goroutine 同时竞争
rw.w.Unlock()}
开释写锁的逻辑比较简单,开释写锁会将会面的读操作和写操作的 goroutine
都唤醒,而后他们在进行竞争;
总结
因为咱们上文曾经分享了互斥锁的实现形式,再来看读写锁就轻松许多了,文末咱们再来总结一下读写锁:
- 读写锁提供四种操作:读上锁,读解锁,写上锁,写解锁;加锁规定是读读共享,写写互斥,读写互斥,写读互斥;
- 读写锁中的读锁是肯定要存在的,其目标是也是为了躲避原子性问题,只有写锁没有读锁的状况下会导致咱们读取到两头值;
- Go 语言的读写锁在设计上也防止了写锁饥饿的问题,通过字段
readerCount
、readerWait
进行管制,当写锁的goroutine
被阻塞时,前面进来想要获取读锁的goroutine
也都会被阻塞住,当写锁开释时,会将前面的读操作goroutine
、写操作的goroutine
都唤醒,剩下的交给他们竞争吧; -
读锁获取锁流程:
- 锁闲暇时,读锁能够立马被获取
- 如果以后有写锁正在阻塞,那么想要获取读锁的
goroutine
就会被休眠
-
开释读锁流程:
- 以后没有异样场景或写锁阻塞期待呈现的话,则间接开释读锁胜利
- 若没有加读锁就开释读锁则抛出异样;
- 写锁被读锁阻塞期待的场景下,会将
readerWait
的值进行递加,readerWait
示意阻塞写操作 goroutine 的读操作 goroutine 数量,当readerWait
减到0
时则能够唤醒被阻塞写操作的goroutine
了;
-
写锁获取锁流程
- 写锁复用了
mutex
互斥锁的能力,首先尝试获取互斥锁,获取互斥锁失败就会进入自旋 / 休眠; - 获取互斥锁胜利并不代表写锁加锁胜利,此时如果还有占用读锁的
goroutine
,那么就会阻塞住,否则就会加写锁胜利
- 写锁复用了
-
开释写锁流程
- 开释写锁会将负值的
readerCount
变成正值,解除对读锁的互斥 - 唤醒以后阻塞住的所有读锁
- 开释互斥锁
- 开释写锁会将负值的
读写锁的代码量不多,因为其复用了互斥锁的设计,针对读写锁的性能多做了一些工作,了解起来比互斥锁要容易很多,你学会了吗?宝贝~。
好啦,本文到这里就完结了,我是asong,咱们下期见。
欢送关注公众号:Golang 梦工厂