关于go:go-channel原理及使用场景

3次阅读

共计 10606 个字符,预计需要花费 27 分钟才能阅读完成。

转载自:go channel 原理及应用场景

源码解析

type hchan struct {
    qcount   uint           // Channel 中的元素个数
    dataqsiz uint           // Channel 中的循环队列的长度
    buf      unsafe.Pointer // Channel 的缓冲区数据指针
    elemsize uint16 // 以后 Channel 可能收发的元素大小
    closed   uint32
    elemtype *_type // 以后 Channel 可能收发的元素类型
    sendx    uint   // Channel 的发送操作解决到的地位
    recvx    uint   // Channel 的接管操作解决到的地位
  recvq    waitq  // 以后 Channel 因为缓冲区空间有余而阻塞的 Goroutine 列表,双向链表 (sugog)
    sendq    waitq  // 以后 Channel 因为缓冲区空间有余而阻塞的 Goroutine 列表,双向链表 (sugog)

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}

创立 channel

channel 的初始化有 2 种,一种是没有缓冲区的 channel,一种是有缓冲区的 channel。对应的初始化之后 hchan 也是有区别的。

无缓冲区的 channel,初始化的时候只为 channel 分配内存,缓冲区 dataqsiz 的长度为 0

有缓冲的 channel,初始化时会为 channel 和缓冲区分配内存,dataqsiz 长度大于 0

同时 channel 的元素大小和缓冲区的长度都是有大小限度的

func makechan(t *chantype, size int) *hchan {
    elem := t.elem

    // compiler checks this but be safe.
    if elem.size >= 1<<16 {throw("makechan: invalid channel element type")
    }
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {throw("makechan: bad alignment")
    }

  // 如果内存超了,或者调配的内存大于 channel 最大分配内存,或者调配的 size 小于 0,间接 Panic
    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {panic(plainError("makechan: size out of range"))
    }

    // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
    // buf points into the same allocation, elemtype is persistent.
    // SudoG's are referenced from their owning thread so they can't be collected.
    // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    var c *hchan
    switch {
    case mem == 0:
        // 如果没有缓冲区,调配一段内存
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race detector uses this location for synchronization.
        c.buf = c.raceaddr()
    case elem.ptrdata == 0:
        // 有缓冲时,如果元素不蕴含指针类型, 会为以后的 Channel 和底层的数组调配一块间断的内存空间
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // 有缓冲区,且元素蕴含指针类型,channel 和 buf 数组各自分配内存
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }

  // 元素大小,元素类型,循环数组长度,更新到 channel
    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)
    lockInit(&c.lock, lockRankHchan)

    if debugChan {print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
    }
    return c
}

