Grpc-03-帧发送器

在客户端跟服务器交互过程中,流具体是通过帧来进行数据的发送。这里学习如何发送帧的,整体如下

帧发送器的整体利处理图

主要流程:

  1. 各种类型的帧通过帧存储器接口(put, executeAndPut)将帧存储到帧缓存器controlBuf里,一个连接里的所有帧共享一个帧缓存器controlBuf
  2. 帧发送器开始发送
    • 帧加载接口获取帧的策略,从帧缓存器controlBuf里加载帧,将帧传递给帧分发器
    • 帧分发器根据帧的类型,分发给不同类型的帧处理器
    • 不同类型的帧处理器接收到帧后,根据设置好的规则进行处理

带着问题看世界

  1. 帧缓冲区可以无限存储帧吗,满了怎么办,什么时候删,大小可以设置吗
  2. 多个流公用一个帧缓冲区吗(显而易见的,因为帧属于不同的流,它们都存入的帧缓冲区中)
  3. 帧缓冲区是如何进行消息发送的

实现原理

在之前的流当中,有帧发送器的逻辑,即帧会在发送之前将帧都存储到帧缓冲器中

帧缓冲器

帧缓存器结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type controlBuffer struct {
ch chan struct{}
done <-chan struct{}
mu sync.Mutex
consumerWaiting bool
list *itemList
err error

transportResponseFrames int
trfChan atomic.Value // chan struct{}
}

type itemNode struct {
it interface{}
next *itemNode
}

type itemList struct {
head *itemNode
tail *itemNode
}

使用 itemList 单向链表存储帧信息

帧进队列
1
2
3
4
5
6
7
8
9
10
//数据加入到链表尾部
func (il *itemList) enqueue(i interface{}) {
n := &itemNode{it: i}
if il.tail == nil { //如果没有尾巴,说明链表为空,新节点为链表
il.head, il.tail = n, n
return
}
il.tail.next = n //将新节点加入到链表尾部
il.tail = n
}

其实就是将帧添加到链表中,这里并没有限制链表长度,超过负载则是单独处理的 throttle()

帧出队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//弹出链表头部数据
func (il *itemList) dequeue() interface{} {
if il.head == nil { //如果head为空,说明链表为空
return nil
}
i := il.head.it
//更新头部与尾部指针
il.head = il.head.next
if il.head == nil { //注意尾部指针
il.tail = nil
}
return i
}

//返回链表头部数据
func (il *itemList) peek() interface{} {
return il.head.it
}

//返回链表,缓存器清空
func (il *itemList) dequeueAll() *itemNode {
h := il.head
il.head, il.tail = nil, nil
return h
}
帧缓存器限制

itemList仅仅单纯用于消息存储,并没有数量限制。但是如果不限制消息数量,可能导致OOM或者消息大面积丢失,其实它单独做了一个限制

当特殊帧达到上限 maxQueuedTransportResponseFrames = 56 之后,会存储一个流控隧道

1
2
3
4
5
6
7
8
9
c.list.enqueue(it)
if it.isTransportResponseFrame() {
c.transportResponseFrames++
if c.transportResponseFrames == maxQueuedTransportResponseFrames {
// We are adding the frame that puts us over the threshold; create
// a throttling channel.
c.trfChan.Store(make(chan struct{}))
}
}

记录条件的帧数量,而不是发送的所有帧都记录进去,符合条件的只有两种帧 incomingSettingsping ,存储了一个隧道,直接通过隧道控制流量

1
2
3
4
5
6
7
8
9
10
11
12
func (c *controlBuffer) throttle() {
ch, _ := c.trfChan.Load().(chan struct{})
if ch != nil {
select {
case <-ch: //要么隧道关闭
case <-c.done: //要么超时
}
}
}
//一共只有两个地方用到
//http_client.go 读取帧的之后 reader
//http_server.go 处理流中帧逻辑 HandleStreams

