Grpc-02-流

上一节中,不管是单次调用还是流式调用都会有一个流的概念,这里分析一下流作用与实现原理

回顾协议说明,在 HTTP/2 中,二进制分帧之后,HTTP /2 不再依赖 TCP 链接去实现多流并行,

  • 同域名下所有通信都在单个连接上完成。
  • 单个连接可以承载任意数量的双向数据流。
  • 数据流以消息的形式发送,而消息又由一个或多个帧组成,多个帧之间可以乱序发送,因为根据帧首部的流标识可以重新组装。

二进制分帧与grpc分片的区别:

  • 实现方式:TCP 分包是在传输层进行的,即将一个大的数据包分割成多个小的 TCP 包进行传输。而 gRPC 的二进制分帧是在应用层进行的,即将一个大的数据流分割成多个小的 gRPC 帧进行传输。

  • 作用不同:TCP 分包的主要作用是将一个大的数据包分割成多个小的 TCP 包进行传输,以避免网络传输时发生丢包或拥塞等问题。而 gRPC 的二进制分帧的主要作用是将一个大的数据流分割成多个小的 gRPC 帧进行传输,以避免在传输大量数据时需要等待全部数据传输完成的问题。此外,gRPC 的二进制分帧还可以实现多路复用和流控等功能,从而提高数据传输的效率和可靠性。

  • 数据格式不同:TCP 分包只是将一个大的数据包分割成多个小的 TCP 包进行传输,数据格式不变。而 gRPC 的二进制分帧将一个大的数据流分割成多个小的 gRPC 帧进行传输,每个 gRPC 帧都是由消息头和消息体组成的二进制数据,其中消息头包含了帧的元数据信息,如帧的长度、类型、标识符等。

尽管 gRPC 的二进制分帧和 TCP 的分包是不同的概念,但是它们都是为了将一个大的数据流分割成多个小块进行传输,从而提高数据传输的效率和可靠性。

实现原理

代码路径:stream.go

构建一个流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
//...数据搜集

//第一次调用的时候等待需要等待地址解析完成
if err := cc.waitForResolvedAddrs(ctx); err != nil {
return nil, err
}

var mc serviceconfig.MethodConfig
var onCommit func()
var newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
}

//... 如果有拦截器,则构建拦截器的流

return newStream(ctx, func() {})
}

拦截器后续再看,这里需要进一步查看创建流的过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
c := defaultCallInfo()
if mc.WaitForReady != nil {
c.failFast = !*mc.WaitForReady
}

//...

callHdr := &transport.CallHdr{
Host: cc.authority,
Method: method,
ContentSubtype: c.contentSubtype,
DoneFunc: doneFunc,
}

//...

cs := &clientStream{
//...
}
//...

if err := cs.newAttemptLocked(false /* isTransparent */); err != nil {
cs.finish(err)
return nil, err
}

op := func(a *csAttempt) error { return a.newStream() }
if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
cs.finish(err)
return nil, err
}

//...

if desc != unaryStreamDesc { //非一元调用
go func() {
select {
case <-cc.ctx.Done():
cs.finish(ErrClientConnClosing)
case <-ctx.Done():
cs.finish(toRPCErr(ctx.Err()))
}
}()
}
return cs, nil
}

核心有两点

  1. 构建流的过程 使用的是 http2_client库,这个时候才会去发送 头帧

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    type HeaderField struct {
    Name, Value string

    // Sensitive means that this header field should never be
    // indexed.
    Sensitive bool
    }

    //头帧
    hdr := &headerFrame{
    hf: headerFields, //切片 键值对
    endStream: false,
    initStream: func(id uint32) error {
    t.mu.Lock()
    if state := t.state; state != reachable {
    t.mu.Unlock()
    // Do a quick cleanup.
    err := error(errStreamDrain)
    if state == closing {
    err = ErrConnClosing
    }
    cleanup(err)
    return err
    }
    t.activeStreams[id] = s //加入到 t 的activeStreams中
    //...
    t.mu.Unlock()
    return nil
    },
    onOrphaned: cleanup,
    wq: s.wq,
    }
  2. 如果非一元调用,那么就需要使用一个独立协程等待流结束,关闭流

发送消息

流构建成功之后,直接调用流的发送消息逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (cs *clientStream) SendMsg(m interface{}) (err error) {
defer func() {
if err != nil && err != io.EOF {
// Call finish on the client stream for errors generated by this SendMsg
// call, as these indicate problems created by this client. (Transport
// errors are converted to an io.EOF error in csAttempt.sendMsg; the real
// error will be returned from RecvMsg eventually in that case, or be
// retried.)
cs.finish(err)
}
}()
if cs.sentLast {
return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
}

//如果是客户端流
if !cs.desc.ClientStreams {
cs.sentLast = true
}

//将数据准备为三部分 头 + 负载 + data
hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
if err != nil {
return err
}

//... 发送大小限制判断

msgBytes := data // Store the pointer before setting to nil. For binary logging.
op := func(a *csAttempt) error {
err := a.sendMsg(m, hdr, payload, data)
// nil out the message and uncomp when replaying; they are only needed for
// stats which is disabled for subsequent attempts.
m, data = nil, nil
return err
}
//...使用重试机制进行消息发送
err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })

