背景
最近因为性能问题,后端服务始终在做python到golang的迁徙和重构。go语言精简优雅,既有编译型语言的谨严和高性能,又有解释型语言的开发效率,杰出的并发性能也是go区别于其余语言的一大特色。go的并发编程代码尽管简略,但重在其并发模型和流程的设计。所以这里总结下golang协程并发罕用的流水线模型。
简略的流水线思维
流水线模式并不是什么离奇的概念,然而它能极大地提高生产效率。比方理论生存中的汽车生产流水线,流水线上的每一个流程负责不同的工作,比方第一个流程是拼装车身,第二个流程是装置发动机,第三个流程是装轮胎...,这些步骤咱们能够类比成go并发流程中的协程,每一个协程就是一个工作。流水线下面传递的车身、发动机、轮胎,这些咱们能够类比成协程间须要传递的数据,而在这些流程(协程)间传递这些配件(数据),天然就要通过传送带(channel)。在流水线上,咱们装四个轮胎必定不是一个一个来装的,必定是有四个机械臂同时来装。因而装轮胎这个步骤咱们有4个协程在并发工作来提高效率。这么一来,流水线模型的基本要素就形成了。
Golang的并发模型灵感其实都来自咱们生存,对程序而言,高的生产效率就是高的性能。在Golang中,流水线由多个流程节点组成,流程之间通过channel连贯,每个流程节点能够由多个同时运行的goroutine组成。
如何结构流水线
有了流水线模式的思维,接下来就是如何结构流水线了。简略来说,其实就是通过channel将工作流程连接起来,两个相邻的流程互为生产者和消费者,通过channel进行通信。耗时的流程能够将工作扩散到多个协程来执行。
咱们先来看一个最简略的流水线,如下图,A是生产者流程,B是它的生产流程,同时又是C的生产者流程。A,B,C三个协程间接,通过读写channel进行通信。
那如果此时B流程能够将a channel中的工作并发执行呢,很简略,咱们只须要起多个B协程就能够了。如下图。
总之,咱们结构流水线并发的思路是关注数据的流动,数据流动的过程交给channel,channel两端数据处理的每个环节都交给goroutine,这个流程连起来,就形成了流水线模型。
对于channel
为什么咱们能够抉择channel来进行协程间的通信呢,协程之间又是怎么放弃同步程序呢,当然这都要归功于channel。channel是go提供的过程内协程间的通信形式,它是协程/线程平安的,channe的读写阻塞会导致协程的切换。
channel的操作和状态组合能够有以下几种状况:
**有1个非凡场景**:当`nil`的通道在`select`的某个`case`中时,这个case会阻塞,但不会造成死锁。
channel不仅能够保障协程平安的数据流动,还能够保障协程的同步。当有并发问题时,channel也是咱们首先应该想到的数据结构。不过,当应用有缓冲区的channel时,能力达到协程并发的成果,并且生产者和消费者的协程间是绝对同步的。应用无缓冲区的channel时,是没有并发成果的,协程间是相对同步的,生产者和消费者必须同时写和读协程能力运行。
channel关注的是数据的流动,这种场景下都能够思考应用channel。比方:消息传递、信号播送、工作散发、后果汇总、同步与异步、并发管制... 更多的不在这里赘述了,总之,Share memory by communicating, don't communicate by sharing memory.
流水线模型实例
举个简略栗子,计算80000以内的质数并输入。
这个例子如果咱们采纳非并发的形式,就是for循环80000,挨个判断是不是素数再输入。不过如果咱们采纳流水线的并发模型会更高效。
从数据流动的角度来剖析,须要遍历生成1-80000的数字到一个channel中,数字判断是否为素数,输入后果到一个channel中。因而咱们须要两个channel,channel的两端就设计成协程即可。
1、遍历生成原始80000个数据(生产者)
2、计算这80000个数据中的素数(生产者+消费者)
3、取后果输入(消费者)
package gen_channelimport "fmt"import "time"func generate_source(data_source_chan chan int) { for i := 1; i <= 80000; i++ { data_source_chan <- i } fmt.Println("写入协程完结") close(data_source_chan)}func generate_sushu(data_source_chan chan int, data_result_chan chan int, gen_chan chan bool) { for num:= range data_source_chan { falg := true for i := 2; i < num; i++ { if num%i == 0 { falg = false break } } if falg == true { data_result_chan <- num } } fmt.Println("该协程完结") gen_chan <- true}func workpool(data_source_chan chan int, data_result_chan chan int, gen_chan chan bool, gen_num int){ // 开启8个协程 for i := 0; i < gen_num; i++ { go generate_sushu(data_source_chan, data_result_chan, gen_chan) }}func Channel_main() { data_source_chan := make(chan int, 2000) data_result_chan := make(chan int, 2000) gen_chan := make(chan bool, 8) time1 := time.Now().Unix() go generate_source(data_source_chan) // 协程池,工作散发 workpool(data_source_chan, data_result_chan, gen_chan, 8) go func() { for i := 0; i < 8; i++ { <-gen_chan } close(data_result_chan) fmt.Println("spend timeis ", time.Now().Unix()-time1) }() for date_result := range data_result_chan { fmt.Println(date_result) }}