Goroutine

什么是Goroutine

  • Goroutine是Golang特有的并发体,是一种轻量级的"线程"
  • Go中最根本的执行单元,每个Goroutine独立执行
  • 每一个Go程序至多有一个Goroutine:主Goroutine。当程序启动时,它会主动创立。
func main() {  say()  // 运行中,期待后果    go say()  fmt.Println("end")  // 不须要期待say()的执行后果}func say(){   fmt.Println("hello world")}

Vs Os Thread

零碎线程 Os Thread

每个零碎线程有固定大小的栈,个别默认2M,这个栈次要用来保留函数递归调用时参数和局部变量,由内核调度

固定大小的栈就会带来问题

  • 空间节约
  • 空间可能又不够,存在栈溢出的危险

Goroutine

由Go的调度器调度,刚创立的时候很小(2kb或者4kb),会依据须要动静地伸缩栈的大小(支流实现中栈的最大值可达到1GB)。

因为启动的代价很小,所以咱们能够轻易地启动成千上万个Goroutine。

通过示例理解

1. 多个goroutine同时运行

运行的程序由调度器决定,不须要相互依赖

func main() {   fmt.Println("Started")   for i := 0; i < 10; i++ {      go execute(i)   }   time.Sleep(time.Second * 1)   fmt.Println("Finished")}func execute(id int) {   fmt.Printf("id: %d\n", id)}

2. 图片并发下载

func main() {   urls := []string{      "https://pic.netbian.com/uploads/allimg/210925/233922-163258436234e8.jpg",      "https://pic.netbian.com/uploads/allimg/210920/180354-16321322345f20.jpg",      "https://pic.netbian.com/uploads/allimg/210916/232432-16318058722f4d.jpg",   }   for _,url := range urls{      go downloadFile(url)   }   time.Sleep(time.Second)}func downloadFile(URL string) error {   //Get the response bytes from the url   response, err := http.Get(URL)   if err != nil {      return err   }   defer response.Body.Close()   if response.StatusCode != 200 {      return errors.New("Received non 200 response code")   }   //Create a empty file   file, err := os.Create(path.Base(URL))   if err != nil {      return err   }   defer file.Close()   //Write the bytes to the fiel   _, err = io.Copy(file, response.Body)   if err != nil {      return err   }   return nil}

Recover

每个Goroutine都要有recover机制,因为当一个Goroutine抛panic的时候只有本身可能捕捉到其它Goroutine是没有方法捕获的。

如果没有recover机制,整个过程会crash。

留神:Goroutine产生panic时,只会调用本身的defer,所以即使主Goroutine里写了recover逻辑,也无奈recover。

func main() {    go do1()    go do2()    time.Sleep(10*time.Second)}func do1() {    for i := 0; i < 100; i++ {        fmt.Println("do1", i)    }}func do2() {    defer func() {        if err := recover(); err != nil {            log.Printf("recover: %v", err)        }    }()    for i := 0; i < 100; i++ {        if i ==5{            panic("do panic")        }        fmt.Println("do2", i)    }}

Channel

根本介绍

Channel是Go内置的数据类型,为初始化的channel的值为nil

通过发送和接管指定元素类型的值来进行通信

  • Channel 提供了 goroutines 之间的同步和通信
  • Goroutine 实现并发/并行的轻量级独立执行。

Shard Memory

graphthread1 --> Memorythread2 --> Memorythread3 --> Memory

CSP

Communicating sequential processes 通信程序编程

用于形容两个独立的并发实体通过共享的通信 channel(管道)进行通信的并发模型,

不关注发送音讯的实体,而关注与发送音讯时应用的channel

不要通过共享内存来通信,而通过通信来共享内存。 -- Rob Pike
graph LRGoroutine1 --> Channel --> Goroutine2

数据结构

type hchan struct {    qcount   uint           // total data in the queue    dataqsiz uint           // size of the circular queue    buf      unsafe.Pointer // points to an array of dataqsiz elements    elemsize uint16    closed   uint32         // denotes weather channel is closed or not    elemtype *_type         // element type    sendx    uint           // send index    recvx    uint           // receive index    recvq    waitq          // list of recv waiters    sendq    waitq          // list of send waiters    lock     mutex}

根本用法

定义

ChannelType = ( "chan" | "chan" "<-" | "<-" "chan" ) ElementType .

<-运算符指定通道方向发送接管。如果没有给出方向,则通道是 双向的

chan T          // 能够发送接管Tchan<- T        // 只能发送T<-chan T        // 只能接管T

创立

ch := make(chan int)     // 无缓冲 cap 0ch := make(chan int,100) // 有缓冲 cap 100

操作

ch <- 1. // 发送<-ch.    // 接管close(ch)// 敞开

代码示例

func goroutineA(ch <-chan int) {   fmt.Println("[goroutineA] want a data")   val := <-ch   fmt.Println("[goroutineA] received the data", val)}func goroutineB(ch chan<- int) {   ch <- 1   fmt.Println("[goroutineB] send the data 1")}func main() {   ch := make(chan int)   go goroutineA(ch)   go goroutineB(ch)   time.Sleep(time.Second)}
sequenceDiagramgroutineA->channel: hello,我想要获取一个数据channel-->groutineA: 我当初还没有数据groutineA->channel: 那我睡觉了,等有数据再叫醒我channel-->groutineA: okgroutineB->channel: hello,我要发送一个数据给你channel-->groutineB: ok,发过来吧channel->groutineA: 醒醒,接收数据啦groutineA-->channel: 来咯

Unbuffered channels

缓冲区大小为0的channel

channel接收者会阻塞,直到收到音讯,channel发送者会阻塞,直到接收者收到音讯

Buffered channels

领有缓冲区,当缓冲区已满时,发送者会阻塞;当缓冲区为空时,接收者会阻塞

总结

不要关注channel的数据结构,更应该关注channel的行为

Commandnilemptyfullnot full & emptyclosed
Receiveblockblocksuccesssuccesssuccess
Sendblocksuccessblocksuccesspanic
Closepanicsuccesssuccesssuccesspanic

几条准则

  • channel 上的发送操作总在对应的接管操作实现前产生
  • 如果 channel 敞开后从中接收数据,接受者就会收到该 channel 返回的零值
  • 从无缓冲的 channel 中进行的接管,要产生在对该 channel 进行的发送实现前
  • 不要在数据接管方或者在有多个发送者的状况下敞开通道。换句话说,咱们只应该让一个通道惟一的发送者敞开此通道

示例

package mainimport "fmt"func main() {    ch1 := make(chan string)    ch1 <- "hello world"    fmt.Println(<-ch1)}

执行之后会报错

fatal error: all goroutines are asleep - deadlock!

起因?

第7行 给通道ch1传入值 hello world,然而对于无缓冲的通道,在接收者未筹备好前发送操作是阻塞的,短少接收者造成死锁

如何解决?

1. 减少接收者
func main() {    ch1 := make(chan string)    go func() {        fmt.Println(<-ch1)    }()    ch1 <- "hello world"    time.Sleep(time.Millisecond)}func main() {    ch1 := make(chan string)    go func() {        ch1 <- "hello world"    }()    fmt.Println(<-ch1)}
2. 减少channel容量
func main() {    ch1 := make(chan string,1)    ch1 <- "hello world"    fmt.Println(<-ch1)}

Goroutine & Channel 串起来

常见的并发模式

告诉

  1. 向一个通道发送一个值实现告诉
func main() {   ch := make(chan int)   go do(ch)   // do something   <- ch   fmt.Println("done")}func do(ch chan int){   // 长时间操作   time.Sleep(3*time.Second)   fmt.Println("doing")   ch <- 1}
  1. 从一个通道接管值实现告诉
func main() {   ch := make(chan int)   go do(ch)   // do something   ch <- 1   fmt.Println("done")}func do(ch chan int){   // 长时间操作   time.Sleep(3*time.Second)   fmt.Println("doing")   <-ch}

互斥锁

func main() {   mutex := make(chan struct{}, 1) // 容量必须为1   counter := 0   increase := func() {      mutex <- struct{}{} // 加锁      counter++      <-mutex // 解锁   }   increase1000 := func(done chan<- struct{}) {      for i := 0; i < 1000; i++ {         increase()      }      done <- struct{}{}   }   done := make(chan struct{})   go increase1000(done)   go increase1000(done)   <-done; <-done   fmt.Println(counter) // 2000}

管制协程的并发数量

不限度的场景

func main() {   for i := 0; i < math.MaxInt32; i++ {      go func(i int) {         log.Println(i)         time.Sleep(time.Second)      }(i)   }   for {         time.Sleep(time.Second)     }}

运行后果

$ go run main.go...150577150578panic: too many concurrent operations on a single file or socket (max 1048575)

问题:如何限度协程的数量?

// main_chan.gofunc main() {    ch := make(chan struct{}, 4)    for i := 0; i < 20; i++ {        ch <- struct{}{}        go func(i int) {            log.Println(i)            time.Sleep(time.Second)            <-ch        }(i)    }    for {        time.Sleep(time.Second)    }}

生产者消费者模型

// 生产者: 生成 factor 整数倍的序列func Producer(factor int, out chan<- int) {    for i := 0; i < 10; i++ {        out <- i * factor    }}// 消费者func Consumer(in <-chan int) {    for v := range in {        fmt.Println(v)    }}func main() {    ch := make(chan int, 3) // 成绩队列    go Producer(3, ch) // 生成 3 的倍数的序列    go Producer(5, ch) // 生成 5 的倍数的序列    go Consumer(ch)    // 生产 生成的队列    time.Sleep(5 * time.Second)}

返回最优的后果

func main() {   ch := make(chan string, 32)   go func() {      ch <- searchByGoogle("golang")   }()   go func() {      ch <- searchByBaidu("golang")   }()   fmt.Println(<-ch)}func searchByGoogle(search string) string {   time.Sleep(2 * time.Second)   return "google result: " + search}func searchByBaidu(search string) string {   time.Sleep(time.Second)   return "baidu result " + search}
问题1:

当取得想要的后果之后,如何告诉或者平安退出其余还在执行的协程?

func main() {    ch := make(chan string, 32)    cancel := make(chan struct{},2)        go func() {        ch <- searchByGoogle("golang",cancel)    }()    go func() {        ch <- searchByBaidu("golang",cancel)    }()    fmt.Println(<-ch)    cancel <- struct{}{}    time.Sleep(time.Second)}func searchByGoogle(search string,cancel chan struct{}) string {    done := make(chan struct{})    go func() {        time.Sleep(2 * time.Second)        done <- struct{}{}    }()    select {    case <- done:        return "google result " + search    case <- cancel:        fmt.Println("google cancel")        return "google cancel"    }}func searchByBaidu(search string,cancel chan struct{}) string {    done := make(chan struct{})    go func() {        time.Sleep(1 * time.Second)        done <- struct{}{}    }()    select {    case <- done:        return "baidu result " + search    case <- cancel:        fmt.Println("google cancel")        return "baidu cancel"    }}
问题2:

如何做超时管制?

Goroutine 透露

1. 被忘记的发送者

func searchByBaidu(search string,cancel chan struct{}) string {    done := make(chan struct{})     go func() {        time.Sleep(1 * time.Second)        done <- struct{}{}    }()    select {    case <- done:        return "baidu result " + search    case <- cancel:        fmt.Println("google cancel")        return "baidu cancel"    }}

case <- done case <- cancel不确定会执行哪一个,如果执行 <-cancel ,则第五行 done <- struct{}{} 会永远阻塞,Goroutine无奈退出

如何解决?

减少channel容量

done := make(chan struct{})

还有其余方法吗?

func searchByBaidu(search string,cancel chan struct{}) string {   done := make(chan struct{})   go func() {      time.Sleep(1 * time.Second)      select {      case done <- struct{}{}:      default:         return      }   }()   select {   case <- done:      return "baidu result " + search   case <- cancel:      fmt.Println("google cancel")      return "baidu cancel"   }}

2. 被忘记的接收者

import (   "errors"   "fmt"   "io"   "net/http"   "os"   "path"   "runtime")type result struct {   url string   err error}func main() {   startGNum := runtime.NumGoroutine()   urls := []string{      "https://pic.netbian.com/uploads/allimg/210925/233922-163258436234e8.jpg",      "https://pic.netbian.com/uploads/allimg/210920/180354-16321322345f20.jpg",      "https://pic.netbian.com/uploads/allimg/210916/232432-16318058722f4d.jpg",   }   total := len(urls)   // 填充输出   input := make(chan string, total)   for _, url := range urls {      input <- url   }   // close(input)   output := make(chan *result, total)   // 启动4个goroutine   for i := 0; i < 4; i++ {      go download(input, output)   }   // 期待后果   for i := 0; i < total; i++ {      ret := <-output      fmt.Println(ret.url, ret.err)   }   time.Sleep(2*time.Second)  // 期待download协程的退出   endGNum := runtime.NumGoroutine()   fmt.Println("start goroutine", startGNum)   fmt.Println("end goroutine", endGNum)}func download(input <-chan string, output chan<- *result) {   for v := range input {      err := downloadFile(v)      output <- &result{         url: v,         err: err,      }   }   fmt.Println("download finish!!!")}func downloadFile(URL string) error {   //Get the response bytes from the url   response, err := http.Get(URL)   if err != nil {      return err   }   defer response.Body.Close()   if response.StatusCode != 200 {      return errors.New("Received non 200 response code")   }   //Create a empty file   file, err := os.Create(path.Base(URL))   if err != nil {      return err   }   defer file.Close()   //Write the bytes to the fiel   _, err = io.Copy(file, response.Body)   if err != nil {      return err   }   return nil}

这个会产生Goroutine的透露,起因是第49行的for v := range input 当没有数据输出的时候还在持续期待输出,所有应该在没有数据输出的时候通知它,让它不要傻傻的等

如何解决?

  1. 敞开input 通道
  2. 传递一个通道通知它后果

准则

  1. 永远不要在不晓得如何进行的状况下启动Goroutine,当咱们启动一个Goroutine的时候须要思考几个问题

    • 什么时候进行?
    • 能够通过什么形式终止它?
  2. 将并发留给调用者

    • 请将是否异步调用的选择权交给调用者,不然很有可能调用者并不知道你在这个函数外面应用了Goroutine
    • 如果你的函数启动了一个 Goroutine,您必须为调用者提供一种明确进行该 Goroutine 的办法。将异步执行函数的决定留给该函数的调用者通常更容易。

总结

Concurrency is a useful tool, but it must be used with caution.

并发是一个有用的工具,然而必须审慎应用

参考链接

  • the way to go
  • golang channel
  • 常见的并发模式
  • the-forgotten-sender
  • the-abandoned-receivers
  • concurrency