Go-Context-使用和源码分析

23次阅读

共计 7403 个字符,预计需要花费 19 分钟才能阅读完成。

概述

Go 语言中的 Goroutine 是 go 语言中的最重要的一部分,是一个用户级的线程是 Go 语言实现高并发高性能的重要原因。但是如何停止一个已经开启的 Goroutine 呢?一般有几种方法:

  • 使用共享内存来停止 goroutine,比如通过判断一个全局变量来判断是否要停止 goroutine
  • 使用文件系统来停止 goroutine,跟使用内存相同用文件来判断
  • 使用 context 上下文,context 也是大家最推荐的一种方式。并且可以结束嵌套的 goroutine。

简单使用

context 库中,有 4 个关键方法:

  • WithCancel 返回一个 cancel 函数,调用这个函数则可以主动停止 goroutine。
  • WithValue WithValue 可以设置一个 key/value 的键值对,可以在下游任何一个嵌套的 context 中通过 key 获取 value。但是不建议使用这种来做 goroutine 之间的通信。
  • WithTimeout 函数可以设置一个 time.Duration,到了这个时间则会 cancel 这个 context。
  • WithDeadline WithDeadline 函数跟 WithTimeout 很相近,只是 WithDeadline 设置的是一个时间点。
package main

import (
    "context"
    "fmt"
    "time"
)

func main() {
    //cancel
    ctx, cancel := context.WithCancel(context.Background())
    go work(ctx, "work1")

    time.Sleep(time.Second * 3)
    cancel()
    time.Sleep(time.Second * 1)

    // with value
    ctx1, valueCancel := context.WithCancel(context.Background())
    valueCtx := context.WithValue(ctx1, "key", "test value context")
    go workWithValue(valueCtx, "value work", "key")
    time.Sleep(time.Second * 3)
    valueCancel()

    // timeout
    ctx2, timeCancel := context.WithTimeout(context.Background(), time.Second*3)
    go work(ctx2, "time cancel")
    time.Sleep(time.Second * 5)
    timeCancel()

    // deadline
    ctx3, deadlineCancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*3))
    go work(ctx3, "deadline cancel")
    time.Sleep(time.Second * 5)
    deadlineCancel()

    time.Sleep(time.Second * 3)

}

func workWithValue(ctx context.Context, name string, key string) {
    for {
        select {case <-ctx.Done():
            fmt.Println(ctx.Value(key))
            println(name, "get message to quit")
            return
        default:
            println(name, "is running", time.Now().String())
            time.Sleep(time.Second)
        }
    }
}

func work(ctx context.Context, name string) {
    for {
        select {case <-ctx.Done():
            println(name, "get message to quit")
            return
        default:
            println(name, "is running", time.Now().String())
            time.Sleep(time.Second)
        }
    }
}

源码分析

context 的原理其实就是利用了 channel struct{} 的特性,使用 select 获取 channel 数据。一旦关闭这个 channel 则会收到数据退出 goroutine 中的逻辑。context 也是支持嵌套使用,结构就如下图显示利用的是一个 map 类型来存储子 context。关闭一个节点就会循环关闭这个节点下面的所有子节点,就实现了优雅的退出 goroutine 的功能。下面我们看具体接口对象和源码逻辑。

Context 接口和核心对象

context interface 有 4 个方法

  • Deadline 该方法返回一个 deadline 和标识是否已设置 deadline 的 bool 值,如果没有设置 deadline,则 ok == false,此时 deadline 为一个初始值的 time.Time 值
  • Done 返回一个 channel。当 timeout 或者调用 cancel 方法时,将会 close 掉
  • Err 返回一个错误
  • Value 返回 WithValue 设置的值
type Context interface {Deadline() (deadline time.Time, ok bool)

    Done() <-chan struct{}

    Err() error

    Value(key interface{}) interface{}}

emptyCtx

