Go语言并发2——Channel

34次阅读

共计 7798 个字符,预计需要花费 20 分钟才能阅读完成。

Go 语言并发 2——Channel
1、什么是 channel
channel 是一种架设在 goroutine 之间进行 通信的管道,类似队列。channel 是引用类型,类型为 chan,可以通过 make 关键字进行创建指定类型的 channel。channel 存在的意义是让 goroutine 通过通信来共享内存,一个往通道发送数据,一个从通道获取数据,来实现数据同步。
2、channel 的创建和传递
声明通道时,需要指定将要被共享的数据的类型。可以通过通道共享内置类型、命名类型、结构类型和引用类型的值或者指针。
2.1 make 关键字创建
ch:=make(chan int) // 创建一个 int 类型的 channel, 所以这个 channel 只能发送接收 int 型的数据
ch1:=make(chan int 10)// 这个是有缓冲的 buffer,这个下面会解释
2.2 <- 运算符 读和取
ch <- 2 // 发送数值 2 给这个通道
x:=<-ch // 从通道里读取值,并把读取的值赋值给 x 变量
<-ch // 从通道里读取值,然后忽略
2.3 close 函数关闭 channel
close(ch) // 可以使用内置函数 close 关闭通道
如果一个通道被关闭了,我们就不能往这个通道里发送数据了,如果发送的话,会引起 painc 异常。但是,我们还可以接收通道里的数据,如果通道里没有数据的话,接收的数据是 nil。
package main
import “fmt”
func main() {
ch:=make(chan int)// 创建 int 型的无缓冲的 channel
go func() {
sum:=0
for i:=0;i<100;i++{
sum+=i
}
ch<-sum // 将 goroutine 算出的数放进通道
}()
fmt.Println(<-ch) // 从通道获取数据,退出 main 函数,如果 channel 还没有存入数据,就会阻塞等待
}
上面这个简单的例子,执行顺序是先创建一个 channel,开启一个 goroutine 进行计算,然后打印从 channel 取出的数。会先执行 fmt.Println(<-ch),这时候 goroutine 还没有往里面写数据,此时 main 函数进入等待。一直到 goroutine 运算完将 sum 发送给 channel,这时候 main 函数马上收到数据,打印完退出。
3、无缓冲的 channel
无缓冲的通道指的是通道的大小为 0,也就是说,这种类型的通道在接收前没有能力保存任何值,它要求发送 goroutine 和接收 goroutine 同时准备好,才可以完成发送和接收操作。
接下来看一个小案例,channel 好比是乒乓球的球桌,球好比数据,数据在 channel 通信就好比乒乓球在球桌上来回弹,而两个 goroutine 就是两位选手,这个就是无缓冲的 channel 的例子
package main

import (
“fmt”
“math/rand”
“sync”
“time”
)
var wg sync.WaitGroup
func main() {
ch := make(chan int) // 乒乓球台看做通道
wg.Add(2) // 2 个 goroutine
// 相当于两个选手对打
go player(“ 张继科 ”, ch)
go player(“ 马龙 ”, ch)
ch <- 1 // 发球
wg.Wait() // 等待比赛结束

}
func player(name string, ch chan int) {
defer wg.Done()// 一方输了就告诉 main 函数,裁判不要等了
for {
ball, ok := <-ch
if !ok {// 如果通道关闭
fmt.Printf(“%s 赢了!!\n”, name)
return
}
n := rand.Intn(100)
if n%13 == 0 {// 随机数来决定自己是否失误
fmt.Printf(“%s 输了 \n”, name)
close(ch) // 输了就得关闭通道
return
}
fmt.Printf(“%s 击球第 %d 次 \n”, name, ball)
ball++
ch <- ball

}
}

