关于golang:Go语言sync包控制并发详解

8次阅读

共计 5920 个字符,预计需要花费 15 分钟才能阅读完成。

除了上一节咱们介绍的 channel 通道,还有 sync.Mutex、sync.WaitGroup 这些原始的同步机制来,更加灵便的实现数据同步和管制并发。

资源竞争

所谓资源竞争,就是在程序中,同一块内存同时被多个 goroutine 拜访。对于这个共享的资源(内存)每个 goroutine 都有不同的操作,就有可能造成数据错乱。

示例:

package main

import (
    "fmt"
  "time"
)

var sum = 0
func main() {
    // 开启 100 个协程来让 sum + 1
    for i := 1; i <= 100; i++ {go add()
    }
    // 睡眠两秒避免程序提前退出
  time.Sleep(2 * time.Second)
    fmt.Println("sum:",sum)
}
func add(){sum += 1}
// 运行后果:sum:98 或 sum:99 或 ...
  1. 屡次运行下面的程序,发现打印的后果可能存在不同,因为咱们用多个协程来操作 sum,而 sum 不是并发平安的,存在竞争。
  2. 咱们应用 go build、go run、go test 命令时,增加 -race 标识能够查看代码中是否存在资源竞争。

解决这个问题,咱们能够给资源进行加锁,让其在同一时刻只能被一个协程来操作。

sync.Mutex

  1. 互斥锁,使同一时刻只能有一个协程执行某段程序,其余协程期待该协程执行完再顺次执行。
  2. 互斥锁只有两个办法 Lock(加锁)和 Unlock(解锁),当一个协程对资源上锁后,只有等该协程解锁,其余协程能力再次上锁。
  3. Lock 和 Unlock 是成对呈现,为了避免上锁后遗记开释锁,咱们能够应用 defer 语句来开释锁。

示例:

package main

import (
    "fmt"
    "sync"
    "time"
)

var sum = 0
var mutex = sync.Mutex{}
func main() {
    // 开启 100 个协程来让 sum + 1
    for i := 1; i <= 100; i++ {go add()
    }
    // 睡眠两秒避免程序提前退出
    time.Sleep(2 * time.Second)
    fmt.Println("sum:",sum)
}
func add(){mutex.Lock()
    defer mutex.Unlock() // 应用 defer 语句,确保锁肯定会被开释
    sum += 1
}

symc.RWMutex

  1. 下面咱们应用互斥锁,来避免多个协程同时对 sum 做加法操作的时候产生数据错乱。RWMutex为读写锁,当读取竞争资源的时候,因为数据不会扭转,所以不论多少个 goroutine 读都是并发平安的。
  2. 因为能够多个协程同时读,不再互相期待,所以在性能上比互斥锁会有很大的晋升。

示例:

package main

import (
    "fmt"
    "sync"
    "time"
)

var sum = 0
var mutex = sync.Mutex{}
var rwmutex = sync.RWMutex{}
func main() {
    // 开启 100 个协程来让 sum + 1
    for i := 1; i <= 100; i++ {go add()
    }
    for i := 1; i<= 10; i++ {go fmt.Println("sum:",getSum())
    }
    // 睡眠两秒避免程序提前退出
    time.Sleep(2 * time.Second)
    fmt.Println("sum:", sum)
}
func add(){mutex.Lock()
    defer mutex.Unlock() // 应用 defer 语句,确保锁肯定会被开释
    sum += 1
}
func getSum() int {rwmutex.RLock() // 应用读写锁
    defer  rwmutex.RUnlock()
    return sum
}

sync.WaitGroup

  1. 下面的示例中,咱们都是要了 time.Sleep(2 * time.Second),来避免:主函数 mian 返回,提前退出程序。然而咱们并不知道程序真正什么时候执行完,所以只能设置个长点的工夫防止程序提前退出,这样会产生性能问题。
  2. 这时候咱们就用到了 sync.WaitGroup,它能够监听程序的执行,一旦全副执行结束,程序就能马上退出。

示例:

package main

import (
    "fmt"
    "sync"
)

var sum = 0
var mutex = sync.Mutex{}
var rwmutex = sync.RWMutex{}

