遇到的问题
异步对于绝大多数的开发而言并不陌生,在 go 语言中异步的实现变得异常方便。只要在执行的方法前加一个 go 关键字就可以实现异步操作。但是如果需求是,按照调用的先后顺序(FIFO)来返回值我们应该怎么办。大家都知道,一系列的方法调用如果使用了异步执行那么就并不能保证返回的先后顺序,返回的先后顺序取决于每个函数耗时的长短,耗时短的则会先返回。当然解决这个问题的办法有很多,在最近看的一本书中发现了 chan 嵌套 chan 可以很巧妙的实现这个需求。
没解决之前
先看一下没有使用嵌套 chan 的情况。代码很简单,方法 operation1 内部 sleep 1 秒 方法 operation2 内部 sleep 2 秒。5 次调用都在 goroutine 中执行,结果可以看到 5 个方法大约耗时 2 秒。
package main
import (
“fmt”
“sync”
“time”
)
func main() {
resultCh := make(chan string)
// 开一个 gotoutine 接受所有返回值并打印
go replay(resultCh)
// 使用 waitgroup 等待一下所有 gorountie 执行完毕,记录时间
wg := sync.WaitGroup{}
startTime := time.Now()
//operation1 内部 sleep 1 秒
//operation2 内部 sleep 2 秒
// 如果是同步执行下列调用需要 8 秒左右
// 目前用异步调用 理论上只需要 2 秒
// 但于丹的问题是 不能实现先进先出的需求
operation2(resultCh, “aaa”, &wg)
operation2(resultCh, “bbb”, &wg)
operation1(resultCh, “ccc”, &wg)
operation1(resultCh, “ddd”, &wg)
operation2(resultCh, “eee”, &wg)
wg.Wait()
endTime := time.Now()
fmt.Printf(“Process time %s”, endTime.Sub(startTime))
}
func replay(resultCh chan string)(){
for{
fmt.Println(<-resultCh)
}
}
func operation1(resultCh chan string, str string, wg *sync.WaitGroup)(){
wg.Add(1)
go func(str string,wg *sync.WaitGroup){
time.Sleep(time.Second*1)
resultCh <- “operation1:”+str
wg.Done()
}(str,wg)
}
func operation2(resultCh chan string, str string, wg *sync.WaitGroup)(){
wg.Add(1)
go func(str string,wg *sync.WaitGroup){
time.Sleep(time.Second*2)
resultCh <- “operation2:”+str
wg.Done()
}(str,wg)
}
结果:执行结果虽然是很理想,执行 5 个方法只用了 2 秒。但是违背了需求的先进先出(FIFO)的规则。返回顺序完全是根据函数耗时长短来决定。
operation1:ddd
operation1:ccc
operation2:aaa
operation2:eee
operation2:bbb
Process time 2.002555639s
如何解决
创建一个嵌套 chan,chan 中的值也是一个 chan,在执行的时候按照先后顺序添加。在 replay 就会按照先进先出的顺序读取,利用 chan 阻塞等待第一个完成再执行下一个 chan 的值。那么这样执行的时间是否会更长?答案是并不会有太大的影响。
package main
import (
“fmt”
“sync”
“time”
)
func main() {
resultCh := make(chan chan string, 5000)
wg := sync.WaitGroup{}
go replay(resultCh)
startTime := time.Now()
operation2(resultCh, “aaa”, &wg)
operation2(resultCh, “bbb”, &wg)
operation1(resultCh, “ccc”, &wg)
operation1(resultCh, “ddd”, &wg)
operation2(resultCh, “eee”, &wg)
wg.Wait()
endTime := time.Now()
fmt.Printf(“Process time %s”, endTime.Sub(startTime))
}
func replay(resultCh chan chan string)(){
for{
// 拿到一个 chan 读取值 这个时候拿到的是先进先出 因为所有方法是按顺序加入 chan 的
c := <- resultCh
// 读取嵌套 chan 中的值,这个时候等待 3 秒 因为是 operation2 中执行了 3 秒 在这 3 绵中 其实其余的 4 个方法也已经执行完毕。之后的方法则不需要等待 sleep 的时间
r := <-c
fmt.Println(r)
}
}
func operation1(ch chan chan string, str string, wg *sync.WaitGroup)(){
// 先创建一个 chan 兵给到嵌套 chan 占据一个通道 这个通道是阻塞的
c := make(chan string)
ch <- c
wg.Add(1)
go func(str string){
time.Sleep(time.Second*1)
c <- “operation1:”+str
wg.Done()
}(str)
}
func operation2(ch chan chan string, str string, wg *sync.WaitGroup)(){
c := make(chan string)
ch <- c
wg.Add(1)
go func(str string){
time.Sleep(time.Second*2)
c <- “operation2:”+str
wg.Done()
}(str)
}
结果:运行的结果还是 2 秒,但是结果却不同,完全是按照我们调用的顺序返回的。严格按照先进先出的规则。这样整体运行的时间其实取决于执行函数中耗时最长的那个函数。如果第一个函数耗时 5 秒 其余 4 个耗时 1 秒。那么整个 main 函数耗时也就是 5 秒
operation2:aaa
operation2:bbb
operation1:ccc
operation1:ddd
operation2:eee
Process time 2.002714923s
总结
其实解决此类问题的方法不止一个,比如在请求喝返回中添加标识等等。但我认为这个方法巧妙的运用了 chan 中嵌套 chan 和 go 语言 chan 阻塞的特性来实现这个功能代码简洁。性能也兵没有消耗太多,总体执行时间也并没有加长。也是一个参考的方法,这种嵌套实现也可以用到其他需要的特殊需求中。