转载自:go channel原理及应用场景
源码解析
type hchan struct { qcount uint // Channel 中的元素个数 dataqsiz uint // Channel 中的循环队列的长度 buf unsafe.Pointer // Channel 的缓冲区数据指针 elemsize uint16 // 以后 Channel 可能收发的元素大小 closed uint32 elemtype *_type // 以后 Channel 可能收发的元素类型 sendx uint // Channel 的发送操作解决到的地位 recvx uint // Channel 的接管操作解决到的地位 recvq waitq // 以后 Channel 因为缓冲区空间有余而阻塞的 Goroutine 列表,双向链表(sugog) sendq waitq // 以后 Channel 因为缓冲区空间有余而阻塞的 Goroutine 列表,双向链表(sugog) // lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another G's status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. lock mutex}
创立channel
channel的初始化有2种,一种是没有缓冲区的channel,一种是有缓冲区的channel。对应的初始化之后hchan也是有区别的。
无缓冲区的channel,初始化的时候只为channel分配内存,缓冲区dataqsiz的长度为0
有缓冲的channel,初始化时会为channel和缓冲区分配内存,dataqsiz长度大于0
同时channel的元素大小和缓冲区的长度都是有大小限度的
func makechan(t *chantype, size int) *hchan { elem := t.elem // compiler checks this but be safe. if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") } // 如果内存超了,或者调配的内存大于channel最大分配内存,或者调配的size小于0,间接Panic mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers. // buf points into the same allocation, elemtype is persistent. // SudoG's are referenced from their owning thread so they can't be collected. // TODO(dvyukov,rlh): Rethink when collector can move allocated objects. var c *hchan switch { case mem == 0: // 如果没有缓冲区,调配一段内存 c = (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. c.buf = c.raceaddr() case elem.ptrdata == 0: // 有缓冲时,如果元素不蕴含指针类型,会为以后的 Channel 和底层的数组调配一块间断的内存空间 c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // 有缓冲区,且元素蕴含指针类型,channel和buf数组各自分配内存 c = new(hchan) c.buf = mallocgc(mem, elem, true) } // 元素大小,元素类型,循环数组长度,更新到channel c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) lockInit(&c.lock, lockRankHchan) if debugChan { print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n") } return c}
发送数据(ch <- i)
发送数据前会加锁,避免多个线程并发批改数据。如果channel曾经敞开,间接Panic
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) }
当存在期待的接收者时,通过
runtime.send
间接将数据发送给阻塞的接收者当channel的recvq队列不为空,而且channel是没有数据数据写入的。这个时候如果有数据写入,会间接把数据拷贝到接收者变量所在的内存地址上。即便这是一个有缓冲的channel,当有期待的接收者时,也是间接给接收者,不会先保留到循环队列
// 如果指标 Channel 没有被敞开并且曾经有处于读期待的 Goroutine,那么 runtime.chansend 会从接管队列 recvq 中取出最先陷入期待的 Goroutine 并间接向它发送数据if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true }// func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if sg.elem != nil { // 调用 runtime.sendDirect 将发送的数据间接拷贝到 x = <-c 表达式中变量 x 所在的内存地址上 sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) // 调用 runtime.goready 将期待接收数据的 Goroutine 标记成可运行状态 Grunnable 并把该 Goroutine 放到发送方所在的处理器的 runnext 上期待执行,该处理器在下一次调度时会立即唤醒数据的接管方; // 须要留神的是,发送数据的过程只是将接管方的 Goroutine 放到了处理器的 runnext 中,程序没有立即执行该 Goroutine goready(gp, skip+1)}
当缓冲区存在空余空间时,将发送的数据写入 Channel 的缓冲区
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ... // 如果以后元素数小于循环队列的长度 if c.qcount < c.dataqsiz { // 应用 runtime.chanbuf 计算出下一个能够存储数据的地位 qp := chanbuf(c, c.sendx) // 将发送的数据拷贝到缓冲区中 typedmemmove(c.elemtype, qp, ep) // 发送的地位索引+1 c.sendx++ // 如果循环队列满了就从0开始 // 因为这里的 buf 是一个循环数组,所以当 sendx 等于 dataqsiz 时会从新回到数组开始的地位 if c.sendx == c.dataqsiz { c.sendx = 0 } // 减少以后元素数 c.qcount++ unlock(&c.lock) return true } ...}
当不存在缓冲区或者缓冲区已满时,期待其余 Goroutine 从 Channel 接收数据
当因为不存在缓冲区或者缓冲区已满无奈写入时,会结构sudog期待执行的gorutine构造,放到hchan的期待队列中,直到被唤醒,把数据放到缓冲区或者间接拷贝给接收者
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ... // 应用 select 关键字能够向 Channel 非阻塞地发送音讯 if !block { unlock(&c.lock) return false } // 获取发送数据应用的 Goroutine gp := getg() // 获取 runtime.sudog 构造 mysg := acquireSudog() // 设置待发送数据的内存地址 mysg.elem = ep // 设置发送数据的goroutine mysg.g = gp mysg.isSelect = false // 设置发送的channel mysg.c = c // 设置到goroutine的waiting上 gp.waiting = mysg // 退出到发送期待队列 c.sendq.enqueue(mysg) // 阻塞期待唤醒 atomic.Store8(&gp.parkingOnChan, 1) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) KeepAlive(ep) // someone woke us up. if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false closed := !mysg.success gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releaseSudog(mysg) if closed { if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } return true}
接收数据(<- ch)
从一个空 Channel 接收数据
goroutine会让出使用权,并阻塞期待
if c == nil { if !block { return } // 让出使用权 gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") } // 不获取锁的状况下,查看失败的非阻塞操作 if !block && empty(c) { // 显示未敞开,持续返回false,因为channel不会从新关上 if atomic.Load(&c.closed) == 0 { return } if empty(c) { // The channel is irreversibly closed and empty. if raceenabled { raceacquire(c.raceaddr()) } // Channel 曾经被敞开并且缓冲区中不存在任何数据,那么会革除 ep 指针中的数据并立即返回 if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } lock(&c.lock) if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) // Channel 曾经被敞开并且缓冲区中不存在任何数据,那么会革除 ep 指针中的数据并立即返回 if ep != nil { typedmemclr(c.elemtype, ep) } return true, false }
当存在期待的发送者时,通过
runtime.recv
从阻塞的发送者或者缓冲区中获取数据如果是无缓冲的channel,当有接收者进来时,会间接从阻塞的发送者拷贝数据
如果是有缓冲的channel,当有接收者进来时,会先从缓冲区拿数据,接着期待的发送者会把数据拷贝到缓冲区
留神这个时候并没有间接去唤醒发送者,而是放到下次p的执行队列中中,下次调度时会唤醒发送者,发送者会做一些开释资源的操作
if sg := c.sendq.dequeue(); sg != nil { recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true }func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if c.dataqsiz == 0 { if raceenabled { racesync(c, sg) } if ep != nil { // 如果无缓存,间接从发送者拷贝数据 recvDirect(c.elemtype, sg, ep) } } else { // 因为队列已满,接收数据的索引和发送数据的索引统一 qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil) racenotify(c, c.recvx, sg) } // 数据从队列拷贝到指标内存地址 if ep != nil { typedmemmove(c.elemtype, ep, qp) } // 数据从发送者拷贝到缓冲区 typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } // 无论产生哪种状况,运行时都会调用 runtime.goready 将以后处理器的 runnext 设置成发送数据的 Goroutine,在调度器下一次调度时将阻塞的发送方唤醒。 goready(gp, skip+1)}
当缓冲区存在数据时,从 Channel 的缓冲区中接收数据
if c.qcount > 0 { // 间接从队列取数据 qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil) } // 放到指标内存 if ep != nil { typedmemmove(c.elemtype, ep, qp) } // 清空队列中对应的元素 typedmemclr(c.elemtype, qp) // 接管索引+1 c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } // 队列元素-1 c.qcount-- unlock(&c.lock) return true, true }
当缓冲区中不存在数据时,期待其余 Goroutine 向 Channel 发送数据
if !block { unlock(&c.lock) return false, false } // no sender available: block on this channel. gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) // 阻塞期待,让出使用权 atomic.Store8(&gp.parkingOnChan, 1) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) // 唤醒之后清空sudog if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } success := mysg.success gp.param = nil mysg.c = nil releaseSudog(mysg) return true, success
敞开channel
当 Channel 是一个空指针或者曾经被敞开时,Go 语言运行时都会间接解体并抛出异样
func closechan(c *hchan) { if c == nil { panic(plainError("close of nil channel")) } lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) }
将
recvq
和sendq
两个队列中的数据退出到 Goroutine 列表gList
中,与此同时该函数会革除所有runtime.sudog
上未被解决的元素c.closed = 1 var glist gList // release all readers for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } // release all writers (they will panic) for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } unlock(&c.lock) // 为所有被阻塞的 Goroutine 调用 runtime.goready 触发调度。 for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) }
应用场景
报错情景
- 往一个敞开的channel发送数据会报错:panic: send on closed channel
- 敞开一个nil的chan会报错:panic: close of nil channel
- 敞开一个曾经敞开的channel报错:panic: close of closed channel
1、一个经典的算法题
有4个goroutine,编号为1、2、3、4。每秒钟会有一个goroutine打印出本人的编号,要求写一个程序,让输入的编号总是依照1、2、3、4、1、2、3、4...的程序打印进去
package mainimport ( "fmt" "time")func main() { // 4个channel chs := make([]chan int, 4) for i, _ := range chs { chs[i] = make(chan int) // 开4个协程 go func(i int) { for { // 获取以后channel值并打印 v := <-chs[i] fmt.Println(v + 1) time.Sleep(time.Second) // 把下一个值写入下一个channel,期待下一次生产 chs[(i+1)%4] <- (v + 1) % 4 } }(i) } // 往第一个塞入0 chs[0] <- 0 select {}}
2、限流器
package mainimport ( "fmt" "time")func main() { // 每次解决3个申请 chLimit := make(chan struct{}, 3) for i := 0; i < 20; i++ { chLimit <- struct{}{} go func(i int) { fmt.Println("上游服务解决逻辑...", i) time.Sleep(time.Second * 3) <-chLimit }(i) } time.Sleep(30 * time.Second)}
如果感觉sleep太丑太暴力,能够用waitGroup管制完结机会
package mainimport ( "fmt" "sync" "time")var wg sync.WaitGroupfunc main() { // 每次解决3个申请 chLimit := make(chan struct{}, 3) for i := 0; i < 20; i++ { chLimit <- struct{}{} wg.Add(1) go func(i int) { fmt.Println("上游服务解决逻辑...", i) time.Sleep(time.Second * 3) <-chLimit wg.Done() }(i) } wg.Wait()}
3、优雅退出
package mainimport ( "fmt" "log" "os" "os/signal" "syscall" "time")func main() { var closing = make(chan struct{}) var closed = make(chan struct{}) go func() { for { select { case <-closing: return default: fmt.Println("业务逻辑...") time.Sleep(1 * time.Second) } } }() termChan := make(chan os.Signal) // 监听退出信号 signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM) <-termChan // 退出中 close(closing) // 退出之前清理一下 go doCleanup(closed) select { case <-closed: case <-time.After(time.Second): log.Println("清理超时不等了") } log.Println("优雅退出")}func doCleanup(closed chan struct{}) { time.Sleep(time.Minute) // 清理完后退出 close(closed)}
4、实现互斥锁
初始化一个缓冲区为1的channel,放入元素代表一把锁,谁获取到这个元素就代表获取了这把锁,开释锁的时候再把这个元素放回channel
package mainimport ( "log" "time")type Mutex struct { ch chan struct{}}// 初始化锁func NewMutex() *Mutex { mu := &Mutex{make(chan struct{}, 1)} mu.ch <- struct{}{} return mu}// 加锁,阻塞获取func (m *Mutex) Lock() { <- m.ch}// 开释锁func (m *Mutex) Unlock() { select { // 胜利写入channel代表开释胜利 case m.ch <- struct{}{}: default: panic("unlock of unlocked mutex") }}// 尝试获取锁func (m *Mutex) TryLock() bool { select { case <-m.ch: return true default: } return false}func (m *Mutex) LockTimeout(timeout time.Duration) bool { timer := time.NewTimer(timeout) select { case <-m.ch: // 胜利获取锁敞开定时器 timer.Stop() return true case <-timer.C: } // 获取锁超时 return false}// 是否上锁func (m *Mutex) IsLocked() bool { return len(m.ch) == 0}func main() { m := NewMutex() ok := m.TryLock() log.Printf("locked v %v\n", ok) ok = m.TryLock() log.Printf("locked v %v\n", ok) go func() { time.Sleep(5*time.Second) m.Unlock() }() ok = m.LockTimeout(10*time.Second) log.Printf("LockTimeout v %v\n", ok)}
参考:
极刻工夫《go 并发编程实战》