注意这里用的是等于 c.transportResponseFrames == maxQueuedTransportResponseFrames,每个客户端http_client都有一个 帧缓存器

帧存储与获取

grpc 帧存储器和帧加载器

生产生通过一个隧道通知消费者消息的处理,否则消费者阻塞等待

帧获取
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (c *controlBuffer) get(block bool) (interface{}, error) {
for {
//...
if !c.list.isEmpty() {
//...
}
if !block { //如果不阻塞,直接退出
c.mu.Unlock()
return nil, nil
}
c.consumerWaiting = true //否则等待消息
c.mu.Unlock()
select {
case <-c.ch:
case <-c.done:
return nil, ErrConnClosing
}
}
}
帧存储
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (bool, error) {
var wakeUp bool
//...
if c.consumerWaiting { //如果消费端还在等待
wakeUp = true
c.consumerWaiting = false
}
c.list.enqueue(it)
//...
if wakeUp {
select {
case c.ch <- struct{}{}: //通知消费者有消息来了
default:
}
}
return true, nil
}

帧分发器

将帧从帧缓冲器获取出来进行分发,就需要用到帧分发器,执行逻辑帧逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func (l *loopyWriter) run() (err error) {
//defer 打印链接 ErrConnClosing
for {
it, err := l.cbuf.get(true) //阻塞式获取
if err != nil {
return err
}
if err = l.handle(it); err != nil {
return err
}
if _, err = l.processData(); err != nil {
return err
}
gosched := true
hasdata:
for {
it, err := l.cbuf.get(false) //非阻塞式获取
if err != nil {
return err
}
if it != nil { //拿到了消息
if err = l.handle(it); err != nil {
return err
}
if _, err = l.processData(); err != nil {
return err
}
continue hasdata
}
isEmpty, err := l.processData()
if err != nil {
return err
}
if !isEmpty {
continue hasdata
}
if gosched {
gosched = false
if l.framer.writer.offset < minBatchSize { //如果
runtime.Gosched() //让出CPU时间片
continue hasdata
}
}
l.framer.writer.Flush() //向连接中写入数据
break hasdata

}
}
}

注意:

  1. 一开始阻塞式获取, 后来非阻塞式获取

    • 一开始阻塞式获取,因为初始化的时候一般没有消息,那么这里就一直等到有消息了
    • processData 的作用是将数据写入到连接中,并且如果还有数据则继续写入
  2. 有一段让出CPU的操作,意思是:在数据量不足最小批处理大小时,让出CPU时间片,以等待更多数据到达,以尽可能地将数据一次性写入连接中,从而提高性能。

    具体来说,当 l.framer.writer.offset(写入缓冲区中的数据量)小于 minBatchSize(最小批处理大小)时,表示当前缓冲区中的数据量不足以一次性写入连接中,这时候我们不应该立即将数据写入连接中,而是应该等待更多的数据到达,以尽可能地将数据一次性写入连接中,从而减少系统调用的次数,提高性能。因此,我们使用 runtime.Gosched() 方法让出CPU时间片,等待更多的数据到达,然后继续循环,直到数据量达到最小批处理大小时,再将数据一次性写入连接中,以提高性能

    1
    2
    3
    4
    5
    6
    7
    if gosched {
    gosched = false
    if l.framer.writer.offset < minBatchSize { //如果
    runtime.Gosched() //让出CPU时间片
    continue hasdata
    }
    }

总结

首先回答一开始的问题

问题1:帧缓冲区可以无限存储帧吗,满了怎么办,什么时候删,大小可以设置吗

答:理论上可以无限存储(链表),通过ping与窗口更新帧来控制来控制负载

问题2:多个流公用一个帧缓冲区吗

答:因为帧属于不同的流,它们都公用同一个帧缓冲区中

问题3:帧缓冲区是如何进行消息发送的

答:利用生产者消费者模式进行消息存储与发送

参考文档

  1. https://blog.csdn.net/u011582922/article/details/117979181
  2. https://juejin.cn/post/7092975446257565726