在上面的例子中我们可以看到函数 context.Background(),这个函数返回的就是一个 emptyCtx
emptyCtx 经常被用作在跟节点或者说是最上层的 context,因为 context 是可以嵌套的。在上面的 Withvalue 的例子中已经看到,先用 emptyCtx 创建一个 context,然后再使用 withValue 把之前创建的 context 传入。这个操作会在下面的分析中详细了解的。
下面就是 emptyCtx,其实实现很简单所有的方法几乎返回的都是 nil。
ToDo 函数返回的也是

var (background = new(emptyCtx)
    todo       = new(emptyCtx)
)

type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {return}

func (*emptyCtx) Done() <-chan struct{} {return nil}

func (*emptyCtx) Err() error {return nil}

func (*emptyCtx) Value(key interface{}) interface{} {return nil}

func (e *emptyCtx) String() string {
    switch e {
    case background:
        return "context.Background"
    case todo:
        return "context.TODO"
    }
    return "unknown empty Context"
}

var (background = new(emptyCtx)
    todo       = new(emptyCtx)
)

func Background() Context {return background}

func TODO() Context {return todo}

cancelCtx

cancelCtx 是 context 实现里最重要的一环,context 的取消几乎都是使用了这个对象。WithDeadline WithTimeout 其实最终都是调用的 cancel 的 cancel 函数来实现的。
对象中的字段:

  • Context 保存 parent Context
  • mu 用来保护数据
  • done 用来标识是否已被 cancel。当外部触发 cancel、或者父 Context 的 channel 关闭时,此 done 也会关闭
  • children 保存它的所有子 canceler
  • err 已经 cancel 则 err!= nil

cancel 主要函数:

Done

Done 函数返回一个 chan struct{} 的 channel,用来判断 context 是否已经被 close 了。从上面的例子可以看到使用一个 select 来判断 context 是否被关闭。一旦从外部调用 cancel 函数关闭了 context 的 done 属性,select 则可以拿到输出,最终关闭这个 context

Cancel

Cancel 函数用来在外部调用,调用之后主要操作:

  1. 加锁避免多出操作
  2. 如果 cancelCtx 的 done 未被初始化则初始化一个(这个属于 lazyload)
  3. 调用 close(c.done) 来关闭 channel,由于 make(chan struct{}) 的特性,上面的 Done channel 则会接收到数据
  4. 循环调用 context.children 的 cancel 方法,关闭所有嵌套的 context。
  5. 释放锁 c.mu.Unlock()
  6. 根据参数 removeFromParent 来判断是否要
type cancelCtx struct {
    Context
    mu       sync.Mutex            // protects following fields
    done     chan struct{}         // created lazily, closed by first cancel call
    children map[canceler]struct{} // set to nil by the first cancel call
    err      error                 // set to non-nil by the first cancel call
}

// 可以被 cancel 的对象,实现者是 *cancelCtx 和 *timerCtx.
type canceler interface {cancel(removeFromParent bool, err error)
    Done() <-chan struct{}
}

func (c *cancelCtx) Done() <-chan struct{} {c.mu.Lock()
    if c.done == nil {c.done = make(chan struct{})
    }
    d := c.done
    c.mu.Unlock()
    return d
}

func (c *cancelCtx) Err() error {c.mu.Lock()
    defer c.mu.Unlock()
    return c.err
}

func (c *cancelCtx) String() string {return fmt.Sprintf("%v.WithCancel", c.Context)
}

// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
func (c *cancelCtx) C(removeFromParent bool, err error) {
    if err == nil {panic("context: internal error: missing cancel error")
    }
    c.mu.Lock()
    if c.err != nil {c.mu.Unlock()
        return // already canceled
    }
    c.err = err
    if c.done == nil {c.done = closedchan} else {close(c.done)
    }
    for child := range c.children {
        // NOTE: acquiring the child's lock while holding parent's lock.
        child.cancel(false, err)
    }
    c.children = nil
    c.mu.Unlock()

    if removeFromParent {removeChild(c.Context, c)
    }
}

timerCtx

timeCtx 其实是在 cancelCtx 基础上增加 timer 属性。其中的 cancel 函数也是调用 cancelCtx 的 Cancel 函数。

type timerCtx struct {
    cancelCtx
    timer *time.Timer // Under cancelCtx.mu.

    deadline time.Time
}

