在理论的利用中,咱们常常会须要在特定的提早后,或者定时去做某件事情。这时就须要用到定时器了,Go 语言提供了一次性定时器 time.Timer 和周期型定时器 time.Ticker。
如何应用
Timer 是一次性的定时器,通过指定的工夫后触发一个事件,这个事件通过其自身提供的 channel 进行告诉。与之相干的次要办法如下:
// 创立 Timer
func NewTimer(d Duration) *Timer
// 进行 Timer
func (t *Timer) Stop() bool
// 重置 Timer
func (t *Timer) Reset(d Duration) bool
// 创立 Timer,返回它的 channel
func After(d Duration) <-chan Time
// 创立一个提早执行 f 函数的 Timer
func AfterFunc(d Duration, f func()) *Timer
Timer 的次要应用场景有:
- 设定超时工夫;
- 提早执行某个办法。
Ticker 是周期型定时器,即周期性的触发一个事件,通过 Ticker 提供的管道将事件传递进来。次要办法有:
// 创立 Ticker
func NewTicker(d Duration) *Ticker
// 进行 Ticker,Ticker 在应用完后务必要开释,否则会产生资源泄露
func (t *Ticker) Stop()
// 启动一个匿名的 Ticker(无奈进行)func Tick(d Duration) <-chan Time
Ticker 的应用场景都和定时工作无关,例如定时进行聚合等批量解决。
实现原理
咱们先来看看 Timer 和 Ticker 的数据结构:
type Timer struct {
C <-chan Time
r runtimeTimer
}
type Ticker struct {
C <-chan Time
r runtimeTimer
}
发现二者截然不同,而且都蕴含有 runtimeTimer 字段,这个 runtimeTimer 才是定时器底层真正的数据结构。翻看 Timer 和 Ticker 的相干实现代码,与这两个构造自身相干的逻辑都很简略,真正简单的是底层 runtimeTimer 的设计与保护,这也是咱们要重点介绍的内容。
演进历史
Go 语言的计时器实现经验过很多个版本的迭代,到最新的版本为止,计时器的实现别离经验了以下几段历史:
- Go 1.9 版本之前,所有的计时器由全局惟一的四叉堆保护;
- Go 1.10 ~ 1.13 版本,全局应用 64 个四叉堆保护所有的计时器,每个处理器(P)创立的计时器会由对应的四叉堆保护;
- Go 1.14 版本之后,每个处理器独自治理计时器并通过网络轮询器触发。
在最开始的实现中,运行时创立的所有计时器都会退出到全局惟一的四叉堆中,而后有一个专门的协程 timerproc 来治理这些计时器,运行时会在计时器到期或者退出了更早的计时器时唤醒 timerproc 来解决。那这样一来,就会产生两个性能上的问题,第一个就是全局惟一的四叉堆带来的锁争用问题,第二个就是唤醒 timerproc 带来的上下文切换问题。
Go 1.10 版本中将全局的四叉堆宰割成了 64 个更小的四叉堆,这种分片的形式,升高了锁的粒度,解决了下面提到的第一个问题,然而第二个问题还是悬而未决。
在最新版本的实现中,所有的计时器都以最小四叉堆的模式存储在处理器 runtime.p 中,这样的设计形式让两个性能问题都迎刃而解。
数据结构
runtime.timer 是 Go 语言计时器的外部示意,每一个计时器都存储在对应处理器的最小四叉堆中,上面是运行时计时器对应的构造体:
type timer struct {
pp puintptr
when int64
period int64
f func(interface{}, uintptr)
arg interface{}
seq uintptr
nextwhen int64
status uint32
}
- pp:计时器所在的处理器 P 的指针地址。
- when:计时器被唤醒的工夫。
- period:计时器再次被唤醒的工夫距离,只有 Ticker 会用到。
- f:回调函数,每次在计时器被唤醒时都会调用。
- arg:回调函数 f 的参数。
- seq:回调函数 f 的参数,该参数仅在 netpoll 的利用场景下应用。
- nextwhen:当计时器状态为 timerModifiedXX 时,将会应用 nextwhen 的值设置到 when 字段上。
- status:计时器的以后状态值。
状态
现阶段计时器所蕴含的状态有上面几种:
状态 | 含意 |
---|---|
timerNoStatus | 计时器尚未设置状态 |
timerWaiting | 期待计时器启动 |
timerRunning | 运行计时器的回调办法 |
timerDeleted | 计时器曾经被删除,但依然在某些 P 的堆中 |
timerRemoving | 计时器正在被删除 |
timerRemoved | 计时器曾经进行,且不在任何 P 的堆中 |
timerModifying | 计时器正在被批改 |
timerModifiedEarlier | 计时器已被批改为更早的工夫 |
timerModifiedLater | 计时器已被批改为更晚的工夫 |
timerMoving | 计时器曾经被批改,正在被挪动 |
- 处于 timerRunning、timerRemoving、timerModifying 和 timerMoving 状态的工夫比拟短。
- 处于 timerWaiting、timerRunning、timerDeleted、timerRemoving、timerModifying、timerModifiedEarlier、timerModifiedLater 和 timerMoving 状态时计时器在处理器(P)的堆上。
- 处于 timerNoStatus 和 timerRemoved 状态时计时器不在堆上。
- 处于 timerModifiedEarlier 和 timerModifiedLater 状态时计时器尽管在堆上,然而可能位于谬误的地位上,须要从新排序。
相干操作
增加计时器
当咱们调用 time.NewTimer 或 time.NewTicker 时,会执行 runtime.addtimer 函数增加计时器:
func addtimer(t *timer) {
if t.when < 0 {t.when = maxWhen}
if t.status != timerNoStatus {throw("addtimer called with initialized timer")
}
t.status = timerWaiting
when := t.when
pp := getg().m.p.ptr()
lock(&pp.timersLock)
cleantimers(pp)
doaddtimer(pp, t)
unlock(&pp.timersLock)
wakeNetPoller(when)
}
- 边界解决以及状态判断;
- 调用 cleantimers 清理处理器中的计时器;
- 调用 doaddtimer 初始化网络轮询器,并将以后计时器退出处理器的 timers 四叉堆中;
- 调用 wakeNetPoller 中断正在阻塞的网络轮询,依据工夫判断是否须要唤醒网络轮询器中休眠的线程。
删除计时器
在计时器的应用中,个别会调用 timer.Stop() 办法来进行计时器,实质上就是让这个 timer 从轮询器中隐没,也就是从处理器 P 的堆中移除 timer:
func deltimer(t *timer) bool {
for {switch s := atomic.Load(&t.status); s {
case timerWaiting, timerModifiedLater:
// timerWaiting/timerModifiedLater -> timerDeleted
...
case timerModifiedEarlier:
// timerModifiedEarlier -> timerModifying -> timerDeleted
...
case timerDeleted, timerRemoving, timerRemoved:
// timerDeleted/timerRemoving/timerRemoved
return false
case timerRunning, timerMoving:
// timerRunning/timerMoving
osyield()
case timerNoStatus:
return false
case timerModifying:
osyield()
default:
badTimer()}
}
}
在 deltimer 中遵循了根本的规定解决:
- timerWaiting/timerModifiedLater -> timerDeleted。
- timerModifiedEarlier -> timerModifying -> timerDeleted。
- timerDeleted/timerRemoving/timerRemoved -> 无需变更,曾经满足条件。
- timerRunning/timerMoving/timerModifying -> 正在执行、挪动中,无奈进行,期待下一次状态查看再解决。
- timerNoStatus -> 无奈进行,不满足条件。
批改计时器
在咱们调用 timer.Reset 办法来从新设置 Duration 值的时候,咱们就是在对底层的计时器进行批改,对应的是 runtime.modtimer 办法。这个办法比较复杂,就不具体介绍了,有趣味的能够本人钻研一下。modtimer 遵循下述规定解决:
- timerWaiting -> timerModifying -> timerModifiedXX。
- timerModifiedXX -> timerModifying -> timerModifiedYY。
- timerNoStatus -> timerModifying -> timerWaiting。
- timerRemoved -> timerModifying -> timerWaiting。
- timerDeleted -> timerModifying -> timerModifiedXX。
- timerRunning -> 期待状态扭转,才能够进行下一步。
- timerMoving -> 期待状态扭转,才能够进行下一步。
- timerRemoving -> 期待状态扭转,才能够进行下一步。
- timerModifying -> 期待状态扭转,才能够进行下一步。
在实现了计时器的状态解决后,会分为两种状况解决:
- 待批改的计时器曾经被删除:因为既有的计时器曾经没有了,因而会调用 doaddtimer 办法创立一个新的计时器,并将本来的 timer 属性赋值过来,再调用 wakeNetPoller 办法在预约工夫唤醒网络轮询器。
- 失常逻辑解决:如果批改后的计时器的触发工夫小于本来的触发工夫,则批改该计时器的状态为 timerModifiedEarlier,并且调用 wakeNetPoller 办法在预约工夫唤醒网络轮询器。
触发计时器
Go 语言会在两种场景下触发计时器,运行计时器中保留的函数:
- 调度器调度时会查看处理器中的计时器是否准备就绪;
- 系统监控会查看是否有未执行的到期计时器。
调度器的触发一共分两种状况,一种是在调度循环 schedule 中,另一种是以后处理器 P 没有可执行的 G 和计时器,去其余 P 窃取计时器和 G 的 findrunnable 函数中。触发计时器时执行的是 checkTimers 函数,来分析一下它的大抵过程。
func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {if atomic.Load(&pp.adjustTimers) == 0 {next := int64(atomic.Load64(&pp.timer0When))
if next == 0 {return now, 0, false}
if now == 0 {now = nanotime()
}
if now < next {if pp != getg().m.p.ptr() || int(atomic.Load(&pp.deletedTimers)) <= int(atomic.Load(&pp.numTimers)/4) {return now, next, false}
}
}
lock(&pp.timersLock)
adjusttimers(pp)
这一段是调整堆中计时器的过程:
-
起始先通过 pp.adjustTimers 查看以后处理器 P 中是否有须要调整的计时器,如果没有的话:
- 当没有须要执行的计时器时,间接返回;
- 当下一个计时器没有到期并且须要删除的计时器不多于总数的 1/4 时都会间接返回。
- 如果处理器中存在须要调整的计时器,会调用 runtime.adjusttimers 依据工夫将 timers 切片重新排列。
rnow = now
if len(pp.timers) > 0 {
if rnow == 0 {rnow = nanotime()
}
for len(pp.timers) > 0 {if tw := runtimer(pp, rnow); tw != 0 {
if tw > 0 {pollUntil = tw}
break
}
ran = true
}
}
执行完调整阶段的逻辑后,就是运行计时器的代码。这一段通过 runtime.runtimer 查找并执行堆中须要执行的计时器:
- 如果胜利执行,runtimer 返回 0;
- 如果没有须要执行的计时器,runtimer 返回最近的计时器的触发工夫,记录这个工夫并返回。
if pp == getg().m.p.ptr() && int(atomic.Load(&pp.deletedTimers)) > len(pp.timers)/4 {clearDeletedTimers(pp)
}
unlock(&pp.timersLock)
return rnow, pollUntil, ran
}
在最初的删除阶段,如果以后 Goroutine 的处理器和传入的处理器雷同,并且处理器中被删除(timerDeleted 状态)的计时器占堆中计时器的 1/4 以上,就会调用 runtime.clearDeletedTimers 清理处理器中全副被标记为 timerDeleted 的计时器。
即便是通过每次调度器调度和窃取的时候触发,但毕竟还是具备肯定的不确定性,因而 Go 中应用系统监控触发来做一个兜底:
func sysmon() {
...
for {
...
now := nanotime()
next, _ := timeSleepUntil()
...
lastpoll := int64(atomic.Load64(&sched.lastpoll))
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
list := netpoll(0)
if !list.empty() {incidlelocked(-1)
injectglist(&list)
incidlelocked(1)
}
}
if next < now {startm(nil, false)
}
...
}
- 调用 runtime.timeSleepUntil 获取计时器的到期工夫以及持有该计时器的堆;
- 如果超过 10ms 的工夫没有网络轮询,调用 runtime.netpoll 轮询;
- 如果以后有应该运行的计时器没有执行,可能存在无奈被抢占的处理器,则启动新的线程解决计时器。
运行计时器
runtime.runtimer 函数会查看处理器四叉堆上最顶上的计时器,该函数也会解决计时器的删除和更新:
func runtimer(pp *p, now int64) int64 {
for {t := pp.timers[0]
switch s := atomic.Load(&t.status); s {
case timerWaiting:
if t.when > now {return t.when}
runOneTimer(pp, t, now)
return 0
case timerDeleted:
// 删除堆中的计时器
case timerModifiedEarlier, timerModifiedLater:
// 批改计时器的工夫
case timerModifying:
osyield()
case timerNoStatus, timerRemoved:
badTimer()
case timerRunning, timerRemoving, timerMoving:
badTimer()
default:
badTimer()}
}
}
它会遵循以下的规定解决计时器:
- timerNoStatus -> 解体:未初始化的计时器
- timerWaiting -> timerWaiting
- timerWaiting -> timerRunning -> timerNoStatus
- timerWaiting -> timerRunning -> timerWaiting
- timerModifying -> 期待状态扭转
- timerModifiedXX -> timerMoving -> timerWaiting
- timerDeleted -> timerRemoving -> timerRemoved
- timerRunning -> 解体:并发调用该函数
- timerRemoved、timerRemoving、timerMoving -> 解体:计时器堆不统一
如果处理器四叉堆顶部的计时器没有到触发工夫会间接返回,否则调用 runtime.runOneTimer 运行堆顶的计时器:
func runOneTimer(pp *p, t *timer, now int64) {
f := t.f
arg := t.arg
seq := t.seq
if t.period > 0 {
delta := t.when - now
t.when += t.period * (1 + -delta/t.period)
siftdownTimer(pp.timers, 0)
if !atomic.Cas(&t.status, timerRunning, timerWaiting) {badTimer()
}
updateTimer0When(pp)
} else {dodeltimer0(pp)
if !atomic.Cas(&t.status, timerRunning, timerNoStatus) {badTimer()
}
}
unlock(&pp.timersLock)
f(arg, seq)
lock(&pp.timersLock)
}
依据计时器的 period 字段,上述函数会做出不同的解决:
-
如果 period 字段大于 0,则代表它是一个 Ticker,须要周期性触发:
- 批改计时器下一次触发的工夫并更新其在堆中的地位;
- 将计时器的状态更新为 timerWaiting;
- 调用 runtime.updateTimer0When 函数设置处理器的 timer0When 字段。
-
如果 period 字段小于或者等于 0,阐明它是一个 Timer,只需触发一次即可:
- 调用 runtime.dodeltimer0 函数删除计时器;
- 将计时器的状态更新至 timerNoStatus。
在实现更新计时器后,上互斥锁,调用计时器的回调办法 f,传入相应参数。实现整个流程。