关于go:25-GolangGo并发编程管道chan

39次阅读

共计 7969 个字符,预计需要花费 20 分钟才能阅读完成。

  Go 语言实现了两种多线程同步计划,一种是传统多线程语言相似,基于共享内存计划;另一种称之为基于协程 - 管道的 CSP(communicating sequential processes)并发编程模型,这也是 Go 语言举荐的形式。本篇文章次要解说管道在并发编程中的典型利用,以及管道的底层实现原理。

典型利用场景

  顾名思义,管道能够从一端写入数据,一端读取数据,用户程序能够很不便的通过管道实现协程间通信,如下列形式:

package main

import (
    "fmt"
    "time"
)

func main() {queue := make(chan int, 1)
    go func() {
        for {
            data := <- queue      // 读取
            fmt.Print(data, " ")  //0 1 2 3 4 5 6 7 8 9 
        }
    }()

    for i := 0; i < 10; i ++ {queue <- i                // 写入}
    time.Sleep(time.Second)
}

  管道能够在多协程间传递数据,管道的申明如 ”chan int” 形式,申明蕴含了传递的数据类型。make 初始化管道时候,第二个参数用于设置管道最大能够存储的数据量:管道容量满了之后,写入数据会阻塞以后协程;管道容量为空时,读取数据也会阻塞以后协程。那如果 make 初始化管道时,第二个参数是 0 呢?这意味着该管道最大容量为 0,也就是,向管道写入数据时如果没有协程恰好期待读,肯定会阻塞以后写协程;相应的,从管道读取数据时如果没有协程恰好期待写入,也肯定会阻塞以后读协程。

  管道容量不为 0 时,咱们通常称该管道为有缓冲管道,对应的管道容量为 0 就是无缓冲管道。有缓冲管道可供多个协程协同解决,在肯定水平上能够进步程序的并发,这句话怎么了解呢?构想有这么一个需要:有一个脚本,从 kafka 等队列生产音讯并解决,然而解决逻辑比拟耗时,单线程 / 协程生产 + 解决效率太低,那就多协程解决呗。一个协程生产 kafka 等队列音讯,写入管道,多个异步协程从管道获取音讯并解决,这里咱们就通过有缓冲管道 + 多协程进步了程序的并发。程序实例如下:

package main

import ("fmt")

func main() {
    // 有缓冲管道
    queue := make(chan int, 100)
    // 启动 10 个子协程生产管道音讯
    for i := 0; i < 10; i ++ {go func() {
            for {
                data := <- queue
                fmt.Println(data)
            }
        }()}

    // 主协程循环向管道写入音讯
    for j := 0; j < 1000; j ++ {queue <- j}
}

  管道的写入或者读取可能会阻塞以后协程,问题就是以后管道是否可读或者可写是不晓得的,如果一个协程须要同时操作多个管道呢?比方有多个异步协程从管道抓取数据(耗时),写入数据管道(每一个异步协程对应一个数据管道),主协程从多个数据管道生产数据,写入本地文件。主协程怎么同时读取多个管道呢?要晓得读取管道可能会导致主协程阻塞的。Go 语言还有一个关键字 select,能够同时监听多个管道,十分相似 IO 多路复用的概念,如 epoll。这时候程序应该是这样的:

package main

import (
    "fmt"
    "time"
)

func main() {c1 := make(chan int, 10)
    c2 := make(chan int, 10)
    // 协程 1,循环向管道 c1 写入数据
    go func() {
        for i := 0; i < 1000; i ++ {
            c1 <- i
            time.Sleep(time.Second)
        }
    }()
    // 协程 2,循环向管道 c2 写入数据
    go func() {
        for i := 1000; i < 2000; i ++ {
            c2 <- i
            time.Sleep(time.Millisecond * 500)
        }
    }()

    // 主协程,select case 同时监听 c1 和 c2 两个管道,哪个管道先变为可读,先执行哪个 case
    for {
        select {
        case data := <- c1:
            fmt.Println(data)
        case data := <- c2:
            fmt.Println(data)
        }
    }
}

  管道的读写操作可能导致协程的阻塞,有没有可能不阻塞协程呢?其实也能够,同样能够用 select 实现,不过这里还须要增加一个非凡的分支,default,意思是默认分支,即其余分支阻塞的时候,执行 default 分支。

package main

import (
    "fmt"
    "strconv"
)

func main() {queue := make(chan int, 0)
    for i := 0; i < 10; i ++ {
        select {
        case queue <- i:
            fmt.Println("insert:" + strconv.Itoa(i))
        default:
            fmt.Println("skip:" + strconv.Itoa(i))
        }
    }
}

  queue 是无缓冲管道,实践上主协程向管道 queue 写入数据都会阻塞,然而通过 select default 的组合,管道的写入变成非阻塞了。此时,如果无奈向管道写入数据,执行 defualt 分支,并没有阻塞协程。

  select 与 default 的组合能够实现管道的非阻塞操作,而 select 与定时器的组合,能够为管道的操作加上超时工夫(其实就是 select 监听多个管道),也就是如果管道不可读或不可写,会阻塞协程,然而待定时器触发时,协程就会解除阻塞。

package main

import (
    "fmt"
    "time"
)

func main() {queue := make(chan int, 0)
    // 定时器 1 秒后触发;t := time.After(time.Second)
    go func() {
        select {
        case <- queue:
            fmt.Println("recv data")
        case <- t:
            fmt.Println("timeout")    //time.After 返回的其实就是管道,1 秒后管道 t 变为可读;}
    }()

    time.Sleep(time.Second * 3)
}

  咱们后面介绍,管道的申明个别蕴含传递的数据类型,然而在某些场景,咱们应用管道只是想传递一个信号,比方下面的程序你会关怀定时器管道 t 读取的数据吗?再比方上面的程序,主协程须要期待子协程运行完结后再退出,就能通过管道实现,而这里管道申明为 chan struct{},因为数据不重要,咱们只关注他的可读可写状态。初始主协程读管道而阻塞,而等到子协程执行结束后,向管道写入任意数据,主协程就会解除阻塞,复原执行。

package main

import (
    "fmt"
    "time"
)

func main() {queue := make(chan struct{}, 0)
    go func() {time.Sleep(time.Second)
        queue <- struct{}{}
    }()

    <- queue
    fmt.Println("time end")
}

实现原理

  chan 是如何实现在多个协程间传递数据呢?思考一下,有缓冲管道是不是须要存储数据,那必定须要一个数组了,而且这个数组应该作为循环队列应用(一边写入数据一边读取数据,数组没必要有限扩容,而且管道是 FIFO 模式,先写入的数据先读取,循环队列就能满足条件);另外,协程操作管道时还有可能被阻塞,阻塞的协程也有可能因为其余协程的写入或者读取而解除阻塞,阻塞的协程队列保留在哪呢?存储在管道变量就能够了;最初,多个协程可能并发的操作管道,所以必定是须要加锁的。

  联合这三点思考,管道的数据类型定义也跃然纸上了:

// runtime/chan.go
type hchan struct {
    // 以后管道存储的元素数目
    qcount   uint           // total data in the queue
    // 管道容量
    dataqsiz uint           // size of the circular queue
    // 数组
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    
    // 标识管道是否被 close
    closed   uint32

    // 管道存储的元素类型 & 元素大小
    elemtype *_type // element type
    elemsize uint16

    // 读 / 写索引,循环队列
    sendx    uint   // send index
    recvx    uint   // receive index

    // 读阻塞协程队列,写协程梗塞队列
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters

    // 锁
    lock mutex
}

  管道数据结构定义如下图所示:

  文件 runtime/chan.go 不仅定义了管道的数据类型,好包含基本操作办法:

// chan 初始化;size 就是 chan 容量
func makechan(t *chantype, size int) *hchan
// 从 chan 读取数据;ep 指针,读取到的数据就存储在 ep;block 示意如果 chan 不可读,是否阻塞协程
func chanrecv(c *hchan, ep unsafe.Pointer, block bool)
// 向 chan 写入数据;ep 指针,待写入的数据就存储在 ep;block 示意如果 chan 不可写,是否阻塞协程
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr)
// chan 敞开
func closechan(c *hchan)

  咱们以 chansend 函数为例,钻研 chan 的基本操作:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // 加锁
    lock(&c.lock)

    // 如果有协程在期待读,间接将数据交给指标协程,并唤醒该协程
    if sg := c.recvq.dequeue(); sg != nil {send(c, sg, ep, func() {unlock(&c.lock) }, 3)
        return true
    }

    // 如果管道还有残余容量
    if c.qcount < c.dataqsiz {
        // 拷贝数据到 chan 数组
        qp := chanbuf(c, c.sendx)
        typedmemmove(c.elemtype, qp, ep)
        // 更新写入索引
        c.sendx++
        // 循环队列,到最初一个索引了,从头开始
        if c.sendx == c.dataqsiz {c.sendx = 0}
        // 管道目前存储元素数目
        c.qcount++
        // 开释锁
        unlock(&c.lock)
        return true
    }

    // 管道容量曾经满了,间接返回 false 或者阻塞协程
    //block 为 false 示意不阻塞协程
    if !block {unlock(&c.lock)
        return false
    }

    // 协程阻塞时,会转化为 sudog 对象存储在管道的阻塞队列
    mysg := acquireSudog()
    mysg.g = gp
    mysg.elem = ep
    // 阻塞协程入队
    c.sendq.enqueue(mysg)

    // 协程换出
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)

    // 走到这里,阐明协程复原执行,会执行一些开释工作
}

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    // 数据拷贝
    if sg.elem != nil {sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }
    gp := sg.g
    unlockf()

    // 唤醒阻塞协程
    goready(gp, skip+1)
}

  向管道写入数据时,如果以后有协程在阻塞期待读,send 函数会调用 goready 唤醒该协程,即变更该协程状态为可运行_Grunnable,同时将该协程从新增加到 P 的协程队列。另外,协程阻塞时不是会转换为 sudog 对象么,而 sudog.elem 专用于数据的传递,send 函数也会间接将待写入管道的数据,通过 sudog.elem 传递给读阻塞的协程。

  咱们还留神到,参数 block 示意如果协程不可读或者不可写,是否阻塞协程。一般的协程读写都是阻塞时的,然而上一大节咱们提到,select + default 能够实现协程的非阻塞读写,这种语法会转换为 runtime.selectnbrecv 函数调用,其正文如下:

