Grpc-03-帧

从前面可知,流单独负责一次调用,并且在服务端使用多路复用实现消息的处理。流其实是通过 ID 标志的虚拟概念,真正传输的其实是帧,帧通过发送器与帧接收器进行发送处理。

带着问题看世界:

  1. 帧的类型有哪些,具体结构是怎样的
  2. 当消息体过大的时候分帧是如何处理的
  3. 当帧的缓冲区是否也会满,什么时候清理

在 grpc-go 中,通过对 HTTP/2 中协议的封装,增加了grpc-go自己的业务类型。看源码的时候注意不要弄混了。这里说的协议帧 是 HTTP/2 中定义实际传输过程中的帧 是由 net 库实现的;而应用帧则是 grpc-go 根据自己的实际需要由转换了一层

协议帧

每个协议帧都是由头部负载两个部分组成

头部

协议帧指的就是HTTP2中帧的结构,每个帧也分为头部与负载

1
2
3
+--------+--------+--------+--------+--------+--------+--------+--------+
| Length(24) |Type(8) |Flags(8)|R| StreamID(31) |
+--------+--------+--------+--------+--------+--------+--------+--------+

虽然Length 是24位,但这不意味着就能处理 2^24 16M大小的帧,一般是默认只支持2^16 16k以下的帧,而2^16 - 2^24 16M 的帧 需要接收端公布自己可以处理这么大的帧,需要在 SETTINGS_MAX_FRAME_SIZE 帧中告知

将二进制buffer转换为帧头部结构

1
2
3
4
5
6
7
type FrameHeader struct {
valid bool // 是否有效,供内部使用
Type FrameType // 帧的类型
Flags Flags // 标志位
Length uint32 // 帧的负载长度(即不包括头部)
StreamID uint32 // 流ID,有的帧没有流,那么StreamID == 0
}

所以头部的整体解析流程就变成了

1
2
3
4
5
6
7
8
9
10
11
12
13
func readFrameHeader(buf []byte, r io.Reader) (FrameHeader, error) {
_, err := io.ReadFull(r, buf[:frameHeaderLen]) //读取9B到buf中
if err != nil {
return FrameHeader{}, err
}
return FrameHeader{ //解析成头部接头
Length: (uint32(buf[0])<<16 | uint32(buf[1])<<8 | uint32(buf[2])), //头部长度
Type: FrameType(buf[3]), //类型
Flags: Flags(buf[4]), //标志位
StreamID: binary.BigEndian.Uint32(buf[5:]) & (1<<31 - 1),
valid: true, //是否有效
}, nil
}

值得注意的是:

  1. binary.BigEndian.Uint32 这个作用是将 4个字节的二进制数据转换为 uint32类型的数据
  2. 1<<31 - 1 用于将高位设置为0,确保能被正确的解析为uint32位的值

最后数据帧的格式如下

帧的类型

其中 FrameType 代表的帧类型,一共有10中

帧类型 说明 字符
FrameData 表示数据帧,用于传输 HTTP 报文的实际数据部分 0x0 “DATA”
FrameHeaders 携带请求或响应头部,也可以分为多个帧进行传输 0x1 “HEADERS”
FramePriority 表示某个流的优先级,用于流量控制 0x2 “PRIORITY”
FrameRSTStream 重置某个流,表示这个流将不再使用 0x3 “RST_STREAM”
FrameSettings 表示设置帧,用于在连接和流级别上传输参数设置 0x4 “SETTINGS”
FramePushPromise 允许服务端在客户端还没有请求的情况下发送响应头部,用于服务端推送(server push)功能 0x5 “PUSH_PROMISE”
FramePing 表示 Ping 帧,用于检测连接是否存活 0x6 “PING”
FrameGoAway 表示断开帧,用于通知对端关闭连接 0x7 “GOAWAY”
FrameWindowUpdate 用于流量控制,通知对端窗口大小的变化 0x8 “WINDOW_UPDATE”
FrameContinuation 表示继续帧,将头部帧或推送帧拆分为多个帧进行传输时,用于指示后续帧属于同一个头部块 0x9 “CONTINUATION”

帧的标志位

另外一个标志位Flags则是帧

