微服务-07-时间轮

问题背景

一个系统中存在着大量的延迟/定时任务:

  1. 在一个间隔时间之后做某事: 例如在最后一次消息发送的5分钟之后, 断开连接
  2. 在一个间隔时间之后不停的做某事: 例如每隔5分钟之后去发送心跳,检测连接是否正常

如果每个任务都使用自己的调度器来管理任务声明周期的话,浪费CPU的资源而且很低效,比如:

  1. 在定时器的数量增长到百万级之后, 基于最小堆实现的定时器的性能会显著降低
  2. 客户端会定时发送心跳以此来确保连接的可用性。导致每个连接都需要新建一些协程去维护

解决方案

延迟操作,通常可以采用两个方案:

  1. Timer:定时器维护一个优先队列,到时间点执行,然后把需要执行的 task 存储在 map
  2. collection 中的 timingWheel ,维护一个存放任务组的数组,每一个槽都维护一个存储 task 的双向链表。开始执行时,计时器每隔指定时间执行一个槽里面的 tasks

时间轮是一种高效来利用线程资源来进行批量化调度的一种调度模型。把大批量的调度任务全部都绑定到同一个的调度器上面,使用这一个调度器来进行所有任务的管理(manager),触发(trigger)以及运行(runnable)

实现原理

使用一个 类Map 的结构进行时间轮而构建,其中:

  1. 定时器的间隔就是从第n个槽进入到第n+1个槽的时间
  2. 每个槽中的任务通过双向链表进行存储,定时器到达槽的位置之后,并发处理槽的任务(指定时间相同)
timingwheel21

问题1:新建的任务如何添加到对应的位置

timingwheel22

如上图,槽位(numSlots)为12的时间轮,时间轮的定时间隔(interval)为1s,当前正在执行槽(tickedPos)为1位置的任务。

当添加一个延时时间(delay)为5s的任务的时候,那么:

  1. 需要等待的间隔数stepstep = delay / interval 为 5。
  2. 需要放入的槽的位置position:position = (step + tickedPos) % numSlots 为 6。

问题2:新建任务超过槽的数目怎么办?

timingwheel24

如上图,当添加一个延时时间(delay)为18s的时候,根据上面的计算公司

  1. 需要等待的间隔数step = delay / interval = 18
  2. 需要放入的槽的位置 postion = (step + tickedPos) % numSlots = 6。

这个时候就出现一个问题,延迟5s的任务和延迟18s的任务会一起在5s后执行,这个时候就出现了多层环的概念

timingwheel25

当第一层环的时间无法满足任务的延时的时候,可以将任务放置到第n层环上。时间轮还是会按照环的顺序进行执行

环的位置circle: circle = (steps - 1) / numSlots

所以延时5s的任务它的环 circle = (5 - 1) / 12 = 0,延时18s的任务它的环 circle = (18 -1)/12 = 1

随着时间的推移第一层环完成之后再去执行第二层环

问题3:不同环的任务是否都需要构建一个新的环来保存任务

不需要,不同环但是槽相同的任务放入到同一个双向链表中,当执行该槽任务的时候,只需要判断当前的任务if circle == 0,否则就不是当前环的任务,将circle -= 1

这样也有一个缺点:就是在遍历槽中任务的时候,虽然不是相同时间执行,但是槽中所有任务都需要遍历。不过一个微服务中定时任务也不会太多,所以缺点基本可以忽略

问题4:定时任务时间轮都是是如何处理的

根据任务类型可以分为延时任务定时任务。根据任务执行一次可以分为执行一次重复执行。这里有两点需要解决

  1. 不同类型任务的开始时间是怎么计算开始时间的
  2. 需要重复的任务是如何存储的
  • 延时任务
    • 执行一次:根据延时时间计算在时间轮中的位置,定时触发即可
    • 重复执行:每次执行任务完成,判断任务是否重复,然后重新计算任务位置重新插入即可
  • 定时任务:
    • 执行一次:

问题5:如果一个时间间隔内时间任务处理不过来怎么办

通过下一节技术内幕来查看处理逻辑。不过可以预想到的是每个槽都会拉起协程来进行任务处理,如果是顺序执行有可能导致部分任务超出执行时间,所以每个任务都会使用一个协程处理,那么这里就可以考虑协程池以及对象池了。

问题6:如果一个任务的延时时间小于时间轮的时间片间隔,那么任务何时执行

这里有两个解决方案,go-zero使用第一种方案:

  1. 由于小于延时时间小于时间间隔,那么就认为任务不需要等到下一个时间轮的时间片执行,而是立即执行

  2. 利用多层环的原理处理更细粒度的任务。这样就不能重用circle的概念,而是需要 sub circle子环概念

    • circle用来解决延时时间超过当前环的问题
    • sub circle用来解决更细粒度的时间片需求。缺点就是逻辑结构更复杂,每个sub circle都需要有一个更细的时间粒度
    timingwheel27

