关于go:29-GolangGo并发编程并发编程

34次阅读

共计 6121 个字符,预计需要花费 16 分钟才能阅读完成。

  Go 语言为咱们提供了基于消息传递 CSP 并发模型,基于管道 + 协程能够很不便的编写高并发服务,然而在某些场景下,或多或少还是须要应用到锁,本篇文章次要介绍除了管道 chan 之外的常见并发编程模式。

原子操作 atomic

  古代计算机都是多核 CPU,每个 CPU 还有本人的高速缓存,主存中局部数据会被缓存在高速缓存中,CPU 拜访数据时会先从高速缓存中查找。那如果同一块内存地址同时被缓存在核 0 与核 1 的 L2 级高速缓存呢?此时如果核 0 与核 1 同时批改该地址内容,则会造成抵触。(参考深刻了解计算机系统第六章,以 Intel Core i7 处理器为例,其有四个核,且每个核都有本人的 L1 和 L2 高速缓存)。

  平时咱们认为的一些原子操作(不会有并发问题的操作),如赋值操作,取值操作等,在多核 CPU 架构下都有可能产生并发问题;另外还有一些常见语句,如 a += b 等也有并发问题。所以在某些场景,咱们须要想方法防止并发问题,怎么办呢?Go 语言 sync/atomic 包为咱们提供了一些常见的原子操作,应用这些办法不必放心并发问题。

// 数据加载
func LoadInt32(addr *int32) (val int32)

// 数据保留
func StoreInt32(addr *int32, val int32)

// 比拟替换操作,如果 addr 地址的数据等于 old,则赋值为 new
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)

//addr 地址的数据累加加 delta
func AddInt32(addr *int32, delta int32) (new int32)

  这些办法底层是怎么实现的呢?数据加载与保留还好了解一些,比拟替换以及数据累加,编译成汇编指令后,显著须要好几步操作能力实现,怎么做到原子的呢?其实还有一些咱们不晓得的指令,语义上尽管比较复杂,但却是一条指令:

/*
 * 比拟替换指令
 * "cmpxchgl  r, [m]":
 *
 *     if (eax == [m]) {
 *         zf = 1;
 *         [m] = r;
 *     } else {
 *         zf = 0;
 *         eax = [m];
 *     }
 */

/* 累加指令
 * "xaddl  r, [m]":
 *
 *     temp = [m];
 *     [m] += r;
 *     r = temp;
 */

  那下面提到的高速缓存的问题怎么解决呢?目前处理器都提供有 lock 指令;其能够锁住总线,其余 CPU 对内存的读写申请都会被阻塞,直到锁开释;不过目前处理器都采纳锁缓存代替锁总线(锁总线的开销比拟大),即 lock 指令会锁定一个缓存行。当某个 CPU 收回 lock 信号锁定某个缓存行时,其余 CPU 会使它们的高速缓存该缓存行生效,同时检测是对该缓存行中数据进行了批改,如果是则会写所有已批改的数据;当某个高速缓存行被锁定时,其余 CPU 都无奈读写该缓存行;lock 后的写操作会及时会写到内存中。

  联合这些常识,实现这些原子操作就非常简单了,参考 Go 语言 CompareAndSwapInt32 函数的汇编代码:

//runtime/internal/atomic/atomic_amd64.s
TEXT ·Cas(SB),NOSPLIT,$0-17
    MOVQ    ptr+0(FP), BX
    MOVL    old+8(FP), AX
    MOVL    new+12(FP), CX
    // 锁缓存行
    LOCK
    // 比拟替换
    CMPXCHGL    CX, 0(BX)
    SETEQ    ret+16(FP)
    RET

锁 sync.Mutex

  为什么须要锁呢?当然是为了解决并发问题,如多个协程同时操作同一个变量,这就不得不提一个经典的例子,多个协程累加公共变量,初始值为零,累加 1w 次,最终后果是什么呢?

package main

import (
    "fmt"
    "time"
)
var value = 0

