Channel
golang CSP 模型中的
C
, 主要用于 goroutine 之间消息的传递,我们知道在写代码的过程中,解偶是非常重要的一环,而使用 channel 则可以很好的隔离 goroutine,使得 goroutne 之间的交互,只需要将重心关注在如何从 channel 中消费或者生产消息。
- 声明和使用
- 阻塞场景
- 关闭 Channel
- select & range
- 使用 channel 模拟生产消费模型
声明和使用
使用 make 声明一个 channel
ch := make(chan int)
ch <- 1 // write ch 位于 <- 的左侧(代表数据流入
<- ch // read ch 位于 <- 的右侧(代表数据流出
阻塞场景
在真正使用 channel 前,我们需要了解 channel 可能会产生 阻塞 场景的所有可能,以防止在代码中编写出不符合我们预期的代码。
下面我们罗列出可能的四种情形
无缓冲
channel 中无数据,但是执行 <- channel (读
ch := make(chan interface{})
<-ch
fmt.Println("read buf succ")
channel 中无数据,往 channel <- (写 , 但是没有 goroutine 读取。
ch := make(chan interface{})
ch <- 1
fmt.Println("read buf succ")
有缓冲
channel 中无数据,但是执行 <- channel
ch := make(chan interface{}, 1)
<-ch
fmt.Println("read buf succ")
channel 中已满, 继续执行 channel <- 动作,但是没有 goroutine 读取。
ch := make(chan interface{}, 1)
ch <- 1
ch <- 2
fmt.Println("read buf succ")
关闭 Channel
使用 close 关闭 channel
ch := make(chan interface{})
close(ch)
关闭 channel 需要注意
- 重复关闭会 panic
- 向关闭的 channel 发送数据会 panic
- 从关闭的 channel 读取数据,会读取到值的初始值,比如 interface 类型,读取到的就是 nil
select & range
range 字段会阻塞监听 channel,直到 channel 被 close。
func recv(ch chan int) {
for msg := range ch { // 使用 range 可以自动等待 ch 的行为,直到 ch 被 close。fmt.Println(msg)
}
fmt.Println("channel closed")
}
func send(ch chan int, msg int) {ch <- msg}
func main() {ch := make(chan int, 2)
go recv(ch)
ch <- 1
ch <- 2
ch <- 3
time.AfterFunc(time.Second*2, func() {close(ch)
})
}
select 的大致工作原理
- 检查所有的
case
- 当检查的
case
已经可以发送|接收,则执行当前代码块- 当有多个
case
可以执行,则随机
选择一个执行- 当没有
case
可以执行,则阻塞- 如果存在
default
,当没有可执行代码块时,则执行default
代码块使用 select 来管理 channel 的读取, 通过 default 防止阻塞.
func readCh(ch chan interface{}) error {
select {
case v := <-ch:
fmt.Println(v)
default:
return errors.New("no data")
}
return nil
}
使用 timer 或者 context 来进行到期退出判定. 另外我们也可以使用 sync.Once()这种形式设定一个开关,
来控制 select 的退出逻辑,可参照 grpc/internal/grpcsync/event.go
func readCh(ch chan interface{}) error {
select {
case v := <-ch:
fmt.Println(v)
case <-time.After(time.Second):
return errors.New("time arrived")
}
return nil
}
使用 channel 模拟生产消费模型
下面代码的 Unbounded 实现摘自 grpc/internal/buffer/unbounded.go,
它没有选择使用带容量的 channel,而是另外使用了一个 list 来备份积压的消息,这里我猜有两个原因
- 使用这种方式 channel 变成了一个任意长度的 channel,不用考虑 channel 被写满导致的问题。
- 这里为什么不直接使用 list + mutex,因为需要 channel 的特性来隔离 goroutine。
type Unbounded struct {c chan interface{}
backlog []interface{}
sync.Mutex
}
func NewUnbounded() *Unbounded {return &Unbounded{c: make(chan interface{}, 1)}
}
func (b *Unbounded) Put(t interface{}) {b.Lock()
if len(b.backlog) == 0 {
select {
case b.c <- t:
b.Unlock()
return
default:
}
}
b.backlog = append(b.backlog, t)
b.Unlock()}
func (b *Unbounded) Load() {b.Lock()
if len(b.backlog) > 0 {
select {case b.c <- b.backlog[0]:
b.backlog[0] = nil
b.backlog = b.backlog[1:]
default:
}
}
b.Unlock()}
func (b *Unbounded) Get() <-chan interface{} {return b.c}
var q *Queue
type Queue struct {buf *Unbounded}
type QueueInterface interface {consume()
produce(info int)
}
func (q *Queue) consume() {
for {
select {case t := <-q.buf.Get():
q.buf.Load()
fmt.Println(t)
case <-time.After(time.Second * 10):
fmt.Println(errors.New("the end"))
}
}
}
func (q *Queue) produce(info int) {q.buf.Put(info)
}
func main() {
q := &Queue{buf: NewUnbounded(),
}
go q.consume()
q.produce(1)
q.produce(3)
time.AfterFunc(time.Second*2, func() {
for i := 0; i < 3; i++ {q.produce(4)
}
//q.produce(4)
})
select {}}
注: 这里的实现使用了 interface 作为 channel 的消息体,凡是在有性能瓶颈的地方应该使用具体的类型独立实现一版,类似 grpc/internal/transport.go 中的 recvBuffer