关于golang:go-更为安全的使用-syncMap-组件

39次阅读

共计 3960 个字符,预计需要花费 10 分钟才能阅读完成。

go 内置了协程平安的 sync 包来不便咱们同步各协程之间的执行状态,应用起来也十分不便。

最近在排查解决一个线下服务的数据同步问题,review 外围代码后,发现这么一段流程控制代码。

谬误示例

package main

import (
    "log"
    "runtime"
    "sync"
)

func main() {
    // 可并行也是重点,生产场景没几个单核的吧?? 
    runtime.GOMAXPROCS(runtime.NumCPU())
    waitGrp := &sync.WaitGroup{}
    waitGrp.Add(1)

    syncTaskProcessMap := &sync.Map{}
    for i := 0; i < 100; i++ {syncTaskProcessMap.Store(i, i)
    }

    for j := 0; j < 100; j++ {go func(j int) {
            // 协程可能并行抢占一轮开始
            syncTaskProcessMap.Delete(j)
            // 协程可能并行抢占一轮完结
            // 在以后协程 Delete 后 Range 前 又被其余协程 Delete 操作了
            
            syncTaskProcessCount := 0
            syncTaskProcessMap.Range(func(key, value interface{}) bool {
                syncTaskProcessCount++
                return true
            })
            
            if syncTaskProcessCount == 0 {log.Println(GetGoroutineID(), "syncTaskProcessMap empty, start syncOnline", syncTaskProcessCount)
            }
        }(j)
    }
    
    waitGrp.Wait()}

func GetGoroutineID() uint64 {b := make([]byte, 64)
    runtime.Stack(b, false)
    b = bytes.TrimPrefix(b, []byte("goroutine"))
    b = b[:bytes.IndexByte(b, ' ')]
    n, _ := strconv.ParseUint(string(b), 10, 64)
    return n
}

代码的本意,是在 i 个协程并发的执行实现后,启动一次 nextProcess 工作,代码应用了 sync.Map 来保护和同步 i 个协程的执行进度,避免多协程并发造成的 map 不平安读写。当最初一个协程执行结束,sync.Map 为空,启动一次 nextProcess。但能读到状态值 syncTaskProcessCount0 的协程,只会是 最初一个 执行实现的协程吗?

sync.Map::Store\Load\Delete\Range 都是协程平安的操作,在调用期间只会被以后 协程 抢占拜访,但它们的组合操作并不是 独占 的,下面的代码认为,Delete && Range 两项操作期间 不会 夹带其余协程对 sync.Map 读写操作,导致能读到 syncTaskProcessCount0 的协程可能不止最初一个执行结束的。

多执行几次,可能失去一下输入:

sqrtcat:demo$ go run test.go 
2021/04/20 14:30:27 114 syncTaskProcessMap empty, start syncOnline 0
^Csignal: interrupt
sqrtcat:demo$ go run test.go 
2021/04/20 14:30:30 117 syncTaskProcessMap empty, start syncOnline 0
2021/04/20 14:30:30 116 syncTaskProcessMap empty, start syncOnline 0
^Csignal: interrupt
sqrtcat:demo$ go run test.go 
2021/04/20 14:30:33 117 syncTaskProcessMap empty, start syncOnline 0
^Csignal: interrupt
sqrtcat:demo$ go run test.go 
2021/04/20 14:30:35 117 syncTaskProcessMap empty, start syncOnline 0
2021/04/20 14:30:35 118 syncTaskProcessMap empty, start syncOnline 0
2021/04/20 14:30:35 115 syncTaskProcessMap empty, start syncOnline 0
^Csignal: interrupt
sqrtcat:demo$ go run test.go 
2021/04/20 14:30:38 131 syncTaskProcessMap empty, start syncOnline 0
2021/04/20 14:30:38 132 syncTaskProcessMap empty, start syncOnline 0
^Csignal: interrupt

能够看到,syncTaskProcessMap empty 的状态被多个协程读到了。
G117,G118,G115 在多核场景下肯能 并行 执行。

  1. SyncMapG117 抢占,Delete 后 2,SyncMap 被开释。
  2. SyncMapG118 抢占,Delete 后 1,SyncMap 被开释。
  3. SyncMapG115 抢占,Delete 后 0,SyncMap 被开释。
  4. 这时的 syncMap 未然为空,G117、G118、G115 持续 Range 失去的 syncTaskProcessCount 都为 0,这样就导致了代码执行与冀望不同了。

所以,尽管 sync.Map 的繁多操作是主动加锁的排他操作,但组合在一起就不是了,咱们要自行在 code section 上加锁。

正确示例

package main

import (
    "log"
    "runtime"
    "sync"
)

// 错误代码示例
func main() {runtime.GOMAXPROCS(runtime.NumCPU())
    
    syncMutex := &sync.Mutex{}
    
    waitGrp := &sync.WaitGroup{}
    waitGrp.Add(1)

    syncTaskProcessMap := &sync.Map{}
    for i := 0; i < 100; i++ {syncTaskProcessMap.Store(i, i)
    }

    for j := 0; j < 100; j++ {go func(j int) {
            // 保障协程对 syncMap 的组合操作也是独占的
            // 将可能的并行操作程序化
            syncMutex.Lock()
            defer syncMutex.Unlock()
            
            syncTaskProcessMap.Delete(j)
            
            syncTaskProcessCount := 0
            syncTaskProcessMap.Range(func(key, value interface{}) bool {
                syncTaskProcessCount++
                return true
            })
            
            if syncTaskProcessCount == 0 {log.Println(GetGoroutineID(), "syncTaskProcessMap empty, start syncOnline", syncTaskProcessCount)
            }
        }(j)
    }
    
    waitGrp.Wait()}

func GetGoroutineID() uint64 {b := make([]byte, 64)
    runtime.Stack(b, false)
    b = bytes.TrimPrefix(b, []byte("goroutine"))
    b = b[:bytes.IndexByte(b, ' ')]
    n, _ := strconv.ParseUint(string(b), 10, 64)
    return n
}

协程并行

多核 的平台上,调配在不同 工夫片队列 上的协程是能够 并行 执行的,雷同 工夫片队列 上的协程是 并发 执行的

func main() {
    // 这行代码将会影响子协程里的日志输出量
    runtime.GOMAXPROCS(runtime.NumCPU())
    waitChan := make(chan int)

    go func() {defer func() {log.Println(GetGoroutineID(), "sub defer")
        }()
        log.Println(GetGoroutineID(), "sub start")
        waitChan <- 1
        log.Println(GetGoroutineID(), "sub finish")
    }()

    log.Println(GetGoroutineID(), "main start")
    log.Println(<-waitChan)
    log.Println(GetGoroutineID(), "main finish")
}
  1. 如果 mainsub 调配在了同一个 cpu 上 或只有一个 cpumain startwaitChan 读阻塞了 mainsub 开始执行,sub start,写入 waitChan,后续也没有触发协程切换的代码段,继续执行 sub finish sub defer 退出,交出 工夫片main 继续执行 main finish
  2. 如果 mainsub 调配在了不同 cpu 上,当 waitChan 阻塞了 cpu1 上的 main,而 subcpu2 执行了 写入 waitChan 后,main 可能会被 cpu1 立刻继续执行, 主协程 main 退出,sub 也会被终止执行,前面的日志打印可能就执行不到了。
sqrtcat:demo$ go run test.go 
2021/04/20 15:26:42 5 sub start
2021/04/20 15:26:42 1 main start
2021/04/20 15:26:42 1
2021/04/20 15:26:42 1 main finish
2021/04/20 15:26:42 5 sub finish

正文完
 0