Go 的并发编程

go 关键字的任何返回值都会被疏忽

就像 Java 的 Thread, Java的 Thread 你须要写一堆货色,然而 go 语言你只须要写一个 go

goroutine

在 go 语言外面, 启动一个 goroutine 是简略的, 简单的是:

  1. 如何取到 goroutine 的返回值
  2. 如何确保 goroutine 不泄露
  3. 如何优雅的敞开 goroutine
  4. 如何让 goroutine 和 channel 擦出火花

范式:go + chan

in <-chan int, out chan<- int

go 关键字配合 chan 传递出参和入参

watchout! 出参和入参都是参数, 不是间接 return 的
import "fmt"func process(val int) int {    fmt.Println(val)    return val}func runThingConcurrently(in ***<-chan*** int, out ***chan<-*** int) {    go func() {        for val := range in {            result := process(val)            out <- result        }    }()}

通道

通道是援用类型 (什么是援用类型,除了援用类型之外,还有其余什么类型,我怎么晓得什么识货该应用什么类型?

通道的零值是nil

每个写入通道的值只能被读取一次, 这可就有点像 rabbitmq 了(不齐全像)

对通道的 for-range 将始终继续,除非通道敞开、bread、return

通道敞开

  • 写操作触发 panic(包含反复的敞开)
  • 如果值还没有被读取(音讯队列中还积压了音讯),只将会被一次返回
  • 如果没有未被读取的值(音讯队列没有积压),返回通道类型的0值

范式:应用逗号ok模式来检测通道是否敞开

v, ok := <-ch

通道的行为

应用没有缓存的通道时, 如果消费者没有生产, 那么生产者的goroutine 是不会退出的.

应用有缓存的通道, 能够解耦消费者和生产者, eg: 消费者在写入实现后就能够立即退出, 不须要等到消费者读取.

范式: sync.waitGroup

只有一个 gorouting 写入操作完结时,敞开通道是比拟容易的; 多个goroutine同时向一个goroutne写入数据时,应用 sync.waitGroup 敞开通道更容易.(反复的敞开会导致通道panic)

应用缓存通道的场景

还是作为音讯队列在思考, 想一下什么时候须要让音讯队列有缓冲

  • 咱们晓得要启动多少个消费者, 限度消费者 goroutine 的数量
  • 不要阻塞生产者

select

无序的从任何一个可执行的分支语句中筛选一个

避免以不统一的程序获取锁,防止出现死锁(select会执行任何一个可执行的)

范式: for-select 循环

for {        select {        case v := <-ch:            fmt.Println(v)        case <-done:            break        }    }

范式: 应用 selcet 实现非阻塞式操作

就像java里的tryXXX

for {        select {        case v := <-ch:            fmt.Println(v)        default:            fmt.Println("no wait")        }    }

范式: 在 select 中跳过有效 case

select 读取一个敞开的通道永远会执行胜利, 上面例子会始终输入 close

func main() {   c := make(chan struct{})   close(c)   for i := 0; i < 10; i++ {      select {      case <-c:         fmt.Println("close") // 永远执行这个 case, 如许恐怖      default:         fmt.Println("default")      }   }}

下面的例子中, 如果想让 default 语句失效, 能够给 channel 赋值 nil

func main() {    c := make(chan struct{})    close(c)    **c = nil // 赋值为 nil, select 语句就不会执行对应 case**    for i := 0; i < 10; i++ {        select {        case <-c:            fmt.Println("close")        default:            fmt.Println("default")        }    }}

并发实际与模式(探讨)

API 应该暗藏并发实现细节

小心变量被笼罩

func main() {    a := []int{2, 4, 6, 8, 10}    ch := make(chan int, len((a)))    for _, val := range a {        *val := val // 小心变量被笼罩, 这种比拟显著的谬误, idea 可能给出正告, 然而简单一个点的, idea 就给不出正告了*        go func() {            ch <- val * 2        }()    }    for i := 0; i < len(a); i++ {        fmt.Println(<-ch)    }}

避免 goroutine 泄露

让你写一个造成 goroutine泄露的代码,你能写进去吗?

还是利用生产者 - 消费者模型, 让消费者提前 break, 生产者因为不晓得产生了什么, 始终在期待 channel 中的变量被生产, 从而导致 goroutine 泄露

func main() {    ch := make(chan int, 0)    produce(ch)    consume(ch)}func produce(ch chan<- int) {    go func() {        for i := 0; i < 100; i++ {            ch <- i        }    }()}func consume(ch <-chan int) {    for i := range ch {        if i > 5 {            fmt.Println("break")            break        }    }}

包装器类型的通道

范式: 通道完结模式

应用两个通道, 一个通道负责传递音讯,另一个负责日常保护经营

这个看起来容易,写起来挺烦的

范式: 应用勾销函数完结 goroutine

定义一个勾销函数, 和channel 一起返回给调用方, 调用方在解决完业务逻辑的时候调用一下这个函数

上面的例子中, 须要在 for 循环完结之后调用

func countTo(max int) (<-chan int, func()) {    done := make(chan struct{})    result := make(chan int)    cancel := func() {        close(done)    }    go func() {        for i := 0; i < max; i++ {            select {            case <-done:                return            default:                result <- i            }        }    }()    return result, cancel}func main() {    ch, cancel := countTo(10)    for i := range ch {        if i > 5 {            break        }    }    cancel()}

范式: 应用缓冲通道限度并发的groutine 的数量

这个例子感觉没有 get 到书上的精华
func process(i int) int {    fmt.Println("process", i)    return i}func processWithMaxGoroutine() []int {    const conc = 10    results := make(chan int, conc)    for i := 0; i < conc; i++ {        results <- process(i)    }    var r []int    for i := 0; i < conc; i++ {        r = append(r, <-results)    }    return r}func main() {    r := processWithMaxGoroutine()    fmt.Println(r)}

范式: 背压

背压是指在异步场景中,被观察者发送事件的速度远快于观察者的处理速度的状况下的一种策略,此策略通知上游的被观察者升高发送速度。简而言之,背压是一种限流策略。

利用缓冲通道和 select 实现限流, 管制最大并发的申请量

原理介绍: channel 作为缓冲队列, 当缓冲队列空的时候, select 会触发 default

func main() {    c := make(chan struct{}, 10)    for i := 0; i < 10; i++ {        c <- struct{}{}    }    for i := 0; i < 20; i++ {        select {        case <-c:            fmt.Println("do", i)        default:            fmt.Println("back press")        }    }}

封装降级一下

type BackPressGauge struct {    gauge chan struct{}}func New(limit int) *BackPressGauge {    gauge := make(chan struct{}, limit)    for i := 0; i < limit; i++ {        gauge <- struct{}{}    }    return &BackPressGauge{        gauge: gauge,    }}func (g BackPressGauge) Process(c func()) error {    select {    case <-g.gauge:        c()        return nil    default:        return errors.New("out of limit")    }}func main() {    gauge := New(10)    for i := 0; i < 20; i++ {        err := gauge.Process(func() {            fmt.Println(i)        })        if err != nil {            fmt.Println(err)        }    }}

简略的超时解决

这种形式能够实现超时返回, 然而这个模式不能让goroutine 在超时后进行运行, 咱们只是不解决goroutine返回的后果

func main() {    i, err := processWithTimeout()    if err != nil {        fmt.Println(err)    } else {        fmt.Println(i)    }}func processWithTimeout() (int, error) {    select {    case v := <-process():        return v, nil    case <-time.After(2 * time.Second):        return -1, errors.New("timeout")    }}func process() <-chan int {    c := make(chan int)    go func() {        time.Sleep(3 * time.Second)        c <- 1        close(c)    }()    return c}

范式: 应用 sync.waitGroup 期待所有goroutine 执行实现

func main() {    var wait sync.WaitGroup    wait.Add(3)    for i := 0; i < 10; i++ {        i := i        go func() {            defer wait.Done()            time.Sleep(3 * time.Second)            fmt.Println(i)        }()    }    wait.Wait()    fmt.Println("wait finish")}

范式: 应用 errorGroup 实现只有一个报错就退出

func main() {    g := new(errgroup.Group)    for i := 0; i < 3; i++ {        i := i        g.Go(func() error {            time.Sleep(3 * time.Second)            if i == 2 {                return errors.New("error")            }            return nil        })    }    if err := g.Wait(); err != nil {        fmt.Println(err)    }}

范式: 只执行一次的代码 sync.Once

type Parser interface {   Parse() string}var parser Parservar once sync.Oncefunc Parse() string {   once.Do(func() {      fmt.Println("init")      parser = initParse()   })   return parser.Parse()}

实战: 先调用A, B , 再调用 C, 任何一步出错或者超过30s 就退出

应用JAVA实现一个

原始人版本

func main() {    ab := make(chan ProcessContext, 2)    go func() {        ab <- processA()    }()    go func() {        ab <- processB()    }()    left := 30    for i := 0; i < 2; i++ {        r := <-ab        if r.err != nil {            fmt.Println(r)            return        }        fmt.Println(r.result)        if left > r.left {            left = r.left        }    }    c := processC(left)    fmt.Println(c)}func request() chan int {    result := make(chan int)    go func(result chan int) {        time.Sleep(3 * time.Second)        result <- 100    }(result)    return result}type ProcessContext struct {    result int    left   int    err    error}func (c ProcessContext) Result() (int, int, error) {    return c.result, c.left, c.err}func processA() ProcessContext {    result := make(chan ProcessContext)    go func() {        fmt.Println("start process a")        select {        case <-time.After(30 * time.Second):            fmt.Println("timeout a")            result <- ProcessContext{                result: -1,                left:   0,                err:    errors.New("a timeout"),            }            break        case r := <-request():            fmt.Println("return a")            result <- ProcessContext{                result: r,                left:   15,                err:    nil,            }            break        }    }()    return <-result}func processB() ProcessContext {    result := make(chan ProcessContext)    go func() {        fmt.Println("start process b")        select {        case <-time.After(30 * time.Second):            fmt.Println("timeout b")            result <- ProcessContext{                result: -1,                left:   0,                err:    errors.New("b timeout"),            }            break        case r := <-request():            fmt.Println("return b")            result <- ProcessContext{                result: r,                left:   15,                err:    nil,            }            break        }    }()    return <-result}func processC(left int) ProcessContext {    result := make(chan ProcessContext)    go func() {        fmt.Println("start process c")        select {        case <-time.After(time.Duration(left) * time.Second):            fmt.Println("c timeout")            result <- ProcessContext{                result: -1,                left:   -1,                err:    errors.New("timeout c"),            }            break        case r := <-request():            fmt.Println("c result")            result <- ProcessContext{                result: r,                left:   0,                err:    nil,            }            break        }    }()    return <-result}

应用 time 优化的版本

func main() {    i, err := process()    if err != nil {        fmt.Println(err)    }    fmt.Println(i)}type Processor struct {    // 这谁能想到    a    chan int    b    chan int    ab   chan int    c    chan int    errs chan error    left time.Duration}func process() (int, error) {    p := Processor{        a:    make(chan int, 1), // 这里应用的缓冲 channel, 生产者 goroutine 能够及时开释        b:    make(chan int, 1),        ab:   make(chan int, 1),        c:    make(chan int, 1),        errs: make(chan error, 2), // ab 会同时解决, 长度为2        left: 30,    }    p.Launch()    ab, err := p.waitForAB() // waitXX 的办法都是 for-select 的套路    if err != nil {        return -1, err    }    ***p.ab <- ab // 确保 ab 先执行实现***    return p.waitForC()}func (p *Processor) Launch() {    // 一步到位, 启动三个 goroutine. 这个谁能想到    left := p.left    go func(left time.Duration) {        a, left, err := processWithTimeout(left)        if err != nil {            p.errs <- err            return        }        p.a <- a        if p.left > left {            p.left = left        }    }(left)    go func(left time.Duration) {        b, left, err := processWithTimeout(left)        if err != nil {            p.errs <- err            return        }        p.b <- b        if p.left > left {            p.left = left        }    }(left)    go func() {        ab := <-p.ab // 阻塞住, 等到 ab 执行实现之后才会开始 c        fmt.Println(ab)        c, left, err := processWithTimeout(p.left)        if err != nil {            p.errs <- err            return        }        p.c <- c        if p.left > left {            p.left = left        }    }()}func (p *Processor) waitForAB() (int, error) {    var ab int    for i := 0; i < 2; i++ {        select {        case a := <-p.a:            ab += a        case b := <-p.b:            ab += b        case err := <-p.errs:            return ab, err        }    }    return ab, nil}func (p *Processor) waitForC() (int, error) {    select {    case err := <-p.errs:        return -1, err    case c := <-p.c:        return c, nil    }}func processWithTimeout(left time.Duration) (int, time.Duration, error) {    fmt.Println("processWithTimeout")    result := make(chan int)    err := make(chan error)    go func() {        select {        case r := <-request():            result <- r            break        case <-time.After(left * time.Second):            err <- errors.New("timeout")        }    }()    select {    case result := <-result:        return result, 10, nil    case err := <-err:        return -1, -1, err    }}func request() chan int {    result := make(chan int)    go func(result chan int) {        time.Sleep(3 * time.Second)        result <- 100    }(result)    return result}

Context

指针

int32 → 32位 → 4字节

独立寻址的最小单元是一个字节。

指针的零值是nil。

&是地址运算符。它位于值类型之前,并返回存储该值的内存地位的地址:

x := "hello"pointerToX := &x

*是间接寻址运算符, 位于指针类型的变量后面,并返回所指向的值。这称为解援用.

在解援用一个指针之前,必须确保这个指针不是空, 否则会引发 panic

& 是下套, * 是解套

new: 创立一个指向所提供的类型的0值实例的指针

// x是斧正类型var x = new(int)fmt.Println(*x) // 0// y也是指针类型var y = new(Man)fmt.Println(y)  // &{0}fmt.Println(*y) // {0}

无奈在根本类型应用 &

func main() {    var z = &Man{        age: 7,        name: "吴梓祺" // 编译会报错    }    fmt.Println(z)}type Man struct {    age int    name *string // 只能传指针类型的 }func main() {    var x = "吴梓祺"    var z = &Man{        age: 7,        name: &x, //这样就不会了    }    fmt.Println(z)}

Go是一种传值调用的语言,传递给函数的值是正本。

和 Java 一样

只管当一个指针被传递给一个函数时,该函数会失去该指针的一个正本。但因为指针依然指向原始数据,所以原始数据能够被调用的函数批改。

惟一应该应用指针参数来批改变量的时候是当函数冀望一个接口时。

go.mod

  • require: 依赖的模块
  • replace:笼罩依赖模块的路劲
  • exclude

Init函数

申明一个init函数,它不承受参数也不返回任何值,在这个包第一次被其余包援用时,就会执行init函数

包援用会触发相应的init函数

Go容许咱们在一个包中定义多个init函数,甚至能够在包的同一个文件中定义多个init函数

只管Go容许定义多个init函数,然而在一个包中应该只申明一个init函数

循环依赖

Go不容许包与包之间存在循环依赖。这意味着如果包A间接或间接导入了包B,那么包B就不能间接或间接导入包A。

模块

Go中的模块零碎遵循抉择最小版本的准则。也就是说咱们会应用go.mod中定义依赖的最低适配版本

Context

包装模式
激励通过函数显式的传递数据是go编程的常规!
激励通过函数显式的传递数据是go编程的常规!
激励通过函数显式的传递数据是go编程的常规!

  • Context.Background()

    构建办法:

构建一个空的 Context

ctx := context.BackendGroup()result, err := logic(ctx, “param”)
  • Context.TOOD

在开发过程中长期应用的, 生产环境中不应该应用 TODO

  • context.WithContext()

接管并包装原来的Context, 返回一个新的 Context

context + select 实现超时勾销

func main() {   ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)   defer cancel() // 这个不要忘了   result, err := longRunningThingManager(ctx)   if err != nil {      fmt.Println(err)      return   }   fmt.Println(result)}func longRunningThingManager(ctx context.Context) (string, error) {     // 包装器类型的通道   type wrapper struct {      result string      err    error   }   ch := make(chan wrapper, 1)   go func() {      result, err := longRunningThing()      ch <- wrapper{result, err}   }()   select { // 通过select-case 实现勾销   case data := <-ch:      return data.result, data.err   case <-ctx.Done():      return "", ctx.Err()   }}func longRunningThing() (string, error) {   time.Sleep(5 * time.Second)   return "result after waiting 10s", nil}
  • context.WithTimeout()
  • context.WithDeadline()

如果把过来的工夫传递给context.WithDeadline,那么context在创立时就曾经被勾销了。

咱们在子context上设置的任何超时都受到在父context上设置的超时的限度

  • context.WithValue()

值传递, context值的键必须是可比拟的

因为键的类型和常量都是未导出(private)的,所以以后包之外的任何代码都不能向context中写入数据,从而防止了抵触。

函数用值创立context时,函数名也应该以ContextWith结尾。如果函数返回context中的某个值,则函数名应该以FromContext结尾

在大多数状况下,咱们应该从申请处理程序的context中获取值,而后显式地将其传递给业务逻辑。你不应该在Go语言中用context代替函数显式地传递参数

应用context来通过规范的API传递数值。在须要解决业务逻辑时,将context中的值复制到显式参数中。这样的形式还能够用于在context中拜访系统维护信息。

勾销

context.WithCancel

包装模式。context被视为一个不可变的实例,每当向一个context增加信息时,咱们通过包装已有的context来创立一个新的context,已有的作为父context,新创建则是子context。

Golang Context 原理与实战

type Context interface {  Deadline() (deadline time.Time, ok bool)  Done() <-chan struct{}  Err() error  Value(key interface{}) interface{}}
  • Deadline() 返回的是上下文的截至工夫,如果没有设定,ok 为 false
  • Done() 当执行的上下文被勾销后,Done返回的chan就会被close。如果这个上下文不会被勾销,返回nil.

Done办法返回一个struct{}的通道(之所以抉择这种返回类型,是因为空构造体不应用内存)。当context因为计时器或勾销函数的调用而被勾销时,该通道被敞开。记住,当你试图读取通道时,一个敞开的通道总是立刻返回其零值。

  • Err() 有几种状况:

    • 如果Done() 返回 chan 没有敞开,返回nil
    • 如果Done() 返回的chan 敞开了, Err 返回一个非nil的值,解释为什么会Done()

      • 如果Canceled,返回 "Canceled"
      • 如果超过了 Deadline,返回 "DeadlineEsceeded"
  • Value(key) 返回上下文中 key 对应的 value 值

Done 和 Err 办法的例子

// 阻塞从req中获取链接,如果超时,间接返回select {case <-ctx.Done():  // 获取链接超时了,间接返回谬误  // do something  return nil, ctx.Err()case ret, ok := <-req:  // 拿到链接,校验并返回  return ret.conn, ret.err}