关于golang:GO-中-Chan-实现原理分享

46次阅读

共计 8871 个字符,预计需要花费 23 分钟才能阅读完成。

GO 中 Chan 实现原理分享

嗨,我是小魔童哪吒,还记得咱们之前分享过 GO 通道 和 sync 包的应用吗?咱们来回顾一下

  • 分享了通道是什么,通道的品种
  • 无缓冲,有缓冲,单向通道具体对应什么
  • 对于通道的具体实际
  • 分享了对于通道的异常情况整顿
  • 简略分享了 sync 包的应用

要是对上述内容还有点趣味的话,欢送查看文章 GO 通道和 sync 包的分享

chan 是什么?

是一种非凡的类型,是连贯并发 goroutine 的管道

channel 通道是能够让一个 goroutine 协程发送特定值到另一个 goroutine 协程的 通信机制

通道 像一个传送带或者队列,总是遵循 先入先出 (First In First Out)的规定,保障收发数据的程序,这一点和 管道 是一样的

一个协程从通道的一头放入数据,另一个协程从通道的另一头读出数据

每一个通道都是一个具体类型的导管,申明 channel 的时候须要为其指定元素类型。

本篇文章次要是分享对于通道的实现原理,对于通道的应用,能够查看文章 GO 通道和 sync 包的分享,这里有具体的阐明

GO 中 Chan 的底层数据结构

理解每一个组件或者每一个数据类型的实现原理,咱们都会去看源码中的数据结构是如何设计的

同样,咱们一起来看看 GO 的 Chan 的数据结构

GO 的 Chan 的源码实现是在:src/runtime/chan.go

type hchan struct {
   qcount   uint           // total data in the queue
   dataqsiz uint           // size of the circular queue
   buf      unsafe.Pointer // points to an array of dataqsiz elements
   elemsize uint16
   closed   uint32
   elemtype *_type // element type
   sendx    uint   // send index
   recvx    uint   // receive index
   recvq    waitq  // list of recv waiters
   sendq    waitq  // list of send waiters

   // lock protects all fields in hchan, as well as several
   // fields in sudogs blocked on this channel.
   //
   // Do not change another G's status while holding this lock
   // (in particular, do not ready a G), as this can deadlock
   // with stack shrinking.
   lock mutex
}

hchan 是实现通道的外围数据结构,对应的成员也是不少,咱们依据源码正文一个参数一个参数的来看看

tag 阐明
qcount 以后的队列,残余元素个数
dataqsiz 环形队列能够寄存的元素个数,也就是环形队列的长度
buf 指针,指向环形队列
elemsize 指的的队列中每个元素的大小
closed 具体标识敞开的状态
elemtype 见名知意,元素的类型
sendx 发送队列的下标,向队列中写入数据的时候,寄存在队列中的地位
recvx 承受队列的下标,从队列的 这个地位开始读取数据
recvq 协程队列,期待读取音讯的协程队列
sendq 协程队列,期待发送音讯的协程队列
lock 互斥锁,在 chan 中,不能够并发的读写数据

依据下面的参数,咱们或多或少就能够晓得 GO 中的通道实现原理设计了哪些知识点:

  • 指针
  • 环形队列
  • 协程
  • 互斥锁

咱们顺便再来看看上述成员的协程队列 waitq 对应的是啥样的数据结构

type waitq struct {
   first *sudog
   last  *sudog
}

sudog 构造是在 src/runtime/runtime2.go中,咱们顺便多学一手

// sudog represents a g in a wait list, such as for sending/receiving
// on a channel.
type sudog struct {
   // The following fields are protected by the hchan.lock of the
   // channel this sudog is blocking on. shrinkstack depends on
   // this for sudogs involved in channel ops.

   g *g

   next *sudog
   prev *sudog
   elem unsafe.Pointer // data element (may point to stack)

   // The following fields are never accessed concurrently.
   // For channels, waitlink is only accessed by g.
   // For semaphores, all fields (including the ones above)
   // are only accessed when holding a semaRoot lock.

   acquiretime int64
   releasetime int64
   ticket      uint32

   // isSelect indicates g is participating in a select, so
   // g.selectDone must be CAS'd to win the wake-up race.
   isSelect bool

   // success indicates whether communication over channel c
   // succeeded. It is true if the goroutine was awoken because a
   // value was delivered over channel c, and false if awoken
   // because c was closed.
   success bool

   parent   *sudog // semaRoot binary tree
   waitlink *sudog // g.waiting list or semaRoot
   waittail *sudog // semaRoot
   c        *hchan // channel
}

依据源码正文,咱们大抵晓得sudog 是干啥的

Sudog示意期待列表中的 g,例如在一个通道上发送 / 接管

