重试机制是为了在短暂异常情况下能够正常恢复,也能防止长时间异常而不退出。这里看一下grpc-go 中的重试机制以及使用方法
带着问题看世界:
- grpc-go 的重试策略的使用
- grpc-go 的重试机制是怎样的
- grpc-go 的重试机制都应用在哪些地方
基本使用
通过 Dial
可选参数设置自定义重试策略
1 | var ( |
首先,这里有两种重试策略 hedgingPolicy
(重试) 与 retryPolicy
(对冲)
重试中参数的解释
MaxAttempts
:最大重试次数InitialBackoff
:默认退避时间MaxBackoff
:最大退避时间BackoffMultiplier
:退避时间增加倍率RetryableStatusCodes
: 服务端返回什么错误码才重试
对冲是指在不等待响应的情况下主动发送单词调用的多个请求。通俗解释就是 如果HedgingDelay
时间没有响应,那么直接发送第二次请求,以此类推,直到达到最大次数 MaxAttempts
MaxAttempts
:最大重试次数HedgingDelay
:等待响应时间
也可以通过可选参数关闭重试策略
1 | func WithDisableRetry() DialOption { |
重试策略
这两种策略都是通过实现统一的重试接口进行加载
1 | type isMethodConfig_RetryOrHedgingPolicy interface { |
通过转换而进行重试测试
1 | pushback := 0 //记录重试间隔时间 |
-
Trailers
是一种消息元数据,用于在相应结束时传递关于次消息的附加信息,通常包含状态码,消息和错误信息的传递。Trailers 通常会在流关闭之前发送。Trailers
发送的情况有几下几种- RPC正常结束,
Trailers
中携带调用的状态、错误码、响应时间等信息; - RPC出错结束,
Trailers
中携带错误信息,例如错误码、错误消息等; - 服务器关闭了stream,
Trailers
中携带stream关闭的原因; - 客户端取消了RPC调用,
Trailers
中携带调用取消的原因
它的特点:
-
Trailers
并不是每个RPC调用都会发,而是流关闭的时候才会发!!! 可以包含在最后一个数据帧中附加一个Trailers
,所以抓包看不到一个独立的Trailers
。 -
如果流中不仅仅有
Trailers
,那么无法直接从流中获取Trailers
,而是需要其他办法,比如拦截器 -
包含元数据存储在ctx中
1
2
3ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(
"grpc-retry-pushback-ms", "5000",
))
- RPC正常结束,
-
这里还多了一个
retryThrottler.throttle
,在下面的重试速率中说明 -
如果有 没有配置
grpc-retry-pushback-ms
,那么就使用重试策略,1
2
3
4
5
6
7
8
9
10
11
12
13var dur time.Duration
if hasPushback { //设置了重试元数据
dur = time.Millisecond * time.Duration(pushback)
cs.numRetriesSincePushback = 0
} else {
fact := math.Pow(rp.BackoffMultiplier, float64(cs.numRetriesSincePushback))
cur := float64(rp.InitialBackoff) * fact
if max := float64(rp.MaxBackoff); cur > max { //最大判断
cur = max
}
dur = time.Duration(grpcrand.Int63n(int64(cur)))
cs.numRetriesSincePushback++
}可以看出,退避时间随着重试次数指数级增长
InitialBackoff * math.Pow(rp.BackoffMultiplier, float64(cs.numRetriesSincePushback))
根据上述计算的退避时间执行等待逻辑
1 | t := time.NewTimer(dur) //开启一个定时器 |
重试速率
重试过程中还能看到 RetryThrottlingPolicy
做了一层重试限制,其实除了配置重试策略,还能控制重试的速率,防止重试的次数太多给服务器造成太大压力
1 | policy := grpcutil.RetryThrottlingPolicy{ |
MaxTokens
:最大令牌数,任何时间点同时允许的最大令牌数TokenRate
:每秒产生的令牌数,每个请求都会消耗一个令牌RetryBudget
:重试预算,制定重试操作可以使用的总时间,如果一个请求需要重试,但是时间已经超过了RetryBudget
,那么不在进行重试
限速原理
第一步:初始化速率配置
结构体如下:
1 | type retryThrottler struct { |
转换规则
1 | if cc.sc.retryThrottling != nil { |
第二步:计算是否进行限速
1 | func (rt *retryThrottler) throttle() bool { |
第三步:成功调用则更新可用 tokens
1 | func (rt *retryThrottler) successfulRPC() { |
这里需要注意的是:一个流中只有只有成功调用的时候才会将增加可用 token
数量
业务重试
业务重试一般直接使用重试封装逻辑(已新建流为例)
1 | op := func(a *csAttempt) error { return a.newStream() } |
内部逻辑如下
1 | func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error { |
-
committed
:当客户端调用CloseSend
方法关闭发送流的时候,会标记ClientStream
为已提交,表示客户端不再发送任何消息有个奇怪的地方如下所示,为什么不发送任何东西还要继续
op
1
2
3
4if cs.committed { //流不在发送任何东西
cs.mu.Unlock()
return toRPCErr(op(cs.attempt))
}原因是如果是正在处理中的请求(尝试重试、取消等操作),还是会尝试执行
op
并返回错误结束尝试 -
如果在重试过程中,重试任务发生了变化,那么也会跳过直接进行下一次重试
成功
如果执行成功,那么只需要执行后续的 OnSuccess
函数即可,一般有有两种
-
如果是
commitAttemptLocked
,那么表示执行成功更新状态即可1
2
3
4
5
6
7func (cs *clientStream) commitAttemptLocked() {
if !cs.committed && cs.onCommit != nil {
cs.onCommit() //配置的重试逻辑
}
cs.committed = true //重试逻辑执行成功
cs.buffer = nil //清空这个重试缓存
} -
如果是
bufferForRetryLocked
,那么会缓存当前的重试操作到回放缓冲区(后续发生失败的时候再执行一次)1
2
3
4
5
6
7
8
9
10
11
12func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error) {
// Note: we still will buffer if retry is disabled (for transparent retries).
if cs.committed { //如果重试逻辑执行成功结束
return
}
cs.bufferSize += sz
if cs.bufferSize > cs.callInfo.maxRetryRPCBufferSize { //回放缓冲区太大就不回放了
cs.commitAttemptLocked()
return
}
cs.buffer = append(cs.buffer, op)
}
失败
1 | func (cs *clientStream) retryLocked(lastErr error) error { |
- 首先,当前的尝试
cs.attempt
将被标记为已完成,其结果会被传递给cs.finish()
- 然后,会调用
cs.shouldRetry()
判断当前错误是否应该重试(服务端错误DoNotTransparentRetry
)- 如果返回的错误可以重试,则会通过
cs.newAttemptLocked()
创建新的尝试,即新的流,并在其上发起新的请求 - 如果没有返回新的错误,则尝试从回放缓冲区
cs.replayBufferLocked()
中获取上一次重试的错误
- 如果返回的错误可以重试,则会通过
- 如果在回放缓冲区中找到了错误,继续进行下一次重试,否则返回 nil 表示重试成功
前面提到了,有一种成功即使执行成功也会加入到 buffer中,在执行失败的之后重新再执行一遍
1 | func (cs *clientStream) replayBufferLocked() error { |
是在构建流的时候加入的,如果构建流成功,那么会将构建流的函数加入到buffer
,如果后续有一次临时性的失败,会将所有成功的构建流全部执行一遍。为什么其中一个创建流失败之后,需要将其他创建成功的逻辑都进行重试?
答:如果一次失败了,那么执行之前的操作。前提是能够保证和之前成功的那次执行得到相同的结果。这里创建流与发送消息都会将成功的操作放入到缓冲区中,当消息发送完毕,那么这个流就结束了。所以在发送过程中,如果其中一个帧发送失败,那么它会将从流构建到消息发送再次传输一遍(相当于一个新的请求)
总结
- 重试机制
withRetry
包含的业务- 建立流链接
- 发送头帧
- 发送消息
- 接收消息
- 关闭发送
- 服务端响应的时候并没有重试机制
- 重试速率限制是针对整个连接而不是每个流一个
- 同时执行一个重试任务,并且执行的都是最新的那个
- 只有重试成功,才会更新令牌
tokens
数量。那是不是影响其他调用的重试,会但影响不大。第一是因为只要有一个成功,那么token数量都会更新。第二是重试次数过多,单次调用会因为超时停止重试。 - 如果在发送过程中,某一个帧发送失败,那么它会从创建流开始重新发起调用