乐趣区

关于golang:Go语言你必须掌握的高效并发模式

对于并发操作,后面咱们曾经理解到了 channel 通道、同步原语 sync 包对共享资源加锁、Context 跟踪协程 / 传参等,这些都是并发编程比拟根底的元素,置信你曾经有了很好的把握。明天咱们介绍下如何应用这些根底元素组成并发模式,更好的编写并发程序。

for select 有限循环模式

这个模式比拟常见,之前文章中的示例也应用过,它个别是和 channel 组合实现工作,格局为:

for { //for 有限循环,或者应用 for range 循环
  select {
    // 通过 channel 管制
    case <-done:
      return
    default:
      // 执行具体的工作
  }
}
  • 这种是 for + select 多路复用的并发模式,哪个 case 满足条件就执行对应的分支,直到有满足退出的条件,才会退出循环。
  • 没有退出条件满足时,则会始终执行 default 分支

for range select 无限循环模式

for _,s:=range []int{}{
   select {
   case <-done:
      return
   case resultCh <- s:
   }
  • 个别把迭代的内容发送到 channel 上
  • done channel 用于退出 for 循环
  • resultCh channel 用来接管循环的值,这些值能够通过 resultCh 传递给其余调用者

select timeout 模式

如果一个申请须要拜访服务器获取数据,然而可能因为网络问题而迟迟获取不到响应,这时候就须要设置一个超时工夫:

package main

import (
    "fmt"
    "time"
)

func main() {result := make(chan string)
    timeout := time.After(3 * time.Second) //
    go func() {
        // 模仿网络拜访
        time.Sleep(5 * time.Second)
        result <- "服务端后果"
    }()
    for {
        select {
        case v := <-result:
            fmt.Println(v)
        case <-timeout:
            fmt.Println("网络拜访超时了")
            return
        default:
            fmt.Println("期待...")
            time.Sleep(1 * time.Second)
        }
    }
}

运行后果:

期待...
期待...
期待...
网络拜访超时了
  • select timeout 模式外围是通过 time.After 函数设置的超时工夫,避免因为异样造成 select 语句有限期待

留神:
不要写成这样

for {
       select {
       case v := <-result:
           fmt.Println(v)
       case <-time.After(3 * time.Second): // 不要写在 select 外面
           fmt.Println("网络拜访超时了")
           return
       default:
           fmt.Println("期待...")
           time.Sleep(1 * time.Second)
       }
   }

case <- time.After(time.Second) 是本次监听动作的超时工夫,意思就说,只有在本次 select 操作中会无效,再次 select 又会从新开始计时,然而有 default,那 case 超时操作,必定执行不到了。

Context 的 WithTimeout 函数超时勾销

package main

import (
    "context"
    "fmt"
    "time"
)
func main() {
    // 创立一个子节点的 context,3 秒后主动超时
    //ctx, stop := context.WithCancel(context.Background())
    ctx, stop := context.WithTimeout(context.Background(), 3*time.Second)

    go func() {worker(ctx, "打工人 1")
    }()
    go func() {worker(ctx, "打工人 2")
    }()
    time.Sleep(5*time.Second) // 工作 5 秒后劳动
    stop() // 5 秒后收回进行指令
    fmt.Println("???")
}

func worker(ctx context.Context, name string){
    for {
        select {case <- ctx.Done():
            fmt.Println("上班咯~~~")
            return
        default:
            fmt.Println(name, "认真摸鱼中,请勿打扰...")
        }
        time.Sleep(1 * time.Second)
    }
}

运行后果:

打工人 2 认真摸鱼中,请勿打扰...
打工人 1 认真摸鱼中,请勿打扰...
打工人 1 认真摸鱼中,请勿打扰...
打工人 2 认真摸鱼中,请勿打扰...
打工人 2 认真摸鱼中,请勿打扰...
打工人 1 认真摸鱼中,请勿打扰...
上班咯~~~
上班咯~~~
// 两秒后
???
  • 下面示例咱们应用了 WithTimeout 函数超时勾销,这是比拟举荐的一种应用形式

Pipeline 模式

Pipeline 模式也成为流水线模式,模仿事实中的流水线生成。咱们以组装手机为例,假如只有三道工序:整机洽购、组装、打包成品:

整机洽购(工序 1)-》组装(工序 2)-》打包(工序 3)

package main

import ("fmt")

func main() {coms := buy(10)    // 洽购 10 套整机
    phones := build(coms) // 组装 10 部手机
    packs := pack(phones) // 打包它们以便售卖
    // 输入测试,看看成果
    for p := range packs {fmt.Println(p)
    }
}

// 工序 1 洽购
func buy(n int) <-chan string {out := make(chan string)
    go func() {defer close(out)
        for i := 1; i <= n; i++ {out <- fmt.Sprint("整机", i)
        }
    }()
    return out
}

// 工序 2 组装
func build(in <-chan string) <-chan string {out := make(chan string)
    go func() {defer close(out)
        for c := range in {out <- "组装(" + c + ")"
        }
    }()
    return out
}

// 工序 3 打包
func pack(in <-chan string) <-chan string {out := make(chan string)
    go func() {defer close(out)
        for c := range in {out <- "打包(" + c + ")"
        }
    }()
    return out
}

运行后果:

打包(组装(整机 1))
打包(组装(整机 2))
打包(组装(整机 3))
打包(组装(整机 4))
打包(组装(整机 5))
打包(组装(整机 6))
打包(组装(整机 7))
打包(组装(整机 8))
打包(组装(整机 9))
打包(组装(整机 10))

扇入扇出模式

手机流水线运行后,发现配件组装工序比拟消耗工夫,导致工序 1 和工序 3 也相应的慢了下来,为了晋升性能,工序 2 减少了两班人手:

  • 依据示意图能看到,红色局部为 扇出 ,蓝色为 扇入

改良后的流水线:

package main

import (
    "fmt"
    "sync"
)

func main() {coms := buy(10)    // 洽购 10 套配件
    // 三班人同时组装 100 部手机
    phones1 := build(coms)
    phones2 := build(coms)
    phones3 := build(coms)
    // 汇聚三个 channel 成一个
    phones := merge(phones1,phones2,phones3)
    packs := pack(phones) // 打包它们以便售卖
    // 输入测试,看看成果
    for p := range packs {fmt.Println(p)
    }
}

// 工序 1 洽购
func buy(n int) <-chan string {out := make(chan string)
    go func() {defer close(out)
        for i := 1; i <= n; i++ {out <- fmt.Sprint("整机", i)
        }
    }()
    return out
}

// 工序 2 组装
func build(in <-chan string) <-chan string {out := make(chan string)
    go func() {defer close(out)
        for c := range in {out <- "组装(" + c + ")"
        }
    }()
    return out
}

// 工序 3 打包
func pack(in <-chan string) <-chan string {out := make(chan string)
    go func() {defer close(out)
        for c := range in {out <- "打包(" + c + ")"
        }
    }()
    return out
}

// 扇入函数(组件),把多个 chanel 中的数据发送到一个 channel 中
func merge(ins ...<-chan string) <-chan string {
    var wg sync.WaitGroup
    out := make(chan string)
    // 把一个 channel 中的数据发送到 out 中
    p:=func(in <-chan string) {defer wg.Done()
        for c := range in {out <- c}
    }
    wg.Add(len(ins))
    // 扇入,须要启动多个 goroutine 用于处于多个 channel 中的数据
    for _,cs:=range ins{go p(cs)
    }
    // 期待所有输出的数据 ins 解决完,再敞开输入 out
    go func() {wg.Wait()
        close(out)
    }()
    return out
}

运行后果:

打包(组装(整机 2))
打包(组装(整机 3))
打包(组装(整机 1))
打包(组装(整机 5))
打包(组装(整机 7))
打包(组装(整机 4))
打包(组装(整机 6))
打包(组装(整机 8))
打包(组装(整机 9))
打包(组装(整机 10))
  1. merge 和业务无关,不能当做一道工序,咱们应该把它叫做 组件
  2. 组件是能够复用的,相似这种扇入工序,都能够应用 merge 组件

Futures 模式

Pipeline 流水线模式中的工序是相互依赖的,只有上一道工序实现,下一道工序能力开始。然而有的工作之间并不需要相互依赖,所以为了进步性能,这些独立的工作就能够并发执行。

Futures 模式能够了解为将来模式,主协程不必期待子协程返回的后果,能够先去做其余事件,等将来须要子协程后果的时候再来取,如果子协程还没有返回后果,就始终期待。

咱们以火锅为例,洗菜、烧水这两个步骤之间没有依赖关系,能够同时做,最初

示例:

package main

import (
    "fmt"
    "time"
)

func main() {vegetablesCh := washVegetables() // 洗菜
    waterCh := boilWater()           // 烧水
    fmt.Println("曾经安顿好洗菜和烧水了,我先开一局")
    time.Sleep(2 * time.Second)

    fmt.Println("要做火锅了,看看菜和水好了吗")
    vegetables := <-vegetablesCh
    water := <-waterCh
    fmt.Println("筹备好了,能够做火锅了:",vegetables,water)

}
// 洗菜
func washVegetables() <-chan string {vegetables := make(chan string)
    go func() {time.Sleep(5 * time.Second)
        vegetables <- "洗好的菜"
    }()
    return vegetables
}
// 烧水
func boilWater() <-chan string {water := make(chan string)
    go func() {time.Sleep(5 * time.Second)
        water <- "烧开的水"
    }()
    return water
}

运行后果:

曾经安顿好洗菜和烧水了,我先开一局
要做火锅了,看看菜和水好了吗
筹备好了,能够做火锅了: 洗好的菜 烧开的水
  1. Futures 模式下的协程和一般协程最大的区别是能够返回后果,而这个后果会在将来的某个工夫点应用。所以在将来获取这个后果的操作必须是一个阻塞的操作,要始终等到获取后果为止。
  2. 如果你的大工作能够拆解为一个个独立并发执行的小工作,并且能够通过这些小工作的后果得出最终大工作的后果,就能够应用 Futures 模式。
退出移动版