func run() {
    var wg sync.WaitGroup
    // 因为要监控 110 个协程,所以设置计数器为 110
    wg.Add(110)
    for i := 1; i <= 100; i++ {go func() {
            // 计数器值减 1
            defer wg.Done()
            add()}()}
    for i := 1; i <= 10; i++ {go func() {
            // 计数器值减 1
            defer wg.Done()
            fmt.Println("sum:", getSum())
        }()}
    // 始终期待,只有计数器值为 0
    wg.Wait()}
func main() {run()
}
func add() {mutex.Lock()
    defer mutex.Unlock() // 应用 defer 语句,确保锁肯定会被开释
    sum += 1
}
func getSum() int {rwmutex.RLock() // 应用读写锁
    defer rwmutex.RUnlock()
    return sum
}
  • 示例中咱们先申明了 sync.WaitGroup,而后通过 Add() 办法设置计数器的值,也就是说有多少个协程监听。
  • 在每个协程执行结束后,调用 Done 办法来使计算器减 1。
  • 最初调用 Wait 办法始终期待,直到计数器为 0,所以协程全副执行结束。

sync.Once

有时候咱们只心愿代码执行一次,即便是在高并发的场景下,比方创立一个单例。这种状况能够应用 sync.Once 来保障代码只执行一次。

示例:

package main

import (
    "fmt"
    "sync"
)

func main() {
    var once sync.Once
    onceBody := func() {fmt.Println("Only once")
    }
  // 用于期待协程执行结束
    done := make(chan bool)
  // 启动 10 个协程执行 once.Do(onceBody)
    for i := 0; i < 10; i++ {go func() {// 把要执行的函数 (办法) 作为参数传给 once.Do 办法即可
            once.Do(onceBody)
            done <- true
        }()}
    for i := 0; i < 10; i++ {<-done}
}
// 运行后果:Only once
  • 下面这个是 Go 语言自带的示例,尽管启动了 10 个协程来执行 onceBody 函数,然而 once.DO 办法保障 onceBody 函数只会执行一次。
  • sync.Once 适宜用于创立单例、只加载一次资源等只须要执行一次的场景。

条件变量 sync.Cond

  1. 咱们有一项工作,只有满足了条件状况下能力执行,否则就等着。如何获取这个条件呢?能够应用 channel 的形式,然而 channel 实用于一对一,一对多就须要用到 sync.Cond
  2. sync.Cond 是基于互斥锁的根底上,减少了一个告诉队列,协程刚开始是期待的,告诉的协程会从告诉队列中唤醒一个或多个被告诉的协程。
  3. sync.Cond 次要有以下几个办法:
  • sync.NewCond(&mutex) //sync.Cond 通过 sync.NewCond 初始化,须要传入一个 mutex,因为阻塞期待告诉的操作以及告诉解除阻塞的操作就是基于 sync.Mutex 来实现的。
  • sync.Wait() // 期待告诉
    阻塞以后协程,直到被其余协程调用 Broadcast 或者 Signal 办法唤醒,应用的时候须要加锁,应用 sync.Cond 中的锁即可
  • sync.Signal() // 单发告诉,随机唤醒一个协程
  • sync.Broadcat() // 播送告诉,唤醒所有期待的协程。

示例:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    // 3 集体赛跑,1 个裁判员发号施令
    cond := sync.NewCond(&sync.Mutex{})
    var wg sync.WaitGroup
    wg.Add(4) // 3 选手 + 1 裁判
    for i := 1; i <= 3; i++ {go func(num int) {defer wg.Done()
            fmt.Println(num, "号选手曾经就位")
            cond.L.Lock()
            cond.Wait() // 期待发令枪响
            fmt.Println(num, "号选手开始跑……")
            cond.L.Unlock()}(i)
    }
    // 期待所有 goroutine 都进入 wait 状态
    time.Sleep(2 * time.Second)
    go func() {defer wg.Done()
        fmt.Println("裁判:“各就各位~~ 准备~~”")
        fmt.Println("啪!!!")
        cond.Broadcast() // 发令枪响}()
    // 避免函数提前返回退出
    wg.Wait()}

运行后果:

3 号选手曾经就位
1 号选手曾经就位
2 号选手曾经就位
裁判:“各就各位~~ 准备~~”啪!!!2 号选手开始跑……
3 号选手开始跑……
1 号选手开始跑……

