关于golang:时间轮与gozero中时间轮的实现解析

62次阅读

共计 8067 个字符,预计需要花费 21 分钟才能阅读完成。

工夫轮

用于提早工作的调度。

场景

设想这么一个场景,我须要保护一个连接池,连接池中的连贯是有超时工夫的,连贯会放弃肯定频率发送心跳包比方 5s 发一次,30s 内如果没有收到 keep-alive 就会过期,到过期工夫的 conn 断开连接,如何去设计这个连接池?

场景形象

这个场景能够了解为,我收到一个申请之后,在提早 30 秒后须要执行一个动作,并且如果在 30s 内收到同样的申请,就把这个工作再推延 30s,咱们应该怎么做呢?

可行解

那假如我的连接池最大连接数是 1000,简略的办法是每个连贯保护一个最近的 keep-alive 工夫,启动一个定时器,每秒去遍历一次所有连贯,到工夫了就断开连接。收到心跳时,更新这个连贯的工夫,那如果有 10000 个连贯呢?每秒就要遍历 10000 次能力确定哪些工作要删掉,这种形式是很浪费资源的。

更优解

应用工夫轮算法,工夫轮算法的根本思维是将提早工作扩散,不在一个中央去保护,防止因为都放在一个中央,每次都要进行遍历的损失。
怎么把工作进行扩散呢?依照提早工作的最大工夫限度和 timer 执行的工夫,比方说我最大反对提早 60 秒,每秒都扫一次,那我就能够建设一个数组叫做 timeWheel,timeWheel 上放的是这个工夫到期的连贯组成的数组 connList,保护一个 cur_index,指向 timeWheel 的某一个 index。
执行的流程是从这个 index 开始向后遍历,每秒挪动一个 index,到最大 index 的时候返回到 0 持续循环,每次挪动都断开 index 外面所有连贯。
接管到心跳时,把这个连贯从以后的 timeWheel 的 index 中去掉,index 减少 30 后对 60 取模保障不越界。

优化:尽管每秒做删除的时候不必扫数组了,然而这样的话,收到心跳的时候就要扫数组了,那怎么优化收到心跳时扫数组的操作呢?从 timeWheel 取连贯的过程能够通过 map 做 conn 到 timeWheel 中 index 的索引,不用查找整个工夫轮。在 conn 批改 timeWheel 的 index,

图解

缺点

这个场景中,最大延迟时间是 60s,所以一个大小为 60 的 timeWheel 就够了,那如果是一天呢?一天是 86400s,就须要用到 86400 大小的 timeWheel 了。能够通过分层来做优化。

多层工夫轮

多层工夫轮是单层工夫轮的优化计划,用于缓解单层工夫轮在工夫范畴大,跨度小的状况下,timeWheel 的大小也须要增大的状况。

场景

把下面的场景的工夫范畴裁减到 7 天,86400s*7

怎么分层

对于这种场景,咱们能够分为 4 层工夫轮。
第一层 timeWheel1 示意秒,大小为 60,每个元素示意 1s。
第二层 timeWheel2 示意分,大小为 60,每个元素示意 1m。
第三层 timeWheel3 示意小时,大小为 24,每个元素示意 1h。
第四层 timeWheel4 示意天,大小为 7,每个元素示意 1d。

执行流程

前提设定

假如以后 timeWheel 的状态如下
timeWheel1 的 60 个 index 都为空,cur_index1 在 index0 地位
timeWheel2 的 60 个 index 都为空,cur_index2 在 index0 地位
timeWheel3 的 24 个 index 都为空,cur_index3 在 index0 地位
timeWheel4 的 7 歌 index 都为空,cur_index4 在 index0 地位

工作增加流程

接管到一个工作 1,须要提早 7100s 之后执行,
计算是否能够落到 timeWheel1 中。7100 比 timeWheel1 的最大范畴 60 大,不能放在 timeWheel1 中。
计算是否能够落到 timeWheel2 中。7100/60 = 118.333 向下取整 118,118 比 timeWheel2 的最大范畴 60 大。不能放在 timeWheel2 中。
计算是否能够落到 timeWheel3 中。$7100/(60*60) = 1.97222$,1.97222 向下取整得 1,1 比 timeWheel3 的最大范畴 24 小,能够放在 timeWheel3 中。计算下一级残余多少工夫:$7100 % (60*60) = 3500$,还剩下 3500s,存储在 cur_index3+ 1 这个地位上,同时保留残余的工夫 3500。
这也就是工夫轮的降级过程。

