关于golang:并发安全Context包的使用

55次阅读

共计 10536 个字符,预计需要花费 27 分钟才能阅读完成。

前言 – 为什么须要 Context

Golang context 是 Golang 利用开发罕用的并发控制技术,它与 WaitGroup 最大的不同点是 context 对于派生 goroutine 有更强的控制力,它能够管制多级的 goroutine。

context 翻译成中文是”上下文”,即它能够管制一组呈树状构造的 goroutine,每个 goroutine 领有雷同的上下文。

典型的应用场景如下图所示:

当一个申请被勾销或超时时,所有用来解决该申请的 goroutine 都应该迅速退出,而后零碎能力开释这些 goroutine 占用的资源。上图中因为 goroutine 派生出子 goroutine,而子 goroutine 又持续派生新的 goroutine,这种状况下应用 WaitGroup 就不太容易,因为子 goroutine 个数不容易确定。而应用 context 就能够很容易实现。

1. 全局变量形式退出

package main

import (
    "fmt"
    "sync"
    "time"
)

var wg sync.WaitGroup
var exit bool

// 全局变量形式存在的问题:// 1. 应用全局变量在跨包调用时不容易对立
// 2. 如果 worker 中再启动 goroutine,就不太好管制了。func worker() {
    for {fmt.Println("worker")
        time.Sleep(time.Second)
        if exit {break}
    }
    wg.Done()}

func main() {wg.Add(1)
    go worker()
    time.Sleep(time.Second * 3) // sleep3 秒免得程序过快退出
    exit = true                 // 批改全局变量实现子 goroutine 的退出
    wg.Wait()
    fmt.Println("over")
}

2.Channel 的形式退出

package main

import (
    "fmt"
    "sync"
    "time"
)

var wg sync.WaitGroup

// 管道形式存在的问题:// 1. 应用全局变量在跨包调用时不容易实现标准和对立,须要保护一个共用的 channel
func worker(exitChan chan struct{}) {
LOOP:
    for {fmt.Println("worker")
        time.Sleep(time.Second)
        select {
        case <-exitChan: // 期待接管下级告诉
            break LOOP
        default:
        }
    }
    wg.Done()}

func main() {var exitChan = make(chan struct{})
    wg.Add(1)
    go worker(exitChan)
    time.Sleep(time.Second * 3) // sleep3 秒免得程序过快退出
    exitChan <- struct{}{}      // 给子 goroutine 发送退出信号
    close(exitChan)
    wg.Wait()
    fmt.Println("over")
}

3.Context 形式退出

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

var wg sync.WaitGroup

func worker(ctx context.Context) {
LOOP:
    for {fmt.Println("worker")
        time.Sleep(time.Second)
        select {case <-ctx.Done(): // 期待下级告诉
            break LOOP
        default:
        }
    }
    wg.Done()}

func main() {ctx, cancel := context.WithCancel(context.Background())
    wg.Add(1)
    go worker(ctx)
    time.Sleep(time.Second * 3)
    cancel() // 告诉子 goroutine 完结
    wg.Wait()
    fmt.Println("over")
}

并且当子 goroutine 又开启另外一个 goroutine 时,只须要将 ctx 传入即可:

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

var wg sync.WaitGroup

func worker(ctx context.Context) {go worker2(ctx)
LOOP:
    for {fmt.Println("worker")
        time.Sleep(time.Second)
        select {case <-ctx.Done(): // 期待下级告诉
            break LOOP
        default:
        }
    }
    wg.Done()}

func worker2(ctx context.Context) {
LOOP:
    for {fmt.Println("worker2")
        time.Sleep(time.Second)
        select {case <-ctx.Done(): // 期待下级告诉
            break LOOP
        default:
        }
    }
}

func main() {ctx, cancel := context.WithCancel(context.Background())
    wg.Add(1)
    go worker(ctx)
    time.Sleep(time.Second * 3)
    cancel() // 告诉子 goroutine 完结
    wg.Wait()
    fmt.Println("over")
}

Context 初识

Go1.7 退出了一个新的规范库 context,它定义了 Context 类型,专门用来简化 对于解决单个申请的多个 goroutine 之间与申请域的数据、勾销信号、截止工夫等相干操作,这些操作可能波及多个 API 调用。

对服务器传入的申请应该创立上下文,而对服务器的传出调用应该承受上下文。它们之间的函数调用链必须传递上下文,或者能够应用 WithCancel、WithDeadline、WithTimeout 或 WithValue 创立的派生上下文。当一个上下文被勾销时,它派生的所有上下文也被勾销。

1. Context 实现原理

接口定义

