关于golang:goroutine并发控制

5次阅读

共计 7719 个字符,预计需要花费 20 分钟才能阅读完成。

通信

共享内存

func  Test() {ordersInfoApp  :=  make([]orderInfoApp, 0, totalCount)
    var  mux sync.Mutex
    wg  := sync.WaitGroup{}

    for  i  :=  0; i <=  10; i++ {wg.Add(1)
        go  func(pageIndex int) {
            // do somethine
            var  ordersInfo orderInfoApp
            mux.Lock()
            ordersInfoApp  =  append(ordersInfoApp, ordersInfo)
            mux.Unlock()

            wg.Done()}(i)
    }

    wg.Wait()}

个别在简略的数据传递下应用

channel

func  Test() {ordersInfoApp  :=  make([]orderInfoApp, 0, totalCount)
    choi  :=  make(chan orderInfoApp, 10)
    wg  := sync.WaitGroup{}

    for  i  :=  0; i <=  10; i++ {wg.Add(1)
        go  func(pageIndex int) {
            // do somethine
            var  ordersInfo orderInfoApp
            choi <- ordersInfo

            wg.Done()}(i)
    }

    go  func() {wg.Wait()
        close(choi)
    }()

    for  v  :=  range choi {ordersInfoApp  =  append(ordersInfoApp, v)
    }
}

绝对简单的数据流动状况

同步和管制

goroutine 退出只能由自身管制,不能从内部强制完结该 goroutine
两种例外情况:main 函数完结或者程序解体完结运行

共享变量管制完结

func  main() {
    running  :=  true
    f  :=  func() {
        for running {fmt.Println("i am running")
            time.Sleep(1  * time.Second)
        }
        fmt.Println("exit")
    }
    go  f()
    go  f()
    go  f()
    time.Sleep(2  * time.Second)
    running  =  false
    time.Sleep(3  * time.Second)
    fmt.Println("main exit")
}

长处
实现简略,不形象,不便,一个变量即可简略管制子 goroutine 的进行。
毛病 :
构造只能是多读一写,不能适应结构复杂的设计,如果有多写,会呈现数据同步问题,须要加锁或者应用 sync.atomic
不适宜用于同级的子 go 程间的通信,全局变量传递的信息太少
因为是单向告诉, 主过程无奈期待所有子 goroutine 退出
这种办法只实用于非常简单的逻辑且并发量不太大的场景

sync.Waitgroup 期待完结

func  main() {
    var  wg sync.WaitGroup
    for  i  :=  0; i <  3; i++ {wg.Add(1)
        go  func() {defer wg.Done()
            // do something
        }()}

    wg.Wait()}

channel 管制完结

// 可扩大到多个 work
func  main() {chClose  :=  make(chan  struct{})
    go  func() {
        for {
            select {
                case  <-chClose:
                return
            }

        // do something
        }
    }()

    //chClose<-struct{}
    close(chClose)
}

留神 channel 的阻塞状况,避免出现死锁。
通常 channel 只能由发送者敞开

  • 向无缓冲的 channel 写入数据会导致该 goroutine 阻塞,直到其余 goroutine 从这个 channel 中读取数据
  • 从无缓冲的 channel 读出数据,如果 channel 中无数据,会导致该 goroutine 阻塞,直到其余 goroutine 向这个 channel 中写入数据
  • 向带缓冲的且缓冲已满的 channel 写入数据会导致该 goroutine 阻塞,直到其余 goroutine 从这个 channel 中读取数据
  • 向带缓冲的且缓冲未满的 channel 写入数据不会导致该 goroutine 阻塞
  • 从带缓冲的 channel 读出数据,如果 channel 中无数据,会导致该 goroutine 阻塞,直到其余 goroutine 向这个 channel 中写入数据
  • 从带缓冲的 channel 读出数据,如果 channel 中有数据,该 goroutine 不会阻塞
// 读完完结
for {
    select {
    case  <-ch:
        default:
        goto finish
    }
}
finish:
  • 如果多个 case 同时就绪时,select 会随机地抉择一个执行
  • case 标签里向 channel 发送或接收数据,case 前面的语句在发送接管胜利后才会执行
  • nil channel(读、写、读写)的 case 标签会被跳过

limitwaitgroup

type  limitwaitgroup  struct {sem chan  struct{}
    wg sync.WaitGroup
}

func  New(n int) *limitwaitgroup {
    return  &limitwaitgroup{sem: make(chan  struct{}, n),
    }
}

func  (l *limitwaitgroup) Add() {l.sem <-  struct{}{}
    l.wg.Add(1)
}

func  (l *limitwaitgroup) Done() {
    <-l.sem
    l.wg.Done()}

