工夫轮
用于提早工作的调度。
场景
设想这么一个场景,我须要保护一个连接池,连接池中的连贯是有超时工夫的,连贯会放弃肯定频率发送心跳包比方5s发一次,30s内如果没有收到keep-alive就会过期,到过期工夫的conn断开连接,如何去设计这个连接池?
场景形象
这个场景能够了解为,我收到一个申请之后,在提早30秒后须要执行一个动作,并且如果在30s内收到同样的申请,就把这个工作再推延30s,咱们应该怎么做呢?
可行解
那假如我的连接池最大连接数是1000,简略的办法是每个连贯保护一个最近的keep-alive工夫,启动一个定时器,每秒去遍历一次所有连贯,到工夫了就断开连接。收到心跳时,更新这个连贯的工夫,那如果有10000个连贯呢?每秒就要遍历10000次能力确定哪些工作要删掉,这种形式是很浪费资源的。
更优解
应用工夫轮算法,工夫轮算法的根本思维是将提早工作扩散,不在一个中央去保护,防止因为都放在一个中央,每次都要进行遍历的损失。
怎么把工作进行扩散呢?依照提早工作的最大工夫限度和timer执行的工夫,比方说我最大反对提早60秒,每秒都扫一次,那我就能够建设一个数组叫做timeWheel,timeWheel上放的是这个工夫到期的连贯组成的数组connList,保护一个cur_index,指向timeWheel的某一个index。
执行的流程是从这个index开始向后遍历,每秒挪动一个index,到最大index的时候返回到0持续循环,每次挪动都断开index外面所有连贯。
接管到心跳时,把这个连贯从以后的timeWheel的index中去掉,index减少30后对60取模保障不越界。
优化:尽管每秒做删除的时候不必扫数组了,然而这样的话,收到心跳的时候就要扫数组了,那怎么优化收到心跳时扫数组的操作呢?从timeWheel取连贯的过程能够通过map做conn到timeWheel中index的索引,不用查找整个工夫轮。在conn批改timeWheel的index,
图解
缺点
这个场景中,最大延迟时间是60s,所以一个大小为60的timeWheel就够了,那如果是一天呢?一天是86400s,就须要用到86400大小的timeWheel了。能够通过分层来做优化。
多层工夫轮
多层工夫轮是单层工夫轮的优化计划,用于缓解单层工夫轮在工夫范畴大,跨度小的状况下,timeWheel的大小也须要增大的状况。
场景
把下面的场景的工夫范畴裁减到7天,86400s*7
怎么分层
对于这种场景,咱们能够分为4层工夫轮。
第一层timeWheel1示意秒,大小为60,每个元素示意1s。
第二层timeWheel2示意分,大小为60,每个元素示意1m。
第三层timeWheel3示意小时,大小为24,每个元素示意1h。
第四层timeWheel4示意天,大小为7,每个元素示意1d。
执行流程
前提设定
假如以后timeWheel的状态如下
timeWheel1的60个index都为空,cur_index1在index0地位
timeWheel2的60个index都为空,cur_index2在index0地位
timeWheel3的24个index都为空,cur_index3在index0地位
timeWheel4的7歌index都为空,cur_index4在index0地位
工作增加流程
接管到一个工作1,须要提早7100s之后执行,
计算是否能够落到timeWheel1中。7100比timeWheel1的最大范畴60大,不能放在timeWheel1中。
计算是否能够落到timeWheel2中。7100/60 = 118.333向下取整118,118比timeWheel2的最大范畴60大。不能放在timeWheel2中。
计算是否能够落到timeWheel3中。$7100/(60*60) = 1.97222$,1.97222向下取整得1,1比timeWheel3的最大范畴24小,能够放在timeWheel3中。计算下一级残余多少工夫:$7100 % (60*60) = 3500$,还剩下3500s,存储在cur_index3+1这个地位上,同时保留残余的工夫3500。
这也就是工夫轮的降级过程。
工夫轮运行流程
运行的时候波及到工夫轮的降级过程。
以四层环为例
一个简略暴力的实现思路是,提前定义有多少层环,有多少层环,就开多少个定时器。
下层定时器到工夫,把对应cur_index中的工作放到上层定时器中,这种须要进行这几个环的一个解决程序做同步,不然可能呈现下层放到上层的时候,上层曾经走过了这个地位的状况。
另一个实现形式是只有一个定时器,先走最底层,最底层走完一圈的时候走下层,通过下层这个cur_index中的工作的剩余时间来判断,没有剩余时间的立刻执行,有剩余时间的放到底层的。如果这一层也跑了一圈,再去上一层取工作。以此类推,实现较为简单。
go-zero中的实现是2层循环,能够保护了一个底层slot和一个循环圈数circle,每次扫描的数量尽管多了,然而实现简略,能够在上面的源码中参考一下。
工夫轮在go中的实现
Go语言中工夫轮的实现
齐全兼容golang定时器的高性能工夫轮实现(go-timewheel)
go-zero中工夫轮的实现
go-zero中工夫轮源码正文
go-zero中的timeWheel用于程序外部缓存cache的过期清理操作。以下只关怀工夫轮的实现形式。
// 工夫轮构造体type TimingWheel struct { interval time.Duration //工夫轮每个槽位的工夫距离 ticker timex.Ticker //定时器 slots []*list.List //槽位数组 timers *SafeMap //一个避免内存透露的map,用来装什么的? tickedPos int //当初指向的槽位 numSlots int //槽数 execute Execute //到工夫执行的函数 // 对外提供的办法,通过channel来与timeWheel交互 // 这种交互方式的益处大略是对于不关怀后果的调用方起到解耦成果,不须要同步期待 setChannel chan timingEntry //设置接口 moveChannel chan baseEntry //挪动接口 removeChannel chan interface{} //删除接口 drainChannel chan func(key, value interface{}) // stopChannel chan lang.PlaceholderType //完结接口}//工夫轮元素构造体type timingEntry struct { baseEntry value interface{} circle int diff int removed bool}//根底字段type baseEntry struct { delay time.Duration key interface{}}//key到slot的映射,以及一个timingEntry元素指针,//用于通过key在timers中疾速查找到元素type positionEntry struct { pos int item *timingEntry}//提早工作字段type timingTask struct { key interface{} value interface{}}//办法定义//以下办法是对外提供的办法,通过channel发送信号,run办法中监听各个channel,收到信号执行相应的办法//立即执行所有工作func (tw *TimingWheel) Drain(fn func(key, value interface{})) //工作没到工夫就把工作的延迟时间更新。到工夫就间接执行。func (tw *TimingWheel) MoveTimer(key interface{}, delay time.Duration)//移除一个工作func (tw *TimingWheel) RemoveTimer(key interface{})//减少一个工作func (tw *TimingWheel) SetTimer(key, value interface{}, delay time.Duration)//进行工夫轮func (tw *TimingWheel) Stop()//以下办法是真正执行的办法func (tw *TimingWheel) drainAll(fn func(key, value interface{}))//pos用 延迟时间/槽位示意的工夫 先计算出往后数第几个槽,思考溢出和以后的槽位偏移,最终的pos = (tickedPos+d/interval)%numSlots//circle = (d/interval-1)/numSlotsfunc (tw *TimingWheel) getPositionAndCircle(d time.Duration) (pos, circle int)func (tw *TimingWheel) initSlots()func (tw *TimingWheel) moveTask(task baseEntry)func (tw *TimingWheel) onTick()func (tw *TimingWheel) removeTask(key interface{})func (tw *TimingWheel) removeTask(key interface{})func (tw *TimingWheel) run()func (tw *TimingWheel) runTasks(tasks []timingTask)func (tw *TimingWheel) scanAndRunTasks(l *list.List)// func newTimingWheelWithClock(interval time.Duration, numSlots int, execute Execute, ticker timex.Ticker) ( *TimingWheel, error) { tw := &TimingWheel{ interval: interval, ticker: ticker, slots: make([]*list.List, numSlots), timers: NewSafeMap(), tickedPos: numSlots - 1, // at previous virtual circle execute: execute, numSlots: numSlots, setChannel: make(chan timingEntry), moveChannel: make(chan baseEntry), removeChannel: make(chan interface{}), drainChannel: make(chan func(key, value interface{})), stopChannel: make(chan lang.PlaceholderType), } tw.initSlots() go tw.run() return tw, nil}// Drain drains all items and executes them.func (tw *TimingWheel) Drain(fn func(key, value interface{})) { tw.drainChannel <- fn}//run办法,监听所有channel,在newTimeWheel的时候就启动了。func (tw *TimingWheel) run() { for { select { case <-tw.ticker.Chan(): //到工夫,执行对应slot上须要执行的工作。 tw.onTick() case task := <-tw.setChannel: //往工夫轮上增加一个工作 tw.setTask(&task) case key := <-tw.removeChannel: //从工夫轮上删除一个工作 tw.removeTask(key) case task := <-tw.moveChannel: //更新一个工夫轮上工作的执行工夫 tw.moveTask(task) case fn := <-tw.drainChannel: tw.drainAll(fn) case <-tw.stopChannel: tw.ticker.Stop() return } }}//先看减少,也就是setTask办法//调用:// run -> setTask//逻辑:// 从map索引中确定,这个工作是否曾经存在了// 存在的话就通过moveTask挪动这个工作的地位// 不存在的话,就计算出工作在环中绝对于以后的ticked的定位,以及要转的圈数circle,将工作放在环上,并且保护map索引func (tw *TimingWheel) setTask(task *timingEntry) { if task.delay < tw.interval { task.delay = tw.interval } if val, ok := tw.timers.Get(task.key); ok { entry := val.(*positionEntry) entry.item.value = task.value tw.moveTask(task.baseEntry) } else { pos, circle := tw.getPositionAndCircle(task.delay) task.circle = circle tw.slots[pos].PushBack(task) tw.setTimerPosition(pos, task) }}// 减少看完了,再看一下是怎么执行的,假如曾经扫到了这个工作所在的slot,// 先保护一下曾经扫到的地位,而后从slot中拿出对应的list,扔到scanAndRunTask办法中执行。func (tw *TimingWheel) onTick() { tw.tickedPos = (tw.tickedPos + 1) % tw.numSlots l := tw.slots[tw.tickedPos] tw.scanAndRunTasks(l)}// 次要看scanAndRunTask办法,这个办法是真正在拿到list之后做的操作//逻辑:// 遍历整个list,先革除被删掉工作,再将循环圈数不为0的工作的圈数-1,// 剩下的是圈数为0的无效工作,思考到有更新操作,更新操作的pos会推延到真正要执行的时候做,所以还要依据diff再看一下是不是一个被更新的操作。// 最初后面这些都被过滤掉,剩下来的工作就是这次scan要执行的工作,把他们退出到执行队列中,通过runTask办法并发执行,这个办法中会管制并发数。func (tw *TimingWheel) scanAndRunTasks(l *list.List) { var tasks []timingTask for e := l.Front(); e != nil; { task := e.Value.(*timingEntry) if task.removed { next := e.Next() l.Remove(e) e = next continue } else if task.circle > 0 { task.circle-- e = e.Next() continue } else if task.diff > 0 { next := e.Next() l.Remove(e) // (tw.tickedPos+task.diff)%tw.numSlots // cannot be the same value of tw.tickedPos pos := (tw.tickedPos + task.diff) % tw.numSlots tw.slots[pos].PushBack(task) tw.setTimerPosition(pos, task) task.diff = 0 e = next continue } tasks = append(tasks, timingTask{ key: task.key, value: task.value, }) next := e.Next() l.Remove(e) tw.timers.Del(task.key) e = next } tw.runTasks(tasks)}// 更新timeWheel中已存在的工作的延迟时间// 调用:// run -> setTask -> moveTask 在setTask中判断如果有这个key就moveTask// run -> moveTaskfunc (tw *TimingWheel) moveTask(task baseEntry) { val, ok := tw.timers.Get(task.key) if !ok { return } timer := val.(*positionEntry) //如果task设置的延迟时间太小了,那就间接执行 if task.delay < tw.interval { threading.GoSafe(func() { tw.execute(timer.item.key, timer.item.value) }) return } // 没到工夫,须要扭转地位,依据新的延迟时间计算出新的定位和circle pos, circle := tw.getPositionAndCircle(task.delay) //依据pos和circle还有旧数据,批改task的信息,做一些标记,在扫描到这个task的时候再真正批改和从新定位。 //提早这些更改的益处是,如果某些key频繁改变的话,不须要频繁进行重定位操作,重定位操作须要保障并发平安。 if pos >= timer.pos { //新pos大于等于旧pos,更新工作的circle,pos还是用旧的,而是把工作的diff更新为新pos-旧pos,这里是为什么? //思考场景,先触发tick,再触发move,因为tick运行工作的时候是go进来的,go进来的工作正在执行,这时候如果来move申请,就会有并发问题 //这里记录他的diff是标记曾经被改过了,下次跑到这个工作的时候就会触发pos更新。 timer.item.circle = circle timer.item.diff = pos - timer.pos } else if circle > 0 { //pos提前了,不在这一圈触发,得算一下diff偏移量和走多少圈 circle-- //把diff的一圈扣掉 timer.item.circle = circle //算diff把circle扣的一圈加回来 假如新pos是1,旧pos是2,num是5,diff就是4,示意 2+4=6,6%5=1,计算的时候,这里就会计算为1 timer.item.diff = tw.numSlots + pos - timer.pos } else { //pos提前了,并且就是这次循环,删除旧的增加新的 // 这里是不是没有思考到并发的状况?如果正在执行这个工作,那这里会不会有问题? // 如果思考就算执行完,也要生成新的继续执行,是正当的。 // 不思考并发,不思考雷同key只执行一次,这里是能够的,如果要思考的话,这里可能会被执行两次 timer.item.removed = true newItem := &timingEntry{ baseEntry: task, value: timer.item.value, } tw.slots[pos].PushBack(newItem) tw.setTimerPosition(pos, newItem) }}//最初看一下是怎么删除的,其实这外面只是通过索引找到这个task,而后把task标记为删除。//下面看scanAndRunTask办法的时候曾经看到了解决被删除的节点的逻辑func (tw *TimingWheel) removeTask(key interface{}) { val, ok := tw.timers.Get(key) if !ok { return } timer := val.(*positionEntry) timer.item.removed = true tw.timers.Del(key)}