共计 4159 个字符,预计需要花费 11 分钟才能阅读完成。
Go 并发模型
传统的编程语言 C ++ Java Python 等,他们的并发逻辑多事基于操作系统的线程。并发执行单元(线程)之间的通信利用的就是操作系统提供的线程或过程间通信的原语。如:共享内存、信号、管道、音讯队列、套接字等。在这些通信原语中,应用最宽泛的就是共享内存。
如果你应用过这种共享内存的并发模型,其实是难用的和容易产生谬误的,特地是在大型或简单的业务场景中。
Go 语言从程序设计当初,就将解决下面传统并发模型问题作为指标,并在新并发模型设计中借鉴注明的 CSP(Communicationing Sequential Processes- 通信顺序进程)并发模型。
CSP 模型目标在于简化并发程序的编写,让并发程序的编写程序与编写顺序程序一样简略。
生产者 —》输入数据 — 输出 / 输入原语 —》输入数据
为了实现 CSP 模型,GO 语言引入了 Channel.Goroutine 能够读写 channel 中的数据,通过 channel 将 goroutine 组合连贯在一起。
Go 语言中 CSP 尽管是支流并发模型,然而还是反对共享内存并发模型。次要是在 sync 包中的互斥锁、读写锁、条件变量、原子操作等。那么咱们该如何抉择呢?
第一种:创立模式
通常会应用上面的形式:
type Worker struct {
}
func Do(f func()) chan Worker {w:= make(chan Worker)
go func() {f()
w<-Worker{}}()
return w
}
func main() {c:=Do(func() {fmt.Print("到下班时间了...")
})
<-c
}
Do 函数外部创立了一个 gorutine 并且返回了一个 channel 类型的变量。Do 函数创立的新 goroutine 与调用的 Do 函数的 goroutine 之间通过一个 channel 分割了起来,2 个 goroutine 能够通过 channel 进行通信。Do 函数的实现因为 channel 在 Go 语言中是一等公民,channel 能够像变量一样初始化、传递和赋值。下面的例子 Do 返回了一个变量,这个变量就是通道,实现了主 goroutine 和子 goroutine 的通信。
第二种:退出模式
##### a) 拆散模式
拆散模式应用最宽泛的是 goroutine 退出模式。所谓拆散模式就是创立它的 goroutine 不须要关怀她的退出,这类 goroutine 启动后与其创建者彻底拆散,其生命周期与其执行的主函数相干,函数返回即 goroutine 退出。
场景 1:一次性工作
// $GOROOT/src/net/dial.go
func (d *Dialer) DialContext(ctx context.Context, network, address string) (Conn, error) {
... ...
if oldCancel := d.Cancel; oldCancel != nil {subCtx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
select {
case <-oldCancel:
cancel()
case <-subCtx.Done():}
}()
ctx = subCtx
}
... ...
}
在 DialContext 办法中创立了一个 goroutine,用来监听量个 channel 是否有数据,一旦有数据,解决后即退出。
场景 2 常驻后盾执行一些特定工作,比方罕用 for{…}或 for{select{…}}模式,还能够用定时器或事件驱动执行。上面是 Go 给每个 P 内置的 GC goroutine 就是这种场景的。
// $GOROOT/src/runtime/mgc.go
func gcBgMarkStartWorkers() {
// Background marking is performed by per-P G's. Ensure that
// each P has a background GC G.
for _, p := range allp {
if p.gcBgMarkWorker == 0 {go gcBgMarkWorker(p) // 每个 P 创立一个 goroutine,以运行 gcBgMarkWorker
notetsleepg(&work.bgMarkReady, -1)
noteclear(&work.bgMarkReady)
}
}
}
func gcBgMarkWorker(_p_ *p) {gp := getg()
... ...
for {
// 解决 GC 事宜
... ...
}
}
##### b) join 模式
在线程模型中,父线程能够通过 pthread join 来期待子线程完结并获取子线程的完结状态。在 Go 中,咱们有时候也有这种需要:goroutine 的创建者须要期待新 goroutine 的后果。
type Worker struct {
}
func Do(f func()) chan Worker {w:= make(chan Worker)
go func() {f()
w<-Worker{}}()
return w
}
func main() {c:=Do(func() {fmt.Print("到下班时间了...")
})
<-c
}
咱们还是看刚刚下面的这个例子,Do 函数应用典型的 goroutine 的创立模式创立了一个 groutine,main 的 goroutine 作为创立通过 Do 函数返回的 channel 与新 goroutine 建设关系,这个 channel 得用处就是在文革 goroutine 之间建设退出工夫的“信号”通信机制。main goroutine 在创立完新 goroutine 后就在该 channel 上阻塞期待了,晓得新的 goroutine 退出前向该 channel 发送了一个”信号”。
运行代码,后果如下:
到下班时间了 …
Process finished with exit code 0
获取 goroutine 的退出状态
如果新 goroutine 的创建者不仅仅要期待 goroutine 的退出,还要晓得完结状态,咱们能够通过自定义类型的 channel 来实现这样的需要。
func add(a,b int) int{return a+b}
func Do(f func(a,b int) int,a,b int) chan int{c:=make(chan int)
go func() {r:=f(a,b)
c<-r
}()
return c
}
func main() {c:=Do(add,1,5)
fmt.Println(<-c)
}
运行后果是 6
期待多个 goroutine 退出
func add(a,b int) int{return a+b}
func Do(f func(a,b int) int,a,b,n int) chan int{c:=make(chan int)
var wg sync.WaitGroup
for i:=0;i<n;i++{wg.Add(1)
go func() {r:=f(a,b)
fmt.Println(r)
wg.Done()}()}
go func() {wg.Wait()
c<-100
}()
go func() {}()
return c
}
func main() {c:=Do(add,1,5,5)
fmt.Println(<-c)
}
运行后果
6
6
6
6
6
100
##### c) notify-wait 模式
后面的场景中,goroutine 的创建者都是在被动地期待新 goroutine 的退出。有些场景,goroutine 的创建者须要被动告诉那些新 goroutine 退出。
告诉并期待一个 goroutine 的退出
func add(a, b int) int {return a + b}
func Do(f func(a, b int) int, a, b int) chan int {quit := make(chan int)
go func() {
var job chan string
for {
select {
case x := <-job:
f(a, b)
fmt.Println(x)
case y := <-quit:
quit <- y
}
}
}()
return quit
}
func main() {c := Do(add, 1, 5)
fmt.Println("开始干活")
time.Sleep(1 * time.Second)
c <- 0
timer := time.NewTimer(time.Second * 10)
defer timer.Stop()
select {
case status := <-c:
fmt.Println(status)
case <-timer.C:
fmt.Println("期待...")
}
}
执行代码后果如下
开始干活
0
告诉并期待多个 goroutine 退出
上面是告诉并期待多个 goroutine 退出的场景。Go 语言的 channel 有一个个性,那就是当应用 close 函数敞开 channel 时,所有阻塞到该 channel 上的 goroutine 都会失去告诉。
func worker(x int) {time.Sleep(time.Second * time.Duration(x))
}
func Do(f func(a int), n int) chan int {quit := make(chan int)
job:=make(chan int)
var wg sync.WaitGroup
for i:=0;i<n;i++ {wg.Add(1)
go func(i int) {defer wg.Done()
name := fmt.Sprintf("worker-%d",i)
for {
j,ok:=<-job
if !ok{fmt.Println(name,"done")
return
}
worker(j)
}
}(i)
}
go func() {
<-quit
close(job)
wg.Wait()
quit<-200
}()
return quit
}
func main() {quit:=Do(worker,5)
fmt.Println("func Work...")
quit<-1
timer := time.NewTimer(time.Second * 10)
defer timer.Stop()
select {
case status := <-quit:
fmt.Println(status)
case <-timer.C:
fmt.Println("期待...")
}
}
运行后果
func Work...
worker-1 done
worker-2 done
worker-3 done
worker-4 done
worker-0 done
200