type Context interface {Deadline() (deadline time.Time, ok bool)
    Done() <-chan struct{}
    Err() error
    Value(key interface{}) interface{}}

其中:

  • Deadline 办法须要返回以后 Context 被勾销的工夫,也就是实现工作的截止工夫(deadline);
  • Done 办法须要返回一个 Channel,这个 Channel 会在当前工作实现或者上下文被勾销之后敞开,屡次调用 Done 办法会返回同一个 Channel;
  • Err 办法会返回以后 Context 完结的起因,它只会在 Done 返回的 Channel 被敞开时才会返回非空的值;

    • 如果以后 Context 被勾销就会返回 Canceled 谬误;
    • 如果以后 Context 超时就会返回 DeadlineExceeded 谬误;
  • Value 办法会从 Context 中返回键对应的值,对于同一个上下文来说,屡次调用 Value 并传入雷同的 Key 会返回雷同的后果,该办法仅用于传递跨 API 和过程间跟申请域的数据;

2. emptyCtx–Background() 和 TODO()

context 包中定义了一个空的 context,名为 emptyCtx,用于 context 的根节点,空的 context 只是简略的实现了 Context,自身不蕴含任何值,仅用于其余 context 的父节点。

emptyCtx 类型定义如下代码所示:

type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {return}

func (*emptyCtx) Done() <-chan struct{} {return nil}

func (*emptyCtx) Err() error {return nil}

func (*emptyCtx) Value(key interface{}) interface{} {return nil}

context 包中定义了一个专用的 emptCtx 全局变量,名为 background,能够应用 context.Background() 获取它,实现代码如下所示:

var background = new(emptyCtx)
func Background() Context {return background}

Background() 次要用于 main 函数、初始化以及测试代码中,作为 Context 这个树结构的最顶层的 Context,也就是根 Context。

TODO(),如果咱们不晓得该应用什么 Context 的时候,能够应用这个。

Background 和 TODO 实质上都是 emptyCtx 构造体类型,是一个不可勾销,没有设置截止工夫,没有携带任何值的 Context。

context 包中实现 Context 接口的 struct,除了 emptyCtx 外,还有 cancelCtx、timerCtx 和 valueCtx 三种,正是基于这三种 context 实例,实现了上面 4 种类型的 context,应用这四个办法时如果没有父 context,都须要传入 backgroud,即 backgroud 作为其父节点

  • WithCancel()
  • WithDeadline()
  • WithTimeout()
  • WithValue()

context 包中各 context 类型之间的关系,如下图所示:

3.cancelCtx–WithCancel()

构造定义

type cancelCtx struct {
    Context

    mu       sync.Mutex            // protects following fields
    done     chan struct{}         // created lazily, closed by first cancel call
    children map[canceler]struct{} // set to nil by the first cancel call
    err      error                 // set to non-nil by the first cancel call
}

cancel() 接口实现

cancel() 外部办法是了解 cancelCtx 的最要害的办法,其作用是敞开本人和其后辈,其后辈存储在 cancelCtx.children 的 map 中,其中 key 值即后辈对象,value 值并没有意义,这里应用 map 只是为了不便查问而已。

cancel 办法实现代码如下所示:

func (c *cancelCtx) cancel(removeFromParent bool, err error) {
    if err == nil {panic("context: internal error: missing cancel error")
    }
    c.mu.Lock()
    if c.err != nil {c.mu.Unlock()
        return // already canceled
    }
    c.err = err
    if c.done == nil {c.done = closedchan} else {close(c.done)
    }
    for child := range c.children {
        // NOTE: acquiring the child's lock while holding parent's lock.
        child.cancel(false, err)
    }
    c.children = nil
    c.mu.Unlock()

    if removeFromParent {removeChild(c.Context, c)
    }
}

WithCancel() 办法实现

WithCancel() 办法作了三件事:

  • 初始化一个 cancelCtx 实例
  • 将 cancelCtx 实例增加到其父节点的 children 中 (如果父节点也能够被 cancel 的话)
  • 返回 cancelCtx 实例和 cancel() 办法

其实现源码如下所示:

func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
    if parent == nil {panic("cannot create context from nil parent")
    }
    c := newCancelCtx(parent)
    propagateCancel(parent, &c)   // 将本身增加到父节点
    return &c, func() { c.cancel(true, Canceled) }
}

这里将本身增加到父节点的过程有必要简略阐明一下:

  1. 如果父节点也反对 cancel,也就是说其父节点必定有 children 成员,那么把新 context 增加到 children 里即可;
  2. 如果父节点不反对 cancel,就持续向上查问,直到找到一个反对 cancel 的节点,把新 context 增加到 children 里;
  3. 如果所有的父节点均不反对 cancel,则启动一个协程期待父节点完结,而后再把以后 context 完结。

