关于golang:Go进阶基础特性定时器

41次阅读

共计 7642 个字符,预计需要花费 20 分钟才能阅读完成。

在理论的利用中,咱们常常会须要在特定的提早后,或者定时去做某件事情。这时就须要用到定时器了,Go 语言提供了一次性定时器 time.Timer 和周期型定时器 time.Ticker。

如何应用

Timer 是一次性的定时器,通过指定的工夫后触发一个事件,这个事件通过其自身提供的 channel 进行告诉。与之相干的次要办法如下:

// 创立 Timer
func NewTimer(d Duration) *Timer
// 进行 Timer
func (t *Timer) Stop() bool
// 重置 Timer
func (t *Timer) Reset(d Duration) bool
// 创立 Timer,返回它的 channel
func After(d Duration) <-chan Time
// 创立一个提早执行 f 函数的 Timer
func AfterFunc(d Duration, f func()) *Timer

Timer 的次要应用场景有:

  1. 设定超时工夫;
  2. 提早执行某个办法。

Ticker 是周期型定时器,即周期性的触发一个事件,通过 Ticker 提供的管道将事件传递进来。次要办法有:

// 创立 Ticker
func NewTicker(d Duration) *Ticker
// 进行 Ticker,Ticker 在应用完后务必要开释,否则会产生资源泄露
func (t *Ticker) Stop()
// 启动一个匿名的 Ticker(无奈进行)func Tick(d Duration) <-chan Time

Ticker 的应用场景都和定时工作无关,例如定时进行聚合等批量解决。

实现原理

咱们先来看看 Timer 和 Ticker 的数据结构:

type Timer struct {
    C <-chan Time
    r runtimeTimer
}

type Ticker struct {
    C <-chan Time
    r runtimeTimer
}

发现二者截然不同,而且都蕴含有 runtimeTimer 字段,这个 runtimeTimer 才是定时器底层真正的数据结构。翻看 Timer 和 Ticker 的相干实现代码,与这两个构造自身相干的逻辑都很简略,真正简单的是底层 runtimeTimer 的设计与保护,这也是咱们要重点介绍的内容。

演进历史

Go 语言的计时器实现经验过很多个版本的迭代,到最新的版本为止,计时器的实现别离经验了以下几段历史:

  1. Go 1.9 版本之前,所有的计时器由全局惟一的四叉堆保护;
  2. Go 1.10 ~ 1.13 版本,全局应用 64 个四叉堆保护所有的计时器,每个处理器(P)创立的计时器会由对应的四叉堆保护;
  3. Go 1.14 版本之后,每个处理器独自治理计时器并通过网络轮询器触发。

在最开始的实现中,运行时创立的所有计时器都会退出到全局惟一的四叉堆中,而后有一个专门的协程 timerproc 来治理这些计时器,运行时会在计时器到期或者退出了更早的计时器时唤醒 timerproc 来解决。那这样一来,就会产生两个性能上的问题,第一个就是全局惟一的四叉堆带来的锁争用问题,第二个就是唤醒 timerproc 带来的上下文切换问题。

Go 1.10 版本中将全局的四叉堆宰割成了 64 个更小的四叉堆,这种分片的形式,升高了锁的粒度,解决了下面提到的第一个问题,然而第二个问题还是悬而未决。

在最新版本的实现中,所有的计时器都以最小四叉堆的模式存储在处理器 runtime.p 中,这样的设计形式让两个性能问题都迎刃而解。

数据结构

runtime.timer 是 Go 语言计时器的外部示意,每一个计时器都存储在对应处理器的最小四叉堆中,上面是运行时计时器对应的构造体:

