WaitGroup:协同期待,工作编排利器
func (wg *WaitGroup) Add(delta int)func (wg *WaitGroup) Done()func (wg *WaitGroup) Wait()
WaitGroup编排须要启动多个 goroutine 执行工作,主 goroutine 须要期待子 goroutine 都实现后才继续执行的工作。
type WaitGroup struct { noCopy noCopy state1 uint64 state2 uint32}
state
// state 返回指向存储在 wg.state*中的 state 和 sema 字段的指针。func (wg *WaitGroup) state() (statep *uint64, semap *uint32) { if unsafe.Alignof(wg.state1) == 8 || uintptr(unsafe.Pointer(&wg.state1))%8 == 0 { // state1 is 64-bit aligned: nothing to do. return &wg.state1, &wg.state2 } else { // state1 is 32-bit aligned but not 64-bit aligned: this means that // (&state1)+4 is 64-bit aligned. state := (*[3]uint32)(unsafe.Pointer(&wg.state1)) return (*uint64)(unsafe.Pointer(&state[1])), &state[0] }}
Add
func (wg *WaitGroup) Add(delta int) { statep, semap := wg.state() if race.Enabled { _ = *statep // trigger nil deref early if delta < 0 { // Synchronize decrements with Wait. race.ReleaseMerge(unsafe.Pointer(wg)) } race.Disable() defer race.Enable() } // 给counter加delta state := atomic.AddUint64(statep, uint64(delta)<<32) v := int32(state >> 32) // 以后计数值 w := uint32(state) // waiter数 if race.Enabled && delta > 0 && v == int32(delta) { // The first increment must be synchronized with Wait. // Need to model this as a read, because there can be // several concurrent wg.counter transitions from 0. race.Read(unsafe.Pointer(semap)) } if v < 0 { panic("sync: negative WaitGroup counter") } if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } if v > 0 || w == 0 { return } // This goroutine has set counter to 0 when waiters > 0. // Now there can't be concurrent mutations of state: // - Adds must not happen concurrently with Wait, // - Wait does not increment waiters if it sees counter == 0. // Still do a cheap sanity check to detect WaitGroup misuse. if *statep != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // Reset waiters count to 0. // 以后计数值=0,并且w!=0 ==> state的就是waiter的数量 *statep = 0 // 唤醒所有sema中协程 for ; w != 0; w-- { runtime_Semrelease(semap, false, 0) }}
wait
func (wg *WaitGroup) Wait() { statep, semap := wg.state() if race.Enabled { _ = *statep // trigger nil deref early race.Disable() } for { state := atomic.LoadUint64(statep) v := int32(state >> 32) // 以后计数值 w := uint32(state) // waiter数量 if v == 0 { // Counter is 0, no need to wait. if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(wg)) } return } // Increment waiters count. if atomic.CompareAndSwapUint64(statep, state, state+1) { if race.Enabled && w == 0 { // Wait must be synchronized with the first Add. // Need to model this is as a write to race with the read in Add. // As a consequence, can do the write only for the first waiter, // otherwise concurrent Waits will race with each other. race.Write(unsafe.Pointer(semap)) } // 阻塞休眠期待 runtime_Semacquire(semap) if *statep != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(wg)) } return } }}
Done
func (wg *WaitGroup) Done() { wg.Add(-1)}