Grpc-05-重试机制

重试机制是为了在短暂异常情况下能够正常恢复,也能防止长时间异常而不退出。这里看一下grpc-go 中的重试机制以及使用方法

带着问题看世界:

  1. grpc-go 的重试策略的使用
  2. grpc-go 的重试机制是怎样的
  3. grpc-go 的重试机制都应用在哪些地方

基本使用

通过 Dial 可选参数设置自定义重试策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
var (
addr = flag.String("addr", "localhost:50052", "the address to connect to")
// see https://github.com/grpc/grpc/blob/master/doc/service_config.md to know more about service config
retryPolicy = `{
"methodConfig": [{
"name": [{"service": "grpc.examples.echo.Echo"}],
"waitForReady": true,
"retryPolicy": {
"MaxAttempts": 4,
"InitialBackoff": ".01s",
"MaxBackoff": ".01s",
"BackoffMultiplier": 1.0,
"RetryableStatusCodes": [ "UNAVAILABLE" ]
},
"hedgingPolicy": {
"MaxAttempts": "",
"HedgingDelay": "",
"NonFatalStatusCodes": [""]
}
}]}`
)

// use grpc.WithDefaultServiceConfig() to set service config
func retryDial() (*grpc.ClientConn, error) {
return grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(retryPolicy))
}

首先,这里有两种重试策略 hedgingPolicy (重试) 与 retryPolicy(对冲)

重试中参数的解释

  1. MaxAttempts:最大重试次数
  2. InitialBackoff:默认退避时间
  3. MaxBackoff:最大退避时间
  4. BackoffMultiplier:退避时间增加倍率
  5. RetryableStatusCodes: 服务端返回什么错误码才重试

对冲是指在不等待响应的情况下主动发送单词调用的多个请求。通俗解释就是 如果HedgingDelay时间没有响应,那么直接发送第二次请求,以此类推,直到达到最大次数 MaxAttempts

  1. MaxAttempts:最大重试次数
  2. HedgingDelay:等待响应时间

也可以通过可选参数关闭重试策略

1
2
3
4
5
func WithDisableRetry() DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.disableRetry = true
})
}

重试策略

这两种策略都是通过实现统一的重试接口进行加载

1
2
3
type isMethodConfig_RetryOrHedgingPolicy interface {
isMethodConfig_RetryOrHedgingPolicy()
}

通过转换而进行重试测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
pushback := 0					//记录重试间隔时间
hasPushback := false //记录是否要重试
if cs.attempt.s != nil { //存在重试任务
if !cs.attempt.s.TrailersOnly() { //如果流不是只有trailer,则无法直接读取元数据
return false, err
}

sps := cs.attempt.s.Trailer()["grpc-retry-pushback-ms"] //获取元数据
if len(sps) == 1 {
var e error
if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
cs.retryThrottler.throttle() // 记录重试次数
return false, err
}
hasPushback = true //重试
} else if len(sps) > 1 { //如果存在多个 sps 表示异常
cs.retryThrottler.throttle() // 记录重试次数(用于重试失败限速)
return false, err
}
}
  1. Trailers 是一种消息元数据,用于在相应结束时传递关于次消息的附加信息,通常包含状态码,消息和错误信息的传递。Trailers 通常会在流关闭之前发送。Trailers 发送的情况有几下几种

    • RPC正常结束,Trailers中携带调用的状态、错误码、响应时间等信息;
    • RPC出错结束,Trailers中携带错误信息,例如错误码、错误消息等;
    • 服务器关闭了stream,Trailers中携带stream关闭的原因;
    • 客户端取消了RPC调用,Trailers中携带调用取消的原因

    它的特点:

    • Trailers 并不是每个RPC调用都会发,而是流关闭的时候才会发!!! 可以包含在最后一个数据帧中附加一个Trailers,所以抓包看不到一个独立的 Trailers

    • 如果流中不仅仅有Trailers,那么无法直接从流中获取 Trailers,而是需要其他办法,比如拦截器

    • 包含元数据存储在ctx中

      1
      2
      3
      ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(
      "grpc-retry-pushback-ms", "5000",
      ))
  2. 这里还多了一个 retryThrottler.throttle,在下面的重试速率中说明

  3. 如果有 没有配置 grpc-retry-pushback-ms,那么就使用重试策略,

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    var dur time.Duration
    if hasPushback { //设置了重试元数据
    dur = time.Millisecond * time.Duration(pushback)
    cs.numRetriesSincePushback = 0
    } else {
    fact := math.Pow(rp.BackoffMultiplier, float64(cs.numRetriesSincePushback))
    cur := float64(rp.InitialBackoff) * fact
    if max := float64(rp.MaxBackoff); cur > max { //最大判断
    cur = max
    }
    dur = time.Duration(grpcrand.Int63n(int64(cur)))
    cs.numRetriesSincePushback++
    }

    可以看出,退避时间随着重试次数指数级增长 InitialBackoff * math.Pow(rp.BackoffMultiplier, float64(cs.numRetriesSincePushback))