典型利用案例

package main

import (
    "fmt"
    "time"
    "context"
)

func HandelRequest(ctx context.Context) {go WriteRedis(ctx)
    go WriteDatabase(ctx)
    for {
        select {case <-ctx.Done():
            fmt.Println("HandelRequest Done.")
            return
        default:
            fmt.Println("HandelRequest running")
            time.Sleep(2 * time.Second)
        }
    }
}

func WriteRedis(ctx context.Context) {
    for {
        select {case <-ctx.Done():
            fmt.Println("WriteRedis Done.")
            return
        default:
            fmt.Println("WriteRedis running")
            time.Sleep(2 * time.Second)
        }
    }
}

func WriteDatabase(ctx context.Context) {
    for {
        select {case <-ctx.Done():
            fmt.Println("WriteDatabase Done.")
            return
        default:
            fmt.Println("WriteDatabase running")
            time.Sleep(2 * time.Second)
        }
    }
}

func main() {ctx, cancel := context.WithCancel(context.Background())
    go HandelRequest(ctx)

    time.Sleep(5 * time.Second)
    fmt.Println("It's time to stop all sub goroutines!")
    cancel()

    //Just for test whether sub goroutines exit or not
    time.Sleep(5 * time.Second)
}

下面代码中协程 HandelRequest() 用于解决某个申请,其又会创立两个协程:WriteRedis()、WriteDatabase(),main 协程创立 context,并把 context 在各子协程间传递,main 协程在适当的机会能够 cancel 掉所有子协程。

4. timerCtx–WithTimerout()

构造定义

type timerCtx struct {
    cancelCtx
    timer *time.Timer // Under cancelCtx.mu.

    deadline time.Time
}

timerCtx 在 cancelCtx 根底上减少了 deadline 用于标示主动 cancel 的最终工夫,而 timer 就是一个触发主动 cancel 的定时器。

由此,衍生出 WithDeadline() 和 WithTimeout()。实现上这两种类型实现原理一样,只不过应用语境不一样:

  • deadline: 指定最初期限,比方 context 将 2018.10.20 00:00:00 之时主动完结
  • timeout: 指定最长存活工夫,比方 context 将在 30s 后完结。

对于接口来说,timerCtx 在 cancelCtx 根底上还须要实现 Deadline() 和 cancel() 办法,其中 cancel() 办法是重写的。

cancel() 接口实现

func (c *timerCtx) cancel(removeFromParent bool, err error) {c.cancelCtx.cancel(false, err)
    if removeFromParent {
        // Remove this timerCtx from its parent cancelCtx's children.
        removeChild(c.cancelCtx.Context, c)
    }
    c.mu.Lock()
    if c.timer != nil {c.timer.Stop()
        c.timer = nil
    }
    c.mu.Unlock()}

cancel() 办法根本继承 cancelCtx,只须要额定把 timer 敞开。

timerCtx 被敞开后,timerCtx.cancelCtx.err 将会存储敞开起因:

  • 如果 deadline 到来之前手动敞开,则敞开起因与 cancelCtx 显示统一;
  • 如果 deadline 到来时主动敞开,则起因为:”context deadline exceeded”

WithDeadline() 办法实现

WithDeadline() 办法实现步骤如下:

  • 初始化一个 timerCtx 实例
  • 将 timerCtx 实例增加到其父节点的 children 中 (如果父节点也能够被 cancel 的话)
  • 启动定时器,定时器到期后会主动 cancel 本 context
  • 返回 timerCtx 实例和 cancel() 办法

也就是说,timerCtx 类型的 context 不仅反对手动 cancel,也会在定时器到来后主动 cancel。

其实现源码如下:

func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
    if parent == nil {panic("cannot create context from nil parent")
    }
    if cur, ok := parent.Deadline(); ok && cur.Before(d) {
        // The current deadline is already sooner than the new one.
        return WithCancel(parent)
    }
    c := &timerCtx{cancelCtx: newCancelCtx(parent),
        deadline:  d,
    }
    propagateCancel(parent, c)
    dur := time.Until(d)
    if dur <= 0 {c.cancel(true, DeadlineExceeded) // deadline has already passed
        return c, func() { c.cancel(false, Canceled) }
    }
    c.mu.Lock()
    defer c.mu.Unlock()
    if c.err == nil {c.timer = time.AfterFunc(dur, func() {c.cancel(true, DeadlineExceeded)
        })
    }
    return c, func() { c.cancel(true, Canceled) }
}

WithTimeout() 办法实现

WithTimeout() 理论调用了 WithDeadline,二者实现原理统一。