帧类型 Flags类型 说明
FrameData FlagDataEndStream 标志位用于 DATA 帧,表示这是最后一个 DATA 帧,即流已经结束,接收方不应该再等待更多的 DATA 0x1
FlagDataPadded 数据帧的填充(Padded)标志,表示数据帧后面跟随一个填充字段 0x8
FrameHeaders FlagHeadersEndStream 首部帧(Headers Frame)的结束流标志,表示发送方已经发送完整个消息 0x1
FlagHeadersEndHeaders 首部帧的结束头(End Headers)标志,表示这个首部块是消息的最后一个首部块 0x4
FlagHeadersPadded 首部帧的填充标志,表示首部帧后面跟随一个填充字段 0x8
FlagHeadersPriority 首部帧的优先级(Priority)标志,表示这个首部块包含了一个优先级信息 0x20
FrameSettings FlagSettingsAck 设置帧(Settings Frame)的确认(Ack)标志,表示这是一个确认帧 0x1
FramePing FlagPingAck Ping帧的确认标志,表示这是一个确认帧 0x1
FrameContinuation FlagContinuationEndHeaders 连续帧(Continuation Frame)的结束头标志,表示这个连续块是消息的最后一个连续块 0x4
FramePushPromise FlagPushPromiseEndHeaders 推送帧(Push Promise Frame)的结束头标志,表示这个推送帧包含了一个完整的首部块 0x4
FlagPushPromisePadded 推送帧的填充标志,表示推送帧后面跟随一个填充字段 0x8

有了上述的帧的关键字段说明,接下来来看看各个数据帧的解析

负载

头部定义之后,负载通过头部长度 Length 进行解析,负载解析如下

1
2
3
4
5
6
7
8
9
10
11
payload := fr.getReadBuf(fh.Length)	//获取负载长度
if _, err := io.ReadFull(fr.r, payload); err != nil { //读取负载长度
return nil, err
}
f, err := typeFrameParser(fh.Type)(fr.frameCache, fh, payload) //将负载根据类型解析为对应的帧 fh是上一步读取的帧头
if err != nil {
if ce, ok := err.(connError); ok {
return nil, fr.connError(ce.Code, ce.Reason)
}
return nil, err
}

数据帧的解析是通过类型参数进行解析

1
f, err := typeFrameParser(fh.Type)(fr.frameCache, fh, payload)

其中

  1. fh.Type 是从头部解析出来的帧的类型,用于找到指定类型的数据帧解析器
  2. fr.frameCache 流都具有自己的帧内存缓存池。在需要发送帧时,可以从自己的缓存池中获取内存,并在完成后将其返回以供重用
  3. fh 之前解析出的帧的头部,用来与解析完的负载合并成一个完整的帧
  4. payload 则是帧的负载,用于解析帧的数据部分

需要关注的是

  1. 是如何转换成其他类型的
  2. 如果有存在分帧,又是如何进行解析的
  3. 如果有帧的丢失怎么办
FrameData

数据帧的解析对应的是 parseDataFrame

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func parseDataFrame(fc *frameCache, fh FrameHeader, payload []byte) (Frame, error) {
if fh.StreamID == 0 { //数据帧StreamID必不为0
return nil, connError{ErrCodeProtocol, "DATA frame with stream ID 0"}
}
f := fc.getDataFrame() //从内存池获取一个数据帧结构
f.FrameHeader = fh //帧头部

var padSize byte
if fh.Flags.Has(FlagDataPadded) { //如果有追加
var err error
payload, padSize, err = readByte(payload)
if err != nil {
return nil, err
}
}
if int(padSize) > len(payload) { //当追加的长度大于负载的长度
return nil, connError{ErrCodeProtocol, "pad size larger than data payload"}
}
f.data = payload[:len(payload)-int(padSize)] //帧的负载就是 负载 - 追加
return f, nil
}

当存在追加帧 FlagDataPadded 的时候 使用 readByte进行解析出去除追加长度的负载

1
2
3
4
5
6
func readByte(p []byte) (remain []byte, b byte, err error) {
if len(p) == 0 {
return nil, 0, io.ErrUnexpectedEOF
}
return p[1:], p[0], nil
}

