Grpc-06-多路复用

多路复用是 HTTP/2中的重要特性,允许同一个TCP连接上同时传输多个HTTP请求和响应

带着问题看世界:

  1. 通过《帧发送器》可知帧是一个一个进行消息发送,多个流的帧如何发送
  2. 服务端是如何区分不同的流的帧
  3. 帧太大一定会进行分包,最大是多少,服务端如何存储分包的帧,保证它的顺序
  4. 服务端如何从分包的数据帧的恢复数据
在这里插入图片描述

发送端

多路复用从 《帧接收器》说起,一个请求入上图所示,由头帧跟多个数据帧组成,现在直接看数据帧的发送逻辑

1
2
3
4
5
6
7
8
9
10
it, err := l.cbuf.get(true)
if err != nil {
return err
}
if err = l.handle(it); err != nil {
return err
}
if _, err = l.processData(); err != nil {
return err
}

上面是从发送缓冲区获取数据进行发送,一共分为三步

  1. 从缓冲区获取帧(这里的帧是应用帧中的数据帧 dataFrame)

  2. 将帧进行流的处理

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    func (l *loopyWriter) preprocessData(df *dataFrame) error {
    str, ok := l.estdStreams[df.streamID]
    if !ok {
    return nil
    }
    // If we got data for a stream it means that
    // stream was originated and the headers were sent out.
    str.itl.enqueue(df)
    if str.state == empty {
    str.state = active
    l.activeStreams.enqueue(str)
    }
    return nil
    }

    意思就是:

    1. 根据帧ID streamID 从 已经建立连接的流中 estdStreams 获取到流

      estdStreams 表示所有已建立但未被清除的流(stream)

      在客户端,表示所有已发送头部信息的流

      在服务端,表示所有已接收头部信息的流

      activeStreams 就表示的是已经发送或接收了头帧以及部分数据帧的流

    2. 将帧存入到流的单向链表中

    3. 如果链表为空(表示仅仅是发送了头帧,还没有数据帧),那么将流加入到 activeStreams 数据流中

  3. 进行消息发送

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    func (l *loopyWriter) processData() (bool, error) {
    //...
    //1. 从数据流中拿第一个流的数据帧
    str := l.activeStreams.dequeue() // Remove the first stream.
    if str == nil {
    return true, nil
    }
    dataItem := str.itl.peek().(*dataFrame)
    //dataItem 是业务数据帧,在这里讲真正分解为发送的多个数据帧 dataFrame

    if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // 如果是空的
    //发送结束帧
    if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
    return false, err
    }
    //...根据流的帧后续是否仍然有数据进行处理
    return false, nil
    }
    var (
    buf []byte
    )

    maxSize := http2MaxFrameLen //16KB
    //计算一个帧最大值,默认16KB
    hSize := min(maxSize, len(dataItem.h)) //头部的最大长度
    dSize := min(maxSize-hSize, len(dataItem.d)) //负载的最大长度
    //... 将dataItem 部分数据写入到buf中

    size := hSize + dSize
    //...
    var endStream bool
    //是否是流的最后一帧
    if dataItem.endStream && len(dataItem.h)+len(dataItem.d) <= size {
    endStream = true
    }
    //...
    if err := l.framer.fr.WriteData(dataItem.streamID, endStream, buf[:size]); err != nil {
    return false, err
    }
    str.bytesOutStanding += size
    l.sendQuota -= uint32(size)
    dataItem.h = dataItem.h[hSize:] //剩余头部长度
    dataItem.d = dataItem.d[dSize:] //剩余负载长度

    //如果数据为空,则从链表中删除这个数据帧
    if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out.
    str.itl.dequeue()
    }
    //如果流中的数据帧为空,那么标记流为 empty
    //如果流中下一个为头帧,那么发送头帧并清理流
    //如果还有一部分数据未发送完毕,那么将剩下的帧存入 activeStreams 的链表中下次再发
    //...发送处理
    return false, nil
    }

    对应代码具体步骤如下:

    1. 获取第一个活动流activeStream(有Data)
    2. 拿到流中间的数据帧 dataFrame (应用帧)
    3. 如果需要发送的数据是空的,那么发送数据帧并表示结束 endStream == true
    4. 如果有一部分没有发送完毕,那么将剩下的部分作为数据帧存入 activeStreams中下次再计算发送

总结:

  1. 在发送端,由于同一个链接中多个流公用一个帧缓冲区,所以虽然是多路复用,但其实客户端所有的帧还是一个一个发送的。只是可能存在一个流的帧没有发完就会发送另一个流的帧
  2. 一个帧最大为 16KB,剩下部分会作为一个帧重新加入到帧缓冲区链表中,下一次重新计算发送

接收端

接收端可以分为上述三个问题

  1. 如何接收数据帧分帧的
  2. 如何存储数据帧分帧的

帧的读取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (fr *Framer) ReadFrame() (Frame, error) {
fr.errDetail = nil
if fr.lastFrame != nil {
fr.lastFrame.invalidate()
}
//1. 利用第三方依赖,直接读取帧的头部
fh, err := readFrameHeader(fr.headerBuf[:], fr.r)
if err != nil {
return nil, err
}
if fh.Length > fr.maxReadSize {
return nil, ErrFrameTooLarge
}
//2. 获取负载长度并读取
payload := fr.getReadBuf(fh.Length)
if _, err := io.ReadFull(fr.r, payload); err != nil {
return nil, err
}
//根据帧的类型,将真转换为程序数据结构
f, err := typeFrameParser(fh.Type)(fr.frameCache, fh, payload)
if err != nil {
if ce, ok := err.(connError); ok {
return nil, fr.connError(ce.Code, ce.Reason)
}
return nil, err
}
//检查帧的顺序
if err := fr.checkFrameOrder(f); err != nil {
return nil, err
}
//如果是头帧,还需要将其中的元数据再解析一次,最终变为 MetaHeadersFrame 帧
if fh.Type == FrameHeaders && fr.ReadMetaHeaders != nil {
return fr.readMetaFrame(f.(*HeadersFrame))
}
return f, nil
}