根据上述计算的退避时间执行等待逻辑

1
2
3
4
5
6
7
8
9
t := time.NewTimer(dur)	//开启一个定时器
select {
case <-t.C: //定时器触发,重试次数+1
cs.numRetries++
return false, nil
case <-cs.ctx.Done(): //流程结束停止重试
t.Stop()
return false, status.FromContextError(cs.ctx.Err()).Err()
}

重试速率

重试过程中还能看到 RetryThrottlingPolicy 做了一层重试限制,其实除了配置重试策略,还能控制重试的速率,防止重试的次数太多给服务器造成太大压力

1
2
3
4
5
6
7
8
9
10
policy := grpcutil.RetryThrottlingPolicy{
MaxTokens: 10, // 最大令牌数
TokenRate: 1, // 每秒产生的令牌数
RetryBudget: 100 * time.Millisecond, // 重试预算
}

// 创建一个 DialOption,使用上面的 RetryThrottlingPolicy
dialOption := grpc.WithRetryThrottlingPolicy(policy)

// 使用 dialOption 创建 gRPC 客户端
  • MaxTokens:最大令牌数,任何时间点同时允许的最大令牌数
  • TokenRate:每秒产生的令牌数,每个请求都会消耗一个令牌
  • RetryBudget:重试预算,制定重试操作可以使用的总时间,如果一个请求需要重试,但是时间已经超过了 RetryBudget,那么不在进行重试

限速原理

第一步:初始化速率配置

结构体如下:

1
2
3
4
5
6
7
8
type retryThrottler struct {
max float64 //最大令牌数
thresh float64 //预算
ratio float64 //每秒令牌数

mu sync.Mutex
tokens float64 //可用令牌数
}

转换规则

1
2
3
4
5
6
7
8
9
10
11
if cc.sc.retryThrottling != nil {
newThrottler := &retryThrottler{
tokens: cc.sc.retryThrottling.MaxTokens,
max: cc.sc.retryThrottling.MaxTokens,
thresh: cc.sc.retryThrottling.MaxTokens / 2,
ratio: cc.sc.retryThrottling.TokenRatio,
}
cc.retryThrottler.Store(newThrottler)
} else {
cc.retryThrottler.Store((*retryThrottler)(nil))
}

第二步:计算是否进行限速

1
2
3
4
5
6
7
8
9
10
11
12
func (rt *retryThrottler) throttle() bool {
if rt == nil {
return false
}
rt.mu.Lock()
defer rt.mu.Unlock()
rt.tokens--
if rt.tokens < 0 {
rt.tokens = 0
}
return rt.tokens <= rt.thresh
}

第三步:成功调用则更新可用 tokens

1
2
3
4
5
6
7
8
9
10
11
func (rt *retryThrottler) successfulRPC() {
if rt == nil {
return
}
rt.mu.Lock()
defer rt.mu.Unlock()
rt.tokens += rt.ratio
if rt.tokens > rt.max {
rt.tokens = rt.max
}
}

这里需要注意的是:一个流中只有只有成功调用的时候才会将增加可用 token 数量

业务重试

业务重试一般直接使用重试封装逻辑(已新建流为例)

1
2
3
4
5
op := func(a *csAttempt) error { return a.newStream() }
if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
cs.finish(err)
return nil, err
}

内部逻辑如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
cs.mu.Lock() //获取流的写锁,防止在重试过程又出现新的重试
for { //使用for循环不断尝试
if cs.committed { //流不在发送任何东西
cs.mu.Unlock()
return toRPCErr(op(cs.attempt))
}
a := cs.attempt
cs.mu.Unlock()
err := op(a) //执行操作
cs.mu.Lock()
if a != cs.attempt { //如果重试对象已经变更,则直接跳过继续下一次操作
// We started another attempt already.
continue
}
if err == io.EOF {
<-a.s.Done()
}
//执行成功的后续操作
if err == nil || (err == io.EOF && a.s.Status().Code() == codes.OK) {
onSuccess()
cs.mu.Unlock()
return err
}
//执行失败则尝试重试
if err := cs.retryLocked(err); err != nil {
cs.mu.Unlock()
return err
}
}
}
  • committed:当客户端调用 CloseSend 方法关闭发送流的时候,会标记 ClientStream为已提交,表示客户端不再发送任何消息

    有个奇怪的地方如下所示,为什么不发送任何东西还要继续 op

    1
    2
    3
    4
    if cs.committed {	//流不在发送任何东西
    cs.mu.Unlock()
    return toRPCErr(op(cs.attempt))
    }

    原因是如果是正在处理中的请求(尝试重试、取消等操作),还是会尝试执行 op并返回错误结束尝试

  • 如果在重试过程中,重试任务发生了变化,那么也会跳过直接进行下一次重试