发送数据 (ch <- i)

  • 发送数据前会加锁,避免多个线程并发批改数据。 如果 channel 曾经敞开,间接 Panic

    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {lock(&c.lock)
    
        if c.closed != 0 {unlock(&c.lock)
            panic(plainError("send on closed channel"))
        }
  • 当存在期待的接收者时,通过 runtime.send 间接将数据发送给阻塞的接收者

    当 channel 的 recvq 队列不为空,而且 channel 是没有数据数据写入的。这个时候如果有数据写入,会间接把数据拷贝到接收者变量所在的内存地址上。即便这是一个有缓冲的 channel,当有期待的接收者时,也是间接给接收者,不会先保留到循环队列

    // 如果指标 Channel 没有被敞开并且曾经有处于读期待的 Goroutine,那么 runtime.chansend 会从接管队列 recvq 中取出最先陷入期待的 Goroutine 并间接向它发送数据
    if sg := c.recvq.dequeue(); sg != nil {send(c, sg, ep, func() {unlock(&c.lock) }, 3)
            return true
        }
    
    // 
    func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
        if sg.elem != nil {
        // 调用 runtime.sendDirect 将发送的数据间接拷贝到 x = <-c 表达式中变量 x 所在的内存地址上
            sendDirect(c.elemtype, sg, ep)
            sg.elem = nil
        }
        gp := sg.g
        unlockf()
        gp.param = unsafe.Pointer(sg)
      // 调用 runtime.goready 将期待接收数据的 Goroutine 标记成可运行状态 Grunnable 并把该 Goroutine 放到发送方所在的处理器的 runnext 上期待执行,该处理器在下一次调度时会立即唤醒数据的接管方;// 须要留神的是,发送数据的过程只是将接管方的 Goroutine 放到了处理器的 runnext 中,程序没有立即执行该 Goroutine
        goready(gp, skip+1)
    }
  • 当缓冲区存在空余空间时,将发送的数据写入 Channel 的缓冲区

    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
        ...
      // 如果以后元素数小于循环队列的长度
        if c.qcount < c.dataqsiz {
        // 应用 runtime.chanbuf 计算出下一个能够存储数据的地位
            qp := chanbuf(c, c.sendx)
        // 将发送的数据拷贝到缓冲区中
            typedmemmove(c.elemtype, qp, ep)
        // 发送的地位索引 +1
            c.sendx++
        // 如果循环队列满了就从 0 开始
        // 因为这里的 buf 是一个循环数组,所以当 sendx 等于 dataqsiz 时会从新回到数组开始的地位
            if c.sendx == c.dataqsiz {c.sendx = 0}
        // 减少以后元素数
            c.qcount++
            unlock(&c.lock)
            return true
        }
        ...
    }
  • 当不存在缓冲区或者缓冲区已满时,期待其余 Goroutine 从 Channel 接收数据

    当因为不存在缓冲区或者缓冲区已满无奈写入时,会结构 sudog 期待执行的 gorutine 构造,放到 hchan 的期待队列中,直到被唤醒,把数据放到缓冲区或者间接拷贝给接收者

    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
        ...
      // 应用 select 关键字能够向 Channel 非阻塞地发送音讯
        if !block {unlock(&c.lock)
            return false
        }
    
      // 获取发送数据应用的 Goroutine
        gp := getg()
      //  获取 runtime.sudog 构造
        mysg := acquireSudog()
      // 设置待发送数据的内存地址
        mysg.elem = ep
      // 设置发送数据的 goroutine
        mysg.g = gp
      mysg.isSelect = false
      // 设置发送的 channel
        mysg.c = c
      // 设置到 goroutine 的 waiting 上
        gp.waiting = mysg
      // 退出到发送期待队列
        c.sendq.enqueue(mysg)
      // 阻塞期待唤醒
        atomic.Store8(&gp.parkingOnChan, 1)
        gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
        KeepAlive(ep)
    
        // someone woke us up.
        if mysg != gp.waiting {throw("G waiting list is corrupted")
        }
        gp.waiting = nil
        gp.activeStackChans = false
        closed := !mysg.success
        gp.param = nil
        if mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)
        }
        mysg.c = nil
        releaseSudog(mysg)
        if closed {
            if c.closed == 0 {throw("chansend: spurious wakeup")
            }
            panic(plainError("send on closed channel"))
        }
        return true
    }

