grpc-go
通信的超时处理有两种方式
- 业务出现异常或判断处理超时主动取消
grpc-go
存在截止时间,超过截止时间自动停止
带着问题看世界
- 取消发生在不同时机是如何处理的
- 链路建立阶段
- 客户端发送阶段
- 请求传输阶段
- 服务端处理阶段
- 消息响应阶段
- 截止时间又是如何做到的
主动取消
基本使用
客户端使用context
并创建双向流,发送两条之后取消再次发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) stream, err := c.BidirectionalStreamingEcho(ctx) if err != nil { log.Fatalf("error creating stream: %v", err) }
if err := sendMessage(stream, "hello"); err != nil { log.Fatalf("error sending on stream: %v", err) } if err := sendMessage(stream, "world"); err != nil { log.Fatalf("error sending on stream: %v", err) }
recvMessage(stream, codes.OK) recvMessage(stream, codes.OK)
fmt.Println("cancelling context") cancel()
sendMessage(stream, "closed")
|
服务端则是响应流的处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| func (s *server) BidirectionalStreamingEcho(stream pb.Echo_BidirectionalStreamingEchoServer) error { for { in, err := stream.Recv() if err != nil { fmt.Printf("server: error receiving from stream: %v\n", err) if err == io.EOF { return nil } return err } fmt.Printf("echoing message %q\n", in.Message) stream.Send(&pb.EchoResponse{Message: in.Message}) } }
|
实现原理
客户端
从调用方法的内部实现,超时控制大致分为三个阶段 构建流阶段、发送消息阶段、接收消息阶段
1 2 3 4 5 6 7 8 9 10
| func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error { cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...) if err != nil { return err } if err := cs.SendMsg(req); err != nil { return err } return cs.RecvMsg(reply) }
|
根据实际情况将通过一下几个阶段进行分析
-
构建流阶段
-
客户端发送阶段
-
请求传输阶段
-
服务端处理阶段
-
消息响应阶段
构建流
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) { var cancel context.CancelFunc if mc.Timeout != nil && *mc.Timeout >= 0 { ctx, cancel = context.WithTimeout(ctx, *mc.Timeout) } else { ctx, cancel = context.WithCancel(ctx) } defer func() { if err != nil { cancel() } }() if desc != unaryStreamDesc { go func() { select { case <-cc.ctx.Done(): cs.finish(ErrClientConnClosing) case <-ctx.Done(): cs.finish(toRPCErr(ctx.Err())) } }() } }
|
如果配置了超时时间则使用超时时间,否则默认ctx,这个超时时间以前是为了针对 WithTimeout
,后来被 DialContext
替代
接着,将超时时间封装到一个流的头帧的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) { ctx = peer.NewContext(ctx, t.getPeer()) headerFields, err := t.createHeaderFields(ctx, callHdr) if err != nil { return nil, err } s := t.newStream(ctx, callHdr) hdr := &headerFrame{ hf: headerFields, } for { success, err := t.controlBuf.executeAndPut(func(it interface{}) bool { if !checkForStreamQuota(it) { return false } if !checkForHeaderListSize(it) { return false } return true }, hdr) select { case <-ch: case <-s.ctx.Done(): return nil, ContextErr(s.ctx.Err()) case <-t.goAway: return nil, errStreamDrain case <-t.ctx.Done(): return nil, ErrConnClosing } } }
|
构建流的过程中,只发送到队列中,那么流就算发送成功,
所以在构建流的过程中,如果触发 context.Done()
,那么
- 首先会返回
context
中的错误
- 其次更新状态流的状态
服务端
原理分析
客户端向服务端请求的一个完整阶段,需要经历:
- 链路建立阶段
- 流建立阶段
- 数据发送阶段
- 数据接收阶段
在创建完客户端流clientStream
后,取消功能以异步方式启动,开始监听是否有取消指令,有的话,就开始执行取消指令。因此,取消功能可以发生在数据发送阶段和数据接收阶段,且两个阶段的流程是一样的
- 取消是流级别的而不是链路级别的,也就是可以用于取消单个请求
- 当客户端主动发起 cancel请求的时候 会构建
cleanupStream
的请求关闭流,并发送RST帧,将流ID,取消状态码封装到RST帧中,发送给服务端
- 服务端收到
RST
帧之后,会停止读取 stream 中的数据,并移除对应的流
截止时间
截止时间与cancel函数的原理类似,主动取消是通过
参考文档
- https://blog.csdn.net/u011582922/article/details/119834758
- https://blog.csdn.net/u011582922/article/details/119944259