乐趣区

关于golang:了解Goroutine-和-Channel

Goroutine

什么是 Goroutine

  • Goroutine 是 Golang 特有的并发体,是一种轻量级的 ” 线程 ”
  • Go 中最根本的执行单元, 每个 Goroutine 独立执行
  • 每一个 Go 程序至多有一个 Goroutine:主 Goroutine。当程序启动时,它会主动创立。
func main() {say()  // 运行中,期待后果
  
  go say()
  fmt.Println("end")  // 不须要期待 say()的执行后果}

func say(){fmt.Println("hello world")
}

Vs Os Thread

零碎线程 Os Thread

每个零碎线程有固定大小的栈,个别默认 2M,这个栈次要用来保留函数递归调用时参数和局部变量,由内核调度

固定大小的栈就会带来问题

  • 空间节约
  • 空间可能又不够,存在栈溢出的危险

Goroutine

由 Go 的调度器调度,刚创立的时候很小(2kb 或者 4kb),会依据须要动静地伸缩栈的大小(支流实现中栈的最大值可达到 1GB)。

因为启动的代价很小,所以咱们能够轻易地启动成千上万个 Goroutine。

通过示例理解

1. 多个 goroutine 同时运行

运行的程序由调度器决定,不须要相互依赖

func main() {fmt.Println("Started")
   for i := 0; i < 10; i++ {go execute(i)
   }
   time.Sleep(time.Second * 1)
   fmt.Println("Finished")
}
func execute(id int) {fmt.Printf("id: %d\n", id)
}

2. 图片并发下载

func main() {urls := []string{
      "https://pic.netbian.com/uploads/allimg/210925/233922-163258436234e8.jpg",
      "https://pic.netbian.com/uploads/allimg/210920/180354-16321322345f20.jpg",
      "https://pic.netbian.com/uploads/allimg/210916/232432-16318058722f4d.jpg",
   }
   for _,url := range urls{go downloadFile(url)
   }
   time.Sleep(time.Second)
}

func downloadFile(URL string) error {
   //Get the response bytes from the url
   response, err := http.Get(URL)
   if err != nil {return err}
   defer response.Body.Close()

   if response.StatusCode != 200 {return errors.New("Received non 200 response code")
   }
   //Create a empty file
   file, err := os.Create(path.Base(URL))
   if err != nil {return err}
   defer file.Close()

   //Write the bytes to the fiel
   _, err = io.Copy(file, response.Body)
   if err != nil {return err}

   return nil
}

Recover

每个 Goroutine 都要有 recover 机制,因为当一个 Goroutine 抛 panic 的时候只有本身可能捕捉到其它 Goroutine 是没有方法捕获的。

如果没有 recover 机制,整个过程会 crash。

留神:Goroutine 产生 panic 时,只会调用本身的 defer,所以即使主 Goroutine 里写了 recover 逻辑,也无奈 recover。

func main() {go do1()
    go do2()
    time.Sleep(10*time.Second)
}

func do1() {
    for i := 0; i < 100; i++ {fmt.Println("do1", i)
    }
}

func do2() {defer func() {if err := recover(); err != nil {log.Printf("recover: %v", err)
        }
    }()

    for i := 0; i < 100; i++ {
        if i ==5{panic("do panic")
        }
        fmt.Println("do2", i)
    }
}

Channel

根本介绍

Channel 是 Go 内置的数据类型,为初始化的 channel 的值为 nil

通过发送和接管指定元素类型的值来进行通信

  • Channel 提供了 goroutines 之间的同步和通信
  • Goroutine 实现并发 / 并行的轻量级独立执行。

Shard Memory

graph
thread1 --> Memory
thread2 --> Memory
thread3 --> Memory

CSP

Communicating sequential processes 通信程序编程

用于形容两个独立的并发实体通过共享的通信 channel(管道)进行通信的并发模型,

不关注发送音讯的实体,而关注与发送音讯时应用的 channel

不要通过共享内存来通信,而通过通信来共享内存。— Rob Pike

graph LR
Goroutine1 --> Channel --> Goroutine2

数据结构

type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
    closed   uint32         // denotes weather channel is closed or not
    elemtype *_type         // element type
    sendx    uint           // send index
    recvx    uint           // receive index
    recvq    waitq          // list of recv waiters
    sendq    waitq          // list of send waiters
    lock     mutex
}