func main() {

    for i := 0; i <1000; i ++ {go func() {value ++}()}
    // 为了期待所有的子协程执行结束
    time.Sleep(time.Second * 10)

    fmt.Println(value)   //951,输入后果随机
}

  明明启动了 1000 个协程,每个协程都对全局变量 value+1,最终 value 的后果为什么是随机的呢?因为这 1000 个协程大概率是调配到多个 CPU 调度执行的,多个 CPU 并行拜访内存变量,value ++ 操作还不是原子的(理论等于 value = value + 1,编译成的汇编指令更是由好几条指令组成),而且每个 CPU 还有高速缓存的存在(CPU 拜访到的其实是内存变量的正本),所以当多个协程操作同一个变量时其后果是不确定的。

  那如果的确须要多个协程拜访同一个变量怎么办?这时候就须要加锁了,操作变量之前加锁,操作变量之后开释锁,锁保障了同一时刻只能有一个协程拜访到这个变量:

package main

import (
    "fmt"
    "sync"
    "time"
)
var value = 0

func main() {lock := sync.Mutex{}
    for i := 0; i <1000; i ++ {go func() {lock.Lock()
            value ++
            lock.Unlock()}()}
    time.Sleep(time.Second * 10)

    fmt.Println(value)
}

  并发相干的一些工具根本都在 sync 包,sync.Mutex 是一种罕用的排他锁,罕用来解决并发问题,其只有两个办法且应用非常简单,操作之前加锁 lock.Lock(),操作时候开释锁 lock.Unlock()。sync.Mutex 继承自接口 sync.Locker,咱们简略理解下 sync.Mutex.Lock 的实现:

// 父接口
type Locker interface {Lock()
    Unlock()}

type Mutex struct {
    // 锁的状态
    state int32
    // 信号量
    sema  uint32
}

func (m *Mutex) Lock() {
    // 疾速加锁:原子操作 cas
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {return}
    // 加锁失败,再次缓缓加锁
    m.lockSlow()}

func (m *Mutex) Unlock() {
    // 疾速开释锁
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if new != 0 {
        // 开释失败,再次缓缓开释锁
        m.unlockSlow(new)
    }
}

  留神到,加锁和开释锁的时候,都是先基于原子操作尝试一次(可能失败),失败后才会走到 lockSlow 函数,该函数外围逻辑是一个 for 循环,在锁已被其余协程抢占时尝试自旋(防止协程切换),自旋完结后再次尝试获取锁(基于 cas),如果还是获取失败,则通过信号量 m.sema 抢占锁(相似于 Semacquire)。

并发 map sync.Map

  map 之前的章节咱们曾经介绍过,多个协程并发操作 map 时,可能会导致 panic(fatal error: concurrent map writes),这是因为当咱们并发写 map 时,可能导致意料之外的状况产生。那怎么解决呢?Go 语言为咱们提供了并发 sync.Map,应用形式如下:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {var m = sync.Map{}
    // 创立 10 个协程
    for i := 0; i <= 10; i ++ {go func() {
            // 协程内,循环操作 map
            for j := 0; j <= 100; j ++ {
                // 数据读取
                v, ok := m.Load(fmt.Sprintf("test_%v", j))
                if ok {
                    // 数据写入
                    m.Store(fmt.Sprintf("test_%v", j), v.(int) + 1)
                } else {m.Store(fmt.Sprintf("test_%v", j), 0)
                }

            }

        }()}
    // 主协程休眠 3 秒,否则主协程完结了,子协程没有机会执行
    time.Sleep(time.Second * 3)
    fmt.Println(m.Load("test_0"))
}

  那是不是每次操作之前都须要加锁呢?这样的话性能是不是会有所升高?这是必定的,不过 Go 语言也通过 ” 读写拆散 ” 计划(适宜读多写少的场景),尽可能的缩小锁的开销,如下是 sync.Map 的定义:

type Map struct {
    // 锁
    mu Mutex

    // 只读的数据,相当于缓存
    read atomic.Value // readOnly

    // 可写数据,拜访须要加锁
    dirty map[any]*entry

    misses int
}

// readOnly 构造定义,也是一个 map
type readOnly struct {m       map[any]*entry
    //dirty 是否存在局部数据,在 readonly 不存在
    amended bool // true if the dirty map contains some key not in m.
}

  留神 read 是只读数据,相当于一份缓存数据,map 的增删改依赖于 dirty。这样辨别之后,map 的读写流程当然也须要扭转了。

func (m *Map) Load(key any) (value any, ok bool) {
    // 加载只读数据
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    // 没有查到数据,dirty 还有一些数据时 readonly 没有的
    if !ok && read.amended {
        // 加锁
        m.mu.Lock()
        // 再次尝试读取 readonly
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]

        // 没有查到数据,dirty 还有一些数据时 readonly 没有的
        if !ok && read.amended {e, ok = m.dirty[key]
            m.missLocked()}
        m.mu.Unlock()}
    if !ok {return nil, false}
    return e.load()}