最初贴一下 sync.Cond 几个办法的源码:

// Wait atomically unlocks c.L and suspends execution
// of the calling goroutine. After later resuming execution,
// Wait locks c.L before returning. Unlike in other systems,
// Wait cannot return unless awoken by Broadcast or Signal.
// Wait 办法开释锁,并阻塞协程执行。满足条件解除阻塞后,以后协程须要取得锁而后 Wait 办法返回。//
// Because c.L is not locked when Wait first resumes, the caller
// typically cannot assume that the condition is true when
// Wait returns. Instead, the caller should Wait in a loop:
// 因为解除阻塞后,以后协程不肯定能马上取得锁,因而返回后须要再次查看条件,所以通常
// 应用循环。//    c.L.Lock()
//    for !condition() {//        c.Wait()
//    }
//    ... make use of condition ...
//    c.L.Unlock()
//
func (c *Cond) Wait() {c.checker.check()
    t := runtime_notifyListAdd(&c.notify)
    c.L.Unlock() // 开释锁
    runtime_notifyListWait(&c.notify, t) // 期待满足条件,解除阻塞
    c.L.Lock() // 获取锁}

// Signal wakes one goroutine waiting on c, if there is any.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Signal() {c.checker.check()
    runtime_notifyListNotifyOne(&c.notify)
}

// Broadcast wakes all goroutines waiting on c.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Broadcast() {c.checker.check()
    runtime_notifyListNotifyAll(&c.notify)
}

条件变量的 Wait 办法次要做了四件事:

  1. 把调用它的 goroutine(也就是以后的 goroutine)退出到以后条件变量的告诉队列中。
  2. 解锁以后的条件变量基于的那个互斥锁。
  3. 让以后的 goroutine 处于期待状态,等到告诉到来时再决定是否唤醒它。此时,这个 goroutine 就会阻塞在调用这个 Wait 办法的那行代码上。
  4. 如果告诉到来并且决定唤醒这个 goroutine,那么就在唤醒它之后从新锁定以后条件变量基于的互斥锁。自此之后,以后的 goroutine 就会继续执行前面的代码了。

注意事项

  1. 调用 wait 办法的时候肯定要加锁,否则会导致程序产生 panic.
  2. wait 调用时须要查看期待条件是否满足,也就说 goroutine 被唤醒了不等于期待条件被满足,期待者被唤醒,只是失去了一次查看的机会而已,举荐写法如下:
//    c.L.Lock()
//    for !condition() {//        c.Wait()
//    }
//    ... make use of condition ...
//    c.L.Unlock()
  1. Signal 和 Boardcast 两个唤醒操作不须要加锁

sync.Map

map 同时读写是线程不平安的,会产生了竞态问题。而 sync.Map 和 map 类型一样,只不过它是并发平安的。
sync.Map 的办法:

  • Store : 存储 key-value 值
  • Load: 依据 key 获取对应的 value 值,还能够判断 key 是否存在。
  • LoadOrStore: 如果 key 对应的 value 存在,则返回 value;不存在则存储 key-value 值。
  • Delete: 删除一个 key-value 键值对
  • Range:遍历 sync.Map

示例:

package main
import (
    "fmt"
    "sync"
)
func main() {
    var syMap sync.Map
    // 将键值对保留到 sync.Map
    syMap.Store("aaa", 111)
    syMap.Store("bbb", 222)
    syMap.Store("ccc", 333)
    fmt.Println(syMap.LoadOrStore("ddd", 444))
    // 从 sync.Map 中依据键取值
    fmt.Println(syMap.Load("aaa"))
    // 依据键删除对应的键值对
    syMap.Delete("aaa")
    // 遍历所有 sync.Map 中的键值对
    syMap.Range(func(k, v interface{}) bool {fmt.Println("k:", k, "=》v:", v)
        return true
    })
}

运行后果:

444 false
111 true
k: bbb =》v: 222
k: ccc =》v: 333
k: ddd =》v: 444

sync.Map 没有获取 map 数量的办法,能够在 遍历的时候自行计算数量,sync.Map 为了保障并发平安,就义了一些性能,如果没有并发场景,举荐应用内置的 map 类。

正文完
 0