type timer struct {
    pp puintptr

    when     int64
    period   int64
    f        func(interface{}, uintptr)
    arg      interface{}
    seq      uintptr
    nextwhen int64
    status   uint32
}
  • pp:计时器所在的处理器 P 的指针地址。
  • when:计时器被唤醒的工夫。
  • period:计时器再次被唤醒的工夫距离,只有 Ticker 会用到。
  • f:回调函数,每次在计时器被唤醒时都会调用。
  • arg:回调函数 f 的参数。
  • seq:回调函数 f 的参数,该参数仅在 netpoll 的利用场景下应用。
  • nextwhen:当计时器状态为 timerModifiedXX 时,将会应用 nextwhen 的值设置到 when 字段上。
  • status:计时器的以后状态值。

状态

现阶段计时器所蕴含的状态有上面几种:

状态 含意
timerNoStatus 计时器尚未设置状态
timerWaiting 期待计时器启动
timerRunning 运行计时器的回调办法
timerDeleted 计时器曾经被删除,但依然在某些 P 的堆中
timerRemoving 计时器正在被删除
timerRemoved 计时器曾经进行,且不在任何 P 的堆中
timerModifying 计时器正在被批改
timerModifiedEarlier 计时器已被批改为更早的工夫
timerModifiedLater 计时器已被批改为更晚的工夫
timerMoving 计时器曾经被批改,正在被挪动
  • 处于 timerRunning、timerRemoving、timerModifying 和 timerMoving 状态的工夫比拟短。
  • 处于 timerWaiting、timerRunning、timerDeleted、timerRemoving、timerModifying、timerModifiedEarlier、timerModifiedLater 和 timerMoving 状态时计时器在处理器(P)的堆上。
  • 处于 timerNoStatus 和 timerRemoved 状态时计时器不在堆上。
  • 处于 timerModifiedEarlier 和 timerModifiedLater 状态时计时器尽管在堆上,然而可能位于谬误的地位上,须要从新排序。

相干操作

增加计时器

当咱们调用 time.NewTimer 或 time.NewTicker 时,会执行 runtime.addtimer 函数增加计时器:

func addtimer(t *timer) {
    if t.when < 0 {t.when = maxWhen}
    if t.status != timerNoStatus {throw("addtimer called with initialized timer")
    }
    t.status = timerWaiting

    when := t.when

    pp := getg().m.p.ptr()
    lock(&pp.timersLock)
    cleantimers(pp)
    doaddtimer(pp, t)
    unlock(&pp.timersLock)

    wakeNetPoller(when)
}
  1. 边界解决以及状态判断;
  2. 调用 cleantimers 清理处理器中的计时器;
  3. 调用 doaddtimer 初始化网络轮询器,并将以后计时器退出处理器的 timers 四叉堆中;
  4. 调用 wakeNetPoller 中断正在阻塞的网络轮询,依据工夫判断是否须要唤醒网络轮询器中休眠的线程。
删除计时器

在计时器的应用中,个别会调用 timer.Stop() 办法来进行计时器,实质上就是让这个 timer 从轮询器中隐没,也就是从处理器 P 的堆中移除 timer:

func deltimer(t *timer) bool {
    for {switch s := atomic.Load(&t.status); s {
        case timerWaiting, timerModifiedLater:
            // timerWaiting/timerModifiedLater -> timerDeleted
            ...
        case timerModifiedEarlier:
            // timerModifiedEarlier -> timerModifying -> timerDeleted
            ...
        case timerDeleted, timerRemoving, timerRemoved:
            // timerDeleted/timerRemoving/timerRemoved 
            return false
        case timerRunning, timerMoving:
            // timerRunning/timerMoving
            osyield()
        case timerNoStatus:
            return false
        case timerModifying:
            osyield()
        default:
            badTimer()}
    }
}

在 deltimer 中遵循了根本的规定解决:

  1. timerWaiting/timerModifiedLater -> timerDeleted。
  2. timerModifiedEarlier -> timerModifying -> timerDeleted。
  3. timerDeleted/timerRemoving/timerRemoved -> 无需变更,曾经满足条件。
  4. timerRunning/timerMoving/timerModifying -> 正在执行、挪动中,无奈进行,期待下一次状态查看再解决。
  5. timerNoStatus -> 无奈进行,不满足条件。
批改计时器

