共计 10943 个字符,预计需要花费 28 分钟才能阅读完成。
Go 的并发编程
go 关键字的任何返回值都会被疏忽
就像 Java 的 Thread,Java 的 Thread 你须要写一堆货色,然而 go 语言你只须要写一个 go
goroutine
在 go 语言外面, 启动一个 goroutine 是简略的, 简单的是:
- 如何取到 goroutine 的返回值
- 如何确保 goroutine 不泄露
- 如何优雅的敞开 goroutine
- 如何让 goroutine 和 channel 擦出火花
范式:go + chan
in <-chan int, out chan<- int
go 关键字配合 chan 传递 出参和入参
watchout! 出参和入参都是参数, 不是间接 return 的
import "fmt"
func process(val int) int {fmt.Println(val)
return val
}
func runThingConcurrently(in ***<-chan*** int, out ***chan<-*** int) {go func() {
for val := range in {result := process(val)
out <- result
}
}()}
通道
通道是援用类型(什么是援用类型,除了援用类型之外,还有其余什么类型,我怎么晓得什么识货该应用什么类型?)
通道的零值是 nil
每个写入通道的值只能被读取一次, 这可就有点像 rabbitmq 了(不齐全像)
对通道的 for-range 将始终继续,除非通道敞开、bread、return
通道敞开
- 写操作触发 panic(包含反复的敞开)
- 如果值还没有被读取(音讯队列中还积压了音讯),只将会被一次返回
- 如果没有未被读取的值(音讯队列没有积压),返回通道类型的 0 值
范式:应用 逗号 ok 模式 来检测通道是否敞开
v, ok := <-ch
通道的行为
应用没有缓存的通道时, 如果消费者没有生产, 那么生产者的 goroutine 是不会退出的.
应用有缓存的通道, 能够解耦消费者和生产者, eg: 消费者在写入实现后就能够立即退出, 不须要等到消费者读取.
范式: sync.waitGroup
只有一个 gorouting 写入操作完结时, 敞开通道是比拟容易的; 多个 goroutine 同时向一个 goroutne 写入数据时, 应用 sync.waitGroup 敞开通道更容易.(反复的敞开会导致通道 panic)
应用缓存通道的场景
还是作为音讯队列在思考, 想一下什么时候须要让音讯队列有缓冲
- 咱们晓得要启动多少个消费者, 限度消费者 goroutine 的数量
- 不要阻塞生产者
select
无序的从任何一个可执行的分支语句中筛选一个
避免以不统一的程序获取锁, 防止出现死锁(select 会执行任何一个可执行的)
范式: for-select 循环
for {
select {
case v := <-ch:
fmt.Println(v)
case <-done:
break
}
}
范式: 应用 selcet 实现非阻塞式操作
就像 java 里的 tryXXX
for {
select {
case v := <-ch:
fmt.Println(v)
default:
fmt.Println("no wait")
}
}
范式: 在 select 中跳过有效 case
select 读取一个敞开的通道永远会执行胜利, 上面例子会始终输入 close
func main() {c := make(chan struct{})
close(c)
for i := 0; i < 10; i++ {
select {
case <-c:
fmt.Println("close") // 永远执行这个 case, 如许恐怖
default:
fmt.Println("default")
}
}
}
下面的例子中, 如果想让 default 语句失效, 能够给 channel 赋值 nil
func main() {c := make(chan struct{})
close(c)
**c = nil // 赋值为 nil, select 语句就不会执行对应 case**
for i := 0; i < 10; i++ {
select {
case <-c:
fmt.Println("close")
default:
fmt.Println("default")
}
}
}
并发实际与模式(探讨)
API 应该暗藏并发实现细节
小心变量被笼罩
func main() {a := []int{2, 4, 6, 8, 10}
ch := make(chan int, len((a)))
for _, val := range a {
*val := val // 小心变量被笼罩, 这种比拟显著的谬误, idea 可能给出正告, 然而简单一个点的, idea 就给不出正告了 *
go func() {ch <- val * 2}()}
for i := 0; i < len(a); i++ {fmt.Println(<-ch)
}
}
避免 goroutine 泄露
让你写一个造成 goroutine 泄露的代码, 你能写进去吗?
还是利用生产者 – 消费者模型, 让消费者提前 break, 生产者因为不晓得产生了什么, 始终在期待 channel 中的变量被生产, 从而导致 goroutine 泄露
func main() {ch := make(chan int, 0)
produce(ch)
consume(ch)
}
func produce(ch chan<- int) {go func() {
for i := 0; i < 100; i++ {ch <- i}
}()}
func consume(ch <-chan int) {
for i := range ch {
if i > 5 {fmt.Println("break")
break
}
}
}
包装器类型的通道
范式: 通道完结模式
应用两个通道, 一个通道负责传递音讯, 另一个负责日常保护经营
这个看起来容易, 写起来挺烦的
范式: 应用勾销函数完结 goroutine
定义一个勾销函数, 和 channel 一起返回给调用方, 调用方在解决完业务逻辑的时候调用一下这个函数
上面的例子中, 须要在 for 循环完结之后调用
func countTo(max int) (<-chan int, func()) {done := make(chan struct{})
result := make(chan int)
cancel := func() {close(done)
}
go func() {
for i := 0; i < max; i++ {
select {
case <-done:
return
default:
result <- i
}
}
}()
return result, cancel
}
func main() {ch, cancel := countTo(10)
for i := range ch {
if i > 5 {break}
}
cancel()}
范式: 应用缓冲通道限度并发的 groutine 的数量
💀 这个例子感觉没有 get 到书上的精华
func process(i int) int {fmt.Println("process", i)
return i
}
func processWithMaxGoroutine() []int {
const conc = 10
results := make(chan int, conc)
for i := 0; i < conc; i++ {results <- process(i)
}
var r []int
for i := 0; i < conc; i++ {r = append(r, <-results)
}
return r
}
func main() {r := processWithMaxGoroutine()
fmt.Println(r)
}
范式: 背压
背压是指在异步场景中,被观察者发送事件的速度远快于观察者的处理速度的状况下的一种策略,此策略通知上游的被观察者升高发送速度。简而言之,背压是一种限流策略。
利用缓冲通道和 select 实现限流, 管制最大并发的申请量
原理介绍: channel 作为缓冲队列, 当缓冲队列空的时候, select 会触发 default
func main() {c := make(chan struct{}, 10)
for i := 0; i < 10; i++ {c <- struct{}{}}
for i := 0; i < 20; i++ {
select {
case <-c:
fmt.Println("do", i)
default:
fmt.Println("back press")
}
}
}
封装降级一下
type BackPressGauge struct {gauge chan struct{}
}
func New(limit int) *BackPressGauge {gauge := make(chan struct{}, limit)
for i := 0; i < limit; i++ {gauge <- struct{}{}}
return &BackPressGauge{gauge: gauge,}
}
func (g BackPressGauge) Process(c func()) error {
select {
case <-g.gauge:
c()
return nil
default:
return errors.New("out of limit")
}
}
func main() {gauge := New(10)
for i := 0; i < 20; i++ {err := gauge.Process(func() {fmt.Println(i)
})
if err != nil {fmt.Println(err)
}
}
}
简略的超时解决
这种形式能够实现超时返回, 然而这个模式不能让 goroutine 在超时后进行运行, 咱们只是不解决 goroutine 返回的后果
func main() {i, err := processWithTimeout()
if err != nil {fmt.Println(err)
} else {fmt.Println(i)
}
}
func processWithTimeout() (int, error) {
select {case v := <-process():
return v, nil
case <-time.After(2 * time.Second):
return -1, errors.New("timeout")
}
}
func process() <-chan int {c := make(chan int)
go func() {time.Sleep(3 * time.Second)
c <- 1
close(c)
}()
return c
}
范式: 应用 sync.waitGroup
期待所有 goroutine 执行实现
func main() {
var wait sync.WaitGroup
wait.Add(3)
for i := 0; i < 10; i++ {
i := i
go func() {defer wait.Done()
time.Sleep(3 * time.Second)
fmt.Println(i)
}()}
wait.Wait()
fmt.Println("wait finish")
}
范式: 应用 errorGroup
实现只有一个报错就退出
func main() {g := new(errgroup.Group)
for i := 0; i < 3; i++ {
i := i
g.Go(func() error {time.Sleep(3 * time.Second)
if i == 2 {return errors.New("error")
}
return nil
})
}
if err := g.Wait(); err != nil {fmt.Println(err)
}
}
范式: 只执行一次的代码 sync.Once
type Parser interface {Parse() string
}
var parser Parser
var once sync.Once
func Parse() string {once.Do(func() {fmt.Println("init")
parser = initParse()})
return parser.Parse()}
实战: 先调用 A, B , 再调用 C, 任何一步出错或者超过 30s 就退出
应用 JAVA 实现一个
原始人版本
func main() {ab := make(chan ProcessContext, 2)
go func() {ab <- processA()
}()
go func() {ab <- processB()
}()
left := 30
for i := 0; i < 2; i++ {
r := <-ab
if r.err != nil {fmt.Println(r)
return
}
fmt.Println(r.result)
if left > r.left {left = r.left}
}
c := processC(left)
fmt.Println(c)
}
func request() chan int {result := make(chan int)
go func(result chan int) {time.Sleep(3 * time.Second)
result <- 100
}(result)
return result
}
type ProcessContext struct {
result int
left int
err error
}
func (c ProcessContext) Result() (int, int, error) {return c.result, c.left, c.err}
func processA() ProcessContext {result := make(chan ProcessContext)
go func() {fmt.Println("start process a")
select {case <-time.After(30 * time.Second):
fmt.Println("timeout a")
result <- ProcessContext{
result: -1,
left: 0,
err: errors.New("a timeout"),
}
break
case r := <-request():
fmt.Println("return a")
result <- ProcessContext{
result: r,
left: 15,
err: nil,
}
break
}
}()
return <-result
}
func processB() ProcessContext {result := make(chan ProcessContext)
go func() {fmt.Println("start process b")
select {case <-time.After(30 * time.Second):
fmt.Println("timeout b")
result <- ProcessContext{
result: -1,
left: 0,
err: errors.New("b timeout"),
}
break
case r := <-request():
fmt.Println("return b")
result <- ProcessContext{
result: r,
left: 15,
err: nil,
}
break
}
}()
return <-result
}
func processC(left int) ProcessContext {result := make(chan ProcessContext)
go func() {fmt.Println("start process c")
select {case <-time.After(time.Duration(left) * time.Second):
fmt.Println("c timeout")
result <- ProcessContext{
result: -1,
left: -1,
err: errors.New("timeout c"),
}
break
case r := <-request():
fmt.Println("c result")
result <- ProcessContext{
result: r,
left: 0,
err: nil,
}
break
}
}()
return <-result
}
应用 time 优化的版本
func main() {i, err := process()
if err != nil {fmt.Println(err)
}
fmt.Println(i)
}
type Processor struct {
// 这谁能想到
a chan int
b chan int
ab chan int
c chan int
errs chan error
left time.Duration
}
func process() (int, error) {
p := Processor{a: make(chan int, 1), // 这里应用的缓冲 channel, 生产者 goroutine 能够及时开释
b: make(chan int, 1),
ab: make(chan int, 1),
c: make(chan int, 1),
errs: make(chan error, 2), // ab 会同时解决, 长度为 2
left: 30,
}
p.Launch()
ab, err := p.waitForAB() // waitXX 的办法都是 for-select 的套路
if err != nil {return -1, err}
***p.ab <- ab // 确保 ab 先执行实现 ***
return p.waitForC()}
func (p *Processor) Launch() {
// 一步到位, 启动三个 goroutine. 这个谁能想到
left := p.left
go func(left time.Duration) {a, left, err := processWithTimeout(left)
if err != nil {
p.errs <- err
return
}
p.a <- a
if p.left > left {p.left = left}
}(left)
go func(left time.Duration) {b, left, err := processWithTimeout(left)
if err != nil {
p.errs <- err
return
}
p.b <- b
if p.left > left {p.left = left}
}(left)
go func() {
ab := <-p.ab // 阻塞住, 等到 ab 执行实现之后才会开始 c
fmt.Println(ab)
c, left, err := processWithTimeout(p.left)
if err != nil {
p.errs <- err
return
}
p.c <- c
if p.left > left {p.left = left}
}()}
func (p *Processor) waitForAB() (int, error) {
var ab int
for i := 0; i < 2; i++ {
select {
case a := <-p.a:
ab += a
case b := <-p.b:
ab += b
case err := <-p.errs:
return ab, err
}
}
return ab, nil
}
func (p *Processor) waitForC() (int, error) {
select {
case err := <-p.errs:
return -1, err
case c := <-p.c:
return c, nil
}
}
func processWithTimeout(left time.Duration) (int, time.Duration, error) {fmt.Println("processWithTimeout")
result := make(chan int)
err := make(chan error)
go func() {
select {case r := <-request():
result <- r
break
case <-time.After(left * time.Second):
err <- errors.New("timeout")
}
}()
select {
case result := <-result:
return result, 10, nil
case err := <-err:
return -1, -1, err
}
}
func request() chan int {result := make(chan int)
go func(result chan int) {time.Sleep(3 * time.Second)
result <- 100
}(result)
return result
}
Context
指针
int32 → 32 位 → 4 字节
独立寻址的最小单元是一个字节。
指针的零值是 nil。
& 是地址运算符。它位于值类型之前,并返回存储该值的内存地位的地址:
x := "hello"
pointerToX := &x
* 是间接寻址运算符, 位于指针类型的变量后面,并返回所指向的值。这称为 解援用.
在解援用一个指针之前,必须确保这个指针不是空,否则会引发 panic
& 是下套,* 是解套
new: 创立一个指向所提供的类型的 0 值实例的指针
// x 是斧正类型
var x = new(int)
fmt.Println(*x) // 0
// y 也是指针类型
var y = new(Man)
fmt.Println(y) // &{0}
fmt.Println(*y) // {0}
无奈在根本类型应用 &
func main() {
var z = &Man{
age: 7,
name: "吴梓祺" // 编译会报错
}
fmt.Println(z)
}
type Man struct {
age int
name *string // 只能传指针类型的
}
func main() {
var x = "吴梓祺"
var z = &Man{
age: 7,
name: &x, // 这样就不会了
}
fmt.Println(z)
}
Go 是一种传值调用的语言,传递给函数的值是正本。
和 Java 一样
只管当一个指针被传递给一个函数时,该函数会失去该指针的一个正本。但因为指针依然指向原始数据,所以原始数据能够被调用的函数批改。
惟一应该应用指针参数来批改变量的时候是当函数冀望一个接口时。
包
go.mod
- require: 依赖的模块
- replace:笼罩依赖模块的路劲
- exclude
Init 函数
申明一个 init 函数,它不承受参数也不返回任何值,在这个包第一次被其余包援用时,就会执行 init 函数
包援用会触发相应的 init 函数
Go 容许咱们在一个包中定义多个 init 函数,甚至能够在包的同一个文件中定义多个 init 函数
只管 Go 容许定义多个 init 函数,然而在一个包中应该只申明一个 init 函数
循环依赖
Go 不容许包与包之间存在循环依赖。这意味着如果包 A 间接或间接导入了包 B,那么包 B 就不能间接或间接导入包 A。
模块
Go 中的模块零碎遵循抉择最小版本的准则。也就是说咱们会应用 go.mod 中定义依赖的最低适配版本
Context
包装模式
激励通过函数显式的传递数据是 go 编程的常规!
激励通过函数显式的传递数据是 go 编程的常规!
激励通过函数显式的传递数据是 go 编程的常规!
-
Context.Background()
构建办法:
构建一个空的 Context
ctx := context.BackendGroup()
result,err := logic(ctx,“param”)
- Context.TOOD
在开发过程中长期应用的, 生产环境中不应该应用 TODO
- context.WithContext()
接管并包装原来的 Context, 返回一个新的 Context
context + select 实现超时勾销
func main() {ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() // 这个不要忘了
result, err := longRunningThingManager(ctx)
if err != nil {fmt.Println(err)
return
}
fmt.Println(result)
}
func longRunningThingManager(ctx context.Context) (string, error) {
// 包装器类型的通道
type wrapper struct {
result string
err error
}
ch := make(chan wrapper, 1)
go func() {result, err := longRunningThing()
ch <- wrapper{result, err}
}()
select { // 通过 select-case 实现勾销
case data := <-ch:
return data.result, data.err
case <-ctx.Done():
return "", ctx.Err()}
}
func longRunningThing() (string, error) {time.Sleep(5 * time.Second)
return "result after waiting 10s", nil
}
- context.WithTimeout()
- context.WithDeadline()
如果把过来的工夫传递给 context.WithDeadline,那么 context 在创立时就曾经被勾销了。
咱们在子 context 上设置的任何超时 都受到在父 context 上设置的超时的限度
- context.WithValue()
值传递, context 值的键必须是可比拟的
因为键的类型和常量都是 未导出 (private) 的,所以以后包之外的任何代码都不能向 context 中写入数据,从而防止了抵触。
函数用值创立 context 时,函数名也应该以 ContextWith
结尾。如果函数返回 context 中的某个值,则函数名应该以 FromContext
结尾
在大多数状况下,咱们应该从申请处理程序的 context 中获取值,而后显式地将其传递给业务逻辑。你不应该在 Go 语言中用 context 代替函数显式地传递参数
应用 context 来通过规范的 API 传递数值。在须要解决业务逻辑时,将 context 中的值复制到显式参数中。这样的形式还能够用于在 context 中拜访系统维护信息。
勾销
context.WithCancel
包装模式。context 被视为一个不可变的实例,每当向一个 context 增加信息时,咱们通过包装已有的 context 来创立一个新的 context,已有的作为父 context,新创建则是子 context。
Golang Context 原理与实战
type Context interface {Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}}
- Deadline() 返回的是上下文的截至工夫,如果没有设定,ok 为 false
- Done() 当执行的上下文被勾销后,Done 返回的 chan 就会被 close。如果这个上下文不会被勾销,返回 nil.
Done 办法返回一个 struct{}的通道(之所以抉择这种返回类型,是因为空构造体不应用内存)。当 context 因为计时器或勾销函数的调用而被勾销时,该通道被敞开。记住,当你试图读取通道时,一个敞开的通道总是立刻返回其零值。
-
Err() 有几种状况:
- 如果 Done() 返回 chan 没有敞开,返回 nil
-
如果 Done() 返回的 chan 敞开了,Err 返回一个非 nil 的值,解释为什么会 Done()
- 如果 Canceled,返回 “Canceled”
- 如果超过了 Deadline,返回 “DeadlineEsceeded”
- Value(key) 返回上下文中 key 对应的 value 值
Done 和 Err 办法的例子
// 阻塞从 req 中获取链接,如果超时,间接返回
select {case <-ctx.Done():
// 获取链接超时了,间接返回谬误
// do something
return nil, ctx.Err()
case ret, ok := <-req:
// 拿到链接,校验并返回
return ret.conn, ret.err
}