return
}
消息准备

将消息体转换为发送前的两部分

1
2
3
4
5
6
7
8
9
10
11
12
func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor) (hdr, payload, data []byte, err error) {
//消息重试中可以直接使用
if preparedMsg, ok := m.(*PreparedMsg); ok {
return preparedMsg.hdr, preparedMsg.payload, preparedMsg.encodedData, nil
}

//1. 消息序列化
//2. 消息压缩
//3. 消息头部与负载
hdr, payload = msgHeader(data, compData)
return hdr, payload, data, nil
}

消息发送前的准备最重要的就是 msgHeader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func msgHeader(data, compData []byte) (hdr []byte, payload []byte) {
hdr = make([]byte, headerLen) // headerLen 5字节
//是否压缩
if compData != nil {
hdr[0] = byte(compressionMade)
data = compData
} else {
hdr[0] = byte(compressionNone)
}
//四个字节用来记录数据长度,并进行大小端准换
// Write length of payload into buf
binary.BigEndian.PutUint32(hdr[payloadLen:], uint32(len(data)))
return hdr, data
}

在大小端中,字节的存储顺序是按照其地址的高低位来决定的。

  • **大端模式(Big Endian)**是指将高位字节存储在低地址处,低位字节存储在高地址处
  • **小端模式(Little Endian)**则是将低位字节存储在低地址处,高位字节存储在高地址处

举个例子,对于一个 4 字节的整数 0x12345678,它在大端模式下的存储顺序为 0x12 0x34 0x56 0x78,而在小端模式下的存储顺序为 0x78 0x56 0x34 0x12

消息发送
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
// should proceed only if Write returns nil.
func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
df := &dataFrame{
streamID: s.id,
endStream: opts.Last,
}
if hdr != nil || data != nil { // If it's not an empty data frame.
// Add some data to grpc message header so that we can equally
// distribute bytes across frames.
emptyLen := http2MaxFrameLen - len(hdr)
if emptyLen > len(data) {
emptyLen = len(data)
}
hdr = append(hdr, data[:emptyLen]...)
data = data[emptyLen:]
df.h, df.d = hdr, data
// TODO(mmukhi): The above logic in this if can be moved to loopyWriter's data handler.
if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
return err
}
}
return t.controlBuf.put(df)
}
  1. hdr + body 组建成数据帧,然后放入帧发送缓存器中
消息的响应
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (cs *clientStream) RecvMsg(m interface{}) error {
//...
var recvInfo *payloadInfo
if cs.binlog != nil {
recvInfo = &payloadInfo{}
}
err := cs.withRetry(func(a *csAttempt) error {
return a.recvMsg(m, recvInfo)
}, cs.commitAttemptLocked)
//...
if err != nil || !cs.desc.ServerStreams {
// err != nil or non-server-streaming indicates end of stream.
cs.finish(err)

//...
}
return err
}

实际的读取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byte, err error) {
if _, err := p.r.Read(p.header[:]); err != nil { //先读取头
return 0, nil, err
}

pf = payloadFormat(p.header[0])
length := binary.BigEndian.Uint32(p.header[1:]) //大小端转换

if length == 0 {
return pf, nil, nil
}
if int64(length) > int64(maxInt) {
return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max length allowed on current machine (%d vs. %d)", length, maxInt)
}
if int(length) > maxReceiveMessageSize { //长度限制
return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize)
}

msg = make([]byte, int(length)) //body读取
if _, err := p.r.Read(msg); err != nil {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
return 0, nil, err
}
return pf, msg, nil
}

与发送类似,发送分为四步

  1. 读取帧的头信息(不是头帧,而是每个帧都有头信息)
  2. 大小端转换
  3. 长度限制
  4. 根据帧头的长度阅读消息

然后通过解压缩(非必要)与反序列化转化为对应的消息,那么通信就完成了

接收消息

流的处理

上面是流处理消息的流程,服务端的流是怎么建立的呢?从前面可以知道,客户端建流其实是发送一个头帧 headerFrame 过去,那么,服务端就需要根据头帧建立不同的流

1
2
3
4
5
6
7
func (s *Server) initServerWorkers() {
s.serverWorkerChannels = make([]chan *serverWorkerData, s.opts.numServerWorkers)
for i := uint32(0); i < s.opts.numServerWorkers; i++ {
s.serverWorkerChannels[i] = make(chan *serverWorkerData)
go s.serverWorker(s.serverWorkerChannels[i])
}
}