根本用法

定义

ChannelType = ("chan" | "chan" "<-" | "<-" "chan") ElementType .

<-运算符指定通道 方向 发送 接管 。如果没有给出方向,则通道是 双向的

chan T          // 能够发送接管 T
chan<- T        // 只能发送 T
<-chan T        // 只能接管 T 

创立

ch := make(chan int)     // 无缓冲 cap 0
ch := make(chan int,100) // 有缓冲 cap 100

操作

ch <- 1. // 发送
<-ch.    // 接管
close(ch)// 敞开

代码示例

func goroutineA(ch <-chan int) {fmt.Println("[goroutineA] want a data")
   val := <-ch
   fmt.Println("[goroutineA] received the data", val)
}

func goroutineB(ch chan<- int) {
   ch <- 1
   fmt.Println("[goroutineB] send the data 1")
}

func main() {ch := make(chan int)
   go goroutineA(ch)
   go goroutineB(ch)
   time.Sleep(time.Second)
}
sequenceDiagram
groutineA->channel: hello,我想要获取一个数据
channel-->groutineA: 我当初还没有数据
groutineA->channel: 那我睡觉了,等有数据再叫醒我
channel-->groutineA: ok
groutineB->channel: hello,我要发送一个数据给你
channel-->groutineB: ok,发过来吧
channel->groutineA: 醒醒,接收数据啦
groutineA-->channel: 来咯

Unbuffered channels

缓冲区大小为 0 的channel

channel 接收者会阻塞,直到收到音讯,channel 发送者会阻塞,直到接收者收到音讯

Buffered channels

领有缓冲区,当缓冲区已满时,发送者会阻塞;当缓冲区为空时,接收者会阻塞

总结

不要关注 channel 的数据结构,更应该关注 channel 的行为

Command nil empty full not full & empty closed
Receive block block success success success
Send block success block success panic
Close panic success success success panic

几条准则

  • channel 上的发送操作总在对应的接管操作实现前产生
  • 如果 channel 敞开后从中接收数据,接受者就会收到该 channel 返回的零值
  • 从无缓冲的 channel 中进行的接管,要产生在对该 channel 进行的发送实现前
  • 不要在数据接管方或者在有多个发送者的状况下敞开通道。换句话说,咱们只应该让一个通道惟一的发送者敞开此通道

示例

package main

import "fmt"

func main() {ch1 := make(chan string)
    ch1 <- "hello world"
    fmt.Println(<-ch1)
}

执行之后会报错

fatal error: all goroutines are asleep - deadlock!

起因?

第 7 行 给通道 ch1 传入值 hello world, 然而对于无缓冲的通道,在接收者未筹备好前发送操作是阻塞的,短少接收者造成死锁

如何解决?

1. 减少接收者
func main() {ch1 := make(chan string)
    go func() {fmt.Println(<-ch1)
    }()
    ch1 <- "hello world"
    time.Sleep(time.Millisecond)
}

func main() {ch1 := make(chan string)
    go func() {ch1 <- "hello world"}()
    fmt.Println(<-ch1)
}
2. 减少 channel 容量
func main() {ch1 := make(chan string,1)
    ch1 <- "hello world"
    fmt.Println(<-ch1)
}

Goroutine & Channel 串起来

常见的并发模式

告诉

  1. 向一个通道发送一个值实现告诉
func main() {ch := make(chan int)
   go do(ch)
   // do something
   <- ch
   fmt.Println("done")
}