Sudog是很必要的,因为 g↔synchronization 对象关系是多对多

一个 g 可能在很多等待队列上,所以一个 g 可能有很多sudogs

而且许多 g 可能在期待同一个同步对象,所以一个对象可能有许多sudogs

咱们抓住主要矛盾

Sudog的数据结构,次要的货色就是一个 g 和一个 elem

g,下面有说到他和 Sudog的对应关系

无论是读通道还是写通道,都会须要 elem

  • 读通道

数据会从 hchan 的队列中,拷贝到 sudogelem

  • 写通道

与读通道相似,是将数据从 sudogelem 处拷贝到 hchan 的队列中

咱们来画个图看看

此处咱们画一个 hchan的构造,次要画一下 recvq期待读取音讯的协程队列,此处的队列,实际上就是用链表来实现的

recvq会对应到 waitq构造,waitq 分为 first 头结点 和 last尾节点 构造别离是 sudog

sudog外面 elem 寄存具体的数据,next 指针指向下一个 sudog,直到指到lastsudog

通过上述的,应该就能明确 GO 中的 chan 根本构造了吧

咱来再来具体看看 hchan 中其余参数都具体是啥意思

  • dataqsiz 对应的环形队列是啥样的
  • sendq和 读 recvq 期待队列是啥样的
  • elemtype元素类型信息又是啥

dataqsiz 对应的环形队列是啥样的

环形队列,故名思议就是 一个首尾连贯,成环状的队列

GO 中的 chan外部的环形队列,次要作用是作为缓冲区

这个环形队列的长度,咱们在创立队列的时候,也就是创立 hchan 构造的时候,就曾经指定好了的

就是 dataqsiz,环形队列的长度

咱们画个图苏醒一下

上图须要表白的意思是这个样子的,上述的队列是循环队列,默认首尾连贯哦

  • dataqsiz 示意 循环队列的长度是 8 个
  • qcount 示意 以后队列中有 5 个元素
  • buf 是指针,指向循环队列头
  • sendx 是发送队列的下标,这里为 1,则指向队列的第 2 个区域,这个参数可选范畴是 [0 , 8)
  • recvx 是接管队列的下标,这里为 4,则指向的是 队列的第 5 个区域进行读取数据

这里顺带提一下,hchan 中读取数据还是写入数据,都是须要去拿 lock 互斥锁的,同一个通道,在同一个时刻只能容许一个协程进行读写

sendq和 读 recvq 期待队列是啥样的

hchan 构造中的 2 个协程队列,一个是用于读取数据,一个是用于发送数据,他们都是期待队列,咱们来看看这个期待队列都是咋放数据下来的,别离有啥个性须要留神

当从通道中读取 或者 发送数据:

  • 若通道的缓冲区为空,或者没有缓冲区,此时从通道中读取数据,则协程是会 被阻塞
  • 若通道缓冲区为满,或者没有缓冲区,此时从通道中写数据,则协程依然也会 被阻塞

这些被阻塞的协程就会被放到期待队列中,依照读 和 写 的动作来进行分类为写 sendq和 读 recvq 队列

那么这些阻塞的协程,啥时候会被唤醒呢?

看过之前的文章 GO 通道和 sync 包的分享,应该就能晓得

咱们在来回顾一下,这篇文章的表格,通道会存在的异常情况:

channel 状态 未初始化的通道(nil) 通道非空 通道是空的 通道满了 通道未满
接收数据 阻塞 接收数据 阻塞 接收数据 接收数据
发送数据 阻塞 发送数据 发送数据 阻塞 发送数据
敞开 panic 敞开通道胜利
待数据读取结束后
返回零值
敞开通道胜利
间接返回零值
敞开通道胜利
待数据读取结束后
返回零值
敞开通道胜利
待数据读取结束后
返回零值

此时,咱们就晓得,具体什么时候被阻塞的协程会被唤醒了

  • 因为读阻塞的协程,会被通道中的写入数据的协程唤醒,反之亦然
  • 因为写阻塞的协程,也会被通道中读取数据的协程唤醒

elemtype元素类型信息又是啥

这个元素类型信息就不难理解了,对于咱们应用通道,创立通道的时候咱们须要填入通道中数据的类型,一个通道,只能写一种数据类型,指的就是这里的elemtype

另外 hchan 还有一个成员是elemsize,代表上述元素类型的占用空间大小

那么这俩成员有啥作用呢?

elemtypeelemsize 就能够计算指定类型的数据占用空间大小了

前者用于在数据传递的过程中进行赋值

后者能够用来在环形队列中定位具体的元素

创立 chan 是咋实现的?

咱们再来瞅瞅 chan.go 的源码实现,看到源码中的 makechan 具体实现