技术内幕

代码路径:core/collection/timingwheel.go,前一节就是 go-zero 中时间轮的时间原理

时间轮对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// A TimingWheel is a timing wheel object to schedule tasks.
TimingWheel struct {
interval time.Duration //时间划分刻度
ticker timex.Ticker
slots []*list.List //数组内为双向链表指针
timers *SafeMap //
tickedPos int //环的位置
numSlots int //时间槽
execute Execute //时间点执行任务的方法
//以下为不同任务的隧道
setChannel chan timingEntry //设置任务隧道
moveChannel chan baseEntry //移动任务隧道
removeChannel chan interface{} //删除任务隧道
drainChannel chan func(key, value interface{}) //并发执行任务隧道
stopChannel chan lang.PlaceholderType //时间轮停止通知隧道
}

//Execute 执行任务的方法
Execute func(key, value interface{})

//timingEntry
timingEntry struct { //时间轮实体对象
baseEntry
value interface{}
circle int
diff int
removed bool //是否删除
}

//baseEntry 基础属性
baseEntry struct {
delay time.Duration
key interface{}
}

//positionEntry 位置对象
positionEntry struct {
pos int
item *timingEntry
}

//timingTask 时间任务
timingTask struct {
key interface{}
value interface{}
}

其中:

  1. 每个槽中因为是链表,所以并没有数量限制

  2. 时间轮实体timingEntry中有两个值需要关注 diffremoved

  3. 时间轮结构中 timers 作用往下看

    timers 使用的 SafeMap 不仅仅是为了处理并发安全,还有就是原生map内存泄露问题的临时替代品(详见《GoZero-SafeMap》)

初始化时间轮

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// NewTimingWheel 初始化并返回一个时间轮
func NewTimingWheel(interval time.Duration, numSlots int, execute Execute) (*TimingWheel, error) {
if interval <= 0 || numSlots <= 0 || execute == nil {
return nil, fmt.Errorf("interval: %v, slots: %d, execute: %p",
interval, numSlots, execute)
}

return newTimingWheelWithClock(interval, numSlots, execute, timex.NewTicker(interval))
}

//newTimingWheelWithClock 初始化时间轮
//interval:时间划分刻度
//numSlots:时间槽
//execute:时间点执行函数
func newTimingWheelWithClock(interval time.Duration, numSlots int, execute Execute,
ticker timex.Ticker) (*TimingWheel, error) {
tw := &TimingWheel{
interval: interval, // 单个时间格时间间隔
ticker: ticker, // 定时器,做时间推动,以interval为单位推进
slots: make([]*list.List, numSlots), // 时间轮
timers: NewSafeMap(), // 存储task{key, value}的map [执行execute所需要的参数]
tickedPos: numSlots - 1, // at previous virtual circle
execute: execute, // 执行函数
numSlots: numSlots, // 初始化 slots num
setChannel: make(chan timingEntry), // 以下几个channel是做task传递的
moveChannel: make(chan baseEntry),
removeChannel: make(chan interface{}),
drainChannel: make(chan func(key, value interface{})),
stopChannel: make(chan lang.PlaceholderType),
}

tw.initSlots() //使用list.New()初始化每个槽,构建双向链表保存任务
go tw.run() //开启协程,使用channel来做task时间任务接收与处理

return tw, nil
}

//根据隧道类型等待不同的任务执行
func (tw *TimingWheel) run() {
for {
select {
case <-tw.ticker.Chan():
tw.onTick()
case task := <-tw.setChannel: //收到设置任务
tw.setTask(&task)
case key := <-tw.removeChannel: //删除隧道
tw.removeTask(key)
case task := <-tw.moveChannel: //重复任务隧道
tw.moveTask(task)
case fn := <-tw.drainChannel: //并发任务
tw.drainAll(fn)
case <-tw.stopChannel: //停止隧道
tw.ticker.Stop()
return
}
}
}
timingwheel26

在时间轮拉起协程进行隧道的监听与处理,这里需要注意的是:

  1. 除了时间轮的定时器隧道,其他隧道都可以通过外部接口将消息传入处理
  2. 其中moveChanneldrainChannel需要解释:
    • moveChannel:更新任务延时时间
    • drainChannel:使用自定义函数并发执行所有任务

添加任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// SetTimer 设置过期时间
func (tw *TimingWheel) SetTimer(key, value interface{}, delay time.Duration) error {
if delay <= 0 || key == nil {
return ErrArgument
}

select {
case tw.setChannel <- timingEntry{ //向设置隧道中添加一个基础的任务对象
baseEntry: baseEntry{
delay: delay,
key: key,
},
value: value,
}:
return nil //设置完直接结束
case <-tw.stopChannel: //当设置过程中收到退出通知则直接退出
return ErrClosed
}
}