成功

如果执行成功,那么只需要执行后续的 OnSuccess 函数即可,一般有有两种

  • 如果是 commitAttemptLocked,那么表示执行成功更新状态即可

    1
    2
    3
    4
    5
    6
    7
    func (cs *clientStream) commitAttemptLocked() {
    if !cs.committed && cs.onCommit != nil {
    cs.onCommit() //配置的重试逻辑
    }
    cs.committed = true //重试逻辑执行成功
    cs.buffer = nil //清空这个重试缓存
    }
  • 如果是 bufferForRetryLocked,那么会缓存当前的重试操作到回放缓冲区(后续发生失败的时候再执行一次)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error) {
    // Note: we still will buffer if retry is disabled (for transparent retries).
    if cs.committed { //如果重试逻辑执行成功结束
    return
    }
    cs.bufferSize += sz
    if cs.bufferSize > cs.callInfo.maxRetryRPCBufferSize { //回放缓冲区太大就不回放了
    cs.commitAttemptLocked()
    return
    }
    cs.buffer = append(cs.buffer, op)
    }

失败

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (cs *clientStream) retryLocked(lastErr error) error {
for {
cs.attempt.finish(toRPCErr(lastErr))
isTransparent, err := cs.shouldRetry(lastErr)
if err != nil {
cs.commitAttemptLocked()
return err
}
cs.firstAttempt = false
if err := cs.newAttemptLocked(isTransparent); err != nil { //创建一个新attemp
return err
}
if lastErr = cs.replayBufferLocked(); lastErr == nil { //回放缓冲区,
return nil
}
}
}
  1. 首先,当前的尝试 cs.attempt 将被标记为已完成,其结果会被传递给 cs.finish()
  2. 然后,会调用 cs.shouldRetry() 判断当前错误是否应该重试(服务端错误 DoNotTransparentRetry )
    • 如果返回的错误可以重试,则会通过 cs.newAttemptLocked() 创建新的尝试,即新的流,并在其上发起新的请求
    • 如果没有返回新的错误,则尝试从回放缓冲区 cs.replayBufferLocked() 中获取上一次重试的错误
  3. 如果在回放缓冲区中找到了错误,继续进行下一次重试,否则返回 nil 表示重试成功

前面提到了,有一种成功即使执行成功也会加入到 buffer中,在执行失败的之后重新再执行一遍

1
2
3
4
5
6
7
8
9
func (cs *clientStream) replayBufferLocked() error {
a := cs.attempt
for _, f := range cs.buffer {
if err := f(a); err != nil {
return err
}
}
return nil
}

是在构建流的时候加入的,如果构建流成功,那么会将构建流的函数加入到buffer,如果后续有一次临时性的失败,会将所有成功的构建流全部执行一遍。为什么其中一个创建流失败之后,需要将其他创建成功的逻辑都进行重试?

答:如果一次失败了,那么执行之前的操作。前提是能够保证和之前成功的那次执行得到相同的结果。这里创建流与发送消息都会将成功的操作放入到缓冲区中,当消息发送完毕,那么这个流就结束了。所以在发送过程中,如果其中一个帧发送失败,那么它会将从流构建到消息发送再次传输一遍(相当于一个新的请求)

总结

  1. 重试机制 withRetry 包含的业务
    • 建立流链接
    • 发送头帧
    • 发送消息
    • 接收消息
    • 关闭发送
  2. 服务端响应的时候并没有重试机制
  3. 重试速率限制是针对整个连接而不是每个流一个
  4. 同时执行一个重试任务,并且执行的都是最新的那个
  5. 只有重试成功,才会更新令牌tokens数量。那是不是影响其他调用的重试,会但影响不大。第一是因为只要有一个成功,那么token数量都会更新。第二是重试次数过多,单次调用会因为超时停止重试。
  6. 如果在发送过程中,某一个帧发送失败,那么它会从创建流开始重新发起调用

参考文档

  1. https://blog.csdn.net/u011582922/article/details/120578941