从上面的例子看出,通道是球桌,球在球桌上来回传递,统计次数。两个选手用 for 循环持续的在接收和发送数据,也就是要么接球要么发球。之所以他们在相互等待对方是因为这个 channel 是一个无缓冲的 channel,也就是球不能放在球桌上,球桌只管传递,不能存储。再看一个例子:
package main
import (
“fmt”
“sync”
“time”
)
var wg sync.WaitGroup
func main() {//4×100 米接力比赛
ch:=make(chan int) // 接力棒
wg.Add(1) // 需要等待的是最后一棒
go runing(ch)
ch<-1
fmt.Println(“ 比赛开始 ”)
wg.Wait()
}
func runing(ch chan int){
var newRunner int
runner:=<-ch // 接棒
fmt.Printf(“ 第 %d 棒正在跑 \n”,runner)

time.Sleep(time.Second)// 跑步中
if runner==4 {// 第四棒
fmt.Printf(“ 跑完了 \n”)
wg.Done()
return
}else{// 没跑完,创建下一棒
newRunner=runner+1
fmt.Printf(“ 第 %d 棒准备就绪 \n”,newRunner)
go runing(ch)// 等待接棒
fmt.Printf(“ 接力棒传递给第 %d 棒 \n”,newRunner)
ch<-newRunner // 接力
}
}
上面的例子很有趣,模拟 4 ×100 米接力,我们创建一个无缓冲的 channel,比作接力棒,只有双方都准备好接收和发送,接力才会发生,不然一方就会处于等待期。传递给 channel 一个数字 1,表示比赛开始,第一棒取出 runner,只要不是第 4 棒就需要往下一棒传递,所以,就创建了第二棒,让他准备继续接力。知道 runner==4,比赛结束。
总结下:无缓冲 channel,可存储的大小为 0,它保证进行发送和接收的 goroutine 会在同一时间进行数据交换。如果发送方没有准备好发送,接收方会进入阻塞,等待发送。
4、有缓冲的 channel
有缓冲通道,其实是一个队列,这个队列的最大容量就是我们使用 make 函数创建通道时,通过第二个参数指定的。
ch := make(chan int, 5)
这里创建容量为 5 的,有缓冲的通道。对于有缓冲的通道,向其发送操作就是向队列的尾部插入元素,接收操作则是从队列的头部删除元素,并返回这个刚刚删除的元素。
当队列满的时候,发送操作会阻塞;当队列空的时候,接收操作会阻塞。有缓冲的通道,不要求发送和接收操作时同步的,相反可以解耦发送和接收操作。
// cap 和 len 函数同样对于有缓冲的 channel 可用,
cap(ch) //channel 容量
len(ch) // 当前 channel 内的数量
看代码:
func mirroredQuery() string {
responses := make(chan string, 3)
go func() { responses <- request(“asia.gopl.io”) }()
go func() { responses <- request(“europe.gopl.io”) }()
go func() { responses <- request(“americas.gopl.io”) }()
return <-responses // return the quickest response
}
func request(hostname string) (response string) {/* … */}
我们定义了一个容量为 3 的通道 responses,然后同时发起 3 个并发 goroutine 向这三个镜像获取数据,获取到的数据发送到通道 responses 中,最后我们使用 return <-responses 返回获取到的第一个数据,也就是最快返回的那个镜像的数据。
5、单项通道
为了避免 channel 混乱使用,还可以在定义的时候定义这个 channel 是单项的,即只能发送数据,或者只能接受数据。比如:
var send chan<- int // 只能发送——只能往 channel 里发数据
var receive <-chan int // 只能接收——只能从 channel 中取

// 我们定义函数的时候,可以明确声明接受的参数
func test(ch chan<- int){
// 接受的是一个只能发送数据的 channel,
}
区别主要在于 <- 符号的位置,在后面,往里发,,在前面,从里面取。好像一列车穿过隧道。
注意:不能把单项 channel 转换为普通 channel
d := (chan int)(send) // Error: cannot convert type chan<- int to type chan int
d := (chan int)(receive) // Error: cannot convert type <-chan int to type chan int
6、forange 迭代
package main

import (
“fmt”
)

