背景

最近因为性能问题,后端服务始终在做python到golang的迁徙和重构。go语言精简优雅,既有编译型语言的谨严和高性能,又有解释型语言的开发效率,杰出的并发性能也是go区别于其余语言的一大特色。go的并发编程代码尽管简略,但重在其并发模型和流程的设计。所以这里总结下golang协程并发罕用的流水线模型。

简略的流水线思维

流水线模式并不是什么离奇的概念,然而它能极大地提高生产效率。比方理论生存中的汽车生产流水线,流水线上的每一个流程负责不同的工作,比方第一个流程是拼装车身,第二个流程是装置发动机,第三个流程是装轮胎...,这些步骤咱们能够类比成go并发流程中的协程,每一个协程就是一个工作。流水线下面传递的车身、发动机、轮胎,这些咱们能够类比成协程间须要传递的数据,而在这些流程(协程)间传递这些配件(数据),天然就要通过传送带(channel)。在流水线上,咱们装四个轮胎必定不是一个一个来装的,必定是有四个机械臂同时来装。因而装轮胎这个步骤咱们有4个协程在并发工作来提高效率。这么一来,流水线模型的基本要素就形成了。
Golang的并发模型灵感其实都来自咱们生存,对程序而言,高的生产效率就是高的性能。在Golang中,流水线由多个流程节点组成,流程之间通过channel连贯,每个流程节点能够由多个同时运行的goroutine组成。

如何结构流水线

有了流水线模式的思维,接下来就是如何结构流水线了。简略来说,其实就是通过channel将工作流程连接起来,两个相邻的流程互为生产者和消费者,通过channel进行通信。耗时的流程能够将工作扩散到多个协程来执行。
咱们先来看一个最简略的流水线,如下图,A是生产者流程,B是它的生产流程,同时又是C的生产者流程。A,B,C三个协程间接,通过读写channel进行通信。

那如果此时B流程能够将a channel中的工作并发执行呢,很简略,咱们只须要起多个B协程就能够了。如下图。

总之,咱们结构流水线并发的思路是关注数据的流动,数据流动的过程交给channel,channel两端数据处理的每个环节都交给goroutine,这个流程连起来,就形成了流水线模型。

对于channel

为什么咱们能够抉择channel来进行协程间的通信呢,协程之间又是怎么放弃同步程序呢,当然这都要归功于channel。channel是go提供的过程内协程间的通信形式,它是协程/线程平安的,channe的读写阻塞会导致协程的切换。
channel的操作和状态组合能够有以下几种状况:

**有1个非凡场景**:当`nil`的通道在`select`的某个`case`中时,这个case会阻塞,但不会造成死锁。

channel不仅能够保障协程平安的数据流动,还能够保障协程的同步。当有并发问题时,channel也是咱们首先应该想到的数据结构。不过,当应用有缓冲区的channel时,能力达到协程并发的成果,并且生产者和消费者的协程间是绝对同步的。应用无缓冲区的channel时,是没有并发成果的,协程间是相对同步的,生产者和消费者必须同时写和读协程能力运行。
channel关注的是数据的流动,这种场景下都能够思考应用channel。比方:消息传递、信号播送、工作散发、后果汇总、同步与异步、并发管制... 更多的不在这里赘述了,总之,Share memory by communicating, don't communicate by sharing memory.

流水线模型实例

举个简略栗子,计算80000以内的质数并输入。
这个例子如果咱们采纳非并发的形式,就是for循环80000,挨个判断是不是素数再输入。不过如果咱们采纳流水线的并发模型会更高效。
从数据流动的角度来剖析,须要遍历生成1-80000的数字到一个channel中,数字判断是否为素数,输入后果到一个channel中。因而咱们须要两个channel,channel的两端就设计成协程即可。
1、遍历生成原始80000个数据(生产者)
2、计算这80000个数据中的素数(生产者+消费者)
3、取后果输入(消费者)

package gen_channelimport "fmt"import "time"func generate_source(data_source_chan chan int) {   for i := 1; i <= 80000; i++ {      data_source_chan <- i   }   fmt.Println("写入协程完结")   close(data_source_chan)}func generate_sushu(data_source_chan chan int, data_result_chan chan int, gen_chan chan bool) {   for num:= range data_source_chan {      falg := true for i := 2; i < num; i++ {         if num%i == 0 {            falg = false break }      }      if falg == true {         data_result_chan <- num      }   }   fmt.Println("该协程完结")   gen_chan <- true}func workpool(data_source_chan chan int, data_result_chan chan int, gen_chan chan bool, gen_num int){   // 开启8个协程 for i := 0; i < gen_num; i++ {      go generate_sushu(data_source_chan, data_result_chan, gen_chan)   }}func Channel_main() {   data_source_chan := make(chan int, 2000)   data_result_chan := make(chan int, 2000)   gen_chan := make(chan bool, 8)   time1 := time.Now().Unix()   go generate_source(data_source_chan)   // 协程池,工作散发   workpool(data_source_chan, data_result_chan, gen_chan, 8)   go func() {      for i := 0; i < 8; i++ {         <-gen_chan      }      close(data_result_chan)      fmt.Println("spend timeis ", time.Now().Unix()-time1)   }()   for date_result := range data_result_chan {      fmt.Println(date_result)   }}