所以,正常情况只有负载,但是如果数据帧存在追加 FlagDataPadded,数据帧的负载还有一个长度

1
2
3
4
5
6
7
+--------+--------+--------+--------+
| padSize(32) |
+--------+--------+--------+--------+
| data ...
+--------+--------+--------+--------+
| padding ...
+--------+--------+--------+--------+

追加的内容在解析成数据帧的过程中,丢掉了!!!!! 这是因为填充字段仅仅是为了将数据帧填充到规定的长度

最终组成数据帧

1
2
3
4
type DataFrame struct {
FrameHeader
data []byte
}

那么另外一个标志位 FlagDataEndStream 是在应用gRPC使用

  • 服务端收到数据帧结束流之后,设置流的状态并结束继续读取流数据
  • 客户端收到数据帧结束流之后,关闭流(结束读取并清理流)
1
2
3
4
5
6
7
8
9
10
11
//客户端
if f.StreamEnded() {
t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
}

//服务端
if f.StreamEnded() {
// Received the end of stream from the client.
s.compareAndSwapState(streamActive, streamReadDone)
s.write(recvMsg{err: io.EOF})
}

其他注意事项

  1. 如果数据帧中 StreamID == 0 ,那么这个数据帧异常,也就是说数据帧必须绑定一个流
FrameHeaders

头帧也是由头部与负载组成,通过 parseHeadersFrame 解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func parseHeadersFrame(_ *frameCache, fh FrameHeader, p []byte) (_ Frame, err error) {
hf := &HeadersFrame{
FrameHeader: fh,
}
if fh.StreamID == 0 { //头帧流ID不能为0
return nil, connError{ErrCodeProtocol, "HEADERS frame with stream ID 0"}
}
var padLength uint8
if fh.Flags.Has(FlagHeadersPadded) { //如果有头部追加,同理解析出负载与追加长度
if p, padLength, err = readByte(p); err != nil {
return
}
}
if fh.Flags.Has(FlagHeadersPriority) { //如果有优先级
var v uint32
p, v, err = readUint32(p)
if err != nil {
return nil, err
}
hf.Priority.StreamDep = v & 0x7fffffff
hf.Priority.Exclusive = (v != hf.Priority.StreamDep) // high bit was set
p, hf.Priority.Weight, err = readByte(p)
if err != nil {
return nil, err
}
}
if len(p)-int(padLength) <= 0 {
return nil, streamError(fh.StreamID, ErrCodeProtocol)
}
hf.headerFragBuf = p[:len(p)-int(padLength)] //去掉追加部分
return hf, nil
}

整体来看,头帧结构如下所示

1
2
3
4
5
6
7
8
9
10
11
+--------+--------+--------+--------+
| padSize(32) |
+--------+--------+--------+--------+
| priority(32) |
+--------+--------+--------+--------+
|weight(8)|
+--------+--------+--------+--------+
| payload |
+--------+--------+--------+--------+
| padding |
+--------+--------+--------+--------+

这里比数据帧多的第一个标志位就是 优先级 FlagHeadersPriority,它的作用是表示流的优先级

1
2
3
4
5
type PriorityParam struct {
StreamDep uint32
Exclusive bool
Weight uint8
}
  1. StreamDep:一个31位的流标识符,表示此流依赖的流。如果为0,表示没有依赖关系。
  2. Exclusive:表示此流是否具有互斥性。如果此字段为true,表示此流是在其依赖的流和同级的兄弟流之间互斥的。
  3. Weight:此流的权重值,是一个0-255的值。根据规范,应该将此字段与StreamDep一起设置,或者两者都不设置。为了获得1到256之间的权重,请将此值加1

为什么流有依赖关系或者互斥关系???

流之间的依赖关系或者互斥关系可以帮助控制流之间的优先级和竞争关系,从而更有效地利用网络带宽和资源。

例如1:如果一个网页需要加载多个资源,比如图片、脚本和样式表,那么这些资源就可以分别被分配到不同的流中。为了更快地呈现网页,图片这类占用带宽较大的资源可以被赋予更高的优先级,使其在竞争网络资源时更优先被传输,从而减少用户等待时间。