func  (l *limitwaitgroup) Wait() {l.wg.Wait()
}

// 例子
wg  := limitwaitgroup.New(6)
for  i  :=  0; i <=  10; i++ {wg.Add()
    go  func(index int){defer wg.Done()
        // do something
    }(i)
}
wg.Wait()

context

上下文 go 1.7 作为第一个参数在 goroutine 里传递

Context 的接口定义

type  Context  interface {Deadline() (deadline time.Time, ok bool)
    Done() <-chan  struct{}
    Err() error
    Value(key interface{}) interface{}}

Deadline获取设置的截止工夫(WithDeadline、WithTimeout),第一个返回值代表截止工夫,第二个返回值代表是否设置了截止工夫,false 时示意没有设置截止工夫

Done办法返回一个敞开的只读的 chan,类型为 struct{},在 goroutine 中,如果该办法返回的 chan 能够读取,则意味着 parent context 曾经发动了勾销申请,咱们通过Done 办法收到这个信号后,就应该做清理操作,而后退出 goroutine,开释资源。

Errcontext 没有被完结,返回 nil;已被完结,返回完结的起因(被勾销、超时)。

Value办法通过一个 Key 获取该 Context 上绑定的值,拜访这个值是线程平安的。key 个别定义以后包的一个新的未导出类型的变量(最好不要应用内置类型),防止和其余 goroutine 的 key 抵触。

  • Context 衍生
func  WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func  WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func  WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func  WithValue(parent Context, key, val interface{}) Context

这四个 With 函数,接管的都有一个 partent 参数,就是父 Context,咱们要基于这个父 Context 创立出子 Context 的意思,这种形式能够了解为子 Context 对父 Context 的继承,也能够了解为基于父 Context 的衍生。

通过这些函数,就创立了一颗 Context 树,树的每个节点都能够有任意多个子节点,节点层级能够有任意多个。

WithCancel函数,传递一个父 Context 作为参数,返回子 Context,以及一个勾销函数用来勾销 Context。WithDeadline函数,和 WithCancel 差不多,它会多传递一个截止工夫参数,意味着到了这个工夫点,会主动勾销 Context,也能够不等到这个时候,能够提前通过勾销函数进行勾销。

WithTimeoutWithDeadline 基本上一样,这个示意是超时主动勾销,设置多少工夫后主动勾销 Context。

WithValue函数和勾销 Context 无关,生成一个绑定了一个键值对数据的 Context,这个绑定的数据能够通过 Context.Value 办法拜访到

  • 例子
    WithCancel
func  main() {ctx, cancel  := context.WithCancel(context.Background())
    go  watch(ctx, "goroutine 1")
    go  watch(ctx, "goroutine 2")
    go  watch(ctx, "goroutine 3")
    time.Sleep(10  * time.Second)
    fmt.Println("开始完结 goroutine")
    cancel()
    time.Sleep(5  * time.Second)
    fmt.Println(ctx.Err())
}
func  watch(ctx context.Context, name string) {
  for {
      select {case  <-ctx.Done():
          fmt.Println(name, "over")
          return
      default:
          fmt.Println(name, "running")
          time.Sleep(2  * time.Second)
      }
  }
}

// output:
goroutine 1 running
goroutine 2 running
goroutine 3 running
goroutine 1 running
goroutine 2 running
goroutine 3 running
goroutine 1 running
goroutine 2 running
goroutine 3 running
goroutine 2 running
goroutine 3 running
goroutine 1 running
goroutine 3 running
goroutine 2 running
goroutine 1 running
开始完结 goroutine
goroutine 1 over
goroutine 2 over
goroutine 3 over
context canceled

WithDeadline

func  main() {ctx, cancel  := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
    go  watch(ctx, "goroutine 1")
    go  watch(ctx, "goroutine 2")
    go  watch(ctx, "goroutine 3")
    _  = cancel
    time.Sleep(8  * time.Second)
    fmt.Println(ctx.Err())
}

func  watch(ctx context.Context, name string) {
  for {
      select {case  <-ctx.Done():
          fmt.Println(name, "over")
          return
      default:
          fmt.Println(name, "running")
          time.Sleep(2  * time.Second)
      }
  }
}

// output:
goroutine 3 running
goroutine 2 running
goroutine 1 running
goroutine 3 running
goroutine 1 running
goroutine 2 running
goroutine 1 running
goroutine 3 running
goroutine 2 running
goroutine 3 over
goroutine 1 over
goroutine 2 over
context deadline exceeded

WithTimeout

