Goroutine
什么是Goroutine
- Goroutine是Golang特有的并发体,是一种轻量级的"线程"
- Go中最根本的执行单元,每个Goroutine独立执行
- 每一个Go程序至多有一个Goroutine:主Goroutine。当程序启动时,它会主动创立。
func main() { say() // 运行中,期待后果 go say() fmt.Println("end") // 不须要期待say()的执行后果}func say(){ fmt.Println("hello world")}
Vs Os Thread
零碎线程 Os Thread
每个零碎线程有固定大小的栈,个别默认2M,这个栈次要用来保留函数递归调用时参数和局部变量,由内核调度
固定大小的栈就会带来问题
- 空间节约
- 空间可能又不够,存在栈溢出的危险
Goroutine
由Go的调度器调度,刚创立的时候很小(2kb或者4kb),会依据须要动静地伸缩栈的大小(支流实现中栈的最大值可达到1GB)。
因为启动的代价很小,所以咱们能够轻易地启动成千上万个Goroutine。
通过示例理解
1. 多个goroutine同时运行
运行的程序由调度器决定,不须要相互依赖
func main() { fmt.Println("Started") for i := 0; i < 10; i++ { go execute(i) } time.Sleep(time.Second * 1) fmt.Println("Finished")}func execute(id int) { fmt.Printf("id: %d\n", id)}
2. 图片并发下载
func main() { urls := []string{ "https://pic.netbian.com/uploads/allimg/210925/233922-163258436234e8.jpg", "https://pic.netbian.com/uploads/allimg/210920/180354-16321322345f20.jpg", "https://pic.netbian.com/uploads/allimg/210916/232432-16318058722f4d.jpg", } for _,url := range urls{ go downloadFile(url) } time.Sleep(time.Second)}func downloadFile(URL string) error { //Get the response bytes from the url response, err := http.Get(URL) if err != nil { return err } defer response.Body.Close() if response.StatusCode != 200 { return errors.New("Received non 200 response code") } //Create a empty file file, err := os.Create(path.Base(URL)) if err != nil { return err } defer file.Close() //Write the bytes to the fiel _, err = io.Copy(file, response.Body) if err != nil { return err } return nil}
Recover
每个Goroutine都要有recover机制,因为当一个Goroutine抛panic的时候只有本身可能捕捉到其它Goroutine是没有方法捕获的。
如果没有recover机制,整个过程会crash。
留神:Goroutine产生panic时,只会调用本身的defer,所以即使主Goroutine里写了recover逻辑,也无奈recover。
func main() { go do1() go do2() time.Sleep(10*time.Second)}func do1() { for i := 0; i < 100; i++ { fmt.Println("do1", i) }}func do2() { defer func() { if err := recover(); err != nil { log.Printf("recover: %v", err) } }() for i := 0; i < 100; i++ { if i ==5{ panic("do panic") } fmt.Println("do2", i) }}
Channel
根本介绍
Channel是Go内置的数据类型,为初始化的channel的值为nil
通过发送和接管指定元素类型的值来进行通信
- Channel 提供了 goroutines 之间的同步和通信
- Goroutine 实现并发/并行的轻量级独立执行。
Shard Memory
graphthread1 --> Memorythread2 --> Memorythread3 --> Memory
CSP
Communicating sequential processes 通信程序编程
用于形容两个独立的并发实体通过共享的通信 channel(管道)进行通信的并发模型,
不关注发送音讯的实体,而关注与发送音讯时应用的channel
不要通过共享内存来通信,而通过通信来共享内存。 -- Rob Pike
graph LRGoroutine1 --> Channel --> Goroutine2
数据结构
type hchan struct { qcount uint // total data in the queue dataqsiz uint // size of the circular queue buf unsafe.Pointer // points to an array of dataqsiz elements elemsize uint16 closed uint32 // denotes weather channel is closed or not elemtype *_type // element type sendx uint // send index recvx uint // receive index recvq waitq // list of recv waiters sendq waitq // list of send waiters lock mutex}
根本用法
定义
ChannelType = ( "chan" | "chan" "<-" | "<-" "chan" ) ElementType .
<-
运算符指定通道方向, 发送或接管。如果没有给出方向,则通道是 双向的
chan T // 能够发送接管Tchan<- T // 只能发送T<-chan T // 只能接管T
创立
ch := make(chan int) // 无缓冲 cap 0ch := make(chan int,100) // 有缓冲 cap 100
操作
ch <- 1. // 发送<-ch. // 接管close(ch)// 敞开
代码示例
func goroutineA(ch <-chan int) { fmt.Println("[goroutineA] want a data") val := <-ch fmt.Println("[goroutineA] received the data", val)}func goroutineB(ch chan<- int) { ch <- 1 fmt.Println("[goroutineB] send the data 1")}func main() { ch := make(chan int) go goroutineA(ch) go goroutineB(ch) time.Sleep(time.Second)}
sequenceDiagramgroutineA->channel: hello,我想要获取一个数据channel-->groutineA: 我当初还没有数据groutineA->channel: 那我睡觉了,等有数据再叫醒我channel-->groutineA: okgroutineB->channel: hello,我要发送一个数据给你channel-->groutineB: ok,发过来吧channel->groutineA: 醒醒,接收数据啦groutineA-->channel: 来咯
Unbuffered channels
缓冲区大小为0的channel
channel接收者会阻塞,直到收到音讯,channel发送者会阻塞,直到接收者收到音讯
Buffered channels
领有缓冲区,当缓冲区已满时,发送者会阻塞;当缓冲区为空时,接收者会阻塞
总结
不要关注channel的数据结构,更应该关注channel的行为
Command | nil | empty | full | not full & empty | closed |
---|---|---|---|---|---|
Receive | block | block | success | success | success |
Send | block | success | block | success | panic |
Close | panic | success | success | success | panic |
几条准则
- channel 上的发送操作总在对应的接管操作实现前产生
- 如果 channel 敞开后从中接收数据,接受者就会收到该 channel 返回的零值
- 从无缓冲的 channel 中进行的接管,要产生在对该 channel 进行的发送实现前
- 不要在数据接管方或者在有多个发送者的状况下敞开通道。换句话说,咱们只应该让一个通道惟一的发送者敞开此通道
示例
package mainimport "fmt"func main() { ch1 := make(chan string) ch1 <- "hello world" fmt.Println(<-ch1)}
执行之后会报错
fatal error: all goroutines are asleep - deadlock!
起因?
第7行 给通道ch1传入值 hello world
,然而对于无缓冲的通道,在接收者未筹备好前发送操作是阻塞的,短少接收者造成死锁
如何解决?
1. 减少接收者
func main() { ch1 := make(chan string) go func() { fmt.Println(<-ch1) }() ch1 <- "hello world" time.Sleep(time.Millisecond)}func main() { ch1 := make(chan string) go func() { ch1 <- "hello world" }() fmt.Println(<-ch1)}
2. 减少channel容量
func main() { ch1 := make(chan string,1) ch1 <- "hello world" fmt.Println(<-ch1)}
Goroutine & Channel 串起来
常见的并发模式
告诉
- 向一个通道发送一个值实现告诉
func main() { ch := make(chan int) go do(ch) // do something <- ch fmt.Println("done")}func do(ch chan int){ // 长时间操作 time.Sleep(3*time.Second) fmt.Println("doing") ch <- 1}
- 从一个通道接管值实现告诉
func main() { ch := make(chan int) go do(ch) // do something ch <- 1 fmt.Println("done")}func do(ch chan int){ // 长时间操作 time.Sleep(3*time.Second) fmt.Println("doing") <-ch}
互斥锁
func main() { mutex := make(chan struct{}, 1) // 容量必须为1 counter := 0 increase := func() { mutex <- struct{}{} // 加锁 counter++ <-mutex // 解锁 } increase1000 := func(done chan<- struct{}) { for i := 0; i < 1000; i++ { increase() } done <- struct{}{} } done := make(chan struct{}) go increase1000(done) go increase1000(done) <-done; <-done fmt.Println(counter) // 2000}
管制协程的并发数量
不限度的场景
func main() { for i := 0; i < math.MaxInt32; i++ { go func(i int) { log.Println(i) time.Sleep(time.Second) }(i) } for { time.Sleep(time.Second) }}
运行后果
$ go run main.go...150577150578panic: too many concurrent operations on a single file or socket (max 1048575)
问题:如何限度协程的数量?
// main_chan.gofunc main() { ch := make(chan struct{}, 4) for i := 0; i < 20; i++ { ch <- struct{}{} go func(i int) { log.Println(i) time.Sleep(time.Second) <-ch }(i) } for { time.Sleep(time.Second) }}
生产者消费者模型
// 生产者: 生成 factor 整数倍的序列func Producer(factor int, out chan<- int) { for i := 0; i < 10; i++ { out <- i * factor }}// 消费者func Consumer(in <-chan int) { for v := range in { fmt.Println(v) }}func main() { ch := make(chan int, 3) // 成绩队列 go Producer(3, ch) // 生成 3 的倍数的序列 go Producer(5, ch) // 生成 5 的倍数的序列 go Consumer(ch) // 生产 生成的队列 time.Sleep(5 * time.Second)}
返回最优的后果
func main() { ch := make(chan string, 32) go func() { ch <- searchByGoogle("golang") }() go func() { ch <- searchByBaidu("golang") }() fmt.Println(<-ch)}func searchByGoogle(search string) string { time.Sleep(2 * time.Second) return "google result: " + search}func searchByBaidu(search string) string { time.Sleep(time.Second) return "baidu result " + search}
问题1:
当取得想要的后果之后,如何告诉或者平安退出其余还在执行的协程?
func main() { ch := make(chan string, 32) cancel := make(chan struct{},2) go func() { ch <- searchByGoogle("golang",cancel) }() go func() { ch <- searchByBaidu("golang",cancel) }() fmt.Println(<-ch) cancel <- struct{}{} time.Sleep(time.Second)}func searchByGoogle(search string,cancel chan struct{}) string { done := make(chan struct{}) go func() { time.Sleep(2 * time.Second) done <- struct{}{} }() select { case <- done: return "google result " + search case <- cancel: fmt.Println("google cancel") return "google cancel" }}func searchByBaidu(search string,cancel chan struct{}) string { done := make(chan struct{}) go func() { time.Sleep(1 * time.Second) done <- struct{}{} }() select { case <- done: return "baidu result " + search case <- cancel: fmt.Println("google cancel") return "baidu cancel" }}
问题2:
如何做超时管制?
Goroutine 透露
1. 被忘记的发送者
func searchByBaidu(search string,cancel chan struct{}) string { done := make(chan struct{}) go func() { time.Sleep(1 * time.Second) done <- struct{}{} }() select { case <- done: return "baidu result " + search case <- cancel: fmt.Println("google cancel") return "baidu cancel" }}
case <- done
和 case <- cancel
不确定会执行哪一个,如果执行 <-cancel
,则第五行 done <- struct{}{}
会永远阻塞,Goroutine无奈退出
如何解决?
减少channel容量
done := make(chan struct{})
还有其余方法吗?
func searchByBaidu(search string,cancel chan struct{}) string { done := make(chan struct{}) go func() { time.Sleep(1 * time.Second) select { case done <- struct{}{}: default: return } }() select { case <- done: return "baidu result " + search case <- cancel: fmt.Println("google cancel") return "baidu cancel" }}
2. 被忘记的接收者
import ( "errors" "fmt" "io" "net/http" "os" "path" "runtime")type result struct { url string err error}func main() { startGNum := runtime.NumGoroutine() urls := []string{ "https://pic.netbian.com/uploads/allimg/210925/233922-163258436234e8.jpg", "https://pic.netbian.com/uploads/allimg/210920/180354-16321322345f20.jpg", "https://pic.netbian.com/uploads/allimg/210916/232432-16318058722f4d.jpg", } total := len(urls) // 填充输出 input := make(chan string, total) for _, url := range urls { input <- url } // close(input) output := make(chan *result, total) // 启动4个goroutine for i := 0; i < 4; i++ { go download(input, output) } // 期待后果 for i := 0; i < total; i++ { ret := <-output fmt.Println(ret.url, ret.err) } time.Sleep(2*time.Second) // 期待download协程的退出 endGNum := runtime.NumGoroutine() fmt.Println("start goroutine", startGNum) fmt.Println("end goroutine", endGNum)}func download(input <-chan string, output chan<- *result) { for v := range input { err := downloadFile(v) output <- &result{ url: v, err: err, } } fmt.Println("download finish!!!")}func downloadFile(URL string) error { //Get the response bytes from the url response, err := http.Get(URL) if err != nil { return err } defer response.Body.Close() if response.StatusCode != 200 { return errors.New("Received non 200 response code") } //Create a empty file file, err := os.Create(path.Base(URL)) if err != nil { return err } defer file.Close() //Write the bytes to the fiel _, err = io.Copy(file, response.Body) if err != nil { return err } return nil}
这个会产生Goroutine的透露,起因是第49行的for v := range input
当没有数据输出的时候还在持续期待输出,所有应该在没有数据输出的时候通知它,让它不要傻傻的等
如何解决?
- 敞开input 通道
- 传递一个通道通知它后果
准则
永远不要在不晓得如何进行的状况下启动Goroutine,当咱们启动一个Goroutine的时候须要思考几个问题
- 什么时候进行?
- 能够通过什么形式终止它?
将并发留给调用者
- 请将是否异步调用的选择权交给调用者,不然很有可能调用者并不知道你在这个函数外面应用了Goroutine
- 如果你的函数启动了一个 Goroutine,您必须为调用者提供一种明确进行该 Goroutine 的办法。将异步执行函数的决定留给该函数的调用者通常更容易。
总结
Concurrency is a useful tool, but it must be used with caution.
并发是一个有用的工具,然而必须审慎应用
参考链接
- the way to go
- golang channel
- 常见的并发模式
- the-forgotten-sender
- the-abandoned-receivers
- concurrency