func makechan(t *chantype, size int) *hchan {
   elem := t.elem

   // compiler checks this but be safe.
   if elem.size >= 1<<16 {throw("makechan: invalid channel element type")
   }
   if hchanSize%maxAlign != 0 || elem.align > maxAlign {throw("makechan: bad alignment")
   }

   mem, overflow := math.MulUintptr(elem.size, uintptr(size))
   if overflow || mem > maxAlloc-hchanSize || size < 0 {panic(plainError("makechan: size out of range"))
   }

   // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
   // buf points into the same allocation, elemtype is persistent.
   // SudoG's are referenced from their owning thread so they can't be collected.
   // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
   var c *hchan
   switch {
   case mem == 0:
      // Queue or element size is zero.
      c = (*hchan)(mallocgc(hchanSize, nil, true))
      // Race detector uses this location for synchronization.
      c.buf = c.raceaddr()
   case elem.ptrdata == 0:
      // Elements do not contain pointers.
      // Allocate hchan and buf in one call.
      c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
      c.buf = add(unsafe.Pointer(c), hchanSize)
   default:
      // Elements contain pointers.
      c = new(hchan)
      c.buf = mallocgc(mem, elem, true)
   }

   c.elemsize = uint16(elem.size)
   c.elemtype = elem
   c.dataqsiz = uint(size)
   lockInit(&c.lock, lockRankHchan)

   if debugChan {print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
   }
   return c
}

如上源码实际上就是初始化 chan 对应的成员,其中循环队列 buf 的大小,是由 makechan 函数传入的 类型信息和缓冲区长度决定的,也就是makechan 的入参

能够通过上述代码的 3 个地位就能够晓得

// 1
func makechan(t *chantype, size int) *hchan
// 2
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
// 3
var c *hchan
   switch {
   case mem == 0:
      // Queue or element size is zero.
      c = (*hchan)(mallocgc(hchanSize, nil, true))
      // Race detector uses this location for synchronization.
      c.buf = c.raceaddr()
   case elem.ptrdata == 0:
      // Elements do not contain pointers.
      // Allocate hchan and buf in one call.
      c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
      c.buf = add(unsafe.Pointer(c), hchanSize)
   default:
      // Elements contain pointers.
      c = new(hchan)
      c.buf = mallocgc(mem, elem, true)
   }

读写 chan 的根本流程

第一张图说明确 向 chan 写入数据的流程

向通道中写入数据,咱们会波及 sendqrecvq 队列,和循环队列的资源问题

依据图示能够看出向通道中写入数据分为 3 种 状况:

  • 写入数据的时候,若recvq 队列为空,且循环队列有空位,那么就间接将数据写入到 循环队列的队尾 即可
  • recvq 队列为空,且循环队列无空位,则将以后的协程放到sendq 期待队列中进行阻塞,期待被唤醒,当被唤醒的时候,须要写入的数据,曾经被读取进去,且曾经实现了写入操作
  • recvq 队列为不为空,那么能够阐明循环队列中没有数据,或者循环队列是空的,即没有缓冲区( 向无缓冲的通道写入数据 ),此时,间接将recvq 期待队列中取出一个 G,写入数据,唤醒 G,实现写入操作

第二张图说明确 向 chan 读取数据的流程

向通道中读取数据,咱们会波及 sendqrecvq 队列,和循环队列的资源问题

依据图示能够看出向通道中读取数据分为 4 种 状况:

  • sendq 为空,且循环队列无元素的时候,那就将以后的协程退出 recvq 期待队列,把 recvq 期待队列对头的一个协程取出来,唤醒,读取数据
  • sendq 为空,且循环队列有元素的时候,间接读取循环队列中的数据即可
  • sendq 有数据,且循环队列有元素的时候,间接读取循环队列中的数据即可,且把 sendq 队列取一个 G 放到循环队列中,进行补充
  • sendq 有数据,且循环队列无元素的时候,则从 sendq 取出一个 G,并且唤醒他,进行数据读取操作

下面说了通道的创立,读写,那么通道咋敞开?

通道的敞开,咱们在利用的时候间接 close 就搞定了,那么对应 close 的时候,底层的队列都是做了啥呢?

若敞开了以后的通道,那么零碎会把recvq 读取数据的期待队列外面的所有协程,全副唤醒,这外面的每一个 G 写入的数据 默认就写个 nil,因为通道敞开了,从敞开的通道外面读取数据,读到的是 nil

零碎还会把 sendq 写数据的期待队列外面的每一个协程唤醒,然而此时就会有问题了,向曾经敞开的协程外面写入数据,会报panic

咱们再来梳理一下,什么状况下对通道操作,会报panic,咱们当初对之前提到的表格再来补充一波

