算法之美-令牌桶算法

在高并发系统中有三把利器用来保护系统:缓存、降级和限流

其中常用的限流算法有 **漏桶算法 **和 令牌桶算法

漏桶算法

把请求比作是水,水来了都先放进桶里,并以限定的速度出水,当水来得过猛而出水不够快时就会导致水直接溢出,即拒绝服务

img

漏斗有一个进水口 和 一个出水口,出水口以一定速率出水,并且有一个最大出水速率:

在漏斗中没有水的时候,

  • 如果进水速率小于等于最大出水速率,那么,出水速率等于进水速率,此时,不会积水
  • 如果进水速率大于最大出水速率,那么,漏斗以最大速率出水,此时,多余的水会积在漏斗中

在漏斗中有水的时候

  • 出水口以最大速率出水
  • 如果漏斗未满,且有进水的话,那么这些水会积在漏斗中
  • 如果漏斗已满,且有进水的话,那么这些水会溢出到漏斗之外
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package ratelimit // import "go.uber.org/ratelimit"

import (
"time"

"sync/atomic"
"unsafe"
)

type state struct {
last time.Time
sleepFor time.Duration
}

type atomicLimiter struct {
state unsafe.Pointer //用于存储上一次执行的时间以及需要sleep的时间
padding [56]byte //是一个无意义的填充数据,为了提高性能,避免 cpu 缓存的 false sharing

perRequest time.Duration //只单位,默认为秒
maxSlack time.Duration //松弛时间,也就是可以允许的突发流量的大小, 默认是 Pre/10
clock Clock //时钟,用于在测试的时候可以 mock 掉不使用真实的时间
}

// newAtomicBased returns a new atomic based limiter.
func newAtomicBased(rate int, opts ...Option) *atomicLimiter {
// TODO consider moving config building to the implementation
// independent code.
config := buildConfig(opts)
perRequest := config.per / time.Duration(rate)
l := &atomicLimiter{
perRequest: perRequest,
maxSlack: -1 * time.Duration(config.slack) * perRequest,
clock: config.clock,
}

initialState := state{
last: time.Time{},
sleepFor: 0,
}
atomic.StorePointer(&l.state, unsafe.Pointer(&initialState))
return l
}

// Take blocks to ensure that the time spent between multiple
// Take calls is on average time.Second/rate.
func (t *atomicLimiter) Take() time.Time {
var (
newState state // 状态
taken bool // 用于表示原子操作是否成功
interval time.Duration // 需要 sleep 的时间
)
for !taken { // 如果 CAS 操作不成功就一直尝试
now := t.clock.Now() //获取当前时间

previousStatePointer := atomic.LoadPointer(&t.state) // load 出上一次调用的时间
oldState := (*state)(previousStatePointer)

newState = state{
last: now,
sleepFor: oldState.sleepFor,
}

// 如果 last 是零值的话,表示之前就没用过,直接保存返回即可
if oldState.last.IsZero() {
taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
continue
}

// sleepFor 是需要睡眠的时间,由于引入了松弛时间,所以 sleepFor 可能是一个
// maxSlack ~ 0 之间的一个值,所以这里需要将现在的需要 sleep 的时间和上一次
// sleepFor 的值相加
newState.sleepFor += t.perRequest - now.Sub(oldState.last)
// 如果距离上一次调用已经很久了,sleepFor 可能会是一个很小的值
// 最小值只能是 maxSlack 的大小
if newState.sleepFor < t.maxSlack {
newState.sleepFor = t.maxSlack
}
// 如果 sleepFor 大于 0 的话,计算出需要 sleep 的时间
// 然后将 state.sleepFor 置零
if newState.sleepFor > 0 {
newState.last = newState.last.Add(newState.sleepFor)
interval, newState.sleepFor = newState.sleepFor, 0
}
// 保存状态
taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
}
t.clock.Sleep(interval)
return newState.last
}

令牌桶算法

令牌桶算法的原理是系统以恒定的速率产生令牌,然后把令牌放到令牌桶中,令牌桶有一个容量,当令牌桶满了的时候,再向其中放令牌,那么多余的令牌会被丢弃;当想要处理一个请求的时候,需要从令牌桶中取出一个令牌,如果此时令牌桶中没有令牌,那么则拒绝该请求

img

源码:https://github.com/beefsack/go-rate/blob/master/rate.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package rate

import (
"container/list"
"sync"
"time"
)

// A RateLimiter limits the rate at which an action can be performed. It
// applies neither smoothing (like one could achieve in a token bucket system)
// nor does it offer any conception of warmup, wherein the rate of actions
// granted are steadily increased until a steady throughput equilibrium is
// reached.
type RateLimiter struct {
limit int
interval time.Duration
mtx sync.Mutex
times list.List //双向链表
}

// New creates a new rate limiter for the limit and interval.
func New(limit int, interval time.Duration) *RateLimiter {
lim := &RateLimiter{
limit: limit,
interval: interval,
}
lim.times.Init()
return lim
}

// Wait blocks if the rate limit has been reached. Wait offers no guarantees
// of fairness for multiple actors if the allowed rate has been temporarily
// exhausted.
func (r *RateLimiter) Wait() {
for {
ok, remaining := r.Try()
if ok {
break
}
time.Sleep(remaining)
}
}

// Try returns true if under the rate limit, or false if over and the
// remaining time before the rate limit expires.
func (r *RateLimiter) Try() (ok bool, remaining time.Duration) {
r.mtx.Lock()
defer r.mtx.Unlock()
now := time.Now()
if l := r.times.Len(); l < r.limit {
r.times.PushBack(now)
return true, 0
}
frnt := r.times.Front()
if diff := now.Sub(frnt.Value.(time.Time)); diff < r.interval {
return false, r.interval - diff
}
frnt.Value = now
r.times.MoveToBack(frnt)
return true, 0
}

漏桶VS令牌桶

漏桶算法 能够强行限制数据的传输速率,

令牌桶算法 在能够限制数据的平均传输速率外,只要桶中存在令牌,就允许突发地传输数据直到达到用户配置的门限,所以也适合于具有突发特性的流量

参考链接

  1. https://www.cnblogs.com/xuwc/p/9123078.html
  2. https://www.jianshu.com/p/d6250493308b
  3. https://segmentfault.com/a/1190000015967922