背景
最近因为性能问题,后端服务始终在做 python 到 golang 的迁徙和重构。go 语言精简优雅,既有编译型语言的谨严和高性能,又有解释型语言的开发效率,杰出的并发性能也是 go 区别于其余语言的一大特色。go 的并发编程代码尽管简略,但重在其并发模型和流程的设计。所以这里总结下 golang 协程并发罕用的流水线模型。
简略的流水线思维
流水线模式并不是什么离奇的概念,然而它能极大地提高生产效率。比方理论生存中的汽车生产流水线,流水线上的每一个流程负责不同的工作,比方第一个流程是拼装车身,第二个流程是装置发动机,第三个流程是装轮胎 …,这些步骤咱们能够类比成 go 并发流程中的协程,每一个协程就是一个工作。流水线下面传递的车身、发动机、轮胎,这些咱们能够类比成协程间须要传递的数据,而在这些流程(协程)间传递这些配件(数据),天然就要通过传送带(channel)。在流水线上,咱们装四个轮胎必定不是一个一个来装的,必定是有四个机械臂同时来装。因而装轮胎这个步骤咱们有 4 个协程在并发工作来提高效率。这么一来,流水线模型的基本要素就形成了。
Golang 的并发模型灵感其实都来自咱们生存,对程序而言,高的生产效率就是高的性能。 在 Golang 中,流水线由多个流程节点组成,流程之间通过 channel 连贯,每个流程节点能够由多个同时运行的 goroutine 组成。
如何结构流水线
有了流水线模式的思维,接下来就是如何结构流水线了。简略来说,其实就是通过 channel 将工作流程连接起来,两个相邻的流程互为生产者和消费者,通过 channel 进行通信。耗时的流程能够将工作扩散到多个协程来执行。
咱们先来看一个最简略的流水线,如下图,A 是生产者流程,B 是它的生产流程,同时又是 C 的生产者流程。A,B,C 三个协程间接,通过读写 channel 进行通信。
那如果此时 B 流程能够将 a channel 中的工作并发执行呢,很简略,咱们只须要起多个 B 协程就能够了。如下图。
总之,咱们结构流水线并发的思路是关注数据的流动,数据流动的过程交给 channel,channel 两端数据处理的每个环节都交给 goroutine,这个流程连起来,就形成了流水线模型。
对于 channel
为什么咱们能够抉择 channel 来进行协程间的通信呢,协程之间又是怎么放弃同步程序呢,当然这都要归功于 channel。channel 是 go 提供的过程内协程间的通信形式,它是协程 / 线程平安的,channe 的读写阻塞会导致协程的切换。
channel 的操作和状态组合能够有以下几种状况:
** 有 1 个非凡场景 **:当 `nil` 的通道在 `select` 的某个 `case` 中时,这个 case 会阻塞,但不会造成死锁。
channel 不仅能够保障协程平安的数据流动,还能够保障协程的同步。当有并发问题时,channel 也是咱们首先应该想到的数据结构。不过,当应用有缓冲区的 channel 时,能力达到协程并发的成果,并且生产者和消费者的协程间是绝对同步的。应用无缓冲区的 channel 时,是没有并发成果的,协程间是相对同步的,生产者和消费者必须同时写和读协程能力运行。
channel 关注的是数据的流动,这种场景下都能够思考应用 channel。比方:消息传递、信号播送、工作散发、后果汇总、同步与异步、并发管制 … 更多的不在这里赘述了,总之,Share memory by communicating, don’t communicate by sharing memory.
流水线模型实例
举个简略栗子,计算 80000 以内的质数并输入。
这个例子如果咱们采纳非并发的形式,就是 for 循环 80000,挨个判断是不是素数再输入。不过如果咱们采纳流水线的并发模型会更高效。
从数据流动的角度来剖析,须要遍历生成 1 -80000 的数字到一个 channel 中,数字判断是否为素数,输入后果到一个 channel 中。因而咱们须要两个 channel,channel 的两端就设计成协程即可。
1、遍历生成原始 80000 个数据(生产者)
2、计算这 80000 个数据中的素数(生产者 + 消费者)
3、取后果输入(消费者)
package gen_channel
import "fmt"
import "time"
func generate_source(data_source_chan chan int) {
for i := 1; i <= 80000; i++ {data_source_chan <- i}
fmt.Println("写入协程完结")
close(data_source_chan)
}
func generate_sushu(data_source_chan chan int, data_result_chan chan int, gen_chan chan bool) {
for num:= range data_source_chan {
falg := true
for i := 2; i < num; i++ {
if num%i == 0 {
falg = false
break }
}
if falg == true {data_result_chan <- num}
}
fmt.Println("该协程完结")
gen_chan <- true
}
func workpool(data_source_chan chan int, data_result_chan chan int, gen_chan chan bool, gen_num int){
// 开启 8 个协程
for i := 0; i < gen_num; i++ {go generate_sushu(data_source_chan, data_result_chan, gen_chan)
}
}
func Channel_main() {data_source_chan := make(chan int, 2000)
data_result_chan := make(chan int, 2000)
gen_chan := make(chan bool, 8)
time1 := time.Now().Unix()
go generate_source(data_source_chan)
// 协程池, 工作散发
workpool(data_source_chan, data_result_chan, gen_chan, 8)
go func() {
for i := 0; i < 8; i++ {<-gen_chan}
close(data_result_chan)
fmt.Println("spend timeis", time.Now().Unix()-time1)
}()
for date_result := range data_result_chan {fmt.Println(date_result)
}
}