例如2:如果一个客户端同时向服务器发起了多个请求,这些请求可能会在服务器端形成竞争关系,造成某些请求的延迟和等待时间过长。通过为请求之间建立依赖关系和互斥关系,可以更好地控制请求的执行顺序和资源利用,提高服务质量和用户体验

最后组成结构体

1
2
3
4
5
type HeadersFrame struct {
FrameHeader
Priority PriorityParam
headerFragBuf []byte // not owned
}

另外两个标志位 FlagHeadersEndStreamFlagHeadersEndHeaders 同理,是在应用层使用

1
2
3
4
5
6
7
8
switch fh.Type {
case FrameHeaders, FrameContinuation:
if fh.Flags.Has(FlagHeadersEndHeaders) {
fr.lastHeaderStream = 0
} else {
fr.lastHeaderStream = fh.StreamID
}
}
FramePriority

虽然头帧中可以设置流的优先级,但是如果有大量的流需要设置优先级,将会导致头帧变得非常庞大,传输的效率也会受到影响。优先级帧可以独立于其他帧来设置流的优先级。同时,PRIORITY 帧也可以用于修改流的依赖关系,因此它具有比头帧更强的功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func parsePriorityFrame(_ *frameCache, fh FrameHeader, payload []byte) (Frame, error) {
if fh.StreamID == 0 {
return nil, connError{ErrCodeProtocol, "PRIORITY frame with stream ID 0"}
}
if len(payload) != 5 { //负载必须是5
return nil, connError{ErrCodeFrameSize, fmt.Sprintf("PRIORITY frame payload size was %d; want 5", len(payload))}
}
v := binary.BigEndian.Uint32(payload[:4])
streamID := v & 0x7fffffff // mask off high bit
return &PriorityFrame{
FrameHeader: fh,
PriorityParam: PriorityParam{
Weight: payload[4], //256
StreamDep: streamID,
Exclusive: streamID != v, //是否是独立
},
}, nil
}

组成的优先级帧如下

1
2
3
4
5
+--------+--------+--------+--------+
| StreamDep(32) |
+--------+--------+--------+--------+
|Weight(8)|
+--------+
FrameRSTStream

重置帧则只需要负载的前4个字节用于标志重置原因

1
2
3
4
5
6
7
8
9
func parseRSTStreamFrame(_ *frameCache, fh FrameHeader, p []byte) (Frame, error) {
if len(p) != 4 {
return nil, ConnectionError(ErrCodeFrameSize)
}
if fh.StreamID == 0 {
return nil, ConnectionError(ErrCodeProtocol)
}
return &RSTStreamFrame{fh, ErrCode(binary.BigEndian.Uint32(p[:4]))}, nil
}

解析成如下结构

1
2
3
+--------+--------+--------+--------+
| ErrCode(32) |
+--------+--------+--------+--------+

组成的数据结构

1
2
3
4
type RSTStreamFrame struct {
FrameHeader
ErrCode ErrCode
}
FrameSettings

设置帧是通过 parseSettingsFrame 解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func parseSettingsFrame(_ *frameCache, fh FrameHeader, p []byte) (Frame, error) {
if fh.Flags.Has(FlagSettingsAck) && fh.Length > 0 { //如果负载长度大于0,则肯定不是ACK帧
return nil, ConnectionError(ErrCodeFrameSize)
}
if fh.StreamID != 0 {
return nil, ConnectionError(ErrCodeProtocol)
}
if len(p)%6 != 0 { //设置帧固定是 6的倍数
return nil, ConnectionError(ErrCodeFrameSize)
}
f := &SettingsFrame{FrameHeader: fh, p: p} //直接将负载存入设置帧中
if v, ok := f.Value(SettingInitialWindowSize); ok && v > (1<<31)-1 {
return nil, ConnectionError(ErrCodeFlowControl)
}
return f, nil
}

它因为直接使用了KV结构,所以固定每一对的长度是6,那么就能统计出每一对的内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (f *SettingsFrame) Value(id SettingID) (v uint32, ok bool) {
f.checkValid()
for i := 0; i < f.NumSettings(); i++ {
if s := f.Setting(i); s.ID == id {
return s.Val, true
}
}
return 0, false
}

