共计 4879 个字符,预计需要花费 13 分钟才能阅读完成。
有时候在 Go 代码中可能会存在多个 goroutine 同时操作一个资源(临界区),这种状况会产生竞态问题(数据竞态)。Sync 包次要实现了并发工作同步 WaitGroup 的几种办法和并发平安的互斥锁和读写锁办法,还实现了比拟非凡的两个办法,一个是放弃只执行一次的 Once 办法和线程平安的 Map。
sync.WaitGroup(同步期待)
sync.WaitGroup 外部保护着一个计数器 Add(),计数器的值能够减少和缩小。例如当咱们启动了 N 个并发工作时,就将计数器值减少 N。每个工作实现时通过调用 Done() 办法将计数器减 1,底层为 Add(-1)。通过调用 Wait() 来期待并发工作执行完,当计数器值为 0 时,示意所有并发工作曾经实现。
var x int64 | |
var wg sync.WaitGroup | |
func add() { | |
for i := 0; i < 5000; i++ {x = x + 1 // 数据竞争} | |
wg.Done()} | |
func main() {wg.Add(2) | |
go add() | |
go add() | |
wg.Wait() | |
fmt.Println(x) | |
} |
下面的代码中咱们开启了两个 goroutine 去累加变量 x 的值,这两个 goroutine 在拜访和批改 x 变量的时候就会存在数据竞争,导致最初的后果与期待的不符。
sync.Mutex(互斥锁)
互斥锁是一种罕用的管制共享资源拜访的办法,它可能保障同时只有一个 goroutine 能够访问共享资源。Go 语言中应用 sync 包的 Mutex 类型来实现互斥锁。应用互斥锁来修复下面代码的问题:
var x int64 | |
var wg sync.WaitGroup | |
var lock sync.Mutex | |
func add() { | |
for i := 0; i < 5000; i++ {lock.Lock() // 加锁 | |
x = x + 1 | |
lock.Unlock() // 解锁} | |
wg.Done()} | |
func main() {wg.Add(2) | |
go add() | |
go add() | |
wg.Wait() | |
fmt.Println(x) | |
} |
应用互斥锁可能保障同一时间有且只有一个 goroutine 进入临界区,其余的 goroutine 则在期待锁;当互斥锁开释后,期待的 goroutine 才能够获取锁进入临界区,多个 goroutine 同时期待一个锁时,唤醒的策略是随机的。
sync.RWMutex(读写互斥锁)
互斥锁是齐全互斥的,然而有很多理论的场景下是读多写少的,当咱们并发的去读取一个资源不波及资源批改的时候是没有必要加锁的,这种场景下应用读写锁是更好的一种抉择。读写锁在 Go 语言中应用 sync 包中的 RWMutex 类型。
读写锁分为两种:读锁和写锁。当一个 goroutine 获取读锁之后,其余的 goroutine 如果是获取读锁会持续取得锁,如果是获取写锁就会期待;当一个 goroutine 获取写锁之后,其余的 goroutine 无论是获取读锁还是写锁都会期待。
读写锁示例:
var ( | |
x int64 | |
wg sync.WaitGroup | |
lock sync.Mutex | |
rwlock sync.RWMutex | |
) | |
func write() {// lock.Lock() // 加互斥锁 | |
rwlock.Lock() // 加写锁 | |
x = x + 1 | |
time.Sleep(10 * time.Millisecond) // 假如读操作耗时 10 毫秒 | |
rwlock.Unlock() // 解写锁 | |
// lock.Unlock() // 解互斥锁 | |
wg.Done()} | |
func read() {// lock.Lock() // 加互斥锁 | |
rwlock.RLock() // 加读锁 | |
time.Sleep(time.Millisecond) // 假如读操作耗时 1 毫秒 | |
rwlock.RUnlock() // 解读锁 | |
// lock.Unlock() // 解互斥锁 | |
wg.Done()} | |
func main() {start := time.Now() | |
for i := 0; i < 10; i++ {wg.Add(1) | |
go write()} | |
for i := 0; i < 1000; i++ {wg.Add(1) | |
go read()} | |
wg.Wait() | |
end := time.Now() | |
fmt.Println(end.Sub(start)) | |
} |
须要留神的是读写锁非常适合读多写少的场景,如果读和写的操作差异不大,读写锁的劣势就施展不进去。
sync.Once(单例)
说在后面的话:这是一个进阶知识点。
在编程的很多场景下咱们须要确保某些操作在高并发的场景下只执行一次,例如只加载一次配置文件、只敞开一次通道等。
Go 语言中的 sync 包中提供了一个针对只执行一次场景的解决方案–sync.Once。
sync.Once 只有一个 Do 办法,其签名如下:
func (o *Once) Do(f func()) {}
留神:如果要执行的函数 f 须要传递参数就须要搭配闭包来应用。
sync.Map(线程平安 map)
Go 语言中内置的 map 不是并发平安的。请看上面的示例:
var m = make(map[string]int) | |
func get(key string) int {return m[key] | |
} | |
func set(key string, value int) {m[key] = value | |
} | |
func main() {wg := sync.WaitGroup{} | |
for i := 0; i < 20; i++ {wg.Add(1) | |
go func(n int) {key := strconv.Itoa(n) | |
set(key, n) | |
fmt.Printf("k=:%v,v:=%v\n", key, get(key)) | |
wg.Done()}(i) | |
} | |
wg.Wait()} |
下面的代码开启大量几个 goroutine 的时候可能没什么问题,当并发多了之后执行下面的代码就会报 fatal error: concurrent map writes 谬误。
像这种场景下就须要为 map 加锁来保障并发的安全性了,Go 语言的 sync 包中提供了一个开箱即用的并发平安版 map–sync.Map。开箱即用示意不必像内置的 map 一样应用 make 函数初始化就能间接应用。同时 sync.Map 内置了诸如 Store、Load、LoadOrStore、Delete、Range 等操作方法。
var m = sync.Map{} | |
func main() {wg := sync.WaitGroup{} | |
for i := 0; i < 20; i++ {wg.Add(1) | |
go func(n int) {key := strconv.Itoa(n) | |
m.Store(key, n) | |
value, _ := m.Load(key) | |
fmt.Printf("k=:%v,v:=%v\n", key, value) | |
wg.Done()}(i) | |
} | |
wg.Wait()} |
sync/atomic(原子操作)
代码中的加锁操作因为波及内核态的上下文切换会比拟耗时、代价比拟高。针对根本数据类型咱们还能够应用原子操作来保障并发平安,因为原子操作是 Go 语言提供的办法它在用户态就能够实现,因而性能比加锁操作更好。Go 语言中原子操作由内置的规范库 sync/atomic 提供。
办法 | 操作类型 |
---|---|
func LoadInt32(addr *int32) (val int32) | 读取操作 |
func LoadInt64(addr *int64) (val int64) | 读取操作 |
func LoadUint32(addr *uint32) (val uint32) | 读取操作 |
func LoadUint64(addr *uint64) (val uint64) | 读取操作 |
func LoadUintptr(addr *uintptr) (val uintptr) | 读取操作 |
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer) | 读取操作 |
func StoreInt32(addr *int32, val int32) | 写入操作 |
func StoreInt64(addr *int64, val int64) | 写入操作 |
func StoreUint32(addr *uint32, val uint32) | 写入操作 |
func StoreUint64(addr *uint64, val uint64) | 写入操作 |
func StoreUintptr(addr *uintptr, val uintptr) | 写入操作 |
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer) | 写入操作 |
func AddInt32(addr *int32, delta int32) (new int32) | 批改操作 |
func AddInt64(addr *int64, delta int64) (new int64) | 批改操作 |
func AddUint32(addr *uint32, delta uint32) (new uint32) | 批改操作 |
func AddUint64(addr *uint64, delta uint64) (new uint64) | 批改操作 |
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr) | 批改操作 |
func SwapInt32(addr *int32, new int32) (old int32) | 替换操作 |
func SwapInt64(addr *int64, new int64) (old int64) | 替换操作 |
func SwapUint32(addr *uint32, new uint32) (old uint32) | 替换操作 |
func SwapUint64(addr *uint64, new uint64) (old uint64) | 替换操作 |
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr) | 替换操作 |
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer) | 替换操作 |
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool) | 比拟并替换操作 |
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool) | 比拟并替换操作 |
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool) | 比拟并替换操作 |
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool) | 比拟并替换操作 |
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool) | 比拟并替换操作 |
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool) | 比拟并替换操作 |
示例:
package main | |
import ( | |
"fmt" | |
"sync" | |
"sync/atomic" | |
"time" | |
) | |
var x int64 | |
var l sync.Mutex | |
var wg sync.WaitGroup | |
// 一般版加函数 | |
func add() { | |
// x = x + 1 | |
x++ // 等价于下面的操作 | |
wg.Done()} | |
// 互斥锁版加函数 | |
func mutexAdd() {l.Lock() | |
x++ | |
l.Unlock() | |
wg.Done()} | |
// 原子操作版加函数 | |
func atomicAdd() {atomic.AddInt64(&x, 1) | |
wg.Done()} | |
func main() {start := time.Now() | |
for i := 0; i < 10000; i++ {wg.Add(1) | |
//go add() // 一般版 add 函数 不是并发平安的 | |
//go mutexAdd() // 加锁版 add 函数 是并发平安的,然而加锁性能开销大 | |
go atomicAdd() // 原子操作版 add 函数 是并发平安,性能优于加锁版} | |
wg.Wait() | |
end := time.Now() | |
fmt.Println(x) | |
fmt.Println(end.Sub(start)) | |
} |