看代码会十分清晰:

func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {return WithDeadline(parent, time.Now().Add(timeout))
}

典型利用案例

上面例子中应用 WithTimeout() 取得一个 context 并在其子协程中传递:

package main

import (
    "fmt"
    "time"
    "context"
)

func HandelRequest(ctx context.Context) {go WriteRedis(ctx)
    go WriteDatabase(ctx)
    for {
        select {case <-ctx.Done():
            fmt.Println("HandelRequest Done.")
            return
        default:
            fmt.Println("HandelRequest running")
            time.Sleep(2 * time.Second)
        }
    }
}

func WriteRedis(ctx context.Context) {
    for {
        select {case <-ctx.Done():
            fmt.Println("WriteRedis Done.")
            return
        default:
            fmt.Println("WriteRedis running")
            time.Sleep(2 * time.Second)
        }
    }
}

func WriteDatabase(ctx context.Context) {
    for {
        select {case <-ctx.Done():
            fmt.Println("WriteDatabase Done.")
            return
        default:
            fmt.Println("WriteDatabase running")
            time.Sleep(2 * time.Second)
        }
    }
}

func main() {ctx, _ := context.WithTimeout(context.Background(), 5 * time.Second)
    go HandelRequest(ctx)

    time.Sleep(10 * time.Second)
}

主协程中创立一个 10s 超时的 context,并将其传递给子协程,5s 主动敞开 context。

5. valueCtx–WithValue()

构造定义

type valueCtx struct {
    Context
    key, val interface{}}

valueCtx 只是在 Context 根底上减少了一个 key-value 对,用于在各级协程间传递一些数据。因为 valueCtx 既不须要 cancel,也不须要 deadline,那么只须要实现 Value() 接口即可。

Value() 接口实现

func (c *valueCtx) Value(key interface{}) interface{} {
    if c.key == key {return c.val}
    return c.Context.Value(key)
}

由 valueCtx 数据结构定义可见,valueCtx.key 和 valueCtx.val 别离代表其 key 和 value 值。实现也很简略,这里有个细节须要关注一下,即以后 context 查找不到 key 时,会向父节点查找,如果查问不到则最终返回 interface{}。也就是说,能够通过子 context 查问到父的 value 值。

WithValue() 办法实现

其源码如下:

func WithValue(parent Context, key, val interface{}) Context {
    if parent == nil {panic("cannot create context from nil parent")
    }
    if key == nil {panic("nil key")
    }
    if !reflectlite.TypeOf(key).Comparable() {panic("key is not comparable")
    }
    return &valueCtx{parent, key, val}
}

典型利用案例

package main

import (
    "fmt"
    "time"
    "context"
)

func HandelRequest(ctx context.Context) {
    for {
        select {case <-ctx.Done():
            fmt.Println("HandelRequest Done.")
            return
        default:
            fmt.Println("HandelRequest running, parameter:", ctx.Value("parameter"))
            time.Sleep(2 * time.Second)
        }
    }
}

func main() {ctx := context.WithValue(context.Background(), "parameter", "1")
    go HandelRequest(ctx)

    time.Sleep(10 * time.Second)
}

上例 main() 中通过 WithValue() 办法取得一个 context,须要指定一个父 context、key 和 value。而后通将该 context 传递给子协程 HandelRequest,子协程能够读取到 context 的 key-value。

留神:本例中子协程无奈主动完结,因为 context 是不反对 cancle 的,也就是说 <-ctx.Done() 永远无奈返回。如果须要返回,须要在创立 context 时指定一个能够 cancel 的 context 作为父节点,应用父节点的 cancel() 在适当的机会完结整个 context。

总结

  • Context 仅仅是一个接口定义,依据实现的不同,能够衍生出不同的 context 类型;
  • cancelCtx 实现了 Context 接口,通过 WithCancel() 创立 cancelCtx 实例;
  • timerCtx 实现了 Context 接口,通过 WithDeadline() 和 WithTimeout() 创立 timerCtx 实例;
  • valueCtx 实现了 Context 接口,通过 WithValue() 创立 valueCtx 实例;
  • 三种 context 实例可互为父节点,从而能够组合成不同的利用模式;
  • 举荐以参数的形式显示传递 Context
  • 以 Context 作为参数的函数办法,应该把 Context 作为第一个参数。
  • 给一个函数办法传递 Context 的时候,不要传递 nil,如果不晓得传递什么,就应用 context.TODO()
  • Context 的 Value 相干办法应该传递申请域的必要数据,不应该用于传递可选参数
  • Context 是线程平安的,能够释怀的在多个 goroutine 中传递

正文完
 0