问题背景
一个系统中存在着大量的延迟/定时任务:
在一个间隔时间之后做某事: 例如在最后一次消息发送的5分钟之后, 断开连接
在一个间隔时间之后不停的做某事: 例如每隔5分钟之后去发送心跳,检测连接是否正常
如果每个任务都使用自己的调度器来管理任务声明周期的话,浪费CPU的资源而且很低效,比如:
在定时器的数量增长到百万级之后, 基于最小堆实现的定时器的性能会显著降低
客户端会定时发送心跳以此来确保连接的可用性。导致每个连接都需要新建一些协程去维护
解决方案
延迟操作,通常可以采用两个方案:
Timer
:定时器维护一个优先队列,到时间点执行,然后把需要执行的 task
存储在 map
中
collection
中的 timingWheel
,维护一个存放任务组的数组,每一个槽都维护一个存储 task
的双向链表。开始执行时,计时器每隔指定时间执行一个槽里面的 tasks
时间轮是一种高效来利用线程资源来进行批量化调度的一种调度模型。把大批量的调度任务全部都绑定到同一个的调度器上面,使用这一个调度器来进行所有任务的管理(manager)
,触发(trigger)
以及运行(runnable)
实现原理
使用一个 类Map
的结构进行时间轮而构建,其中:
定时器的间隔就是从第n个槽进入到第n+1个槽的时间
每个槽中的任务通过双向链表进行存储,定时器到达槽的位置之后,并发处理槽的任务(指定时间相同)
问题1 :新建的任务如何添加到对应的位置
如上图,槽位(numSlots)
为12的时间轮,时间轮的定时间隔(interval)
为1s,当前正在执行槽(tickedPos)
为1位置的任务。
当添加一个延时时间(delay)
为5s的任务的时候,那么:
需要等待的间隔数step
:step = delay / interval
为 5。
需要放入的槽的位置position
:position = (step + tickedPos) % numSlots
为 6。
问题2 :新建任务超过槽的数目怎么办?
如上图,当添加一个延时时间(delay)
为18s的时候,根据上面的计算公司
需要等待的间隔数step = delay / interval =
18
需要放入的槽的位置 postion = (step + tickedPos) % numSlots =
6。
这个时候就出现一个问题,延迟5s的任务和延迟18s的任务会一起在5s后执行,这个时候就出现了多层环的概念
当第一层环的时间无法满足任务的延时的时候,可以将任务放置到第n层环上。时间轮还是会按照环的顺序进行执行
环的位置circle
: circle = (steps - 1) / numSlots
所以延时5s的任务它的环 circle = (5 - 1) / 12 = 0
,延时18s的任务它的环 circle = (18 -1)/12 = 1
随着时间的推移第一层环完成之后再去执行第二层环
问题3 :不同环的任务是否都需要构建一个新的环来保存任务
不需要 ,不同环但是槽相同的任务放入到同一个双向链表中,当执行该槽任务的时候,只需要判断当前的任务if circle == 0
,否则就不是当前环的任务,将circle -= 1
这样也有一个缺点:就是在遍历槽中任务的时候,虽然不是相同时间执行,但是槽中所有任务都需要遍历。不过一个微服务中定时任务也不会太多,所以缺点基本可以忽略
问题4 :定时任务时间轮都是是如何处理的
根据任务类型可以分为延时任务 与定时任务 。根据任务执行一次可以分为执行一次 和重复执行 。这里有两点需要解决
不同类型任务的开始时间是怎么计算开始时间的
需要重复的任务是如何存储的
延时任务
执行一次:根据延时时间计算在时间轮中的位置,定时触发即可
重复执行:每次执行任务完成,判断任务是否重复,然后重新计算任务位置重新插入即可
定时任务:
问题5 :如果一个时间间隔内时间任务处理不过来怎么办
通过下一节技术内幕来查看处理逻辑。不过可以预想到的是每个槽都会拉起协程来进行任务处理,如果是顺序执行有可能导致部分任务超出执行时间,所以每个任务都会使用一个协程处理,那么这里就可以考虑协程池以及对象池了。
问题6 :如果一个任务的延时时间小于时间轮的时间片间隔,那么任务何时执行
这里有两个解决方案,go-zero
使用第一种方案:
由于小于延时时间小于时间间隔,那么就认为任务不需要等到下一个时间轮的时间片执行,而是立即执行
利用多层环的原理处理更细粒度的任务。这样就不能重用circle
的概念,而是需要 sub circle
子环概念
circle
用来解决延时时间超过当前环的问题
sub circle
用来解决更细粒度的时间片需求。缺点就是逻辑结构更复杂,每个sub circle
都需要有一个更细的时间粒度
技术内幕
代码路径:core/collection/timingwheel.go
,前一节就是 go-zero
中时间轮的时间原理
时间轮对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 TimingWheel struct { interval time.Duration ticker timex.Ticker slots []*list.List timers *SafeMap tickedPos int numSlots int execute Execute setChannel chan timingEntry moveChannel chan baseEntry removeChannel chan interface {} drainChannel chan func (key, value interface {}) //并发执行任务隧道 stopChannel chan lang.PlaceholderType } Execute func (key, value interface {}) timingEntry struct { baseEntry value interface {} circle int diff int removed bool } baseEntry struct { delay time.Duration key interface {} } positionEntry struct { pos int item *timingEntry } timingTask struct { key interface {} value interface {} }
其中:
每个槽中因为是链表,所以并没有数量限制
时间轮实体timingEntry
中有两个值需要关注 diff
和 removed
时间轮结构中 timers
作用往下看
timers
使用的 SafeMap
不仅仅是为了处理并发安全,还有就是原生map
内存泄露问题的临时替代品(详见《GoZero-SafeMap》)
初始化时间轮
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 func NewTimingWheel (interval time.Duration, numSlots int , execute Execute) (*TimingWheel, error) { if interval <= 0 || numSlots <= 0 || execute == nil { return nil , fmt.Errorf("interval: %v, slots: %d, execute: %p" , interval, numSlots, execute) } return newTimingWheelWithClock(interval, numSlots, execute, timex.NewTicker(interval)) } func newTimingWheelWithClock (interval time.Duration, numSlots int , execute Execute, ticker timex.Ticker) (*TimingWheel, error) { tw := &TimingWheel{ interval: interval, ticker: ticker, slots: make ([]*list.List, numSlots), timers: NewSafeMap(), tickedPos: numSlots - 1 , execute: execute, numSlots: numSlots, setChannel: make (chan timingEntry), moveChannel: make (chan baseEntry), removeChannel: make (chan interface {}), drainChannel: make (chan func (key, value interface {}) ), stopChannel: make (chan lang.PlaceholderType), } tw.initSlots() go tw.run() return tw, nil } func (tw *TimingWheel) run () { for { select { case <-tw.ticker.Chan(): tw.onTick() case task := <-tw.setChannel: tw.setTask(&task) case key := <-tw.removeChannel: tw.removeTask(key) case task := <-tw.moveChannel: tw.moveTask(task) case fn := <-tw.drainChannel: tw.drainAll(fn) case <-tw.stopChannel: tw.ticker.Stop() return } } }
在时间轮拉起协程进行隧道的监听与处理,这里需要注意的是:
除了时间轮的定时器隧道,其他隧道都可以通过外部接口将消息传入处理
其中moveChannel
和drainChannel
需要解释:
moveChannel
:更新任务延时时间
drainChannel
:使用自定义函数并发执行所有任务
添加任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 func (tw *TimingWheel) SetTimer (key, value interface {}, delay time.Duration) error { if delay <= 0 || key == nil { return ErrArgument } select { case tw.setChannel <- timingEntry{ baseEntry: baseEntry{ delay: delay, key: key, }, value: value, }: return nil case <-tw.stopChannel: return ErrClosed } } func (tw *TimingWheel) setTask (task *timingEntry) { if task.delay < tw.interval { task.delay = tw.interval } if val, ok := tw.timers.Get(task.key); ok { entry := val.(*positionEntry) entry.item.value = task.value tw.moveTask(task.baseEntry) } else { pos, circle := tw.getPositionAndCircle(task.delay) task.circle = circle tw.slots[pos].PushBack(task) tw.setTimerPosition(pos, task) } }
注意:
在添加任务的时候这里的 value 类型是 interface
类型,而不是指定的任务对象
注意这里使用 setTimerPosition
又缓存任务,它的作用往下看
如 实现原理 中介绍的那样计算任务所在的槽
以及环
1 2 3 4 5 6 7 8 9 10 11 12 13 14 func (tw *TimingWheel) getPositionAndCircle (d time.Duration) (pos, circle int ) { steps := int (d / tw.interval) pos = (tw.tickedPos + steps) % tw.numSlots circle = (steps - 1 ) / tw.numSlots return }
设置任务的位置
1 2 3 4 5 6 7 8 9 10 11 12 13 func (tw *TimingWheel) setTimerPosition (pos int , task *timingEntry) { if val, ok := tw.timers.Get(task.key); ok { timer := val.(*positionEntry) timer.item = task timer.pos = pos } else { tw.timers.Set(task.key, &positionEntry{ pos: pos, item: task, }) } }
所以 时间轮中 timers
其实是通过key
保存任务,并把完成任务和位置都保存
更新任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 func (tw *TimingWheel) MoveTimer (key interface {}, delay time.Duration) error { if delay <= 0 || key == nil { return ErrArgument } select { case tw.moveChannel <- baseEntry{ delay: delay, key: key, }: return nil case <-tw.stopChannel: return ErrClosed } } func (tw *TimingWheel) moveTask (task baseEntry) { val, ok := tw.timers.Get(task.key) if !ok { return } timer := val.(*positionEntry) if task.delay < tw.interval { threading.GoSafe(func () { tw.execute(timer.item.key, timer.item.value) }) return } pos, circle := tw.getPositionAndCircle(task.delay) if pos >= timer.pos { timer.item.circle = circle timer.item.diff = pos - timer.pos } else if circle > 0 { circle-- timer.item.circle = circle timer.item.diff = tw.numSlots + pos - timer.pos } else { timer.item.removed = true newItem := &timingEntry{ baseEntry: task, value: timer.item.value, } tw.slots[pos].PushBack(newItem) tw.setTimerPosition(pos, newItem) } }
注意:
移动任务MoveTimer(key interface{}, delay time.Duration) error {... }
并没有参数 Value
,用来移动更新已有的任务
执行任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 func (tw *TimingWheel) onTick () { tw.tickedPos = (tw.tickedPos + 1 ) % tw.numSlots l := tw.slots[tw.tickedPos] tw.scanAndRunTasks(l) } func (tw *TimingWheel) scanAndRunTasks (l *list.List) { var tasks []timingTask for e := l.Front(); e != nil ; { task := e.Value.(*timingEntry) if task.removed { next := e.Next() l.Remove(e) e = next continue } else if task.circle > 0 { task.circle-- e = e.Next() continue } else if task.diff > 0 { next := e.Next() l.Remove(e) pos := (tw.tickedPos + task.diff) % tw.numSlots tw.slots[pos].PushBack(task) tw.setTimerPosition(pos, task) task.diff = 0 e = next continue } tasks = append (tasks, timingTask{ key: task.key, value: task.value, }) next := e.Next() l.Remove(e) tw.timers.Del(task.key) e = next } tw.runTasks(tasks) }
具体执行
1 2 3 4 5 6 7 8 9 10 11 12 13 func (tw *TimingWheel) runTasks (tasks []timingTask) { if len (tasks) == 0 { return } go func () { for i := range tasks { threading.RunSafe(func () { tw.execute(tasks[i].key, tasks[i].value) }) } }() }
注意:每个任务都使用一个协程进行执行,这里使用的是 go-zero
封装的协程池,这个go 会等待所有任务执行完毕才会结束
删除任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 func (tw *TimingWheel) RemoveTimer (key interface {}) error { if key == nil { return ErrArgument } select { case tw.removeChannel <- key: return nil case <-tw.stopChannel: return ErrClosed } } func (tw *TimingWheel) removeTask (key interface {}) { val, ok := tw.timers.Get(key) if !ok { return } timer := val.(*positionEntry) timer.item.removed = true tw.timers.Del(key) }
排空任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 func (tw *TimingWheel) Drain (fn func (key, value interface {}) ) error { select { case tw.drainChannel <- fn: return nil case <-tw.stopChannel: return ErrClosed } } func (tw *TimingWheel) drainAll (fn func (key, value interface {}) ) { runner := threading.NewTaskRunner(drainWorkers) for _, slot := range tw.slots { for e := slot.Front(); e != nil ; { task := e.Value.(*timingEntry) next := e.Next() slot.Remove(e) e = next if !task.removed { runner.Schedule(func () { fn(task.key, task.value) }) } } } }
排空任务并不是直接删除,而是提供一个自定义函数接口用于处理剩下的任务,有利于安全退出。但需要注意的是每个任务都会拉一个协程进行处理,也就是不能立即执行 时间轮的 Stop
停止时间轮
1 2 3 4 func (tw *TimingWheel) Stop () { close (tw.stopChannel) }
总结
task
从 优先队列 O(nlog(n))
降到 双向链表 O(1)
,而执行task也只要轮询一个时间点的tasks O(N)
,不需要像优先队列,放入和删除元素 O(nlog(n))
。
时间轮中的多层环是一种虚拟概念,用来记录超出范围(nusSlots * interval
)的任务
使用chan
接收外部接口调用的好处是并发安全,当然内部逻辑实现还是需要注意map
安全
补充:
可以添加一个在协程池中讲到的时间轮状态,保证不会有新的任务在退出的时候加进来
可以使用协程池和内存池的节省内存空间,但如果协程池处理不过来使后续任务阻塞,可能会导致时间轮功能异常
分布式系统中系统的定时调用则需要使用分布式定时器,这在另外的章节中学习
技术应用
go-zero用于缓存的定时删除
1 2 3 4 5 6 7 8 9 10 11 12 13 timingWheel, err := NewTimingWheel(time.Second, slots, func (k, v interface {}) { key, ok := k.(string ) if !ok { return } cache.Del(key) }) if err != nil { return nil , err } cache.timingWheel = timingWheel
参考链接
https://www.ericcai.fun/detail/16
https://juejin.cn/post/6844904110399946766
https://xiaorui.cc/archives/6160
https://zhuanlan.zhihu.com/p/264826698
https://lk668.github.io/2021/04/05/2021-04-05-手把手教你如何用golang实现一个timewheel/
https://go-zero.dev/cn/docs/blog/principle/timing-wheel