// compiler implements
//
//    select {
//    case v, ok = <-c:
//        ... foo
//    default:
//        ... bar
//    }
//
// as
//
//    if selected, ok = selectnbrecv(&v, c); selected {
//        ... foo
//    } else {
//        ... bar
//    }
//
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool)

  咱们再思考一个问题,管道如果被 close 了,或者是管道没有初始化(nil),这时候如果读、或者写、或者甚至 close 管道,会呈现什么状况呢?阻塞吗?还是会抛 panic 异样?

func closechan(c *hchan) {
    // 抛 panic
    if c == nil {panic(plainError("close of nil channel"))
    }

    // 抛 panic
    if c.closed != 0 {unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }
    // 标识被敞开
    c.closed = 1

    // 唤醒所有读阻塞、写阻塞的协程
    release all readers
    for {sg := c.recvq.dequeue()
    }

    // release all writers (they will panic)
    for {sg := c.sendq.dequeue()
    }
}

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil {
        if !block {return false}
        // 永恒阻塞
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    // 如果敞开,不可写,抛 panic
    if c.closed != 0 {unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }
}

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    if c == nil {
        if !block {return}
        // 永恒阻塞
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    // 如果管道没有数据,返回该类型空数据
    if c.closed != 0 && c.qcount == 0 {unlock(&c.lock)
        if ep != nil {typedmemclr(c.elemtype, ep)
        }
        return true, false
    }

    // 失常数据读取流程
}

  首先明确了一件事件,管道只能敞开一次,并且如果管道为 nil,也是不能敞开的;而且管道敞开时,也会唤醒所有因为该管道而阻塞的协程。当管道为 nil 时,如果 block 为 true,读写管道都会导致协程的永恒阻塞。当管道被 close 时,向管道写入数据是会抛 panic 的,然而能够失常读取数据,即便管道为空,读取也会立刻返回(空数据)。

  最初,select 时如何实现同时监听多个管道的呢?设想一下如果将以后协程增加多多个管道的阻塞队列呢,是不是任意管道可读或可写时,都会唤醒该协程?select 的实现逻辑有些简单,这里咱们就不再赘述,有趣味的能够钻研下 runtime.selectgo 函数:

