微服务-12-自适应降载

问题背景

调用链路错综复杂,做为服务的提供者需要有一种保护自己的机制,防止调用方无脑调用压垮自己,保证自身服务的高可用。自适应降载能根据服务自身的系统负载动态判断是否需要降载,它的目标:

  1. 保证系统不被拖垮
  2. 在系统稳定的前提下,保持系统的吞吐量

问题:服务怎么知道自己需要降载?

通过CPU负载并发数判断往往存在较大波动,这种被称为毛刺的现象可能导致系统一致频繁的自动进行降载操作。所以如果能通过统计最近一段时间内的指标均值使均值更加平滑

实现原理

统计学上有一种算法:滑动平均(exponential moving average),用来估算变量的局部均值,使得变量的更新与历史一段时间的历史取值有关,无需记录所有的历史局部变量就可以实现平均值估算

变量 V 在 t 时刻记为 Vt,θt 为变量 V 在 t 时刻的取值,即在不使用滑动平均模型时 Vt=θt,在使用滑动平均模型后,Vt 的更新公式如下:

Vt=βVt1+(1β)θtVt=β⋅Vt−1+(1−β)⋅θt

  • β = 0 时 Vt = θt
  • β = 0.9 时,大致相当于过去 10 个 θt 值的平均
  • β = 0.99 时,大致相当于过去 100 个 θt 值的平均

而统计最近一段时间内的数据则可以使用 滑动窗口算法,接下来看看如何进行自适应降载判断

技术内幕

来看看 go-zero 的自适应降载的实现

代码:core/load/adaptiveshedder.go

gozero-zishiying-1

使用案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func UnarySheddingInterceptor(shedder load.Shedder, metrics *stat.Metrics) grpc.UnaryServerInterceptor {
ensureSheddingStat()
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (val interface{}, err error) {
sheddingStat.IncrementTotal()
var promise load.Promise
// 检查是否被降载
promise, err = shedder.Allow()
// 降载,记录相关日志与指标
if err != nil {
metrics.AddDrop()
sheddingStat.IncrementDrop()
return
}
// 最后回调执行结果
defer func() {
// 执行失败
if err == context.DeadlineExceeded {
promise.Fail()
// 执行成功
} else {
sheddingStat.IncrementPass()
promise.Pass()
}
}()
// 执行业务方法
return handler(ctx, req)
}
}

仅需要调用 Allow 接口进行降载逻辑初始化

  1. 如果降载 shedder.Allow(),那么直接记录信息并返回
  2. 否则执行业务逻辑
    • 成功则执行 promise.Pass
    • 失败则执行 promise.Fail,业务调用错误这种算执行成功,这里使用业务超时(DeadlineExceeded)表示执行失败需要降载。

这里的promise变量就是上图中的返回结果句柄,用于将业务逻辑结果更新到降载器中

接口定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
const (
defaultBuckets = 50 //默认滑动窗口槽
defaultWindow = time.Second * 5 //默认滑动窗口大小
defaultCpuThreshold = 900 //CPU阈值
defaultMinRt = float64(time.Second / time.Millisecond) //最小速率
flyingBeta = 0.9 //平均请求 系数
coolOffDuration = time.Second //冷静时间
)
type (
//回到函数结果处理
Promise interface {
// 请求成功时回调此函数
Pass()
// 请求失败时回调此函数
Fail()
}

//降载接口
Shedder interface {
// 降载检查
// 1. 允许调用,需手动执行 Promise.accept()/reject()上报实际执行任务结构
// 2. 拒绝调用,将会直接返回err:服务过载错误 ErrServiceOverloaded
Allow() (Promise, error)
}

// ShedderOption lets caller customize the Shedder.
ShedderOption func(opts *shedderOptions)

shedderOptions struct {
window time.Duration
buckets int
cpuThreshold int64
}

adaptiveShedder struct {
cpuThreshold int64 //CPU阈值
windows int64 //滑动窗口大小
flying int64 //调度统计
avgFlying float64 //平均调度
avgFlyingLock syncx.SpinLock //调度锁
dropTime *syncx.AtomicDuration //降载时间
droppedRecently *syncx.AtomicBool //降载标识
passCounter *collection.RollingWindow //滑动窗口 通过统计
rtCounter *collection.RollingWindow //滑动窗口 速率统计
}
)

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//构建一个自适应熔断调度器
func NewAdaptiveShedder(opts ...ShedderOption) Shedder {
if !enabled.True() {
return newNopShedder()
}

//... 可选参数执行

bucketDuration := options.window / time.Duration(options.buckets) //滑动窗口槽的大小
return &adaptiveShedder{
cpuThreshold: options.cpuThreshold, //CPU阈值
windows: int64(time.Second / bucketDuration), //1s滑动窗口个数
dropTime: syncx.NewAtomicDuration(), //熔断时间
droppedRecently: syncx.NewAtomicBool(), //最近是否熔断
passCounter: collection.NewRollingWindow(options.buckets, bucketDuration,
collection.IgnoreCurrentBucket()), //滑动窗口通过统计
rtCounter: collection.NewRollingWindow(options.buckets, bucketDuration,
collection.IgnoreCurrentBucket()), //滑动窗口速率统计
}
}