func main() {
data := make(chan int) // 数据交换队列
exit := make(chan bool) // 退出通知
go func() {
for d := range data {// 从队列迭代接收数据,直到 close。
fmt.Println(d)
}
fmt.Println(“recv over.”)
exit <- true // 发出退出通知。
}()
data <- 1 // 发送数据。
data <- 2
data <- 3
close(data) // 关闭队列。
fmt.Println(“send over.”)
<-exit // 等待退出通知。
}
forange 用于 channel 有一个特点,就是一直进行迭代,不管 channel 有没有数据,直到 channel(close)关闭。这样既安全又便利,当 channel 关闭时,for 循环会自动退出,无需主动监测 channel 是否关闭,可以防止读取已经关闭的 channel,造成读到数据为通道所存储的数据类型的零值。
7、select 关键字
之前的例子都是使用 1 个 channel 进行通信,当我们使用多个 channel 进行通信时,就需要用到 select 关键字来进行管理。

可处理一个或多个 channel 的发送和接收
同时又多个可用的 channel 的按随机顺序处理
可用空的 select 来阻塞 main 函数
可设置超时

当通道为 nil 时,对应的 case 永远为阻塞,无论读写。特殊关注:而普通情况下,对 nil 的通道写操作是要 panic 的。

7.1 处理多个 channel 发送和接收
package main

import (
“fmt”
“os”
)
func main() {
a, b := make(chan int, 3), make(chan int)
go func() {
v, ok, s := 0, false, “”
for {
select {// 随机选择可用 channel,接收数据。
case v, ok = <-a:
s = “a”
case v, ok = <-b:
s = “b”
}
if ok {
fmt.Println(s, v)
} else {
os.Exit(0)
}
}
}()
for i := 0; i < 5; i++ {
select {// 随机选择可用 channel,发送数据。
case a <- i:
case b <- i:
}
}
close(a)
select {} // 没有可用 channel,阻塞 main goroutine。
}

我们的例子中,使用 select 有点像 switch,它可以管理多个 channel,随机的发送也可以随机的获取数据。最后我们用 select{} 很巧妙的阻塞了 main goroutine , 因为没有可用的 channel,它进入阻塞直到 channel 关闭,执行了 os.Exit(0) main 函数才推出。
7.2 设置超时
package main

import (
“fmt”
“time”
)
func main() {
exit := make(chan bool)
c1 := make(chan int, 2)
c2 := make(chan string, 2)

go func() {
select {
case vi := <-c1:
fmt.Println(vi)
case vs := <-c2:
fmt.Println(vs)
case <-time.After(time.Second * 3):
fmt.Println(“timeout.”)
}

exit <- true
}()
// 我们先把发送数据代码注释掉。
// 这里我们并没有向 c1 和 c2 发送任何数据,select 超时后就会打印 timeout
//c1<-10
//c2<-“ 加油 ”
<-exit
}
当然 select 还有 default,但是在循环中使用 default 一定要小心,小心,小心。
8、channel 总结
channel 存在 3 种状态

nil,未初始化的状态,只进行了声明,或者手动赋值为 nil

active,正常的 channel,可读或者可写
closed, 已关闭的 channel。关闭的 channel 存储的是类型零值。

操作
nil 通道
closed 关闭的通道
active 正常通道

close
panic
panic
成功

ch<-
死锁
panic
阻塞或成功

<-ch
死锁
零值
阻塞或成功

对于 nil 通道的情况,也并非完全遵循上表,有 1 个特殊场景:当 nil 的通道在 select 的某个 case 中时,这个 case 会阻塞,但不会造成死锁。

channel 分为有缓冲和无缓冲
channel 有单项 channel
可以使用 forange 迭代,直到 channel 关闭
可以用 select 管理多个 channel,随机处理读和写
可以用 select{} 阻塞 main 函数
select 可以设置超时

9、channel 应用场景小结
9.1 forange 迭代,无需关注 channel 是否关闭
场景:在需要不断从 channel 取数据时,而不用关心 channel 是否关闭
for x := range ch{
fmt.Println(x)
}
// 会一直迭代,直到 channel 关闭
9.2 使用_,ok 判断 channel 是否关闭
场景:在不确定 channel 是否关闭时,使用
if v, ok := <- ch; ok {
fmt.Println(v)
}
ok 的含义:

true:读到数据,并且通道没有关闭。

false:通道关闭,无数据读到。