接收数据 (<- ch)

  • 从一个空 Channel 接收数据

    goroutine 会让出使用权,并阻塞期待

        if c == nil {
            if !block {return}
        // 让出使用权
            gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
            throw("unreachable")
        }
    
        // 不获取锁的状况下,查看失败的非阻塞操作
        if !block && empty(c) {
            // 显示未敞开,持续返回 false,因为 channel 不会从新关上
            if atomic.Load(&c.closed) == 0 {return}
    
            if empty(c) {
                // The channel is irreversibly closed and empty.
                if raceenabled {raceacquire(c.raceaddr())
                }
          // Channel 曾经被敞开并且缓冲区中不存在任何数据,那么会革除 ep 指针中的数据并立即返回
                if ep != nil {typedmemclr(c.elemtype, ep)
                }
                return true, false
            }
        }
    
        var t0 int64
        if blockprofilerate > 0 {t0 = cputicks()
        }
    
        lock(&c.lock)
    
        if c.closed != 0 && c.qcount == 0 {
            if raceenabled {raceacquire(c.raceaddr())
            }
            unlock(&c.lock)
        // Channel 曾经被敞开并且缓冲区中不存在任何数据,那么会革除 ep 指针中的数据并立即返回
            if ep != nil {typedmemclr(c.elemtype, ep)
            }
            return true, false
        }
  • 当存在期待的发送者时,通过 runtime.recv 从阻塞的发送者或者缓冲区中获取数据

    如果是无缓冲的 channel,当有接收者进来时,会间接从阻塞的发送者拷贝数据

    如果是有缓冲的 channel,当有接收者进来时,会先从缓冲区拿数据,接着期待的发送者会把数据拷贝到缓冲区

    留神这个时候并没有间接去唤醒发送者,而是放到下次 p 的执行队列中中,下次调度时会唤醒发送者,发送者会做一些开释资源的操作

    if sg := c.sendq.dequeue(); sg != nil {recv(c, sg, ep, func() {unlock(&c.lock) }, 3)
            return true, true
        }
    
    
    func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
        if c.dataqsiz == 0 {
            if raceenabled {racesync(c, sg)
            }
            if ep != nil {
                // 如果无缓存,间接从发送者拷贝数据
                recvDirect(c.elemtype, sg, ep)
            }
        } else {
            // 因为队列已满,接收数据的索引和发送数据的索引统一
        qp := chanbuf(c, c.recvx)
            if raceenabled {racenotify(c, c.recvx, nil)
                racenotify(c, c.recvx, sg)
            }
            // 数据从队列拷贝到指标内存地址
            if ep != nil {typedmemmove(c.elemtype, ep, qp)
            }
            // 数据从发送者拷贝到缓冲区
            typedmemmove(c.elemtype, qp, sg.elem)
            c.recvx++
            if c.recvx == c.dataqsiz {c.recvx = 0}
            c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
        }
        sg.elem = nil
        gp := sg.g
        unlockf()
        gp.param = unsafe.Pointer(sg)
        sg.success = true
        if sg.releasetime != 0 {sg.releasetime = cputicks()
        }
      // 无论产生哪种状况,运行时都会调用 runtime.goready 将以后处理器的 runnext 设置成发送数据的 Goroutine,在调度器下一次调度时将阻塞的发送方唤醒。goready(gp, skip+1)
    }
  • 当缓冲区存在数据时,从 Channel 的缓冲区中接收数据

    if c.qcount > 0 {
            // 间接从队列取数据
            qp := chanbuf(c, c.recvx)
            if raceenabled {racenotify(c, c.recvx, nil)
            }
          // 放到指标内存
            if ep != nil {typedmemmove(c.elemtype, ep, qp)
            }
          // 清空队列中对应的元素
            typedmemclr(c.elemtype, qp)
          // 接管索引 +1
            c.recvx++
            if c.recvx == c.dataqsiz {c.recvx = 0}
          // 队列元素 -1
            c.qcount--
            unlock(&c.lock)
            return true, true
        }
  • 当缓冲区中不存在数据时,期待其余 Goroutine 向 Channel 发送数据

    if !block {unlock(&c.lock)
            return false, false
        }
    
        // no sender available: block on this channel.
        gp := getg()
        mysg := acquireSudog()
        mysg.releasetime = 0
        if t0 != 0 {mysg.releasetime = -1}
        // No stack splits between assigning elem and enqueuing mysg
        // on gp.waiting where copystack can find it.
        mysg.elem = ep
        mysg.waitlink = nil
        gp.waiting = mysg
        mysg.g = gp
        mysg.isSelect = false
        mysg.c = c
        gp.param = nil
        c.recvq.enqueue(mysg)
        // 阻塞期待,让出使用权
        atomic.Store8(&gp.parkingOnChan, 1)
        gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
    
        // 唤醒之后清空 sudog
        if mysg != gp.waiting {throw("G waiting list is corrupted")
        }
        gp.waiting = nil
        gp.activeStackChans = false
        if mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)
        }
        success := mysg.success
        gp.param = nil
        mysg.c = nil
        releaseSudog(mysg)
        return true, success

敞开 channel

  • 当 Channel 是一个空指针或者曾经被敞开时,Go 语言运行时都会间接解体并抛出异样

    func closechan(c *hchan) {
        if c == nil {panic(plainError("close of nil channel"))
        }
    
        lock(&c.lock)
        if c.closed != 0 {unlock(&c.lock)
            panic(plainError("close of closed channel"))
        }
  • recvqsendq 两个队列中的数据退出到 Goroutine 列表 gList 中,与此同时该函数会革除所有 runtime.sudog 上未被解决的元素

    c.closed = 1
    
        var glist gList
    
        // release all readers
        for {sg := c.recvq.dequeue()
            if sg == nil {break}
            if sg.elem != nil {typedmemclr(c.elemtype, sg.elem)
                sg.elem = nil
            }
            if sg.releasetime != 0 {sg.releasetime = cputicks()
            }
            gp := sg.g
            gp.param = unsafe.Pointer(sg)
            sg.success = false
            if raceenabled {raceacquireg(gp, c.raceaddr())
            }
            glist.push(gp)
        }
    
        // release all writers (they will panic)
        for {sg := c.sendq.dequeue()
            if sg == nil {break}
            sg.elem = nil
            if sg.releasetime != 0 {sg.releasetime = cputicks()
            }
            gp := sg.g
            gp.param = unsafe.Pointer(sg)
            sg.success = false
            if raceenabled {raceacquireg(gp, c.raceaddr())
            }
            glist.push(gp)
        }
        unlock(&c.lock)
    
        // 为所有被阻塞的 Goroutine 调用 runtime.goready 触发调度。for !glist.empty() {gp := glist.pop()
            gp.schedlink = 0
            goready(gp, 3)
        }

应用场景