func (f *SettingsFrame) Setting(i int) Setting {
buf := f.p
return Setting{
ID: SettingID(binary.BigEndian.Uint16(buf[i*6 : i*6+2])),
Val: binary.BigEndian.Uint32(buf[i*6+2 : i*6+6]),
}
}

一共存在6对

标志 说明
SettingHeaderTableSize 客户端和服务器通信时,指定用于HPACK头表大小的值 0x1
SettingEnablePush 服务器指示客户端是否可以推送资源 0x2
SettingMaxConcurrentStreams 指定客户端可以同时使用的最大流数 0x3
SettingInitialWindowSize 指定流控制窗口的初始大小 0x4
SettingMaxFrameSize 指定帧大小的最大值 0x5
SettingMaxHeaderListSize 指定头部列表大小的最大值 0x6

所以对应的结构就是

1
2
3
4
type SettingsFrame struct {
FrameHeader
p []byte
}

初始化窗口大小不能大于 v > (1<<31)-1

FramePushPromise

FramePushPromise 通过 parsePushPromise 进行数据解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func parsePushPromise(_ *frameCache, fh FrameHeader, countError func(string), p []byte) (_ Frame, err error) {
pp := &PushPromiseFrame{
FrameHeader: fh,
}
if pp.StreamID == 0 {
countError("frame_pushpromise_zero_stream")
return nil, ConnectionError(ErrCodeProtocol)
}
//如果有追加,那么就读取追加长度
var padLength uint8
if fh.Flags.Has(FlagPushPromisePadded) {
if p, padLength, err = readByte(p); err != nil {
countError("frame_pushpromise_pad_short")
return
}
}

p, pp.PromiseID, err = readUint32(p) //读取PromiseID
if err != nil {
countError("frame_pushpromise_promiseid_short")
return
}
pp.PromiseID = pp.PromiseID & (1<<31 - 1)

if int(padLength) > len(p) {
return nil, ConnectionError(ErrCodeProtocol)
}
pp.headerFragBuf = p[:len(p)-int(padLength)]
return pp, nil
}

所以二进制可以看出

1
2
3
4
5
6
7
8
9
+--------+--------+--------+--------+
| padLength(32) |
+--------+--------+--------+--------+
| PromiseID(32) |
+--------+--------+--------+--------+
| payload |
+--------+--------+--------+--------+
| padding |
+--------+--------+--------+--------+

所以最后组成的结构体

1
2
3
4
5
type PushPromiseFrame struct {
FrameHeader
PromiseID uint32
headerFragBuf []byte // not owned
}
FramePing

FramePing 通过 parsePingFrame 进行数据解析

1
2
3
4
5
6
7
8
9
10
11
12
13
func parsePingFrame(_ *frameCache, fh FrameHeader, countError func(string), payload []byte) (Frame, error) {
if len(payload) != 8 {
countError("frame_ping_length")
return nil, ConnectionError(ErrCodeFrameSize)
}
if fh.StreamID != 0 {
countError("frame_ping_has_stream")
return nil, ConnectionError(ErrCodeProtocol)
}
f := &PingFrame{FrameHeader: fh}
copy(f.Data[:], payload)
return f, nil
}

组成的结构体

1
2
3
4
type PingFrame struct {
FrameHeader
Data [8]byte
}

ping帧中很特别的是负载是指定长度的8,通常是时间戳

FrameGoAway

FrameGoAway 通过 parseGoAwayFrame 进行数据解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func parseGoAwayFrame(_ *frameCache, fh FrameHeader, countError func(string), p []byte) (Frame, error) {
if fh.StreamID != 0 {
countError("frame_goaway_has_stream")
return nil, ConnectionError(ErrCodeProtocol)
}
if len(p) < 8 {
countError("frame_goaway_short")
return nil, ConnectionError(ErrCodeFrameSize)
}
return &GoAwayFrame{
FrameHeader: fh,
LastStreamID: binary.BigEndian.Uint32(p[:4]) & (1<<31 - 1),
ErrCode: ErrCode(binary.BigEndian.Uint32(p[4:8])),
debugData: p[8:],
}, nil
}

GoAway帧用于通知对端,当前的连接即将关闭,以及关闭的原因。LastStreamID 表示这个流之后的所有流都将被关闭

组成的结构体