9.3 使用 select 处理多个 channel
场景:需要对多个通道进行同时处理,但只处理最先发生的 channel 时,见上面的例子
注意:当通道为 nil 时,对应的 case 永远为阻塞,无论读写。特殊关注:普通情况下,对 nil 的通道写操作是要 panic 的。
9.4 使用 channel 传递结构体时,用指针
场景:channel 传递的数据是结构体时,最好用指针。
channel 本质上传递的是数据的拷贝,拷贝的数据越小传输效率越高,传递结构体指针,比传递结构体更高效
9.5 简单⼯⼚模式打包并发任务和 channel
package main
import (
“math/rand”
“time”
)
func NewTest() chan int { // 简单工厂方法返回一个 channel
c := make(chan int)
rand.Seed(time.Now().UnixNano())
go func() {
time.Sleep(time.Second)
c <- rand.Int()
}()
// 并且返回的 channel 是已经准备好发送的,阻塞中,只要接收方一准备好,立马数据就传递出去了
return c
}
func main() {
t := NewTest()
println(<-t) // 等待 goroutine 结束返回。
}
9.6 ⽤ channel 实现信号量 (semaphore)
简单解释下信号量,也叫信号灯,是可以用来保证两个或多个关键代码段不被并发调用。信号量有四种操作,1、初始化,2、等信号 3、发信号 4、清理
package main
import (
“fmt”
“sync”
)
func main() {
wg := sync.WaitGroup{}
wg.Add(3)
sem := make(chan int,1)
for i := 0; i < 3; i++ {
go func(id int) {
defer wg.Done()
sem <- 1 // 向 sem 发送数据,阻塞或者成功。
fmt.Printf(“ 第 %d 个 \n”,id)
for x := 0; x < 3; x++ {
fmt.Println(id, x)
}
<-sem // 接收数据,使得其他阻塞 goroutine 可以发送数据。
}(i)
}
wg.Wait()
}
// 输出
第 2 个
2 0
2 1
2 2
第 0 个
0 0
0 1
0 2
第 1 个
1 0
1 1
1 2

这里的 channel 是一个容量为 1 的有缓冲的通道。也就是说,它只能存一个信号,比如开了 3 个 goroutine,只有一个能发送进去,其他的都会阻塞,等到这个 goroutine 处理完自己的事情,将数据取出 <-,那么第二个 goroutine 就会发送,执行,然后取出。接着是第三个 goroutine。这样就实现了信号量,保证 goroutine 一个个的执行。
9.7 利用从 closed channel 取值,发出退出的通知
应用场景:关闭所有下游的 goroutine
nil 的 channel 在 select 中是永久阻塞的,case 是不会走的,但是关闭了的 channel,就会走。
从关闭了的 channel 中取值 <- 是不会引发 panic,会取出零值。
实现思路就是:在 channel 取值,是阻塞的,只要一关闭 channel,取值就是零值,然后执行退出就可以了。
通过将 nil channel 关闭,使 select 的 阻塞 channel 变为取出零值,case 退出代码执行,所有读取这个 channel 的 goroutine 就会执行关闭代码。
package main
import (
“sync”
“time”
)
func main() {
var wg sync.WaitGroup
quit := make(chan bool)
for i := 0; i < 2; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for {
select {
case <-quit: // closed channel 不会阻塞,会取出零值,因此可用作退出通知。
return
default: // 执行正常任务。
func() {
println(id, time.Now().Nanosecond())
time.Sleep(time.Second)
}()
}
}
}(i)
}
time.Sleep(time.Second * 5) // 让测试 goroutine 运⾏⼀会。
close(quit) // 发出退出通知。
wg.Wait()
}

9.8 channel 作为结构成员
channel 可以做未结构体的成员,可以封装的更好。
package main
import (
“fmt”
)

type Request struct {
data []int
ret chan int
}
func NewRequest(data …int) *Request {
return &Request{data, make(chan int, 1)}
}
// 使用结构体指针,效率更高。
func Process(req *Request) {
x := 0
for _, i := range req.data {
x += i
}
req.ret <- x
}
func main() {
req := NewRequest(10, 20, 30)
Process(req)
fmt.Println(<-req.ret)
}

正文完
 0