func(cs *clientStream)SendMsg(m interface{})(err error) { deferfunc() { if err != nil && err != io.EOF { // Call finish on the client stream for errors generated by this SendMsg // call, as these indicate problems created by this client. (Transport // errors are converted to an io.EOF error in csAttempt.sendMsg; the real // error will be returned from RecvMsg eventually in that case, or be // retried.) cs.finish(err) } }() if cs.sentLast { return status.Errorf(codes.Internal, "SendMsg called after CloseSend") } //如果是客户端流 if !cs.desc.ClientStreams { cs.sentLast = true }
//... 发送大小限制判断 msgBytes := data // Store the pointer before setting to nil. For binary logging. op := func(a *csAttempt)error { err := a.sendMsg(m, hdr, payload, data) // nil out the message and uncomp when replaying; they are only needed for // stats which is disabled for subsequent attempts. m, data = nil, nil return err } //...使用重试机制进行消息发送 err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) }) return }
消息准备
将消息体转换为发送前的两部分
1 2 3 4 5 6 7 8 9 10 11 12
funcprepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor)(hdr, payload, data []byte, err error) { //消息重试中可以直接使用 if preparedMsg, ok := m.(*PreparedMsg); ok { return preparedMsg.hdr, preparedMsg.payload, preparedMsg.encodedData, nil }
// Write formats the data into HTTP2 data frame(s) and sends it out. The caller // should proceed only if Write returns nil. func(t *http2Client)Write(s *Stream, hdr []byte, data []byte, opts *Options)error { df := &dataFrame{ streamID: s.id, endStream: opts.Last, } if hdr != nil || data != nil { // If it's not an empty data frame. // Add some data to grpc message header so that we can equally // distribute bytes across frames. emptyLen := http2MaxFrameLen - len(hdr) if emptyLen > len(data) { emptyLen = len(data) } hdr = append(hdr, data[:emptyLen]...) data = data[emptyLen:] df.h, df.d = hdr, data // TODO(mmukhi): The above logic in this if can be moved to loopyWriter's data handler. if err := s.wq.get(int32(len(hdr) + len(data))); err != nil { return err } } return t.controlBuf.put(df) }
将hdr + body 组建成数据帧,然后放入帧发送缓存器中
消息的响应
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
func(cs *clientStream)RecvMsg(m interface{})error { //... var recvInfo *payloadInfo if cs.binlog != nil { recvInfo = &payloadInfo{} } err := cs.withRetry(func(a *csAttempt)error { return a.recvMsg(m, recvInfo) }, cs.commitAttemptLocked) //... if err != nil || !cs.desc.ServerStreams { // err != nil or non-server-streaming indicates end of stream. cs.finish(err)
// StreamError is an error that only affects one stream within an // HTTP/2 connection. type StreamError struct { StreamID uint32 Code ErrCode Cause error // optional additional detail }