帧转换器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
var frameParsers = map[FrameType]frameParser{
FrameData: parseDataFrame,
FrameHeaders: parseHeadersFrame,
FramePriority: parsePriorityFrame,
FrameRSTStream: parseRSTStreamFrame,
FrameSettings: parseSettingsFrame,
FramePushPromise: parsePushPromise,
FramePing: parsePingFrame,
FrameGoAway: parseGoAwayFrame,
FrameWindowUpdate: parseWindowUpdateFrame,
FrameContinuation: parseContinuationFrame,
}

func typeFrameParser(t FrameType) frameParser {
if f := frameParsers[t]; f != nil {
return f
}
return parseUnknownFrame
}

数据帧处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (t *http2Server) handleData(f *http2.DataFrame) {
size := f.Header().Length
//...流控
// Select the right stream to dispatch.
s, ok := t.getStream(f)
//...
if size > 0 { //负载大于0
//... 流控

if len(f.Data()) > 0 {
buffer := t.bufferPool.get()
buffer.Reset()
buffer.Write(f.Data())
s.write(recvMsg{buffer: buffer})
}
}
//如果流结束了,
if f.StreamEnded() {
// Received the end of stream from the client.
s.compareAndSwapState(streamActive, streamReadDone)
s.write(recvMsg{err: io.EOF})
}
}

流程:

  1. 如果是数据帧则获取流的大小以及负载
  2. 将负载封装成recvMsg写入到流中
  3. 如果对方告知流结束了,那么将流的状态从 streamActive 改为 streamReadDone
  4. 封装帧结束消息 recvMsg{err: io.EOF}

核心就是讲数据写入流中的 s.write(recvMsg{buffer: buffer})

帧缓冲区

与发送端所有流的帧都放入到一个帧缓冲区不一样,这里的缓冲区是一个缓冲区切片,用于接收同一个流中的分帧数据

1
2
3
4
5
6
type recvBuffer struct {
c chan recvMsg //缓冲区为1 的 channel
mu sync.Mutex //原子锁
backlog []recvMsg //缓冲切片
err error //错误信息
}
帧的存储

如果 切片 里面没有没有数据,就直接存入隧道,说明切面的消息已经都消费了

如果 切片 里面有数据,那么就存入到切片的后面,保证消息的顺序消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (b *recvBuffer) put(r recvMsg) {
b.mu.Lock()
if b.err != nil {
b.mu.Unlock()
return
}
b.err = r.err
if len(b.backlog) == 0 { //切片为空,则直接存入隧道
select {
case b.c <- r:
b.mu.Unlock()
return
default:
}
}
b.backlog = append(b.backlog, r) //切片不为空则直接存入切片尾部
b.mu.Unlock()
}
帧的消费

如果切片长度大于0,说明切片内部有消息,则直接将切片第一条数据传入隧道,这样每次读取隧道中的数据即可,也能缓冲一部分数据

1
2
3
4
5
6
7
8
9
10
11
12
func (b *recvBuffer) load() {
b.mu.Lock()
if len(b.backlog) > 0 {
select {
case b.c <- b.backlog[0]:
b.backlog[0] = recvMsg{}
b.backlog = b.backlog[1:]
default:
}
}
b.mu.Unlock()
}

虽然帧被放入到了同一个流的 recvBuffer中,但还是没有说明:如果一个帧的分帧是前后发送的,但是接收的顺序是乱序的时候,即使顺序消费也无法保证数据帧能够正常的解析

答:这里可以这样想,由于TCP是有序的,那么所有的TCP包都会按照发送顺序在接收端组装完成。也就是说只要发送顺序一定,那么接收端的顺序与发送端的是一样的。又因为同一个连接共用一个帧缓冲器,也就是说同一个流中的帧都是顺序发送的,所以接收端收到的帧的顺序也是不会乱序的。

总结

  1. 多路复用可以在同一个TCP连接上同时传输多个HTTP请求和响应,避免建立和关闭连接的开销

  2. 和长连接的区别是: 多路复用 可以避免 队头阻塞(Head-of-Line Blocking) 问题。如果某个请求在传输过程过程中出现阻塞,那么后续的请求也会被阻塞

  3. 和分包的区别是:HTTP分包是指将一个HTTP消息分为多个TCP数据包(Packet)进行传输,而多路复用是在同一个TCP连接上同时传输多个HTTP请求和响应

  4. HTTP2 帧的最大传输字节是 16KB,TCP最大传输MSS是 1460B,IP层最大传输单元 MTU 1500B

    HTTP2 的帧有一个固定9B的头部,用于描述帧的类型,长度,标志等信息。其中保存林该帧的有效载荷的长度,最大长度 2^24 - 1 大约为 16KB

  5. 多路复使用相同的 StreamID 来标识属于同一个流,使用recvBuffer进行帧的缓存(多个数据帧),使用独立协程或者生产者消费者模式进行帧处理

参考文档

  1. https://blog.csdn.net/u011582922/article/details/120426690