func (c *timerCtx) cancel(removeFromParent bool, err error) {c.cancelCtx.cancel(false, err)
    if removeFromParent {
        // Remove this timerCtx from its parent cancelCtx's children.
        removeChild(c.cancelCtx.Context, c)
    }
    c.mu.Lock()
    if c.timer != nil {c.timer.Stop()
        c.timer = nil
    }
    c.mu.Unlock()}

func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {return c.deadline, true}

WithCancel WithDeadline WithTimeout WithValue

这三个方法是对于 context 使用的一个封装,在最上边的例子里我们可以看到是如何使用的。在这段我们是要看的是如何实现的源码。

WithCancel

WithCancel 函数返回 context 和一个主动取消的函数,外部只要调用这个函数则会 close context 中 channel。
返回的函数测试 cancelCtx 中测 cancel 函数,在上面已经有了详细说明这里就不过多描述了。

func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {c := newCancelCtx(parent)
    propagateCancel(parent, &c)
    return &c, func() { c.cancel(true, Canceled) }
}

WithDeadline

  1. 判断父节点中的 deadline 是否比父节点的早,如果是则直接调用 WithCancel
  2. 创建一个 timerCtx,timerCtx 的具体描述也在上面详细分析过了
  3. 使用 time.afterFunc 设置 dur,当时间到了则执行 timerCtx.Cancel 最终执行的也是 cancelCtx.Cancel
  4. 返回 Cancel 函数,方便外部调用
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {if cur, ok := parent.Deadline(); ok && cur.Before(d) {
        // The current deadline is already sooner than the new one.
        return WithCancel(parent)
    }
    c := &timerCtx{cancelCtx: newCancelCtx(parent),
        deadline:  d,
    }
    propagateCancel(parent, c)
    dur := time.Until(d)
    if dur <= 0 {c.cancel(true, DeadlineExceeded) // deadline has already passed
        return c, func() { c.cancel(true, Canceled) }
    }
    c.mu.Lock()
    defer c.mu.Unlock()
    if c.err == nil {c.timer = time.AfterFunc(dur, func() {c.cancel(true, DeadlineExceeded)
        })
    }
    return c, func() { c.cancel(true, Canceled) }
}

WithTimeout

WithTimeout 实现很简单,其实就是调用了 WithDeadline 方法,传入已经计算过的 deadline。

func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {return WithDeadline(parent, time.Now().Add(timeout))
}

WithValue

WithValue 不返回 cancel 函数,只是把传入的 key 和 value 保存起来。方便上下游节点根据 key 获取 value。

type valueCtx struct {
    Context
    key, val interface{}}

func (c *valueCtx) String() string {return fmt.Sprintf("%v.WithValue(%#v, %#v)", c.Context, c.key, c.val)
}

func (c *valueCtx) Value(key interface{}) interface{} {
    if c.key == key {return c.val}
    return c.Context.Value(key)
}

func WithValue(parent Context, key, val interface{}) Context {
    if key == nil {panic("nil key")
    }
    if !reflect.TypeOf(key).Comparable() {panic("key is not comparable")
    }
    return &valueCtx{parent, key, val}
}

使用原则

从网上看到了一些使用原则,把他摘抄下来:

  • 不要把 Context 存在一个结构体当中,显式地传入函数。Context 变量需要作为第一个参数使用,一般命名为 ctx。
  • 即使方法允许,也不要传入一个 nil 的 Context,如果你不确定你要用什么 Context 的时候传一个 context.TODO。
  • 使用 context 的 Value 相关方法只应该用于在程序和接口中传递的和请求相关的元数据,不要用它来传递一些可选的参数。
  • 同样的 Context 可以用来传递到不同的 goroutine 中,Context 在多个 goroutine 中是安全的

总结

上面讲述了 context 的用法和源码,其实有很多框架都实现了自己的 context。其实只要继承了 context 接口就是一个 context 对象。Context 是大家都比较推荐的一种停止 goroutine 的一种方式,并且 context 支持嵌套,停止跟节点它下面所有的子节点都会停止。

正文完
 0