//setTask 时间轮收到设置任务任务之后进行任务添加
func (tw *TimingWheel) setTask(task *timingEntry) {
if task.delay < tw.interval { //如果任务延迟时间小于时间轮的刻度,那么延迟时间等于刻度
task.delay = tw.interval
}

if val, ok := tw.timers.Get(task.key); ok { //如果已经存在这个任务,更新任务
entry := val.(*positionEntry)
entry.item.value = task.value
tw.moveTask(task.baseEntry)
} else {
pos, circle := tw.getPositionAndCircle(task.delay) //根据延时获取任务位置
task.circle = circle //设置任务所在的环
tw.slots[pos].PushBack(task) //将任务添加到对应槽中
tw.setTimerPosition(pos, task) //更新任务最新位置
}
}

注意:

  • 在添加任务的时候这里的 value 类型是 interface 类型,而不是指定的任务对象
  • 注意这里使用 setTimerPosition 又缓存任务,它的作用往下看

实现原理 中介绍的那样计算任务所在的以及

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//getPositionAndCircle 获取位置与环
//@param: d 任务延时时间
//@return:
// pos 任务所在槽
// circle 任务环
func (tw *TimingWheel) getPositionAndCircle(d time.Duration) (pos, circle int) {
//如果将任务的延迟时间按照时间轮刻度划分,那么 steps 就是在第几个刻度内
steps := int(d / tw.interval)
// 时间轮的相对位置 + 延迟相对时间刻度的位置 % 槽的数量 = 任务在时间轮中的位置
pos = (tw.tickedPos + steps) % tw.numSlots
circle = (steps - 1) / tw.numSlots

return
}

设置任务的位置

1
2
3
4
5
6
7
8
9
10
11
12
13
//setTimerPosition 设置任务位置
func (tw *TimingWheel) setTimerPosition(pos int, task *timingEntry) {
if val, ok := tw.timers.Get(task.key); ok { //如果任务缓存已经存在,那么更新槽的位置
timer := val.(*positionEntry)
timer.item = task
timer.pos = pos
} else {
tw.timers.Set(task.key, &positionEntry{ //如果任务缓存不存在,则设置任务缓存
pos: pos,
item: task,
})
}
}

所以 时间轮中 timers其实是通过key保存任务,并把完成任务和位置都保存

更新任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
//MoveTimer移动任务
func (tw *TimingWheel) MoveTimer(key interface{}, delay time.Duration) error {
if delay <= 0 || key == nil {
return ErrArgument
}

select {
case tw.moveChannel <- baseEntry{
delay: delay,
key: key,
}:
return nil
case <-tw.stopChannel:
return ErrClosed
}
}

//moveTask 更新任务执行位置
func (tw *TimingWheel) moveTask(task baseEntry) {
val, ok := tw.timers.Get(task.key) //通过key获取任务(因为是移动,所以当任务不存在,那么直接结束。)
if !ok {
return
}

timer := val.(*positionEntry)
if task.delay < tw.interval { //如果任务的delay小于时间片,那么立即执行
threading.GoSafe(func() { //拉起协程进行执行
tw.execute(timer.item.key, timer.item.value)
})
return
}

pos, circle := tw.getPositionAndCircle(task.delay) //计算任务位置
if pos >= timer.pos {
timer.item.circle = circle
timer.item.diff = pos - timer.pos //计算任务位置与缓存中任务位置的不同
} else if circle > 0 { //不是当前环的任务,那么 circl - 1
circle--
timer.item.circle = circle
timer.item.diff = tw.numSlots + pos - timer.pos //不同 加上一个环的槽数,判断位置是否相同
} else {
timer.item.removed = true
newItem := &timingEntry{
baseEntry: task,
value: timer.item.value,
}
tw.slots[pos].PushBack(newItem)
tw.setTimerPosition(pos, newItem)
}
}

注意:

  1. 移动任务MoveTimer(key interface{}, delay time.Duration) error {... }并没有参数 Value,用来移动更新已有的任务

执行任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
//定时器触发任务
func (tw *TimingWheel) onTick() {
//获取需要执行任务的槽位置
tw.tickedPos = (tw.tickedPos + 1) % tw.numSlots
l := tw.slots[tw.tickedPos] //获取槽的对应链表
tw.scanAndRunTasks(l)
}

