问题背景
使用负载均衡策略是一种避免超负载的处理方式,但服务的容量是有限的。部分服务还是会出现超载的情况,如果优雅的处理过载则对可靠的服务至关重要。
在高并发场景下,为了应对依赖服务过载,服务不可用等情况,提出了熔断、限流与降级方案。这里主要描述熔断的原理,这里存在几个问题:
- 都有哪些熔断的解决方案
- 熔断器的实现原理是什么
使用较多的熔断组件:
- hystrix circuit breaker(不再维护)
- hystrix-go
- resilience4j(推荐)
- sentinel(推荐)
熔断器原理
熔断器一般具有三个状态:
- 关闭: 默认状态,请求能被到达目标服务,同时统计在窗口时间成功和失败次数,如果达到错误率阈值将会进入断开状态。
- 断开: 此状态下将会直接返回错误,如果有 fallback 配置则直接调用 fallback 方法。
- 半断开: 进行断开状态会维护一个超时时间,到达超时时间开始进入 半断开 状态,尝试允许一部分请求正常通过并统计成功数量,如果请求正常则认为此时目标服务已恢复进入 关闭 状态,否则进入 断开 状态
基于熔断器的原理,通常熔断器主要关注以下参数:
- 错误比例阈值: 达到该阈值进入 断开 状态
- 断开状态超时时间: 超时后进入 半断开 状态
- 半断开状态允许请求数量
- 窗口时间大小
这里有更将详细可参考的参考以及算法说明
- https://resilience4j.readme.io/docs/circuitbreaker
- https://sre.google/sre-book/handling-overload/
由于go-zero
的熔断器是基于google文章实现,来看下基本算法
技术内幕
- 无论什么熔断器都得依靠指标统计来转换状态,而统计指标一般要求是最近的一段时间内的数据,所以通常采用一个
滑动时间窗口
数据结构 来存储统计数据。同时熔断器的状态也需要依靠指标统计来实现可观测性。
- 外部服务请求结果各式各样,所以需要提供一个自定义的判断方法,判断请求是否成功。熔断器需要实时收集此数据。
- 当外部服务被熔断时使用者往往需要自定义快速失败的逻辑,考虑提供自定义的
fallback()
功能。
接口定义
代码路径:core/breaker/breaker.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| window = time.Second * 10 buckets = 40 k = 1.5 protection = 5
Acceptable func(err error) bool
Breaker interface { Name() string
Allow() (Promise, error)
Do(req func() error) error
DoWithAcceptable(req func() error, acceptable Acceptable) error
DoWithFallback(req func() error, fallback func(err error) error) error
DoWithFallbackAcceptable(req func() error, fallback func(err error) error, acceptable Acceptable) error }
|
断路器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| circuitBreaker struct { name string throttle }
throttle interface { allow() (Promise, error) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error }
type googleBreaker struct { k float64 stat *collection.RollingWindow proba *mathx.Proba }
func newGoogleBreaker() *googleBreaker { bucketDuration := time.Duration(int64(window) / int64(buckets)) st := collection.NewRollingWindow(buckets, bucketDuration) return &googleBreaker{ stat: st, k: k, proba: mathx.NewProba(), } }
|
数据记录
其实是将成功或者失败记录到滑动窗口中
1 2 3 4 5 6 7
| func (b *googleBreaker) markSuccess() { b.stat.Add(1) }
func (b *googleBreaker) markFailure() { b.stat.Add(0) }
|
请求操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| func (b *googleBreaker) history() (accepts, total int64) { b.stat.Reduce(func(b *collection.Bucket) { accepts += int64(b.Sum) total += b.Count })
return }
func (b *googleBreaker) accept() error { accepts, total := b.history() weightedAccepts := b.k * float64(accepts) dropRatio := math.Max(0, (float64(total-protection)-weightedAccepts)/float64(total+1)) if dropRatio <= 0 { return nil }
if b.proba.TrueOnProba(dropRatio) { return ErrServiceUnavailable }
return nil }
func (b *googleBreaker) allow() (internalPromise, error) { if err := b.accept(); err != nil { return nil, err }
return googlePromise{ b: b, }, nil }
func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error { if err := b.accept(); err != nil { if fallback != nil { return fallback(err) }
return err }
defer func() { if e := recover(); e != nil { b.markFailure() panic(e) } }()
err := req() if acceptable(err) { b.markSuccess() } else { b.markFailure() }
return err }
|
总结
-
通过滑动窗口进行最近一段数据(成功失败次数)的统计
-
是否断路则是通过指定的公式计算
失败率=总数−可接收误差−k∗成功/失败
参考文档
- https://talkgo.org/t/topic/3035