1
2
3
4
5
6
type GoAwayFrame struct {
FrameHeader
LastStreamID uint32
ErrCode ErrCode
debugData []byte
}
FrameWindowUpdate

FrameWindowUpdate 通过 parseWindowUpdateFrame 进行数据解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func parseWindowUpdateFrame(_ *frameCache, fh FrameHeader, countError func(string), p []byte) (Frame, error) {
if len(p) != 4 {
countError("frame_windowupdate_bad_len")
return nil, ConnectionError(ErrCodeFrameSize)
}
inc := binary.BigEndian.Uint32(p[:4]) & 0x7fffffff // mask off high reserved bit
if inc == 0 {
if fh.StreamID == 0 {
countError("frame_windowupdate_zero_inc_conn")
return nil, ConnectionError(ErrCodeProtocol)
}
countError("frame_windowupdate_zero_inc_stream")
return nil, streamError(fh.StreamID, ErrCodeProtocol)
}
return &WindowUpdateFrame{
FrameHeader: fh,
Increment: inc,
}, nil
}

组成的结构体

1
2
3
4
type WindowUpdateFrame struct {
FrameHeader
Increment uint32 // never read with high bit set
}
FrameContinuation

FrameContinuation 通过 parseContinuationFrame 进行数据解析

1
2
3
4
5
6
7
func parseContinuationFrame(_ *frameCache, fh FrameHeader, countError func(string), p []byte) (Frame, error) {
if fh.StreamID == 0 {
countError("frame_continuation_zero_stream")
return nil, connError{ErrCodeProtocol, "CONTINUATION frame with stream ID 0"}
}
return &ContinuationFrame{fh, p}, nil
}

组成的结构体

1
2
3
4
type ContinuationFrame struct {
FrameHeader
headerFragBuf []byte
}

这个将用在帧过大的时候进行数据分帧,这个逻辑将在下面的分帧逻辑进行进一步分析

应用帧

上面其实看的是HTTP2协议中的协议帧,而实际在grpc-go中又做了进一步的区分

帧类型 说明
incomingWindowUpdate 通知发送方更新发送窗口的大小
outgoingWindowUpdate 通知接收方更新接收窗口的大小
incomingSettings 设置帧
outgoingSettings 设置帧
headerFrame 帧的头部信息
registerStream 服务器专用
cleanupStream 针对RST帧
earlyAbortStream
incomingGoAway 为客户端服务,客户端一旦接受此帧,帧发送器状态为draining
dataFrame 数据帧
ping Ping帧
goAway goAway帧
outFlowControlSizeRequest

这里特殊说明一个数据帧,因为在后续的分帧会说到

1
2
3
4
5
6
7
type dataFrame struct {
streamID uint32 //帧所属的流
endStream bool //该帧是否为该流的最后一帧
h []byte //帧的头部,包含了一些控制信息,例如帧长度、类型、标志等
d []byte //帧的数据负载
onEachWrite func() //在每次写出帧的一部分数据时调用的回调函数
}

特殊注意的就是d 字段,表示的数据负载在传输时可能被切割成多个 dataFrame 进行传输。

分帧

当数据过大时,就需要分批发送

TCP 分段(segment):将应用层数据分成多个 TCP 段进行传输,每个 TCP 段都有自己的头部信息

应用层 分包(packet):分包是指将一个应用层数据包分成多个 IP 数据报进行传输

IP数据包 分片(fragmentation):分片是指将一个 IP 数据报分成多个较小的数据报进行传输

在这里插入图片描述

将发送的请求Body超过限制16KB就能看到分帧了

http2_dataframe2

抓包可以看到其实单个帧的大小是 16384,而他们两个区别是

  1. 在第二个中间带上的是0x01 EndStream,表示这个流结束
  2. 因为负载不同,所以他们头部Length也不一样,但是最大就是16384

然后直接看看分帧分别是如何发送与接收的

客户端

接着上述 dataFrame的结构说起,它并不是直接切分成多个,而是一部分一部分的切割发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
//...
df := &dataFrame{
streamID: s.id,
endStream: opts.Last,
}
if hdr != nil || data != nil { // If it's not an empty data frame.
// Add some data to grpc message header so that we can equally
// distribute bytes across frames.
emptyLen := http2MaxFrameLen - len(hdr)
if emptyLen > len(data) {
emptyLen = len(data)
}
hdr = append(hdr, data[:emptyLen]...)
data = data[emptyLen:]
df.h, df.d = hdr, data
//...
}
return t.controlBuf.put(df)
}