工夫轮运行流程

运行的时候波及到工夫轮的降级过程。
以四层环为例
一个简略暴力的实现思路是,提前定义有多少层环,有多少层环,就开多少个定时器。
下层定时器到工夫,把对应 cur_index 中的工作放到上层定时器中,这种须要进行这几个环的一个解决程序做同步,不然可能呈现下层放到上层的时候,上层曾经走过了这个地位的状况。

另一个实现形式是只有一个定时器,先走最底层,最底层走完一圈的时候走下层,通过下层这个 cur_index 中的工作的剩余时间来判断,没有剩余时间的立刻执行,有剩余时间的放到底层的。如果这一层也跑了一圈,再去上一层取工作。以此类推,实现较为简单。

go-zero 中的实现是 2 层循环,能够保护了一个底层 slot 和一个循环圈数 circle,每次扫描的数量尽管多了,然而实现简略,能够在上面的源码中参考一下。

工夫轮在 go 中的实现

Go 语言中工夫轮的实现
齐全兼容 golang 定时器的高性能工夫轮实现(go-timewheel)
go-zero 中工夫轮的实现

go-zero 中工夫轮源码正文

go-zero 中的 timeWheel 用于程序外部缓存 cache 的过期清理操作。以下只关怀工夫轮的实现形式。

// 工夫轮构造体
type TimingWheel struct {
        interval      time.Duration // 工夫轮每个槽位的工夫距离
        ticker        timex.Ticker // 定时器
        slots         []*list.List // 槽位数组
        timers        *SafeMap // 一个避免内存透露的 map,用来装什么的?tickedPos     int // 当初指向的槽位
        numSlots      int // 槽数
        execute       Execute // 到工夫执行的函数
        // 对外提供的办法,通过 channel 来与 timeWheel 交互
        // 这种交互方式的益处大略是对于不关怀后果的调用方起到解耦成果,不须要同步期待
        setChannel    chan timingEntry // 设置接口
        moveChannel   chan baseEntry // 挪动接口
        removeChannel chan interface{} // 删除接口
        drainChannel  chan func(key, value interface{}) //
        stopChannel   chan lang.PlaceholderType // 完结接口
}

// 工夫轮元素构造体
type timingEntry struct {
        baseEntry
        value   interface{}
        circle  int
        diff    int
        removed bool
}

// 根底字段
type baseEntry struct {
        delay time.Duration
        key   interface{}}

//key 到 slot 的映射,以及一个 timingEntry 元素指针,// 用于通过 key 在 timers 中疾速查找到元素
type positionEntry struct {
        pos  int
        item *timingEntry
}

// 提早工作字段
type timingTask struct {key   interface{}
        value interface{}}



// 办法定义
// 以下办法是对外提供的办法,通过 channel 发送信号,run 办法中监听各个 channel,收到信号执行相应的办法
// 立即执行所有工作
func (tw *TimingWheel) Drain(fn func(key, value interface{})) 
// 工作没到工夫就把工作的延迟时间更新。到工夫就间接执行。func (tw *TimingWheel) MoveTimer(key interface{}, delay time.Duration)
// 移除一个工作
func (tw *TimingWheel) RemoveTimer(key interface{})
// 减少一个工作
func (tw *TimingWheel) SetTimer(key, value interface{}, delay time.Duration)
// 进行工夫轮
func (tw *TimingWheel) Stop()

