从前面可知,流单独负责一次调用,并且在服务端使用多路复用实现消息的处理。流其实是通过 ID 标志的虚拟概念,真正传输的其实是帧,帧通过发送器与帧接收器进行发送处理。
带着问题看世界:
帧的类型有哪些,具体结构是怎样的
当消息体过大的时候分帧是如何处理的
当帧的缓冲区是否也会满,什么时候清理
在 grpc-go 中,通过对 HTTP/2 中协议的封装,增加了grpc-go
自己的业务类型。看源码的时候注意不要弄混了。这里说的协议帧 是 HTTP/2 中定义实际传输过程中的帧 是由 net 库实现的;而应用帧则是 grpc-go
根据自己的实际需要由转换了一层
协议帧
每个协议帧都是由头部 与负载 两个部分组成
头部
协议帧指的就是HTTP2中帧的结构,每个帧也分为头部与负载
1 2 3 +--------+--------+--------+--------+--------+--------+--------+--------+ | Length(24 ) |Type(8 ) |Flags(8 )|R| StreamID(31 ) | +--------+--------+--------+--------+--------+--------+--------+--------+
虽然Length 是24位,但这不意味着就能处理 2^24 16M大小的帧,一般是默认只支持2^16 16k以下的帧,而2^16 - 2^24 16M 的帧 需要接收端公布自己可以处理这么大的帧,需要在 SETTINGS_MAX_FRAME_SIZE 帧中告知
将二进制buffer转换为帧头部结构
1 2 3 4 5 6 7 type FrameHeader struct { valid bool Type FrameType Flags Flags Length uint32 StreamID uint32 }
所以头部的整体解析流程就变成了
1 2 3 4 5 6 7 8 9 10 11 12 13 func readFrameHeader (buf []byte , r io.Reader) (FrameHeader, error) { _, err := io.ReadFull(r, buf[:frameHeaderLen]) if err != nil { return FrameHeader{}, err } return FrameHeader{ Length: (uint32 (buf[0 ])<<16 | uint32 (buf[1 ])<<8 | uint32 (buf[2 ])), Type: FrameType(buf[3 ]), Flags: Flags(buf[4 ]), StreamID: binary.BigEndian.Uint32(buf[5 :]) & (1 <<31 - 1 ), valid: true , }, nil }
值得注意的是:
binary.BigEndian.Uint32
这个作用是将 4个字节的二进制数据转换为 uint32类型的数据
1<<31 - 1
用于将高位设置为0,确保能被正确的解析为uint32位的值
最后数据帧的格式如下
帧的类型
其中 FrameType
代表的帧类型,一共有10中
帧类型
说明
值
字符
FrameData
表示数据帧,用于传输 HTTP 报文的实际数据部分
0x0
“DATA”
FrameHeaders
携带请求或响应头部,也可以分为多个帧进行传输
0x1
“HEADERS”
FramePriority
表示某个流的优先级,用于流量控制
0x2
“PRIORITY”
FrameRSTStream
重置某个流,表示这个流将不再使用
0x3
“RST_STREAM”
FrameSettings
表示设置帧,用于在连接和流级别上传输参数设置
0x4
“SETTINGS”
FramePushPromise
允许服务端在客户端还没有请求的情况下发送响应头部,用于服务端推送(server push)功能
0x5
“PUSH_PROMISE”
FramePing
表示 Ping 帧,用于检测连接是否存活
0x6
“PING”
FrameGoAway
表示断开帧,用于通知对端关闭连接
0x7
“GOAWAY”
FrameWindowUpdate
用于流量控制,通知对端窗口大小的变化
0x8
“WINDOW_UPDATE”
FrameContinuation
表示继续帧,将头部帧或推送帧拆分为多个帧进行传输时,用于指示后续帧属于同一个头部块
0x9
“CONTINUATION”
帧的标志位
另外一个标志位Flags则是帧
帧类型
Flags类型
说明
值
FrameData
FlagDataEndStream
标志位用于 DATA
帧,表示这是最后一个 DATA
帧,即流已经结束,接收方不应该再等待更多的 DATA
帧
0x1
FlagDataPadded
数据帧的填充(Padded)标志,表示数据帧后面跟随一个填充字段
0x8
FrameHeaders
FlagHeadersEndStream
首部帧(Headers Frame)的结束流标志,表示发送方已经发送完整个消息
0x1
FlagHeadersEndHeaders
首部帧的结束头(End Headers)标志,表示这个首部块是消息的最后一个首部块
0x4
FlagHeadersPadded
首部帧的填充标志,表示首部帧后面跟随一个填充字段
0x8
FlagHeadersPriority
首部帧的优先级(Priority)标志,表示这个首部块包含了一个优先级信息
0x20
FrameSettings
FlagSettingsAck
设置帧(Settings Frame)的确认(Ack)标志,表示这是一个确认帧
0x1
FramePing
FlagPingAck
Ping帧的确认标志,表示这是一个确认帧
0x1
FrameContinuation
FlagContinuationEndHeaders
连续帧(Continuation Frame)的结束头标志,表示这个连续块是消息的最后一个连续块
0x4
FramePushPromise
FlagPushPromiseEndHeaders
推送帧(Push Promise Frame)的结束头标志,表示这个推送帧包含了一个完整的首部块
0x4
FlagPushPromisePadded
推送帧的填充标志,表示推送帧后面跟随一个填充字段
0x8
有了上述的帧的关键字段说明,接下来来看看各个数据帧的解析
负载
头部定义之后,负载通过头部长度 Length 进行解析,负载解析如下
1 2 3 4 5 6 7 8 9 10 11 payload := fr.getReadBuf(fh.Length) if _, err := io.ReadFull(fr.r, payload); err != nil { return nil , err } f, err := typeFrameParser(fh.Type)(fr.frameCache, fh, payload) if err != nil { if ce, ok := err.(connError); ok { return nil , fr.connError(ce.Code, ce.Reason) } return nil , err }
数据帧的解析是通过类型参数进行解析
1 f, err := typeFrameParser(fh.Type)(fr.frameCache, fh, payload)
其中
fh.Type
是从头部解析出来的帧的类型,用于找到指定类型的数据帧解析器
fr.frameCache
流都具有自己的帧内存缓存池。在需要发送帧时,可以从自己的缓存池中获取内存,并在完成后将其返回以供重用
fh
之前解析出的帧的头部,用来与解析完的负载合并成一个完整的帧
payload
则是帧的负载,用于解析帧的数据部分
需要关注的是
是如何转换成其他类型的
如果有存在分帧,又是如何进行解析的
如果有帧的丢失怎么办
FrameData
数据帧的解析对应的是 parseDataFrame
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 func parseDataFrame (fc *frameCache, fh FrameHeader, payload []byte ) (Frame, error) { if fh.StreamID == 0 { return nil , connError{ErrCodeProtocol, "DATA frame with stream ID 0" } } f := fc.getDataFrame() f.FrameHeader = fh var padSize byte if fh.Flags.Has(FlagDataPadded) { var err error payload, padSize, err = readByte(payload) if err != nil { return nil , err } } if int (padSize) > len (payload) { return nil , connError{ErrCodeProtocol, "pad size larger than data payload" } } f.data = payload[:len (payload)-int (padSize)] return f, nil }
当存在追加帧 FlagDataPadded
的时候 使用 readByte
进行解析出去除追加长度的负载
1 2 3 4 5 6 func readByte (p []byte ) (remain []byte , b byte , err error) { if len (p) == 0 { return nil , 0 , io.ErrUnexpectedEOF } return p[1 :], p[0 ], nil }
所以,正常情况只有负载,但是如果数据帧存在追加 FlagDataPadded
,数据帧的负载还有一个长度
1 2 3 4 5 6 7 +--------+--------+--------+--------+ | padSize(32 ) | +--------+--------+--------+--------+ | data ... +--------+--------+--------+--------+ | padding ... +--------+--------+--------+--------+
追加的内容在解析成数据帧的过程中,丢掉了!!!!! 这是因为填充字段仅仅是为了将数据帧填充到规定的长度
最终组成数据帧
1 2 3 4 type DataFrame struct { FrameHeader data []byte }
那么另外一个标志位 FlagDataEndStream
是在应用gRPC使用
服务端收到数据帧结束流之后,设置流的状态并结束继续读取流数据
客户端收到数据帧结束流之后,关闭流(结束读取并清理流)
1 2 3 4 5 6 7 8 9 10 11 if f.StreamEnded() { t.closeStream(s, io.EOF, false , http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers" ), nil , true ) } if f.StreamEnded() { s.compareAndSwapState(streamActive, streamReadDone) s.write(recvMsg{err: io.EOF}) }
其他注意事项
如果数据帧中 StreamID == 0
,那么这个数据帧异常,也就是说数据帧必须绑定一个流
头帧也是由头部与负载组成,通过 parseHeadersFrame
解析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 func parseHeadersFrame (_ *frameCache, fh FrameHeader, p []byte ) (_ Frame, err error) { hf := &HeadersFrame{ FrameHeader: fh, } if fh.StreamID == 0 { return nil , connError{ErrCodeProtocol, "HEADERS frame with stream ID 0" } } var padLength uint8 if fh.Flags.Has(FlagHeadersPadded) { if p, padLength, err = readByte(p); err != nil { return } } if fh.Flags.Has(FlagHeadersPriority) { var v uint32 p, v, err = readUint32(p) if err != nil { return nil , err } hf.Priority.StreamDep = v & 0x7fffffff hf.Priority.Exclusive = (v != hf.Priority.StreamDep) p, hf.Priority.Weight, err = readByte(p) if err != nil { return nil , err } } if len (p)-int (padLength) <= 0 { return nil , streamError(fh.StreamID, ErrCodeProtocol) } hf.headerFragBuf = p[:len (p)-int (padLength)] return hf, nil }
整体来看,头帧结构如下所示
1 2 3 4 5 6 7 8 9 10 11 +--------+--------+--------+--------+ | padSize(32 ) | +--------+--------+--------+--------+ | priority(32 ) | +--------+--------+--------+--------+ |weight(8 )| +--------+--------+--------+--------+ | payload | +--------+--------+--------+--------+ | padding | +--------+--------+--------+--------+
这里比数据帧多的第一个标志位就是 优先级 FlagHeadersPriority
,它的作用是表示流的优先级
1 2 3 4 5 type PriorityParam struct { StreamDep uint32 Exclusive bool Weight uint8 }
StreamDep
:一个31位的流标识符,表示此流依赖的流。如果为0,表示没有依赖关系。
Exclusive
:表示此流是否具有互斥性。如果此字段为true,表示此流是在其依赖的流和同级的兄弟流之间互斥的。
Weight
:此流的权重值,是一个0-255的值。根据规范,应该将此字段与StreamDep
一起设置,或者两者都不设置。为了获得1到256之间的权重,请将此值加1
为什么流有依赖关系或者互斥关系???
流之间的依赖关系或者互斥关系可以帮助控制流之间的优先级和竞争关系,从而更有效地利用网络带宽和资源。
例如1:如果一个网页需要加载多个资源,比如图片、脚本和样式表,那么这些资源就可以分别被分配到不同的流中。为了更快地呈现网页,图片这类占用带宽较大的资源可以被赋予更高的优先级,使其在竞争网络资源时更优先被传输,从而减少用户等待时间。
例如2:如果一个客户端同时向服务器发起了多个请求,这些请求可能会在服务器端形成竞争关系,造成某些请求的延迟和等待时间过长。通过为请求之间建立依赖关系和互斥关系,可以更好地控制请求的执行顺序和资源利用,提高服务质量和用户体验
最后组成结构体
1 2 3 4 5 type HeadersFrame struct { FrameHeader Priority PriorityParam headerFragBuf []byte }
另外两个标志位 FlagHeadersEndStream
和 FlagHeadersEndHeaders
同理,是在应用层使用
1 2 3 4 5 6 7 8 switch fh.Type {case FrameHeaders, FrameContinuation: if fh.Flags.Has(FlagHeadersEndHeaders) { fr.lastHeaderStream = 0 } else { fr.lastHeaderStream = fh.StreamID } }
FramePriority
虽然头帧中可以设置流的优先级,但是如果有大量的流需要设置优先级,将会导致头帧变得非常庞大,传输的效率也会受到影响。优先级帧可以独立于其他帧来设置流的优先级。同时,PRIORITY 帧也可以用于修改流的依赖关系,因此它具有比头帧更强的功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 func parsePriorityFrame (_ *frameCache, fh FrameHeader, payload []byte ) (Frame, error) { if fh.StreamID == 0 { return nil , connError{ErrCodeProtocol, "PRIORITY frame with stream ID 0" } } if len (payload) != 5 { return nil , connError{ErrCodeFrameSize, fmt.Sprintf("PRIORITY frame payload size was %d; want 5" , len (payload))} } v := binary.BigEndian.Uint32(payload[:4 ]) streamID := v & 0x7fffffff return &PriorityFrame{ FrameHeader: fh, PriorityParam: PriorityParam{ Weight: payload[4 ], StreamDep: streamID, Exclusive: streamID != v, }, }, nil }
组成的优先级帧如下
1 2 3 4 5 +--------+--------+--------+--------+ | StreamDep(32 ) | +--------+--------+--------+--------+ |Weight(8 )| +--------+
FrameRSTStream
重置帧则只需要负载的前4个字节用于标志重置原因
1 2 3 4 5 6 7 8 9 func parseRSTStreamFrame(_ *frameCache, fh FrameHeader, p []byte) (Frame, error) { if len(p) != 4 { return nil, ConnectionError(ErrCodeFrameSize) } if fh.StreamID == 0 { return nil, ConnectionError(ErrCodeProtocol) } return &RSTStreamFrame{fh, ErrCode(binary.BigEndian.Uint32(p[:4]))}, nil }
解析成如下结构
1 2 3 +--------+--------+--------+--------+ | ErrCode(32 ) | +--------+--------+--------+--------+
组成的数据结构
1 2 3 4 type RSTStreamFrame struct { FrameHeader ErrCode ErrCode }
FrameSettings
设置帧是通过 parseSettingsFrame
解析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func parseSettingsFrame (_ *frameCache, fh FrameHeader, p []byte ) (Frame, error) { if fh.Flags.Has(FlagSettingsAck) && fh.Length > 0 { return nil , ConnectionError(ErrCodeFrameSize) } if fh.StreamID != 0 { return nil , ConnectionError(ErrCodeProtocol) } if len (p)%6 != 0 { return nil , ConnectionError(ErrCodeFrameSize) } f := &SettingsFrame{FrameHeader: fh, p: p} if v, ok := f.Value(SettingInitialWindowSize); ok && v > (1 <<31 )-1 { return nil , ConnectionError(ErrCodeFlowControl) } return f, nil }
它因为直接使用了KV结构,所以固定每一对的长度是6,那么就能统计出每一对的内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func (f *SettingsFrame) Value (id SettingID) (v uint32 , ok bool ) { f.checkValid() for i := 0 ; i < f.NumSettings(); i++ { if s := f.Setting(i); s.ID == id { return s.Val, true } } return 0 , false } func (f *SettingsFrame) Setting (i int ) Setting { buf := f.p return Setting{ ID: SettingID(binary.BigEndian.Uint16(buf[i*6 : i*6 +2 ])), Val: binary.BigEndian.Uint32(buf[i*6 +2 : i*6 +6 ]), } }
一共存在6对
标志
说明
值
SettingHeaderTableSize
客户端和服务器通信时,指定用于HPACK头表大小的值
0x1
SettingEnablePush
服务器指示客户端是否可以推送资源
0x2
SettingMaxConcurrentStreams
指定客户端可以同时使用的最大流数
0x3
SettingInitialWindowSize
指定流控制窗口的初始大小
0x4
SettingMaxFrameSize
指定帧大小的最大值
0x5
SettingMaxHeaderListSize
指定头部列表大小的最大值
0x6
所以对应的结构就是
1 2 3 4 type SettingsFrame struct { FrameHeader p []byte }
初始化窗口大小不能大于 v > (1<<31)-1
FramePushPromise
FramePushPromise
通过 parsePushPromise
进行数据解析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 func parsePushPromise (_ *frameCache, fh FrameHeader, countError func (string ) , p []byte ) (_ Frame, err error) { pp := &PushPromiseFrame{ FrameHeader: fh, } if pp.StreamID == 0 { countError("frame_pushpromise_zero_stream" ) return nil , ConnectionError(ErrCodeProtocol) } var padLength uint8 if fh.Flags.Has(FlagPushPromisePadded) { if p, padLength, err = readByte(p); err != nil { countError("frame_pushpromise_pad_short" ) return } } p, pp.PromiseID, err = readUint32(p) if err != nil { countError("frame_pushpromise_promiseid_short" ) return } pp.PromiseID = pp.PromiseID & (1 <<31 - 1 ) if int (padLength) > len (p) { return nil , ConnectionError(ErrCodeProtocol) } pp.headerFragBuf = p[:len (p)-int (padLength)] return pp, nil }
所以二进制可以看出
1 2 3 4 5 6 7 8 9 +--------+--------+--------+--------+ | padLength(32 ) | +--------+--------+--------+--------+ | PromiseID(32 ) | +--------+--------+--------+--------+ | payload | +--------+--------+--------+--------+ | padding | +--------+--------+--------+--------+
所以最后组成的结构体
1 2 3 4 5 type PushPromiseFrame struct { FrameHeader PromiseID uint32 headerFragBuf []byte }
FramePing
FramePing
通过 parsePingFrame
进行数据解析
1 2 3 4 5 6 7 8 9 10 11 12 13 func parsePingFrame (_ *frameCache, fh FrameHeader, countError func (string ) , payload []byte ) (Frame, error) { if len (payload) != 8 { countError("frame_ping_length" ) return nil , ConnectionError(ErrCodeFrameSize) } if fh.StreamID != 0 { countError("frame_ping_has_stream" ) return nil , ConnectionError(ErrCodeProtocol) } f := &PingFrame{FrameHeader: fh} copy (f.Data[:], payload) return f, nil }
组成的结构体
1 2 3 4 type PingFrame struct { FrameHeader Data [8 ]byte }
ping帧中很特别的是负载是指定长度的8,通常是时间戳
FrameGoAway
FrameGoAway
通过 parseGoAwayFrame
进行数据解析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func parseGoAwayFrame (_ *frameCache, fh FrameHeader, countError func (string ) , p []byte ) (Frame, error) { if fh.StreamID != 0 { countError("frame_goaway_has_stream" ) return nil , ConnectionError(ErrCodeProtocol) } if len (p) < 8 { countError("frame_goaway_short" ) return nil , ConnectionError(ErrCodeFrameSize) } return &GoAwayFrame{ FrameHeader: fh, LastStreamID: binary.BigEndian.Uint32(p[:4 ]) & (1 <<31 - 1 ), ErrCode: ErrCode(binary.BigEndian.Uint32(p[4 :8 ])), debugData: p[8 :], }, nil }
GoAway
帧用于通知对端,当前的连接即将关闭,以及关闭的原因。LastStreamID
表示这个流之后的所有流都将被关闭
组成的结构体
1 2 3 4 5 6 type GoAwayFrame struct { FrameHeader LastStreamID uint32 ErrCode ErrCode debugData []byte }
FrameWindowUpdate
FrameWindowUpdate
通过 parseWindowUpdateFrame
进行数据解析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func parseWindowUpdateFrame (_ *frameCache, fh FrameHeader, countError func (string ) , p []byte ) (Frame, error) { if len (p) != 4 { countError("frame_windowupdate_bad_len" ) return nil , ConnectionError(ErrCodeFrameSize) } inc := binary.BigEndian.Uint32(p[:4 ]) & 0x7fffffff if inc == 0 { if fh.StreamID == 0 { countError("frame_windowupdate_zero_inc_conn" ) return nil , ConnectionError(ErrCodeProtocol) } countError("frame_windowupdate_zero_inc_stream" ) return nil , streamError(fh.StreamID, ErrCodeProtocol) } return &WindowUpdateFrame{ FrameHeader: fh, Increment: inc, }, nil }
组成的结构体
1 2 3 4 type WindowUpdateFrame struct { FrameHeader Increment uint32 }
FrameContinuation
FrameContinuation
通过 parseContinuationFrame
进行数据解析
1 2 3 4 5 6 7 func parseContinuationFrame (_ *frameCache, fh FrameHeader, countError func (string ) , p []byte ) (Frame, error) { if fh.StreamID == 0 { countError("frame_continuation_zero_stream" ) return nil , connError{ErrCodeProtocol, "CONTINUATION frame with stream ID 0" } } return &ContinuationFrame{fh, p}, nil }
组成的结构体
1 2 3 4 type ContinuationFrame struct { FrameHeader headerFragBuf []byte }
这个将用在帧过大的时候进行数据分帧,这个逻辑将在下面的分帧逻辑进行进一步分析
应用帧
上面其实看的是HTTP2协议中的协议帧,而实际在grpc-go中又做了进一步的区分
帧类型
说明
incomingWindowUpdate
通知发送方更新发送窗口的大小
outgoingWindowUpdate
通知接收方更新接收窗口的大小
incomingSettings
设置帧
outgoingSettings
设置帧
headerFrame
帧的头部信息
registerStream
服务器专用
cleanupStream
针对RST帧
earlyAbortStream
incomingGoAway
为客户端服务,客户端一旦接受此帧,帧发送器状态为draining
dataFrame
数据帧
ping
Ping帧
goAway
goAway帧
outFlowControlSizeRequest
这里特殊说明一个数据帧,因为在后续的分帧会说到
1 2 3 4 5 6 7 type dataFrame struct { streamID uint32 endStream bool h []byte d []byte onEachWrite func () //在每次写出帧的一部分数据时调用的回调函数 }
特殊注意的就是d
字段,表示的数据负载在传输时可能被切割成多个 dataFrame
进行传输。
分帧
当数据过大时,就需要分批发送
TCP 分段(segment):将应用层数据分成多个 TCP 段进行传输,每个 TCP 段都有自己的头部信息
应用层 分包(packet):分包是指将一个应用层数据包分成多个 IP 数据报进行传输
IP数据包 分片(fragmentation):分片是指将一个 IP 数据报分成多个较小的数据报进行传输
将发送的请求Body超过限制16KB就能看到分帧了
抓包可以看到其实单个帧的大小是 16384,而他们两个区别是
在第二个中间带上的是0x01 EndStream
,表示这个流结束
因为负载不同,所以他们头部Length
也不一样,但是最大就是16384
然后直接看看分帧分别是如何发送与接收的
客户端
接着上述 dataFrame的结构说起,它并不是直接切分成多个,而是一部分一部分的切割发送
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 func (t *http2Client) Write (s *Stream, hdr []byte , data []byte , opts *Options) error { df := &dataFrame{ streamID: s.id, endStream: opts.Last, } if hdr != nil || data != nil { emptyLen := http2MaxFrameLen - len (hdr) if emptyLen > len (data) { emptyLen = len (data) } hdr = append (hdr, data[:emptyLen]...) data = data[emptyLen:] df.h, df.d = hdr, data } return t.controlBuf.put(df) }
分析如下:
hdr
是应用层grpc帧的头 长度为 5B(1B是否压缩 + 4B负载长度)
data 是应用层grpc帧的负载数据
构建步骤如下:
首先构建一个数据帧(应用层的),指定流ID以及是否是最后一个流(!cs.desc.ClientStreams
)
然后判断数据是否超过http2MaxFrameLen(16384)
接着 **从data中截取一部分放入到 数据帧的h
,然后将剩余部分放入到d中!!!!!**所以这里的h
与d
并不是指的头与负载
最后将整个帧放入到帧缓冲器中
经过获取帧并将帧放入到链表之后到数据帧的处理中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 func (l *loopyWriter) processData () (bool , error) { dataItem := str.itl.peek().(*dataFrame) if len (dataItem.h) == 0 && len (dataItem.d) == 0 { return false , nil } var ( idx int buf []byte ) if len (dataItem.h) != 0 { buf = dataItem.h } else { idx = 1 buf = dataItem.d } size := http2MaxFrameLen if len (buf) < size { size = len (buf) } if err := l.framer.fr.WriteData(dataItem.streamID, endStream, buf[:size]); err != nil { return false , err } buf = buf[size:] if idx == 0 { dataItem.h = buf } else { dataItem.d = buf } if len (dataItem.h) == 0 && len (dataItem.d) == 0 { str.itl.dequeue() } return false , nil }
相当于整个数据一次性放入到流的消息链表中,然后每次从数据中截取一部分给HTTP2进行发送
服务端
服务端则是从结束数据开始看
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func (p *parser) recvMsg (maxReceiveMessageSize int ) (pf payloadFormat, msg []byte , err error) { if _, err := p.r.Read(p.header[:]); err != nil { return 0 , nil , err } pf = payloadFormat(p.header[0 ]) length := binary.BigEndian.Uint32(p.header[1 :]) if _, err := p.r.Read(msg); err != nil { if err == io.EOF { err = io.ErrUnexpectedEOF } return 0 , nil , err } return pf, msg, nil }
首先直接读取头部长度
接着校验负载长度是否异常(不能超过 接收最大长度)
接着读取指定长度
到这里其实消息体已经是完整的应用层数据帧了,接着我们看下这个应用层数据帧是如何缓存的以及怎样触发读取的
分帧得从
由于流使用的是多路复用,所以每个流需要使用缓冲保存分帧数据,当判断已经获取到完整的帧的时候,再从缓冲中获取数据即可。这里用到的结构是 recvBuffer
1 2 3 4 5 6 type recvBuffer struct { c chan recvMsg mu sync.Mutex backlog []recvMsg err error }
获取到数据帧之后,会将帧存入到 recvBuffer
中
1 2 3 func (s *Stream) write (m recvMsg) { s.buf.put(m) }
然后这里做了一个特殊的设计
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 func (b *recvBuffer) put (r recvMsg) { b.mu.Lock() if b.err != nil { b.mu.Unlock() return } b.err = r.err if len (b.backlog) == 0 { select { case b.c <- r: b.mu.Unlock() return default : } } b.backlog = append (b.backlog, r) b.mu.Unlock() }
这里有两个操作
当 backlog 为空,则直接写入到隧道中
当 backlog 不为空,则追加到backlog后面
总结
帧分为帧头与负载,帧头长度为9
帧类型为10种,并结合flags进行使用
参考文档
https://blog.csdn.net/u011582922/article/details/120578941
https://www.jianshu.com/p/e22fef60a7f0