// 如果 misses 次数过多,将 dirty 数据加载到 readonly
func (m *Map) missLocked() {
    m.misses++
    if m.misses < len(m.dirty) {return}
    m.read.Store(readOnly{m: m.dirty})
    m.dirty = nil
    m.misses = 0
}

  能够看到,读取操作优先查问 readonly 数据,因为不须要加锁,当然在屡次操作之后 readonly 数据可能和 dirty 数据不一样,如果 misses 次数过多,会将 dirty 数据加载到 readonly;另外在写数据时,如果 readonly 存在 key,则尝试写 readonly(基于 cas,不须要加锁);如果写失败,再加锁,有趣味的能够本人学习下 sync.Map.Store 函数的实现逻辑。

并发管制 sync.Waitgroup

  构想有这么一个需要:业务须要从三个数据接口查问数据,而且这三个接口互不依赖,传统的编程形式可能就是顺序调用了,这样总的耗时是这三个接口耗时之和,在 Go 语言中,提供了并发管制 sync.Waitgroup,使得咱们能够并发申请三个接口,这时候总的耗时等于三个接口耗时的最大值。

package main

import (
    "fmt"
    "sync"
)

func main() {
    //WaitGroup 用于协程并发管制
    wg := sync.WaitGroup{}
    // 启动 10 个协程并发执行工作
    for i := 0; i < 10; i ++ {
        // 标记工作开始
        wg.Add(1)
        go func(a int) {fmt.Println(fmt.Sprintf("work %d exec", a))
            // 标记工作完结
            wg.Done()}(i)
    }
    // 主协程期待工作完结
    wg.Wait()
    fmt.Println("main end")
}

  如下面程序所示,主协程启动了 10 个协程,然而必须等到 10 个协程都完结(相当于期待 10 个子协程申请接口返回响应后果)。sync.WaitGroup 应用非常简单,只有三个 API,Add 办法标识子工作开启,Done 办法标识子工作完结,主协程中应用 Wait 办法,期待所有子工作完结,否则主协程会始终阻塞在这里。

type WaitGroup struct {
    // 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
    // 64-bit atomic operations require 64-bit alignment, but 32-bit
    // compilers only guarantee that 64-bit fields are 32-bit aligned.
    // For this reason on 32 bit architectures we need to check in state()
    // if state1 is aligned or not, and dynamically "swap" the field order if
    // needed.
    state1 uint64
    state2 uint32
}

  sync.WaitGroup 构造的定义非常简单,只有两个字段:一个示意并发工作数,一个示意期待者数量,别离用 32bit 整数示意,只不过在 32-bit/64-bit 架构下,因为内存对齐形式不太一样,两个字段的拜访形式也不太一样。

  sync.WaitGroup 构造的 API 办法这里就不做过多介绍了,底层是基于后面介绍的原子操作(LoadUint64、AddUint64、CompareAndSwapUint64)实现的,有趣味的能够本人学习钻研(参考文件 sync/waitgroup.go)。

并发检测 race

  Go 程序日常开发中,如果放心可能存在并发问题,能够应用 -race 检测潜在的并发问题,以下面的程序为例:

package main

import (
    "fmt"
    "time"
)
var value = 0

func main() {

    for i := 0; i <1000; i ++ {go func() {value ++}()}
    time.Sleep(time.Second * 10)

    fmt.Println(value)  
}

go run -race test.go
==================
WARNING: DATA RACE
Read at 0x0000011f48a8 by goroutine 8:
  main.main.func1()
      /test.go:13 +0x29

Previous write at 0x0000011f48a8 by goroutine 7:
  main.main.func1()
      /test.go:13 +0x44

Goroutine 8 (running) created at:
  main.main()
      /test.go:12 +0x39

Goroutine 7 (finished) created at:
  main.main()
      /test.go:12 +0x39

总结

  本篇文章重点介绍了 Go 语言原子操作,互斥锁,并发 map 以及并发管制的根本应用,当然还有局部并发包没有介绍,如条件变量(cyn.Cond),读写锁(sync.RWMutex)等等,这些有趣味的能够本人学习钻研。

正文完
 0