关于golang:golang-syncWaitGroup-源码分析

3次阅读

共计 1990 个字符,预计需要花费 5 分钟才能阅读完成。

最近在学习 golang 源码,学习 golang 源码是学习 golang 的十分好的路径。

先来记录一波 sync 包的学习。版本 go1.14.2 darwin/amd64

sync.WaitGroup

咱们个别应用 sync.waitGroup 做并发管制,应用形式个别如下

func main() {wg := sync.WaitGroup{}

    for i := 0; i < 10; i++ {wg.Add(1)
        go func(index int) {defer wg.Done()
            fmt.Println(index)
        }(i)
    }
    
    wg.Wait()}

所以咱们先看下 waitGroup 次要有是哪个函数,Add, Done, Wait 函数,在看函数源码前,咱们先看下 WaitGroup 构造

type WaitGroup struct {
    noCopy noCopy          // 禁止拷贝,如果有拷贝构建不会报错,能够用 go vet 或 go tool vet 检测是否有拷贝谬误,state1 [3]uint32      // 重要,存有 计数器,期待数,信号量的值
}

其中 state1 成员寄存的值在 64 位零碎中如下:

<img src=”/Users/guanjingyun/Library/Application Support/typora-user-images/image-20201226143123260.png” alt=”image-20201226143123260″ style=”zoom:33%;” />

接下来看下 Add 办法

func (wg *WaitGroup) Add(delta int) {statep, semap := wg.state()   // 获取 wg 状态,statep 地址 (高 32 位的计数器值和低 32 位的期待数量值), semap 信号量
    if race.Enabled {  // 数据竞态检测,默认是 false, 开启耗费 cpu 性能 , 先不论
        _ = *statep
        if delta < 0 {race.ReleaseMerge(unsafe.Pointer(wg))
        }
        race.Disable()
        defer race.Enable()}
    state := atomic.AddUint64(statep, uint64(delta)<<32) // 原子操作给计数器加上 delta 的值
    v := int32(state >> 32)     // 高 32 位 计数器
    w := uint32(state)   // 低 32 位 期待数
    if race.Enabled && delta > 0 && v == int32(delta) {  // 数据竞态检测, 先不论
        race.Read(unsafe.Pointer(semap))
    }
    if v < 0 {                                  // 计数器 <0 
        panic("sync: negative WaitGroup counter")
    }
    if w != 0 && delta > 0 && v == int32(delta) {    // 在 add 之前,曾经调用过 wait 函数??(没太看明确)panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    if v > 0 || w == 0 { // wait 数 为 0 或 计数 >0 间接返回
        return
    }

    if *statep != state {   //?实践上应该相等
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    
    *statep = 0
    for ; w != 0; w-- {  // 计数器 为 0,开释所有 wait 信号量,runtime_Semrelease(semap, false, 0)
    }
}

Done 函数, 很简略了:

// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {wg.Add(-1)
}

Wait 函数, 把数据竞态检测 局部去了,为代码看起来简洁

func (wg *WaitGroup) Wait() {statep, semap := wg.state()  // 获取 wg 状态,statep 地址 (高 32 位的计数器值和低 32 位的期待数量值), semap 信号量
    for {state := atomic.LoadUint64(statep) 
        v := int32(state >> 32)  // 如上
        w := uint32(state)
        if v == 0 {  // 计数为 0,不必 wait
            return
        }

        if atomic.CompareAndSwapUint64(statep, state, state+1) { // 期待数加 1
            runtime_Semacquire(semap)    // 获取信号量
            if *statep != 0 {panic("sync: WaitGroup is reused before previous Wait has returned")
            }
            return
        }
    }
}

runtime_Semacquire 和 runtime_Semrelease 获取和开释信号量,用于休眠和唤醒 协程,具体实现等下次深入研究下

正文完
 0