它是通过 serverWorkerChannels 的隧道,来进行流的处理的,而在协程的处理中,首先由一个专门的函数将数据传输到 serverWorkerChannels 隧道中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
st.HandleStreams(func(stream *transport.Stream) {
wg.Add(1)
if s.opts.numServerWorkers > 0 {
data := &serverWorkerData{st: st, wg: &wg, stream: stream}
select {
case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data:
default:
// 如果隧道是满的,那么再起一个协程进行流的处理
go func() {
s.handleStream(st, stream, s.traceInfo(st, stream))
wg.Done()
}()
}
} else { //单消费者模式
go func() {
defer wg.Done()
s.handleStream(st, stream, s.traceInfo(st, stream))
}()
}
}, //...)

那么数据又是如何转换为流的呢

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
defer close(t.readerDone)
for {
t.controlBuf.throttle()
frame, err := t.framer.fr.ReadFrame() //循环读取帧
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano()) //记录最近一次读取时间
if err != nil {
if se, ok := err.(http2.StreamError); ok { //如果发生流错误
if logger.V(logLevel) {
logger.Warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
}
t.mu.Lock()
s := t.activeStreams[se.StreamID] //拿出来关掉
t.mu.Unlock()
if s != nil {
t.closeStream(s, true, se.Code, false)
} else {
t.controlBuf.put(&cleanupStream{
streamID: se.StreamID,
rst: true,
rstCode: se.Code,
onWrite: func() {},
})
}
continue
}
//其他错误
t.Close()
return
}
//不同类型帧的处理
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
if t.operateHeaders(frame, handle, traceCtx) {
t.Close()
break
}
case *http2.DataFrame:
t.handleData(frame)
//... 不同帧的处理
}
}
}

注意事项:

  1. 流错误只会影响一条流,关闭流之后 发送 cleanupStream的帧,StreamError 的目的就是记录流错误

    1
    2
    3
    4
    5
    6
    7
    // StreamError is an error that only affects one stream within an
    // HTTP/2 connection.
    type StreamError struct {
    StreamID uint32
    Code ErrCode
    Cause error // optional additional detail
    }

来看一下头帧(构建流的时候作用)的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
// Acquire max stream ID lock for entire duration
t.maxStreamMu.Lock()
defer t.maxStreamMu.Unlock()

streamID := frame.Header().StreamID //流id

// 当头部大小超过限制,那么服务端会直接清理掉这条流
if frame.Truncated {
t.controlBuf.put(&cleanupStream{
streamID: streamID,
rst: true,
rstCode: http2.ErrCodeFrameSize,
onWrite: func() {},
})
return false
}

//流ID必定是单数,且大于当前最大流ID
if streamID%2 != 1 || streamID <= t.maxStreamID {
return true
}
t.maxStreamID = streamID

buf := newRecvBuffer()
s := &Stream{
id: streamID,
st: t,
buf: buf,
fc: &inFlow{limit: uint32(t.initialWindowSize)},
}
//...

for _, hf := range frame.Fields {
//解析 fields
}
//通过头部field帧 grpc-timeout 设置超时时间
if timeoutSet {
s.ctx, s.cancel = context.WithTimeout(t.ctx, timeout)
} else {
s.ctx, s.cancel = context.WithCancel(t.ctx)
}

// Attach the received metadata to the context.
//跟踪
//...

t.activeStreams[streamID] = s //流加入到 活动流 中
//...

// Register the stream with loopy. //注册流的帧
t.controlBuf.put(&registerStream{
streamID: s.id,
wq: s.wq,
})
handle(s) //处理流
return false
}

问题1streamID%2 != 1 为什么要限制为单数,

答:防止两端流ID冲突,客户端发起的流具有奇数ID,服务器端发起的流具有偶数ID

可以看到,收到头枕之后会加入到帧缓冲器中,其实也是在服务端会转换为一个 registerStream 的帧放入到帧缓冲器中

消息的处理

首先,消息是通过服务端work对应的隧道进行生产者-消费者处理的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
sm := stream.Method()
//解析 服务与方法
service := sm[:pos]
method := sm[pos+1:]

srv, knownService := s.services[service] //查找proto定义的犯法进行处理
if knownService {
if md, ok := srv.methods[method]; ok {
s.processUnaryRPC(t, stream, srv, md, trInfo)
return
}
if sd, ok := srv.streams[method]; ok {
s.processStreamingRPC(t, stream, srv, sd, trInfo)
return
}
}
//未知的service 或者 method 异常情况处理
//信息记录
}

通过 processUnaryRPC 进入到定义中,只需要关心 reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt) 执行业务逻辑

总结

  1. 每个RPC调用都可以看做一个stream,客户端发送请求会创建一个新的流,服务器也会创建流进行消息响应

  2. 流的状态流转

    1
    2
    3
    4
    5
    6
    7
    const (
    stateIdle streamState = iota
    stateOpen
    stateHalfClosedLocal
    stateHalfClosedRemote
    stateClosed
    )

参考文档

  1. https://blog.csdn.net/u011582922/article/details/120578941