Grpc-08-超时处理

grpc-go 通信的超时处理有两种方式

  1. 业务出现异常或判断处理超时主动取消
  2. grpc-go 存在截止时间,超过截止时间自动停止

带着问题看世界

  1. 取消发生在不同时机是如何处理的
    • 链路建立阶段
    • 客户端发送阶段
    • 请求传输阶段
    • 服务端处理阶段
    • 消息响应阶段
  2. 截止时间又是如何做到的
    • 服务端是如何在截止时间停止的

主动取消

基本使用

客户端使用context并创建双向流,发送两条之后取消再次发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
stream, err := c.BidirectionalStreamingEcho(ctx)
if err != nil {
log.Fatalf("error creating stream: %v", err)
}

// Send some test messages.
if err := sendMessage(stream, "hello"); err != nil {
log.Fatalf("error sending on stream: %v", err)
}
if err := sendMessage(stream, "world"); err != nil {
log.Fatalf("error sending on stream: %v", err)
}

// Ensure the RPC is working.
recvMessage(stream, codes.OK)
recvMessage(stream, codes.OK)

fmt.Println("cancelling context")
cancel()

//失败
sendMessage(stream, "closed")

服务端则是响应流的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (s *server) BidirectionalStreamingEcho(stream pb.Echo_BidirectionalStreamingEchoServer) error {
for {
in, err := stream.Recv()
if err != nil {
fmt.Printf("server: error receiving from stream: %v\n", err)
if err == io.EOF {
return nil
}
return err
}
fmt.Printf("echoing message %q\n", in.Message)
stream.Send(&pb.EchoResponse{Message: in.Message})
}
}

实现原理

客户端

从调用方法的内部实现,超时控制大致分为三个阶段 构建流阶段、发送消息阶段、接收消息阶段

1
2
3
4
5
6
7
8
9
10
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
if err != nil {
return err
}
if err := cs.SendMsg(req); err != nil {
return err
}
return cs.RecvMsg(reply)
}

根据实际情况将通过一下几个阶段进行分析

  • 构建流阶段

  • 客户端发送阶段

  • 请求传输阶段

  • 服务端处理阶段

  • 消息响应阶段

构建流
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
//...
var cancel context.CancelFunc
if mc.Timeout != nil && *mc.Timeout >= 0 {
ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
} else {
ctx, cancel = context.WithCancel(ctx)
}
defer func() {
if err != nil {
cancel()
}
}()
//...
//如果一元调用
if desc != unaryStreamDesc { //var unaryStreamDesc = &StreamDesc{ServerStreams: false, ClientStreams: false}
go func() { //使用协程来清理结束时候的流
select {
case <-cc.ctx.Done():
cs.finish(ErrClientConnClosing)
case <-ctx.Done():
cs.finish(toRPCErr(ctx.Err()))
}
}()
}
}

如果配置了超时时间则使用超时时间,否则默认ctx,这个超时时间以前是为了针对 WithTimeout,后来被 DialContext替代

接着,将超时时间封装到一个流的头帧的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
//...
//解析成头帧的KV字段
ctx = peer.NewContext(ctx, t.getPeer())
headerFields, err := t.createHeaderFields(ctx, callHdr)
if err != nil {
return nil, err
}
s := t.newStream(ctx, callHdr)
//...
//封装到头帧中
hdr := &headerFrame{
hf: headerFields,
//...
}
for {
//发送头帧
success, err := t.controlBuf.executeAndPut(func(it interface{}) bool {
if !checkForStreamQuota(it) {
return false
}
if !checkForHeaderListSize(it) {
return false
}
return true
}, hdr)

//...
select {
case <-ch: //配额没有了
case <-s.ctx.Done(): //流结束
return nil, ContextErr(s.ctx.Err())
case <-t.goAway: //返回关闭流的帧
return nil, errStreamDrain
case <-t.ctx.Done(): //连接结束
return nil, ErrConnClosing
}
}
}

构建流的过程中,只发送到队列中,那么流就算发送成功,

所以在构建流的过程中,如果触发 context.Done(),那么

  1. 首先会返回 context中的错误
  2. 其次更新状态流的状态
服务端

原理分析

客户端向服务端请求的一个完整阶段,需要经历:

  1. 链路建立阶段
  2. 流建立阶段
  3. 数据发送阶段
  4. 数据接收阶段

在创建完客户端流clientStream后,取消功能以异步方式启动,开始监听是否有取消指令,有的话,就开始执行取消指令。因此,取消功能可以发生在数据发送阶段和数据接收阶段,且两个阶段的流程是一样的

  1. 取消是流级别的而不是链路级别的,也就是可以用于取消单个请求
  2. 当客户端主动发起 cancel请求的时候 会构建 cleanupStream的请求关闭流,并发送RST帧,将流ID,取消状态码封装到RST帧中,发送给服务端
  3. 服务端收到RST帧之后,会停止读取 stream 中的数据,并移除对应的流

截止时间

截止时间与cancel函数的原理类似,主动取消是通过

参考文档

  1. https://blog.csdn.net/u011582922/article/details/119834758
  2. https://blog.csdn.net/u011582922/article/details/119944259