共计 3522 个字符,预计需要花费 9 分钟才能阅读完成。
除了在 goroutine 之间安全的传递数据之外,在看了《Concurrency in Go》之后,感慨 channel 还有那么多模式可供使用,在个人的学习中总结了以下几种常用的模式
pipeline
概念
我们以爬虫为例,一般爬虫分为如下步骤:
抓取页面 -> 解析页面 -> 整合数据分析 -> 分析结果入库
如果你把上面所有的步骤都放在一个函数里面处理,那会是多难看,多难以维护,从解耦角度考虑,我们可以起四个进程,分别承担不同的角色,例如,进程 1 负责抓取页面,进程 2 负责解析页面,等等,各个进程拿到一个数据后,交给下一个进程来处理,这就是 pipeline 的基本思想,每个角色只负责关心自己的东西
示例
给定一个数 n,执行 (n2 + 1) 2 的操作
func pipeline() {generator := func(done chan interface{}, intergers ...int) <-chan int {inStream := make(chan int)
go func() {defer close(inStream)
for _, i := range intergers {
select {
case <-done:
return
case inStream <- i:
}
}
}()
return inStream
}
add := func(done <-chan interface{}, inStream <-chan int, increment int) <-chan int {addInStream := make(chan int)
go func() {defer close(addInStream)
for i := range inStream {
select {
case <-done:
return
case addInStream <- i + increment:
}
}
}()
return addInStream
}
multiply := func(done <-chan interface{}, inStream <-chan int, increment int) <-chan int {multiplyInStream := make(chan int)
go func() {defer close(multiplyInStream)
for i := range inStream {
select {
case <-done:
return
case multiplyInStream <- i * increment:
}
}
}()
return multiplyInStream
}
done := make(chan interface{})
defer close(done)
inStream := generator(done, []int{1, 2, 3, 4, 5, 6, 7}...)
pipeline := multiply(done, add(done, multiply(done, inStream, 2), 1), 2)
for v := range pipeline {fmt.Println(v)
}
}
扇入扇出
在 pipeline 模型中,是一种高效的流式处理,但是假如 pipeline 中有 a,b,c 三个环节,b 环节处理的特别慢,这时候就会影响到 c 环节的处理,如果增加 b 环节进程处理的数量,也就可以减弱 b 环节的慢处理对整个 pipeline 的影响,那么 a -> 多个 b 的过程就是 扇入 ,多个 b 环节输出数据到 c 环节,就是 扇出
示例
func FanInFanOut() {producer := func(intergers ...int) <-chan interface{} {inStream := make(chan interface{})
go func() {defer close(inStream)
for _, v := range intergers {time.Sleep(5 * time.Second)
inStream <- v
}
}()
return inStream
}
fanIn := func(channels ...<-chan interface{},
) <-chan interface{} {
var wg sync.WaitGroup
multiplexStream := make(chan interface{})
multiplex := func(c <-chan interface{}) {defer wg.Done()
for i := range c {multiplexStream <- i}
}
wg.Add(len(channels))
for _, c := range channels {go multiplex(c)
}
go func() {wg.Wait()
close(multiplexStream)
}()
return multiplexStream
}
consumer := func(inStream <-chan interface{}) {
for v := range inStream {fmt.Println(v)
}
}
nums := runtime.NumCPU()
producerStreams := make([]<-chan interface{}, nums)
for i := 0; i < nums; i++ {producerStreams[i] = producer(i)
}
consumer(fanIn(producerStreams...))
}
tee- channel
概念
假如你从 channel 中拿到了一条 sql 语句,这时候,你想对这条 sql 记录,分析并执行,那你就需要将这条 sql 分别转发给这三个任务对应的 channel,tee-channel 就是做这个事情的
示例
func teeChannel() {producer := func(intergers ...int) <-chan interface{} {inStream := make(chan interface{})
go func() {defer close(inStream)
for _, v := range intergers {inStream <- v}
}()
return inStream
}
tee := func(in <-chan interface{}) (_, _ <-chan interface{}) {out1 := make(chan interface{})
out2 := make(chan interface{})
go func() {defer close(out1)
defer close(out2)
for val := range in {
out1, out2 := out1, out2
for i := 0; i < 2; i++ {
select {
case out1 <- val:
out1 = nil
case out2 <- val:
out2 = nil
}
}
}
}()
return out1, out2
}
out1, out2 := tee(producer(1, 2, 3, 4, 5))
for val1 := range out1 {fmt.Printf("out1: %v, out2: %v", val1, <-out2)
}
}
桥接 channel
概念
无论是前面提到的 pipeline 还是 扇入扇出 ,每个 goroutine 都是对一个 channel 进行消费,但是实际场景中,可能会有多个 channel 来供给我们消费,而作为消费者,我们不关心这些值是来自于哪个 channel,这种情况下,处理一个充满 channel 的 channel 可能会很多。如果我们定义一个功能,可以将充满 channel 的 channel 拆解为一个简单的 channel,这将使消费者更专注于手头的工作,这就是 桥接 channel的思想
示例
func bridge() {gen := func() <-chan <-chan interface{} {in := make(chan (<-chan interface{}))
go func() {defer close(in)
for i := 0; i < 10; i++ {stream := make(chan interface{}, 1)
stream <- i
close(stream)
in <- stream
}
}()
return in
}
bridge := func(in <-chan (<-chan interface{})) <-chan interface{} {valStream := make(chan interface{})
go func() {defer close(valStream)
for {stream := make(<-chan interface{})
select {
case maybeStream, ok := <-in:
if ok == false {return}
stream = maybeStream
}
for val := range stream {valStream <- val}
}
}()
return valStream
}
for val := range bridge(gen()) {fmt.Println(val)
}
}