Goroutine
什么是 Goroutine
- Goroutine 是 Golang 特有的并发体,是一种轻量级的 ” 线程 ”
- Go 中最根本的执行单元, 每个 Goroutine 独立执行
- 每一个 Go 程序至多有一个 Goroutine:主 Goroutine。当程序启动时,它会主动创立。
func main() {say() // 运行中,期待后果
go say()
fmt.Println("end") // 不须要期待 say()的执行后果}
func say(){fmt.Println("hello world")
}
Vs Os Thread
零碎线程 Os Thread
每个零碎线程有固定大小的栈,个别默认 2M,这个栈次要用来保留函数递归调用时参数和局部变量,由内核调度
固定大小的栈就会带来问题
- 空间节约
- 空间可能又不够,存在栈溢出的危险
Goroutine
由 Go 的调度器调度,刚创立的时候很小(2kb 或者 4kb),会依据须要动静地伸缩栈的大小(支流实现中栈的最大值可达到 1GB)。
因为启动的代价很小,所以咱们能够轻易地启动成千上万个 Goroutine。
通过示例理解
1. 多个 goroutine 同时运行
运行的程序由调度器决定,不须要相互依赖
func main() {fmt.Println("Started")
for i := 0; i < 10; i++ {go execute(i)
}
time.Sleep(time.Second * 1)
fmt.Println("Finished")
}
func execute(id int) {fmt.Printf("id: %d\n", id)
}
2. 图片并发下载
func main() {urls := []string{
"https://pic.netbian.com/uploads/allimg/210925/233922-163258436234e8.jpg",
"https://pic.netbian.com/uploads/allimg/210920/180354-16321322345f20.jpg",
"https://pic.netbian.com/uploads/allimg/210916/232432-16318058722f4d.jpg",
}
for _,url := range urls{go downloadFile(url)
}
time.Sleep(time.Second)
}
func downloadFile(URL string) error {
//Get the response bytes from the url
response, err := http.Get(URL)
if err != nil {return err}
defer response.Body.Close()
if response.StatusCode != 200 {return errors.New("Received non 200 response code")
}
//Create a empty file
file, err := os.Create(path.Base(URL))
if err != nil {return err}
defer file.Close()
//Write the bytes to the fiel
_, err = io.Copy(file, response.Body)
if err != nil {return err}
return nil
}
Recover
每个 Goroutine 都要有 recover 机制,因为当一个 Goroutine 抛 panic 的时候只有本身可能捕捉到其它 Goroutine 是没有方法捕获的。
如果没有 recover 机制,整个过程会 crash。
留神:Goroutine 产生 panic 时,只会调用本身的 defer,所以即使主 Goroutine 里写了 recover 逻辑,也无奈 recover。
func main() {go do1()
go do2()
time.Sleep(10*time.Second)
}
func do1() {
for i := 0; i < 100; i++ {fmt.Println("do1", i)
}
}
func do2() {defer func() {if err := recover(); err != nil {log.Printf("recover: %v", err)
}
}()
for i := 0; i < 100; i++ {
if i ==5{panic("do panic")
}
fmt.Println("do2", i)
}
}
Channel
根本介绍
Channel 是 Go 内置的数据类型,为初始化的 channel 的值为 nil
通过发送和接管指定元素类型的值来进行通信
- Channel 提供了 goroutines 之间的同步和通信
- Goroutine 实现并发 / 并行的轻量级独立执行。
Shard Memory
graph
thread1 --> Memory
thread2 --> Memory
thread3 --> Memory
CSP
Communicating sequential processes 通信程序编程
用于形容两个独立的并发实体通过共享的通信 channel(管道)进行通信的并发模型,
不关注发送音讯的实体,而关注与发送音讯时应用的 channel
不要通过共享内存来通信,而通过通信来共享内存。— Rob Pike
graph LR
Goroutine1 --> Channel --> Goroutine2
数据结构
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32 // denotes weather channel is closed or not
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
lock mutex
}
根本用法
定义
ChannelType = ("chan" | "chan" "<-" | "<-" "chan") ElementType .
<-
运算符指定通道 方向 , 发送 或接管 。如果没有给出方向,则通道是 双向的
chan T // 能够发送接管 T
chan<- T // 只能发送 T
<-chan T // 只能接管 T
创立
ch := make(chan int) // 无缓冲 cap 0
ch := make(chan int,100) // 有缓冲 cap 100
操作
ch <- 1. // 发送
<-ch. // 接管
close(ch)// 敞开
代码示例
func goroutineA(ch <-chan int) {fmt.Println("[goroutineA] want a data")
val := <-ch
fmt.Println("[goroutineA] received the data", val)
}
func goroutineB(ch chan<- int) {
ch <- 1
fmt.Println("[goroutineB] send the data 1")
}
func main() {ch := make(chan int)
go goroutineA(ch)
go goroutineB(ch)
time.Sleep(time.Second)
}
sequenceDiagram
groutineA->channel: hello,我想要获取一个数据
channel-->groutineA: 我当初还没有数据
groutineA->channel: 那我睡觉了,等有数据再叫醒我
channel-->groutineA: ok
groutineB->channel: hello,我要发送一个数据给你
channel-->groutineB: ok,发过来吧
channel->groutineA: 醒醒,接收数据啦
groutineA-->channel: 来咯
Unbuffered channels
缓冲区大小为 0 的channel
channel 接收者会阻塞,直到收到音讯,channel 发送者会阻塞,直到接收者收到音讯
Buffered channels
领有缓冲区,当缓冲区已满时,发送者会阻塞;当缓冲区为空时,接收者会阻塞
总结
不要关注 channel 的数据结构,更应该关注 channel 的行为
Command | nil | empty | full | not full & empty | closed |
---|---|---|---|---|---|
Receive | block | block | success | success | success |
Send | block | success | block | success | panic |
Close | panic | success | success | success | panic |
几条准则
- channel 上的发送操作总在对应的接管操作实现前产生
- 如果 channel 敞开后从中接收数据,接受者就会收到该 channel 返回的零值
- 从无缓冲的 channel 中进行的接管,要产生在对该 channel 进行的发送实现前
- 不要在数据接管方或者在有多个发送者的状况下敞开通道。换句话说,咱们只应该让一个通道惟一的发送者敞开此通道
示例
package main
import "fmt"
func main() {ch1 := make(chan string)
ch1 <- "hello world"
fmt.Println(<-ch1)
}
执行之后会报错
fatal error: all goroutines are asleep - deadlock!
起因?
第 7 行 给通道 ch1 传入值 hello world
, 然而对于无缓冲的通道,在接收者未筹备好前发送操作是阻塞的,短少接收者造成死锁
如何解决?
1. 减少接收者
func main() {ch1 := make(chan string)
go func() {fmt.Println(<-ch1)
}()
ch1 <- "hello world"
time.Sleep(time.Millisecond)
}
func main() {ch1 := make(chan string)
go func() {ch1 <- "hello world"}()
fmt.Println(<-ch1)
}
2. 减少 channel 容量
func main() {ch1 := make(chan string,1)
ch1 <- "hello world"
fmt.Println(<-ch1)
}
Goroutine & Channel 串起来
常见的并发模式
告诉
- 向一个通道发送一个值实现告诉
func main() {ch := make(chan int)
go do(ch)
// do something
<- ch
fmt.Println("done")
}
func do(ch chan int){
// 长时间操作
time.Sleep(3*time.Second)
fmt.Println("doing")
ch <- 1
}
- 从一个通道接管值实现告诉
func main() {ch := make(chan int)
go do(ch)
// do something
ch <- 1
fmt.Println("done")
}
func do(ch chan int){
// 长时间操作
time.Sleep(3*time.Second)
fmt.Println("doing")
<-ch
}
互斥锁
func main() {mutex := make(chan struct{}, 1) // 容量必须为 1
counter := 0
increase := func() {mutex <- struct{}{} // 加锁
counter++
<-mutex // 解锁
}
increase1000 := func(done chan<- struct{}) {
for i := 0; i < 1000; i++ {increase()
}
done <- struct{}{}
}
done := make(chan struct{})
go increase1000(done)
go increase1000(done)
<-done; <-done
fmt.Println(counter) // 2000
}
管制协程的并发数量
不限度的场景
func main() {
for i := 0; i < math.MaxInt32; i++ {go func(i int) {log.Println(i)
time.Sleep(time.Second)
}(i)
}
for {time.Sleep(time.Second)
}
}
运行后果
$ go run main.go
...
150577
150578
panic: too many concurrent operations on a single file or socket (max 1048575)
问题:如何限度协程的数量?
// main_chan.go
func main() {ch := make(chan struct{}, 4)
for i := 0; i < 20; i++ {ch <- struct{}{}
go func(i int) {log.Println(i)
time.Sleep(time.Second)
<-ch
}(i)
}
for {time.Sleep(time.Second)
}
}
生产者消费者模型
// 生产者: 生成 factor 整数倍的序列
func Producer(factor int, out chan<- int) {
for i := 0; i < 10; i++ {out <- i * factor}
}
// 消费者
func Consumer(in <-chan int) {
for v := range in {fmt.Println(v)
}
}
func main() {ch := make(chan int, 3) // 成绩队列
go Producer(3, ch) // 生成 3 的倍数的序列
go Producer(5, ch) // 生成 5 的倍数的序列
go Consumer(ch) // 生产 生成的队列
time.Sleep(5 * time.Second)
}
返回最优的后果
func main() {ch := make(chan string, 32)
go func() {ch <- searchByGoogle("golang")
}()
go func() {ch <- searchByBaidu("golang")
}()
fmt.Println(<-ch)
}
func searchByGoogle(search string) string {time.Sleep(2 * time.Second)
return "google result:" + search
}
func searchByBaidu(search string) string {time.Sleep(time.Second)
return "baidu result" + search
}
问题 1:
当取得想要的后果之后,如何告诉或者平安退出其余还在执行的协程?
func main() {ch := make(chan string, 32)
cancel := make(chan struct{},2)
go func() {ch <- searchByGoogle("golang",cancel)
}()
go func() {ch <- searchByBaidu("golang",cancel)
}()
fmt.Println(<-ch)
cancel <- struct{}{}
time.Sleep(time.Second)
}
func searchByGoogle(search string,cancel chan struct{}) string {done := make(chan struct{})
go func() {time.Sleep(2 * time.Second)
done <- struct{}{}
}()
select {
case <- done:
return "google result" + search
case <- cancel:
fmt.Println("google cancel")
return "google cancel"
}
}
func searchByBaidu(search string,cancel chan struct{}) string {done := make(chan struct{})
go func() {time.Sleep(1 * time.Second)
done <- struct{}{}
}()
select {
case <- done:
return "baidu result" + search
case <- cancel:
fmt.Println("google cancel")
return "baidu cancel"
}
}
问题 2:
如何做超时管制?
Goroutine 透露
1. 被忘记的发送者
func searchByBaidu(search string,cancel chan struct{}) string {done := make(chan struct{})
go func() {time.Sleep(1 * time.Second)
done <- struct{}{}
}()
select {
case <- done:
return "baidu result" + search
case <- cancel:
fmt.Println("google cancel")
return "baidu cancel"
}
}
case <- done
和 case <- cancel
不确定会执行哪一个,如果执行 <-cancel
,则第五行 done <- struct{}{}
会永远阻塞,Goroutine 无奈退出
如何解决?
减少 channel 容量
done := make(chan struct{})
还有其余方法吗?
func searchByBaidu(search string,cancel chan struct{}) string {done := make(chan struct{})
go func() {time.Sleep(1 * time.Second)
select {case done <- struct{}{}:
default:
return
}
}()
select {
case <- done:
return "baidu result" + search
case <- cancel:
fmt.Println("google cancel")
return "baidu cancel"
}
}
2. 被忘记的接收者
import (
"errors"
"fmt"
"io"
"net/http"
"os"
"path"
"runtime"
)
type result struct {
url string
err error
}
func main() {startGNum := runtime.NumGoroutine()
urls := []string{
"https://pic.netbian.com/uploads/allimg/210925/233922-163258436234e8.jpg",
"https://pic.netbian.com/uploads/allimg/210920/180354-16321322345f20.jpg",
"https://pic.netbian.com/uploads/allimg/210916/232432-16318058722f4d.jpg",
}
total := len(urls)
// 填充输出
input := make(chan string, total)
for _, url := range urls {input <- url}
// close(input)
output := make(chan *result, total)
// 启动 4 个 goroutine
for i := 0; i < 4; i++ {go download(input, output)
}
// 期待后果
for i := 0; i < total; i++ {
ret := <-output
fmt.Println(ret.url, ret.err)
}
time.Sleep(2*time.Second) // 期待 download 协程的退出
endGNum := runtime.NumGoroutine()
fmt.Println("start goroutine", startGNum)
fmt.Println("end goroutine", endGNum)
}
func download(input <-chan string, output chan<- *result) {
for v := range input {err := downloadFile(v)
output <- &result{
url: v,
err: err,
}
}
fmt.Println("download finish!!!")
}
func downloadFile(URL string) error {
//Get the response bytes from the url
response, err := http.Get(URL)
if err != nil {return err}
defer response.Body.Close()
if response.StatusCode != 200 {return errors.New("Received non 200 response code")
}
//Create a empty file
file, err := os.Create(path.Base(URL))
if err != nil {return err}
defer file.Close()
//Write the bytes to the fiel
_, err = io.Copy(file, response.Body)
if err != nil {return err}
return nil
}
这个会产生 Goroutine 的透露,起因是第 49 行的for v := range input
当没有数据输出的时候还在持续期待输出,所有应该在没有数据输出的时候通知它,让它不要傻傻的等
如何解决?
- 敞开 input 通道
- 传递一个通道通知它后果
准则
-
永远不要在不晓得如何进行的状况下启动 Goroutine,当咱们启动一个 Goroutine 的时候须要思考几个问题
- 什么时候进行?
- 能够通过什么形式终止它?
-
将并发留给调用者
- 请将是否异步调用的选择权交给调用者,不然很有可能调用者并不知道你在这个函数外面应用了 Goroutine
- 如果你的函数启动了一个 Goroutine,您必须为调用者提供一种明确进行该 Goroutine 的办法。将异步执行函数的决定留给该函数的调用者通常更容易。
总结
Concurrency is a useful tool, but it must be used with caution.
并发是一个有用的工具,然而必须审慎应用
参考链接
- the way to go
- golang channel
- 常见的并发模式
- the-forgotten-sender
- the-abandoned-receivers
- concurrency