// 以下办法是真正执行的办法
func (tw *TimingWheel) drainAll(fn func(key, value interface{}))
//pos 用 延迟时间 / 槽位示意的工夫 先计算出往后数第几个槽,思考溢出和以后的槽位偏移,最终的 pos = (tickedPos+d/interval)%numSlots
//circle = (d/interval-1)/numSlots
func (tw *TimingWheel) getPositionAndCircle(d time.Duration) (pos, circle int)
func (tw *TimingWheel) initSlots()
func (tw *TimingWheel) moveTask(task baseEntry)
func (tw *TimingWheel) onTick()
func (tw *TimingWheel) removeTask(key interface{})
func (tw *TimingWheel) removeTask(key interface{})
func (tw *TimingWheel) run()
func (tw *TimingWheel) runTasks(tasks []timingTask)
func (tw *TimingWheel) scanAndRunTasks(l *list.List)

// 
func newTimingWheelWithClock(interval time.Duration, numSlots int, execute Execute, ticker timex.Ticker) (*TimingWheel, error) {
    tw := &TimingWheel{
        interval:      interval,
        ticker:        ticker,
        slots:         make([]*list.List, numSlots),
        timers:        NewSafeMap(),
        tickedPos:     numSlots - 1, // at previous virtual circle
        execute:       execute,
        numSlots:      numSlots,
        setChannel:    make(chan timingEntry),
        moveChannel:   make(chan baseEntry),
        removeChannel: make(chan interface{}),
        drainChannel:  make(chan func(key, value interface{})),
        stopChannel:   make(chan lang.PlaceholderType),
    }

    tw.initSlots()
    go tw.run()

    return tw, nil
}

// Drain drains all items and executes them.
func (tw *TimingWheel) Drain(fn func(key, value interface{})) {tw.drainChannel <- fn}

//run 办法,监听所有 channel,在 newTimeWheel 的时候就启动了。func (tw *TimingWheel) run() {
    for {
        select {case <-tw.ticker.Chan(): // 到工夫,执行对应 slot 上须要执行的工作。tw.onTick()
        case task := <-tw.setChannel: // 往工夫轮上增加一个工作
            tw.setTask(&task)
        case key := <-tw.removeChannel: // 从工夫轮上删除一个工作
            tw.removeTask(key)
        case task := <-tw.moveChannel: // 更新一个工夫轮上工作的执行工夫
            tw.moveTask(task)
        case fn := <-tw.drainChannel:
            tw.drainAll(fn)
        case <-tw.stopChannel:
            tw.ticker.Stop()
            return
        }
    }
}

// 先看减少,也就是 setTask 办法
// 调用://  run -> setTask
// 逻辑://  从 map 索引中确定,这个工作是否曾经存在了
//      存在的话就通过 moveTask 挪动这个工作的地位
//      不存在的话,就计算出工作在环中绝对于以后的 ticked 的定位,以及要转的圈数 circle,将工作放在环上,并且保护 map 索引
func (tw *TimingWheel) setTask(task *timingEntry) {
    if task.delay < tw.interval {task.delay = tw.interval}

    if val, ok := tw.timers.Get(task.key); ok {entry := val.(*positionEntry)
        entry.item.value = task.value
        tw.moveTask(task.baseEntry)
    } else {pos, circle := tw.getPositionAndCircle(task.delay)
        task.circle = circle
        tw.slots[pos].PushBack(task)
        tw.setTimerPosition(pos, task)
    }
}


// 减少看完了,再看一下是怎么执行的,假如曾经扫到了这个工作所在的 slot,// 先保护一下曾经扫到的地位,而后从 slot 中拿出对应的 list,扔到 scanAndRunTask 办法中执行。func (tw *TimingWheel) onTick() {tw.tickedPos = (tw.tickedPos + 1) % tw.numSlots
    l := tw.slots[tw.tickedPos]
    tw.scanAndRunTasks(l)
}