func do(ch chan int){
   // 长时间操作
   time.Sleep(3*time.Second)
   fmt.Println("doing")
   ch <- 1
}
  1. 从一个通道接管值实现告诉
func main() {ch := make(chan int)

   go do(ch)
   // do something
   ch <- 1
   fmt.Println("done")
}

func do(ch chan int){
   // 长时间操作
   time.Sleep(3*time.Second)
   fmt.Println("doing")
   <-ch
}

互斥锁

func main() {mutex := make(chan struct{}, 1) // 容量必须为 1

   counter := 0
   increase := func() {mutex <- struct{}{} // 加锁
      counter++
      <-mutex // 解锁
   }

   increase1000 := func(done chan<- struct{}) {
      for i := 0; i < 1000; i++ {increase()
      }
      done <- struct{}{}
   }

   done := make(chan struct{})
   go increase1000(done)
   go increase1000(done)
   <-done; <-done
   fmt.Println(counter) // 2000
}

管制协程的并发数量

不限度的场景

func main() {
   for i := 0; i < math.MaxInt32; i++ {go func(i int) {log.Println(i)
         time.Sleep(time.Second)
      }(i)
   }

   for {time.Sleep(time.Second)
     }
}

运行后果

$ go run main.go
...
150577
150578
panic: too many concurrent operations on a single file or socket (max 1048575)

问题:如何限度协程的数量?

// main_chan.go
func main() {ch := make(chan struct{}, 4)

    for i := 0; i < 20; i++ {ch <- struct{}{}
        go func(i int) {log.Println(i)
            time.Sleep(time.Second)
            <-ch
        }(i)
    }

    for {time.Sleep(time.Second)
    }
}

生产者消费者模型

// 生产者: 生成 factor 整数倍的序列
func Producer(factor int, out chan<- int) {
    for i := 0; i < 10; i++ {out <- i * factor}
}

// 消费者
func Consumer(in <-chan int) {
    for v := range in {fmt.Println(v)
    }
}
func main() {ch := make(chan int, 3) // 成绩队列

    go Producer(3, ch) // 生成 3 的倍数的序列
    go Producer(5, ch) // 生成 5 的倍数的序列
    go Consumer(ch)    // 生产 生成的队列

    time.Sleep(5 * time.Second)
}

返回最优的后果

func main() {ch := make(chan string, 32)

   go func() {ch <- searchByGoogle("golang")
   }()
   go func() {ch <- searchByBaidu("golang")
   }()

   fmt.Println(<-ch)
}

func searchByGoogle(search string) string {time.Sleep(2 * time.Second)
   return "google result:" + search
}

func searchByBaidu(search string) string {time.Sleep(time.Second)
   return "baidu result" + search
}
问题 1:

当取得想要的后果之后,如何告诉或者平安退出其余还在执行的协程?

func main() {ch := make(chan string, 32)
    cancel := make(chan struct{},2)
    
    go func() {ch <- searchByGoogle("golang",cancel)
    }()
    go func() {ch <- searchByBaidu("golang",cancel)
    }()

    fmt.Println(<-ch)
    cancel <- struct{}{}

    time.Sleep(time.Second)
}

func searchByGoogle(search string,cancel chan struct{}) string {done := make(chan struct{})
    go func() {time.Sleep(2 * time.Second)
        done <- struct{}{}
    }()

    select {
    case <- done:
        return "google result" + search
    case <- cancel:
        fmt.Println("google cancel")
        return "google cancel"
    }
}

func searchByBaidu(search string,cancel chan struct{}) string {done := make(chan struct{})
    go func() {time.Sleep(1 * time.Second)
        done <- struct{}{}
    }()

    select {
    case <- done:
        return "baidu result" + search
    case <- cancel:
        fmt.Println("google cancel")
        return "baidu cancel"
    }
}
问题 2:

如何做超时管制?

Goroutine 透露

1. 被忘记的发送者