// selectgo implements the select statement.
// cas0 指向多个 case 数组首地址,nsends、nrecvs 读、写管道的数目;block 是否阻塞
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) 

管道与调度器 schedule

  还记得之前介绍的吗?协程因为某些起因阻塞了(chan 的读写,socket 的读写等等),或者是协程执行完结了,这时候也是须要从新调度其余协程的。协程阻塞通常是通过 runtime.gopark 函数实现的,而灰度协程调度通常是通过函数 runtime.goready 实现。

  管道的读操作以及写操作都有可能阻塞协程,参考函数 chanrecv 以及 chansend:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // 协程阻塞
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
}
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // 协程阻塞
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
}

  协程因管道阻塞后,什么时候能复原执行呢?当然是其余协程读 / 写管道时了,从函数 chansend 的流程能够看到,协程阻塞时,转换为 sudog 构造,存储在 sendq 阻塞队列。所以在 chanrecv 函数中,必定能够找到对应的从 sendq 获取协程并复原调度的逻辑。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {if sg := c.sendq.dequeue(); sg != nil {recv(c, sg, ep, func() {unlock(&c.lock) }, 3)
        return true, true
    }
}

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if c.dataqsiz == 0 {
        // 无缓冲管道,间接拷贝数据
        if ep != nil {
            // copy data from sender
            recvDirect(c.elemtype, sg, ep)
        }
    } else {
        // 有缓冲管道,该阻塞协程是因为管道满了
        qp := chanbuf(c, c.recvx)
        // 从缓冲区拷贝数据
        if ep != nil {typedmemmove(c.elemtype, ep, qp)
        }
        // 拷贝发送协程的数据到管道
        typedmemmove(c.elemtype, qp, sg.elem)
        c.recvx++
        if c.recvx == c.dataqsiz {c.recvx = 0}
        c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    }
    
    // 复原协程调度
    goready(gp, skip+1)
}

总结

  管道是 Go 语言并发编程十分重要的数据类型,本篇文章先介绍了管道的一些典型利用场景,最初深刻底层,解说了管道读写操作的实现逻辑,以及管道与调度器 schedule 之间的关系。

正文完
 0