对于并发操作,后面咱们曾经理解到了 channel 通道、同步原语 sync 包对共享资源加锁、Context 跟踪协程/传参等,这些都是并发编程比拟根底的元素,置信你曾经有了很好的把握。明天咱们介绍下如何应用这些根底元素组成并发模式,更好的编写并发程序。
for select 有限循环模式
这个模式比拟常见,之前文章中的示例也应用过,它个别是和 channel 组合实现工作,格局为:
for { //for 有限循环,或者应用 for range 循环 select { //通过 channel 管制 case <-done: return default: //执行具体的工作 }}
- 这种是 for + select 多路复用的并发模式,哪个 case 满足条件就执行对应的分支,直到有满足退出的条件,才会退出循环。
- 没有退出条件满足时,则会始终执行 default 分支
for range select 无限循环模式
for _,s:=range []int{}{ select { case <-done: return case resultCh <- s: }
- 个别把迭代的内容发送到 channel 上
- done channel 用于退出 for 循环
- resultCh channel 用来接管循环的值,这些值能够通过 resultCh 传递给其余调用者
select timeout 模式
如果一个申请须要拜访服务器获取数据,然而可能因为网络问题而迟迟获取不到响应,这时候就须要设置一个超时工夫:
package mainimport ( "fmt" "time")func main() { result := make(chan string) timeout := time.After(3 * time.Second) // go func() { //模仿网络拜访 time.Sleep(5 * time.Second) result <- "服务端后果" }() for { select { case v := <-result: fmt.Println(v) case <-timeout: fmt.Println("网络拜访超时了") return default: fmt.Println("期待...") time.Sleep(1 * time.Second) } }}
运行后果:
期待...期待...期待...网络拜访超时了
- select timeout 模式外围是通过 time.After 函数设置的超时工夫,避免因为异样造成 select 语句有限期待
留神:
不要写成这样for { select { case v := <-result: fmt.Println(v) case <-time.After(3 * time.Second): //不要写在 select 外面 fmt.Println("网络拜访超时了") return default: fmt.Println("期待...") time.Sleep(1 * time.Second) } }
case <- time.After(time.Second) 是本次监听动作的超时工夫,意思就说,只有在本次 select 操作中会无效,再次 select 又会从新开始计时,然而有default ,那case 超时操作,必定执行不到了。
Context 的 WithTimeout 函数超时勾销
package mainimport ( "context" "fmt" "time")func main() { // 创立一个子节点的context,3秒后主动超时 //ctx, stop := context.WithCancel(context.Background()) ctx, stop := context.WithTimeout(context.Background(), 3*time.Second) go func() { worker(ctx, "打工人1") }() go func() { worker(ctx, "打工人2") }() time.Sleep(5*time.Second) //工作5秒后劳动 stop() //5秒后收回进行指令 fmt.Println("???")}func worker(ctx context.Context, name string){ for { select { case <- ctx.Done(): fmt.Println("上班咯~~~") return default: fmt.Println(name, "认真摸鱼中,请勿打扰...") } time.Sleep(1 * time.Second) }}
运行后果:
打工人2 认真摸鱼中,请勿打扰...打工人1 认真摸鱼中,请勿打扰...打工人1 认真摸鱼中,请勿打扰...打工人2 认真摸鱼中,请勿打扰...打工人2 认真摸鱼中,请勿打扰...打工人1 认真摸鱼中,请勿打扰...上班咯~~~上班咯~~~//两秒后???
- 下面示例咱们应用了 WithTimeout 函数超时勾销,这是比拟举荐的一种应用形式
Pipeline 模式
Pipeline 模式也成为流水线模式,模仿事实中的流水线生成。咱们以组装手机为例,假如只有三道工序:整机洽购、组装、打包成品:
整机洽购(工序1)-》组装(工序2)-》打包(工序3)
package mainimport ( "fmt")func main() { coms := buy(10) //洽购10套整机 phones := build(coms) //组装10部手机 packs := pack(phones) //打包它们以便售卖 //输入测试,看看成果 for p := range packs { fmt.Println(p) }}//工序1洽购func buy(n int) <-chan string { out := make(chan string) go func() { defer close(out) for i := 1; i <= n; i++ { out <- fmt.Sprint("整机", i) } }() return out}//工序2组装func build(in <-chan string) <-chan string { out := make(chan string) go func() { defer close(out) for c := range in { out <- "组装(" + c + ")" } }() return out}//工序3打包func pack(in <-chan string) <-chan string { out := make(chan string) go func() { defer close(out) for c := range in { out <- "打包(" + c + ")" } }() return out}
运行后果:
打包(组装(整机1))打包(组装(整机2))打包(组装(整机3))打包(组装(整机4))打包(组装(整机5))打包(组装(整机6))打包(组装(整机7))打包(组装(整机8))打包(组装(整机9))打包(组装(整机10))
扇入扇出模式
手机流水线运行后,发现配件组装工序比拟消耗工夫,导致工序1和工序3也相应的慢了下来,为了晋升性能,工序2减少了两班人手:
- 依据示意图能看到,红色局部为扇出,蓝色为扇入
改良后的流水线:
package mainimport ( "fmt" "sync")func main() { coms := buy(10) //洽购10套配件 //三班人同时组装100部手机 phones1 := build(coms) phones2 := build(coms) phones3 := build(coms) //汇聚三个channel成一个 phones := merge(phones1,phones2,phones3) packs := pack(phones) //打包它们以便售卖 //输入测试,看看成果 for p := range packs { fmt.Println(p) }}//工序1洽购func buy(n int) <-chan string { out := make(chan string) go func() { defer close(out) for i := 1; i <= n; i++ { out <- fmt.Sprint("整机", i) } }() return out}//工序2组装func build(in <-chan string) <-chan string { out := make(chan string) go func() { defer close(out) for c := range in { out <- "组装(" + c + ")" } }() return out}//工序3打包func pack(in <-chan string) <-chan string { out := make(chan string) go func() { defer close(out) for c := range in { out <- "打包(" + c + ")" } }() return out}//扇入函数(组件),把多个chanel中的数据发送到一个channel中func merge(ins ...<-chan string) <-chan string { var wg sync.WaitGroup out := make(chan string) //把一个channel中的数据发送到out中 p:=func(in <-chan string) { defer wg.Done() for c := range in { out <- c } } wg.Add(len(ins)) //扇入,须要启动多个goroutine用于处于多个channel中的数据 for _,cs:=range ins{ go p(cs) } //期待所有输出的数据ins解决完,再敞开输入out go func() { wg.Wait() close(out) }() return out}
运行后果:
打包(组装(整机2))打包(组装(整机3))打包(组装(整机1))打包(组装(整机5))打包(组装(整机7))打包(组装(整机4))打包(组装(整机6))打包(组装(整机8))打包(组装(整机9))打包(组装(整机10))
- merge 和业务无关,不能当做一道工序,咱们应该把它叫做 组件
- 组件是能够复用的,相似这种扇入工序,都能够应用 merge 组件
Futures 模式
Pipeline 流水线模式中的工序是相互依赖的,只有上一道工序实现,下一道工序能力开始。然而有的工作之间并不需要相互依赖,所以为了进步性能,这些独立的工作就能够并发执行。
Futures 模式能够了解为将来模式,主协程不必期待子协程返回的后果,能够先去做其余事件,等将来须要子协程后果的时候再来取,如果子协程还没有返回后果,就始终期待。
咱们以火锅为例,洗菜、烧水这两个步骤之间没有依赖关系,能够同时做,最初
示例:
package mainimport ( "fmt" "time")func main() { vegetablesCh := washVegetables() //洗菜 waterCh := boilWater() //烧水 fmt.Println("曾经安顿好洗菜和烧水了,我先开一局") time.Sleep(2 * time.Second) fmt.Println("要做火锅了,看看菜和水好了吗") vegetables := <-vegetablesCh water := <-waterCh fmt.Println("筹备好了,能够做火锅了:",vegetables,water)}//洗菜func washVegetables() <-chan string { vegetables := make(chan string) go func() { time.Sleep(5 * time.Second) vegetables <- "洗好的菜" }() return vegetables}//烧水func boilWater() <-chan string { water := make(chan string) go func() { time.Sleep(5 * time.Second) water <- "烧开的水" }() return water}
运行后果:
曾经安顿好洗菜和烧水了,我先开一局要做火锅了,看看菜和水好了吗筹备好了,能够做火锅了: 洗好的菜 烧开的水
- Futures 模式下的协程和一般协程最大的区别是能够返回后果,而这个后果会在将来的某个工夫点应用。所以在将来获取这个后果的操作必须是一个阻塞的操作,要始终等到获取后果为止。
- 如果你的大工作能够拆解为一个个独立并发执行的小工作,并且能够通过这些小工作的后果得出最终大工作的后果,就能够应用 Futures 模式。