func searchByBaidu(search string,cancel chan struct{}) string {done := make(chan struct{}) 
    go func() {time.Sleep(1 * time.Second)
        done <- struct{}{}
    }()

    select {
    case <- done:
        return "baidu result" + search
    case <- cancel:
        fmt.Println("google cancel")
        return "baidu cancel"
    }
}

case <- done case <- cancel不确定会执行哪一个,如果执行 <-cancel,则第五行 done <- struct{}{} 会永远阻塞,Goroutine 无奈退出

如何解决?

减少 channel 容量

done := make(chan struct{})

还有其余方法吗?

func searchByBaidu(search string,cancel chan struct{}) string {done := make(chan struct{})
   go func() {time.Sleep(1 * time.Second)
      select {case done <- struct{}{}:
      default:
         return
      }
   }()

   select {
   case <- done:
      return "baidu result" + search
   case <- cancel:
      fmt.Println("google cancel")
      return "baidu cancel"
   }
}

2. 被忘记的接收者

import (
   "errors"
   "fmt"
   "io"
   "net/http"
   "os"
   "path"
   "runtime"
)

type result struct {
   url string
   err error
}

func main() {startGNum := runtime.NumGoroutine()
   urls := []string{
      "https://pic.netbian.com/uploads/allimg/210925/233922-163258436234e8.jpg",
      "https://pic.netbian.com/uploads/allimg/210920/180354-16321322345f20.jpg",
      "https://pic.netbian.com/uploads/allimg/210916/232432-16318058722f4d.jpg",
   }

   total := len(urls)
   // 填充输出
   input := make(chan string, total)
   for _, url := range urls {input <- url}
   // close(input)
   output := make(chan *result, total)
   // 启动 4 个 goroutine
   for i := 0; i < 4; i++ {go download(input, output)
   }
   // 期待后果
   for i := 0; i < total; i++ {
      ret := <-output
      fmt.Println(ret.url, ret.err)
   }
   time.Sleep(2*time.Second)  // 期待 download 协程的退出
   endGNum := runtime.NumGoroutine()
   fmt.Println("start goroutine", startGNum)
   fmt.Println("end goroutine", endGNum)
}

func download(input <-chan string, output chan<- *result) {

   for v := range input {err := downloadFile(v)
      output <- &result{
         url: v,
         err: err,
      }
   }
   fmt.Println("download finish!!!")
}

func downloadFile(URL string) error {
   //Get the response bytes from the url
   response, err := http.Get(URL)
   if err != nil {return err}
   defer response.Body.Close()

   if response.StatusCode != 200 {return errors.New("Received non 200 response code")
   }
   //Create a empty file
   file, err := os.Create(path.Base(URL))
   if err != nil {return err}
   defer file.Close()

   //Write the bytes to the fiel
   _, err = io.Copy(file, response.Body)
   if err != nil {return err}

   return nil
}

这个会产生 Goroutine 的透露,起因是第 49 行的for v := range input 当没有数据输出的时候还在持续期待输出,所有应该在没有数据输出的时候通知它,让它不要傻傻的等

如何解决?

  1. 敞开 input 通道
  2. 传递一个通道通知它后果

准则

  1. 永远不要在不晓得如何进行的状况下启动 Goroutine,当咱们启动一个 Goroutine 的时候须要思考几个问题

    • 什么时候进行?
    • 能够通过什么形式终止它?
  2. 将并发留给调用者

    • 请将是否异步调用的选择权交给调用者,不然很有可能调用者并不知道你在这个函数外面应用了 Goroutine
    • 如果你的函数启动了一个 Goroutine,您必须为调用者提供一种明确进行该 Goroutine 的办法。将异步执行函数的决定留给该函数的调用者通常更容易。

总结

Concurrency is a useful tool, but it must be used with caution.

并发是一个有用的工具,然而必须审慎应用

参考链接

  • the way to go
  • golang channel
  • 常见的并发模式
  • the-forgotten-sender
  • the-abandoned-receivers
  • concurrency
退出移动版