func (tw *TimingWheel) scanAndRunTasks(l *list.List) {
var tasks []timingTask

for e := l.Front(); e != nil; { //遍历链表
task := e.Value.(*timingEntry)
if task.removed { //如果任务状态为删除,则从任务链表中删除任务
next := e.Next()
l.Remove(e)
e = next
continue
} else if task.circle > 0 { //如果任务所在的环大于0
task.circle--
e = e.Next()
continue
} else if task.diff > 0 { //如果任务位置发生了变化
next := e.Next()
l.Remove(e) //先删掉任务
// (tw.tickedPos+task.diff)%tw.numSlots
// cannot be the same value of tw.tickedPos
pos := (tw.tickedPos + task.diff) % tw.numSlots //根据任务最新的位置计算位置重新放入任务
tw.slots[pos].PushBack(task)
tw.setTimerPosition(pos, task)
task.diff = 0
e = next
continue
}

tasks = append(tasks, timingTask{
key: task.key,
value: task.value,
})
next := e.Next()
l.Remove(e)
tw.timers.Del(task.key)
e = next
}

tw.runTasks(tasks)
}

具体执行

1
2
3
4
5
6
7
8
9
10
11
12
13
func (tw *TimingWheel) runTasks(tasks []timingTask) {
if len(tasks) == 0 {
return
}

go func() {
for i := range tasks { //拉起一个协程,不断执行任务(多任务非并发)
threading.RunSafe(func() {
tw.execute(tasks[i].key, tasks[i].value)
})
}
}()
}

注意:每个任务都使用一个协程进行执行,这里使用的是 go-zero封装的协程池,这个go 会等待所有任务执行完毕才会结束

删除任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// RemoveTimer removes the task with the given key.
func (tw *TimingWheel) RemoveTimer(key interface{}) error {
if key == nil {
return ErrArgument
}

select {
case tw.removeChannel <- key:
return nil
case <-tw.stopChannel:
return ErrClosed
}
}

//removeTask
func (tw *TimingWheel) removeTask(key interface{}) {
val, ok := tw.timers.Get(key) //任务不存在
if !ok {
return
}

timer := val.(*positionEntry)
timer.item.removed = true //增加一个状态表示已经被删除
tw.timers.Del(key)
}

排空任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//Drain 使用 fn 排空任务
func (tw *TimingWheel) Drain(fn func(key, value interface{})) error {
select {
case tw.drainChannel <- fn:
return nil
case <-tw.stopChannel:
return ErrClosed
}
}

//drainAll 排空任务
func (tw *TimingWheel) drainAll(fn func(key, value interface{})) {
runner := threading.NewTaskRunner(drainWorkers)
for _, slot := range tw.slots {
for e := slot.Front(); e != nil; {
task := e.Value.(*timingEntry)
next := e.Next()
slot.Remove(e)
e = next
if !task.removed {
runner.Schedule(func() {
fn(task.key, task.value)
})
}
}
}
}

排空任务并不是直接删除,而是提供一个自定义函数接口用于处理剩下的任务,有利于安全退出。但需要注意的是每个任务都会拉一个协程进行处理,也就是不能立即执行 时间轮的 Stop

停止时间轮

1
2
3
4
// Stop stops tw. No more actions after stopping a TimingWheel.
func (tw *TimingWheel) Stop() {
close(tw.stopChannel)
}

总结

  1. task优先队列 O(nlog(n)) 降到 双向链表 O(1),而执行task也只要轮询一个时间点的tasks O(N),不需要像优先队列,放入和删除元素 O(nlog(n))
  2. 时间轮中的多层环是一种虚拟概念,用来记录超出范围(nusSlots * interval)的任务
  3. 使用chan接收外部接口调用的好处是并发安全,当然内部逻辑实现还是需要注意map安全

补充:

  1. 可以添加一个在协程池中讲到的时间轮状态,保证不会有新的任务在退出的时候加进来
  2. 可以使用协程池和内存池的节省内存空间,但如果协程池处理不过来使后续任务阻塞,可能会导致时间轮功能异常
  3. 分布式系统中系统的定时调用则需要使用分布式定时器,这在另外的章节中学习

技术应用

go-zero用于缓存的定时删除

1
2
3
4
5
6
7
8
9
10
11
12
13
timingWheel, err := NewTimingWheel(time.Second, slots, func(k, v interface{}) {
key, ok := k.(string)
if !ok {
return
}

cache.Del(key) //到达则删除缓存
})
if err != nil {
return nil, err
}

cache.timingWheel = timingWheel

参考链接

  1. https://www.ericcai.fun/detail/16
  2. https://juejin.cn/post/6844904110399946766
  3. https://xiaorui.cc/archives/6160
  4. https://zhuanlan.zhihu.com/p/264826698
  5. https://lk668.github.io/2021/04/05/2021-04-05-手把手教你如何用golang实现一个timewheel/
  6. https://go-zero.dev/cn/docs/blog/principle/timing-wheel