逻辑判断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//降载入口
func (as *adaptiveShedder) Allow() (Promise, error) {
if as.shouldDrop() {
as.dropTime.Set(timex.Now())
as.droppedRecently.Set(true)

return nil, ErrServiceOverloaded
}

as.addFlying(1)

return &promise{
start: timex.Now(),
shedder: as,
}, nil
}
结果句柄操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type promise struct {
start time.Duration
shedder *adaptiveShedder
}

func (p *promise) Fail() {
p.shedder.addFlying(-1)
}

func (p *promise) Pass() {
rt := float64(timex.Since(p.start)) / float64(time.Millisecond) //花费的毫秒数
p.shedder.addFlying(-1)
p.shedder.rtCounter.Add(math.Ceil(rt))
p.shedder.passCounter.Add(1)
}

注意

  1. 其中的 p.shedder.addFlying(-1) 也就是说 flying 变量用于更新调度请求数量的
  2. 失败并不会记录到调度统计中,因为计算平均请求不需要失败
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (as *adaptiveShedder) addFlying(delta int64) {
flying := atomic.AddInt64(&as.flying, delta) //请求数量更新

// 当请求完成时更新 avgFlying。
// 这个策略使得 avgFlying 相对 flying 有一点延迟,并且更平滑。
// 当 flying 请求快速增加时,avgFlying 增加较慢,接受更多请求。
// 当 flying 请求快速下降时,avgFlying 下降较慢,接受较少的请求。
// 它使服务尽可能多地处理请求。
if delta < 0 { //当 < 0 表示 请求完成,计算平均请求
as.avgFlyingLock.Lock()
as.avgFlying = as.avgFlying*flyingBeta + float64(flying)*(1-flyingBeta) //滑动平均算法
as.avgFlyingLock.Unlock()
}
}
CPU超过限制

这里的CPU也是经过定时统计得出的最近一段时间CPU负载,防止毛刺

1
2
3
systemOverloadChecker = func(cpuThreshold int64) bool {
return stat.CpuUsage() >= cpuThreshold
}
过载中判断

如果是正在过载中则,超过一段时间冷静期就恢复正常

gozero-zishiying-2

过载标识/时间 是当初过载时候设置的 dropTimedroppedRecently

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//过载中判断
func (as *adaptiveShedder) stillHot() bool {
if !as.droppedRecently.True() { //是否过热中
return false
}

dropTime := as.dropTime.Load() //加载降载时间
if dropTime == 0 {
return false
}

hot := timex.Since(dropTime) < coolOffDuration //是否超过冷静期
if !hot {
as.droppedRecently.Set(false) //更新降载标识
}

return hot
}
过载判断

过载判断 的逻辑是

1
2
//平均请求数大于 且 当前未完成请求的数量超过了最大请求数
avgFlying > maxFlight && flying > maxFlight

这个 最大并发数 又是怎样计算的呢?

当前系统的最大并发数 = 窗口单位时间内的最大通过数量 * 窗口单位时间内的最小响应时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (as *adaptiveShedder) maxFlight() int64 {
// windows = buckets per second
// maxQPS = maxPASS * windows
// minRT = min average response time in milliseconds
// maxQPS * minRT / milliseconds_per_second
// as.maxPass()*as.windows - 每个桶最大的qps * 1s内包含桶的数量
// as.minRt()/1e3 - 窗口所有桶中最小的平均响应时间 / 1000ms这里是为了转换成秒
return int64(math.Max(1, float64(as.maxPass()*as.windows)*(as.minRt()/1e3)))
}

//当前滑动窗口中的最大请求的统计
func (as *adaptiveShedder) maxPass() int64 {
var result float64 = 1

as.passCounter.Reduce(func(b *collection.Bucket) {
if b.Sum > result {
result = b.Sum
}
})

return int64(result)
}

//当前滑动窗口中最小速率的统计
func (as *adaptiveShedder) minRt() float64 {
result := defaultMinRt

as.rtCounter.Reduce(func(b *collection.Bucket) {
if b.Count <= 0 {
return
}

avg := math.Round(b.Sum / float64(b.Count))
if avg < result {
result = avg
}
})

return result
}

总结

  1. 自适应降载逻辑处理 当请求突然增大的时候,虽然没有达到服务能够承受的极限,也有可能出现降载。因为平均请求数量以及最大请求数量 都超过了最近一段时间能承载的最大水平
  2. 按照第一条逻辑,如果服务刚启动那会请求确实比较多,是不是就会出现降载了。不会,这里在计算 最大并发数的时候,给定了一个最小最大并发数 1 * defaultMinRt / milliseconds_per_second 。也就是并发数低于 1000的时候也不会触发降载

参考文档

  1. https://talkgo.org/t/topic/3058
  2. https://www.cnblogs.com/wuliytTaotao/p/9479958.html