分析如下:

  1. hdr 是应用层grpc帧的头 长度为 5B(1B是否压缩 + 4B负载长度)
  2. data 是应用层grpc帧的负载数据
  3. 构建步骤如下:
    1. 首先构建一个数据帧(应用层的),指定流ID以及是否是最后一个流(!cs.desc.ClientStreams)
    2. 然后判断数据是否超过http2MaxFrameLen(16384)
    3. 接着 **从data中截取一部分放入到 数据帧的h,然后将剩余部分放入到d中!!!!!**所以这里的hd并不是指的头与负载
    4. 最后将整个帧放入到帧缓冲器中

经过获取帧并将帧放入到链表之后到数据帧的处理中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func (l *loopyWriter) processData() (bool, error) {
//...
dataItem := str.itl.peek().(*dataFrame) //头部数据帧的指针,并不是出栈
//如果数据帧是空的,那么直接发送结束帧并关闭流
if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame
//...
return false, nil
}
var (
idx int
buf []byte
)
if len(dataItem.h) != 0 { // data header has not been written out yet.
buf = dataItem.h
} else {
idx = 1
buf = dataItem.d
}
size := http2MaxFrameLen
if len(buf) < size {
size = len(buf)
}
//... 流级别与连接级别流控
//...
//发送最大帧以及流控控制下的size
if err := l.framer.fr.WriteData(dataItem.streamID, endStream, buf[:size]); err != nil {
return false, err
}
buf = buf[size:] //更新剩余部分
if idx == 0 {
dataItem.h = buf
} else {
dataItem.d = buf
}

//如果消息发完那么流头部消息发送完毕,则退出该消息
if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out.
str.itl.dequeue()
}
//...
return false, nil
}

相当于整个数据一次性放入到流的消息链表中,然后每次从数据中截取一部分给HTTP2进行发送

服务端

服务端则是从结束数据开始看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byte, err error) {
if _, err := p.r.Read(p.header[:]); err != nil {
return 0, nil, err
}

pf = payloadFormat(p.header[0])
length := binary.BigEndian.Uint32(p.header[1:])

//...负载长度判断
if _, err := p.r.Read(msg); err != nil {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
return 0, nil, err
}
return pf, msg, nil
}
  • 首先直接读取头部长度
  • 接着校验负载长度是否异常(不能超过 接收最大长度)
  • 接着读取指定长度

到这里其实消息体已经是完整的应用层数据帧了,接着我们看下这个应用层数据帧是如何缓存的以及怎样触发读取的

分帧得从

由于流使用的是多路复用,所以每个流需要使用缓冲保存分帧数据,当判断已经获取到完整的帧的时候,再从缓冲中获取数据即可。这里用到的结构是 recvBuffer

1
2
3
4
5
6
type recvBuffer struct {
c chan recvMsg //make(chan recvMsg, 1)
mu sync.Mutex
backlog []recvMsg
err error
}

获取到数据帧之后,会将帧存入到 recvBuffer

1
2
3
func (s *Stream) write(m recvMsg) {
s.buf.put(m)
}

然后这里做了一个特殊的设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (b *recvBuffer) put(r recvMsg) {
b.mu.Lock()
if b.err != nil {
b.mu.Unlock()
// An error had occurred earlier, don't accept more
// data or errors.
return
}
b.err = r.err
if len(b.backlog) == 0 {
select {
case b.c <- r:
b.mu.Unlock()
return
default:
}
}
b.backlog = append(b.backlog, r)
b.mu.Unlock()
}

这里有两个操作

  1. 当 backlog 为空,则直接写入到隧道中
  2. 当 backlog 不为空,则追加到backlog后面

总结

  1. 帧分为帧头与负载,帧头长度为9
  2. 帧类型为10种,并结合flags进行使用

参考文档

  1. https://blog.csdn.net/u011582922/article/details/120578941
  2. https://www.jianshu.com/p/e22fef60a7f0