channel 状态 未初始化的通道(nil) 通道非空 通道是空的 通道满了 通道未满 敞开的通道
接收数据 阻塞 接收数据 阻塞 接收数据 接收数据 nil
发送数据 阻塞 发送数据 发送数据 阻塞 发送数据 panic
敞开 panic 敞开通道胜利
待数据读取结束后
返回零值
敞开通道胜利
间接返回零值
敞开通道胜利
待数据读取结束后
返回零值
敞开通道胜利
待数据读取结束后
返回零值
panic
  • 敞开一个曾经被敞开了的通道,会报panic
  • 敞开一个未初始化的通道,即为 nil 的通道,也会报panic
  • 向一个曾经敞开的通道写入数据,会报panic

你认为这就完了吗?

GO 外面 Chan 个别会和 select 搭配应用,咱们最初来简略说一下GO 的 通道咋和 select 应用

GO 外面 select 就和 C/C++ 外面的 多路 IO 复用 相似,在 C/C++ 中多路 IO 复用有如下几种形式

  • SELECT
  • POLL
  • EPOLL

都能够本人去模仿实现多路 IO 复用,各有利弊,个别应用的最多的是 EPOLL,且 C /C++ 也有对应的网络库

当咱们写 GO 的多路 IO 复用的时候,那就相当爽了,GO 默认反对select 关键字

SELECT 简略应用

咱们就来看看都是咋用的,不废话,咱间接上 DEMO

package main

import (
   "log"
   "time"
)

func main() {

   // 简略设置 log 参数
   log.SetFlags(log.Lshortfile | log.LstdFlags)

   // 创立 2 个通道,元素数据类型为 int,缓冲区大小为 5
   var ch1 = make(chan int, 5)
   var ch2 = make(chan int, 5)

   // 别离向通道中各自写入数据,咱默认写 1 吧
   // 间接写一个匿名函数 向通道中增加数据
   go func (){
      var num = 1
      for {
         ch1 <- num
         num += 1
         time.Sleep(1 * time.Second)
      }
   }()

   go func (){
      var num = 1
      for {
         ch2 <- num
         num += 1
         time.Sleep(1 * time.Second)
      }
   }()

   for {
      select {// 读取数据
      case num := <-ch1:
         log.Printf("read ch1 data is  %d\n", num)

      case num := <-ch2:
         log.Printf("read ch2 data is: %d\n", num)

      default:
         log.Printf("ch1 and ch2 is empty\n")
          // 劳动 1s 再读
         time.Sleep(1 * time.Second)
      }
   }
}

运行成果

2021/06/18 17:43:06 main.go:54: ch1 and ch2 is empty
2021/06/18 17:43:07 main.go:48: read ch1 data is  1
2021/06/18 17:43:07 main.go:48: read ch1 data is  2
2021/06/18 17:43:07 main.go:51: read ch2 data is: 1
2021/06/18 17:43:07 main.go:51: read ch2 data is: 2
2021/06/18 17:43:07 main.go:54: ch1 and ch2 is empty
2021/06/18 17:43:08 main.go:48: read ch1 data is  3
2021/06/18 17:43:08 main.go:51: read ch2 data is: 3
2021/06/18 17:43:08 main.go:54: ch1 and ch2 is empty
2021/06/18 17:43:09 main.go:48: read ch1 data is  4
2021/06/18 17:43:09 main.go:51: read ch2 data is: 4
2021/06/18 17:43:09 main.go:54: ch1 and ch2 is empty
2021/06/18 17:43:10 main.go:51: read ch2 data is: 5
2021/06/18 17:43:10 main.go:48: read ch1 data is  5

从运行后果来看,select 监控的 2 个 通道,读取到的数据是随机的

可是咱们看到 case 这个关键字,是不是会想到 switch ... case...,此处的的case 是程序运行的(GO 中没有 switch),select 外面的 case 应该也是程序运行才对呀,为啥后果是随机的?

大家要是感兴趣的话,能够深入研究一下,咱们明天就先到这里了。

总结

  • 分享了 GO 中通道是什么
  • 通道的底层数据结构具体解析
  • 通道在 GO 源码中是如何实现的
  • Chan 读写的基本原理
  • 敞开通道会呈现哪些异样,panic
  • select 的简略利用

欢送点赞,关注,珍藏

敌人们,你的反对和激励,是我保持分享,提高质量的能源

好了,本次就到这里,下一次 GO 中 defer 的实现原理分享

技术是凋谢的,咱们的心态,更应是凋谢的。拥抱变动,背阴而生,致力向前行。

我是 小魔童哪吒,欢送点赞关注珍藏,下次见~

正文完
 0