报错情景

  • 往一个敞开的 channel 发送数据会报错:panic: send on closed channel
  • 敞开一个 nil 的 chan 会报错:panic: close of nil channel
  • 敞开一个曾经敞开的 channel 报错:panic: close of closed channel

1、一个经典的算法题

有 4 个 goroutine,编号为 1、2、3、4。每秒钟会有一个 goroutine 打印出本人的编号,要求写一个程序,让输入的编号总是依照 1、2、3、4、1、2、3、4… 的程序打印进去

package main

import (
    "fmt"
    "time"
)

func main() {
  // 4 个 channel
    chs := make([]chan int, 4)
    for i, _ := range chs {chs[i] = make(chan int)
    // 开 4 个协程
        go func(i int) {
            for {
        // 获取以后 channel 值并打印
                v := <-chs[i]
                fmt.Println(v + 1)
                time.Sleep(time.Second)
        // 把下一个值写入下一个 channel,期待下一次生产
                chs[(i+1)%4] <- (v + 1) % 4
            }

        }(i)
    }

  // 往第一个塞入 0
    chs[0] <- 0
    select {}}

2、限流器

package main

import (
    "fmt"
    "time"
)

func main() {
    // 每次解决 3 个申请
    chLimit := make(chan struct{}, 3)
    for i := 0; i < 20; i++ {chLimit <- struct{}{}
        go func(i int) {fmt.Println("上游服务解决逻辑...", i)
            time.Sleep(time.Second * 3)
            <-chLimit
        }(i)
    }
    time.Sleep(30 * time.Second)
}

如果感觉 sleep 太丑太暴力,能够用 waitGroup 管制完结机会

package main

import (
    "fmt"
    "sync"
    "time"
)

var wg sync.WaitGroup

func main() {
    // 每次解决 3 个申请
    chLimit := make(chan struct{}, 3)
    for i := 0; i < 20; i++ {chLimit <- struct{}{}
        wg.Add(1)
        go func(i int) {fmt.Println("上游服务解决逻辑...", i)
            time.Sleep(time.Second * 3)
            <-chLimit
            wg.Done()}(i)
    }
    wg.Wait()}

3、优雅退出

package main

import (
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"
)

func main() {var closing = make(chan struct{})
    var closed = make(chan struct{})

    go func() {
        for {
            select {
            case <-closing:
                return
            default:
                fmt.Println("业务逻辑...")
                time.Sleep(1 * time.Second)
            }
        }
    }()

    termChan := make(chan os.Signal)
  // 监听退出信号
    signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
    <-termChan

  // 退出中
    close(closing)

    // 退出之前清理一下
    go doCleanup(closed)

    select {
    case <-closed:
    case <-time.After(time.Second):
        log.Println("清理超时不等了")
    }

    log.Println("优雅退出")
}

func doCleanup(closed chan struct{}) {time.Sleep(time.Minute)
  // 清理完后退出
    close(closed)
}

4、实现互斥锁

初始化一个缓冲区为 1 的 channel,放入元素代表一把锁,谁获取到这个元素就代表获取了这把锁,开释锁的时候再把这个元素放回 channel

package main

import (
    "log"
    "time"
)

type Mutex struct {ch chan struct{}
}

// 初始化锁
func NewMutex() *Mutex {mu := &Mutex{make(chan struct{}, 1)}
    mu.ch <- struct{}{}
    return mu
}

// 加锁,阻塞获取
func (m *Mutex) Lock()  {<- m.ch}

// 开释锁
func (m *Mutex) Unlock()  {
    select {
    // 胜利写入 channel 代表开释胜利
    case m.ch <- struct{}{}:
    default:
        panic("unlock of unlocked mutex")
    }
}

// 尝试获取锁
func (m *Mutex) TryLock() bool {
    select {
    case <-m.ch:
        return true
    default:

    }
    return false
}

func (m *Mutex) LockTimeout(timeout time.Duration) bool {timer := time.NewTimer(timeout)

    select {
    case <-m.ch:
    // 胜利获取锁敞开定时器
        timer.Stop()
        return true
    case <-timer.C:

    }
  // 获取锁超时
    return false
}

// 是否上锁
func (m *Mutex) IsLocked() bool {return len(m.ch) == 0
}


func main()  {m := NewMutex()
    ok := m.TryLock()
    log.Printf("locked v %v\n", ok)
    ok = m.TryLock()
    log.Printf("locked v %v\n", ok)

    go func() {time.Sleep(5*time.Second)
        m.Unlock()}()

    ok = m.LockTimeout(10*time.Second)
    log.Printf("LockTimeout v %v\n", ok)
}

参考:

极刻工夫《go 并发编程实战》

正文完
 0