在咱们调用 timer.Reset 办法来从新设置 Duration 值的时候,咱们就是在对底层的计时器进行批改,对应的是 runtime.modtimer 办法。这个办法比较复杂,就不具体介绍了,有趣味的能够本人钻研一下。modtimer 遵循下述规定解决:

  1. timerWaiting -> timerModifying -> timerModifiedXX。
  2. timerModifiedXX -> timerModifying -> timerModifiedYY。
  3. timerNoStatus -> timerModifying -> timerWaiting。
  4. timerRemoved -> timerModifying -> timerWaiting。
  5. timerDeleted -> timerModifying -> timerModifiedXX。
  6. timerRunning -> 期待状态扭转,才能够进行下一步。
  7. timerMoving -> 期待状态扭转,才能够进行下一步。
  8. timerRemoving -> 期待状态扭转,才能够进行下一步。
  9. timerModifying -> 期待状态扭转,才能够进行下一步。

在实现了计时器的状态解决后,会分为两种状况解决:

  1. 待批改的计时器曾经被删除:因为既有的计时器曾经没有了,因而会调用 doaddtimer 办法创立一个新的计时器,并将本来的 timer 属性赋值过来,再调用 wakeNetPoller 办法在预约工夫唤醒网络轮询器。
  2. 失常逻辑解决:如果批改后的计时器的触发工夫小于本来的触发工夫,则批改该计时器的状态为 timerModifiedEarlier,并且调用 wakeNetPoller 办法在预约工夫唤醒网络轮询器。
触发计时器

Go 语言会在两种场景下触发计时器,运行计时器中保留的函数:

  • 调度器调度时会查看处理器中的计时器是否准备就绪;
  • 系统监控会查看是否有未执行的到期计时器。

调度器的触发一共分两种状况,一种是在调度循环 schedule 中,另一种是以后处理器 P 没有可执行的 G 和计时器,去其余 P 窃取计时器和 G 的 findrunnable 函数中。触发计时器时执行的是 checkTimers 函数,来分析一下它的大抵过程。

func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {if atomic.Load(&pp.adjustTimers) == 0 {next := int64(atomic.Load64(&pp.timer0When))
        if next == 0 {return now, 0, false}
        if now == 0 {now = nanotime()
        }
        if now < next {if pp != getg().m.p.ptr() || int(atomic.Load(&pp.deletedTimers)) <= int(atomic.Load(&pp.numTimers)/4) {return now, next, false}
        }
    }

    lock(&pp.timersLock)

    adjusttimers(pp)

这一段是调整堆中计时器的过程:

  • 起始先通过 pp.adjustTimers 查看以后处理器 P 中是否有须要调整的计时器,如果没有的话:

    • 当没有须要执行的计时器时,间接返回;
    • 当下一个计时器没有到期并且须要删除的计时器不多于总数的 1/4 时都会间接返回。
  • 如果处理器中存在须要调整的计时器,会调用 runtime.adjusttimers 依据工夫将 timers 切片重新排列。
rnow = now
    if len(pp.timers) > 0 {
        if rnow == 0 {rnow = nanotime()
        }
        for len(pp.timers) > 0 {if tw := runtimer(pp, rnow); tw != 0 {
                if tw > 0 {pollUntil = tw}
                break
            }
            ran = true
        }
    }

执行完调整阶段的逻辑后,就是运行计时器的代码。这一段通过 runtime.runtimer 查找并执行堆中须要执行的计时器:

  • 如果胜利执行,runtimer 返回 0;
  • 如果没有须要执行的计时器,runtimer 返回最近的计时器的触发工夫,记录这个工夫并返回。
if pp == getg().m.p.ptr() && int(atomic.Load(&pp.deletedTimers)) > len(pp.timers)/4 {clearDeletedTimers(pp)
    }

    unlock(&pp.timersLock)
    return rnow, pollUntil, ran
}