func  main() {ctx, cancel  := context.WithTimeout(context.Background(), 5*time.Second)
    go  watch(ctx, "goroutine 1")
    go  watch(ctx, "goroutine 2")
    go  watch(ctx, "goroutine 3")
    _  = cancel
    time.Sleep(8  * time.Second)
    fmt.Println(ctx.Err())
}

func  watch(ctx context.Context, name string) {
  for {
      select {case  <-ctx.Done():
          fmt.Println(name, "over")
          return
      default:
          fmt.Println(name, "running")
          time.Sleep(2  * time.Second)
      }
  }
}

// output:
goroutine 3 running
goroutine 1 running
goroutine 2 running
goroutine 3 running
goroutine 2 running
goroutine 1 running
goroutine 2 running
goroutine 1 running
goroutine 3 running
goroutine 2 over
goroutine 3 over
goroutine 1 over
context deadline exceeded

WithValue

type  key  int  // 未导出的包公有类型
var  kkk key =  0

func  main() {ctx, cancel  := context.WithCancel(context.Background())
    // WithValue 是没有勾销函数的
    ctx  = context.WithValue(ctx, kkk, "100W")
    
    go  watch(ctx, "goroutine 1")
    go  watch(ctx, "goroutine 2")
    go  watch(ctx, "goroutine 3")
    
    time.Sleep(8  * time.Second)
    
    fmt.Println("开始完结 goroutine")
    cancel()
    
    time.Sleep(3  * time.Second)
    fmt.Println(ctx.Err())
}

func  watch(ctx context.Context, name string) {
  for {
      select {case  <-ctx.Done():
          fmt.Println(name, "over")
          return
      default:
          fmt.Println(name, "running", "爸爸给我了", ctx.Value(kkk).(string))
          time.Sleep(2  * time.Second)
      }
  }
}
// output:
goroutine 1 running 爸爸给我了 100W
goroutine 2 running 爸爸给我了 100W
goroutine 3 running 爸爸给我了 100W
goroutine 2 running 爸爸给我了 100W
goroutine 1 running 爸爸给我了 100W
goroutine 3 running 爸爸给我了 100W
goroutine 1 running 爸爸给我了 100W
goroutine 2 running 爸爸给我了 100W
goroutine 3 running 爸爸给我了 100W
goroutine 1 running 爸爸给我了 100W
goroutine 3 running 爸爸给我了 100W
goroutine 2 running 爸爸给我了 100W
开始完结 goroutine
goroutine 2 over
goroutine 3 over
goroutine 1 running 爸爸给我了 100W
goroutine 1 over
context canceled

管制多个 goroutine

func  main() {http.HandleFunc("/", func(W http.ResponseWriter, r *http.Request) {fmt.Println("收到申请")
        
    ctx, cancel  := context.WithCancel(context.Background())
    go  worker(ctx, 2)
    go  worker(ctx, 3)
        
    time.Sleep(time.Second *  10)
    cancel()
    fmt.Println(ctx.Err())
    })
    http.ListenAndServe(":9290", nil)
}
func  worker(ctx context.Context, speed int) {reader  :=  func(n int) {
        for {
            select {case  <-ctx.Done():
                return
                default:
                break
            }
            fmt.Println("reader:", n)
            time.Sleep(time.Duration(n) * time.Second)
        }
    }
    
    go  reader(2)
    go  reader(1)
    
    for {
        select {case  <-ctx.Done():
            return
            default:
            break
        }
        fmt.Println("worker:", speed)
        time.Sleep(time.Duration(speed) * time.Second)
    }
}

// output:
收到申请
worker: 2
reader: 1
worker: 3
reader: 1
reader: 2
reader: 2
reader: 1
reader: 1
reader: 1
reader: 2
worker: 2
reader: 2
reader: 1
reader: 1
reader: 1
worker: 3
reader: 1
worker: 2
reader: 2
reader: 1
reader: 2
reader: 1
context canceled
  • 应用规定
    应用 Context 的程序应遵循以下这些规定来放弃跨包接口的统一和不便动态剖析工具 (go vet) 来查看上下文流传是否有潜在问题。

    • 不要将 Context 存储在构造类型中,而是显式的传递给每个须要的函数;Context 应该作为函数的第一个参数传递,通常命名为 ctx:
    func  DoSomething(ctx context.Context, arg Arg) error {// ... use ctx ...}   
    • 即便函数能够承受 nil 值,也不要传递 nil Context。如果不确定要应用哪个 Context,请传递 context.TODO。
    • 应用 context 的 Value 相干办法只应该用于在程序和接口中传递和申请相干的元数据,不要用它来传递一些可选的参数
    • 雷同的 Context 能够传递给在不同 goroutine 中运行的函数; Context 对于多个 goroutine 同时应用是平安的。
正文完
 0