// 次要看 scanAndRunTask 办法,这个办法是真正在拿到 list 之后做的操作
// 逻辑://  遍历整个 list,先革除被删掉工作,再将循环圈数不为 0 的工作的圈数 -1,//  剩下的是圈数为 0 的无效工作,思考到有更新操作,更新操作的 pos 会推延到真正要执行的时候做,所以还要依据 diff 再看一下是不是一个被更新的操作。//  最初后面这些都被过滤掉,剩下来的工作就是这次 scan 要执行的工作,把他们退出到执行队列中,通过 runTask 办法并发执行,这个办法中会管制并发数。func (tw *TimingWheel) scanAndRunTasks(l *list.List) {var tasks []timingTask

    for e := l.Front(); e != nil; {task := e.Value.(*timingEntry)
        if task.removed {next := e.Next()
            l.Remove(e)
            e = next
            continue
        } else if task.circle > 0 {
            task.circle--
            e = e.Next()
            continue
        } else if task.diff > 0 {next := e.Next()
            l.Remove(e)
            // (tw.tickedPos+task.diff)%tw.numSlots
            // cannot be the same value of tw.tickedPos
            pos := (tw.tickedPos + task.diff) % tw.numSlots
            tw.slots[pos].PushBack(task)
            tw.setTimerPosition(pos, task)
            task.diff = 0
            e = next
            continue
        }

        tasks = append(tasks, timingTask{
            key:   task.key,
            value: task.value,
        })
        next := e.Next()
        l.Remove(e)
        tw.timers.Del(task.key)
        e = next
    }

    tw.runTasks(tasks)
}



// 更新 timeWheel 中已存在的工作的延迟时间
// 调用://    run -> setTask -> moveTask 在 setTask 中判断如果有这个 key 就 moveTask
//    run -> moveTask
func (tw *TimingWheel) moveTask(task baseEntry) {val, ok := tw.timers.Get(task.key)
    if !ok {return}

    timer := val.(*positionEntry)
    // 如果 task 设置的延迟时间太小了,那就间接执行
    if task.delay < tw.interval {threading.GoSafe(func() {tw.execute(timer.item.key, timer.item.value)
        })
        return
    }

    // 没到工夫,须要扭转地位,依据新的延迟时间计算出新的定位和 circle
    pos, circle := tw.getPositionAndCircle(task.delay)
    // 依据 pos 和 circle 还有旧数据,批改 task 的信息,做一些标记,在扫描到这个 task 的时候再真正批改和从新定位。// 提早这些更改的益处是,如果某些 key 频繁改变的话,不须要频繁进行重定位操作,重定位操作须要保障并发平安。if pos >= timer.pos { 
        // 新 pos 大于等于旧 pos,更新工作的 circle,pos 还是用旧的,而是把工作的 diff 更新为新 pos- 旧 pos,这里是为什么?// 思考场景,先触发 tick,再触发 move,因为 tick 运行工作的时候是 go 进来的,go 进来的工作正在执行,这时候如果来 move 申请,就会有并发问题
        // 这里记录他的 diff 是标记曾经被改过了,下次跑到这个工作的时候就会触发 pos 更新。timer.item.circle = circle
        timer.item.diff = pos - timer.pos
    } else if circle > 0 {
        //pos 提前了,不在这一圈触发,得算一下 diff 偏移量和走多少圈
        circle-- // 把 diff 的一圈扣掉
        timer.item.circle = circle 
        // 算 diff 把 circle 扣的一圈加回来 假如新 pos 是 1,旧 pos 是 2,num 是 5,diff 就是 4,示意 2+4=6,6%5=1,计算的时候,这里就会计算为 1
        timer.item.diff = tw.numSlots + pos - timer.pos 
    } else {
        //pos 提前了,并且就是这次循环,删除旧的增加新的
        // 这里是不是没有思考到并发的状况?如果正在执行这个工作,那这里会不会有问题?// 如果思考就算执行完,也要生成新的继续执行,是正当的。// 不思考并发,不思考雷同 key 只执行一次,这里是能够的,如果要思考的话,这里可能会被执行两次
        timer.item.removed = true
        newItem := &timingEntry{
            baseEntry: task,
            value:     timer.item.value,
        }
        tw.slots[pos].PushBack(newItem)
        tw.setTimerPosition(pos, newItem)
    }
}


// 最初看一下是怎么删除的,其实这外面只是通过索引找到这个 task,而后把 task 标记为删除。// 下面看 scanAndRunTask 办法的时候曾经看到了解决被删除的节点的逻辑
func (tw *TimingWheel) removeTask(key interface{}) {val, ok := tw.timers.Get(key)
    if !ok {return}

    timer := val.(*positionEntry)
    timer.item.removed = true
    tw.timers.Del(key)
}

正文完
 0