在最初的删除阶段,如果以后 Goroutine 的处理器和传入的处理器雷同,并且处理器中被删除(timerDeleted 状态)的计时器占堆中计时器的 1/4 以上,就会调用 runtime.clearDeletedTimers 清理处理器中全副被标记为 timerDeleted 的计时器。

即便是通过每次调度器调度和窃取的时候触发,但毕竟还是具备肯定的不确定性,因而 Go 中应用系统监控触发来做一个兜底:

func sysmon() {
    ...
    for {
        ...
        now := nanotime()
        next, _ := timeSleepUntil()
        ...
        lastpoll := int64(atomic.Load64(&sched.lastpoll))
        if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
            list := netpoll(0)
            if !list.empty() {incidlelocked(-1)
                injectglist(&list)
                incidlelocked(1)
            }
        }
        if next < now {startm(nil, false)
        }
        ...
}
  • 调用 runtime.timeSleepUntil 获取计时器的到期工夫以及持有该计时器的堆;
  • 如果超过 10ms 的工夫没有网络轮询,调用 runtime.netpoll 轮询;
  • 如果以后有应该运行的计时器没有执行,可能存在无奈被抢占的处理器,则启动新的线程解决计时器。
运行计时器

runtime.runtimer 函数会查看处理器四叉堆上最顶上的计时器,该函数也会解决计时器的删除和更新:

func runtimer(pp *p, now int64) int64 {
    for {t := pp.timers[0]
        switch s := atomic.Load(&t.status); s {
        case timerWaiting:
            if t.when > now {return t.when}

            runOneTimer(pp, t, now)
            return 0

        case timerDeleted:
            // 删除堆中的计时器
        case timerModifiedEarlier, timerModifiedLater:
            // 批改计时器的工夫
        case timerModifying:
            osyield()
        case timerNoStatus, timerRemoved:
            badTimer()
        case timerRunning, timerRemoving, timerMoving:
            badTimer()
        default:
            badTimer()}
    }
}

它会遵循以下的规定解决计时器:

  • timerNoStatus -> 解体:未初始化的计时器
  • timerWaiting -> timerWaiting
  • timerWaiting -> timerRunning -> timerNoStatus
  • timerWaiting -> timerRunning -> timerWaiting
  • timerModifying -> 期待状态扭转
  • timerModifiedXX -> timerMoving -> timerWaiting
  • timerDeleted -> timerRemoving -> timerRemoved
  • timerRunning -> 解体:并发调用该函数
  • timerRemoved、timerRemoving、timerMoving -> 解体:计时器堆不统一

如果处理器四叉堆顶部的计时器没有到触发工夫会间接返回,否则调用 runtime.runOneTimer 运行堆顶的计时器:

func runOneTimer(pp *p, t *timer, now int64) {
    f := t.f
    arg := t.arg
    seq := t.seq

    if t.period > 0 {
        delta := t.when - now
        t.when += t.period * (1 + -delta/t.period)
        siftdownTimer(pp.timers, 0)
        if !atomic.Cas(&t.status, timerRunning, timerWaiting) {badTimer()
        }
        updateTimer0When(pp)
    } else {dodeltimer0(pp)
        if !atomic.Cas(&t.status, timerRunning, timerNoStatus) {badTimer()
        }
    }

    unlock(&pp.timersLock)
    f(arg, seq)
    lock(&pp.timersLock)
}

依据计时器的 period 字段,上述函数会做出不同的解决:

  • 如果 period 字段大于 0,则代表它是一个 Ticker,须要周期性触发:

    • 批改计时器下一次触发的工夫并更新其在堆中的地位;
    • 将计时器的状态更新为 timerWaiting;
    • 调用 runtime.updateTimer0When 函数设置处理器的 timer0When 字段。
  • 如果 period 字段小于或者等于 0,阐明它是一个 Timer,只需触发一次即可:

    • 调用 runtime.dodeltimer0 函数删除计时器;
    • 将计时器的状态更新至 timerNoStatus。

在实现更新计时器后,上互斥锁,调用计时器的回调办法 f,传入相应参数。实现整个流程。

正文完
 0