在客户端跟服务器交互过程中,流具体是通过帧来进行数据的发送。这里学习如何发送帧的,整体如下
主要流程:
各种类型的帧通过帧存储器接口(put, executeAndPut
)将帧存储到帧缓存器controlBuf
里,一个连接里的所有帧共享一个帧缓存器controlBuf
帧发送器开始发送
帧加载接口获取帧的策略,从帧缓存器controlBuf里加载帧,将帧传递给帧分发器
帧分发器根据帧的类型,分发给不同类型的帧处理器
不同类型的帧处理器接收到帧后,根据设置好的规则进行处理
带着问题看世界
帧缓冲区可以无限存储帧吗,满了怎么办,什么时候删,大小可以设置吗
多个流公用一个帧缓冲区吗(显而易见的,因为帧属于不同的流,它们都存入的帧缓冲区中)
帧缓冲区是如何进行消息发送的
实现原理
在之前的流当中,有帧发送器的逻辑,即帧会在发送之前将帧都存储到帧缓冲器中
帧缓冲器
帧缓存器结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 type controlBuffer struct { ch chan struct {} done <-chan struct {} mu sync.Mutex consumerWaiting bool list *itemList err error transportResponseFrames int trfChan atomic.Value } type itemNode struct { it interface {} next *itemNode } type itemList struct { head *itemNode tail *itemNode }
使用 itemList
单向链表存储帧信息
帧进队列
1 2 3 4 5 6 7 8 9 10 func (il *itemList) enqueue (i interface {}) { n := &itemNode{it: i} if il.tail == nil { il.head, il.tail = n, n return } il.tail.next = n il.tail = n }
其实就是将帧添加到链表中,这里并没有限制链表长度,超过负载则是单独处理的 throttle()
帧出队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 func (il *itemList) dequeue () interface {} { if il.head == nil { return nil } i := il.head.it il.head = il.head.next if il.head == nil { il.tail = nil } return i } func (il *itemList) peek () interface {} { return il.head.it } func (il *itemList) dequeueAll () *itemNode { h := il.head il.head, il.tail = nil , nil return h }
帧缓存器限制
itemList
仅仅单纯用于消息存储,并没有数量限制。但是如果不限制消息数量,可能导致OOM或者消息大面积丢失,其实它单独做了一个限制
当特殊帧达到上限 maxQueuedTransportResponseFrames = 56
之后,会存储一个流控隧道
1 2 3 4 5 6 7 8 9 c.list.enqueue(it) if it.isTransportResponseFrame() { c.transportResponseFrames++ if c.transportResponseFrames == maxQueuedTransportResponseFrames { c.trfChan.Store(make (chan struct {})) } }
记录条件的帧数量,而不是发送的所有帧都记录进去 ,符合条件的只有两种帧 incomingSettings
和 ping
,存储了一个隧道,直接通过隧道控制流量
1 2 3 4 5 6 7 8 9 10 11 12 func (c *controlBuffer) throttle () { ch, _ := c.trfChan.Load().(chan struct {}) if ch != nil { select { case <-ch: case <-c.done: } } }
注意这里用的是等于 c.transportResponseFrames == maxQueuedTransportResponseFrames
,每个客户端http_client
都有一个 帧缓存器
帧存储与获取
生产生通过一个隧道通知消费者消息的处理,否则消费者阻塞等待
帧获取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func (c *controlBuffer) get (block bool ) (interface {}, error) { for { if !c.list.isEmpty() { } if !block { c.mu.Unlock() return nil , nil } c.consumerWaiting = true c.mu.Unlock() select { case <-c.ch: case <-c.done: return nil , ErrConnClosing } } }
帧存储
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func (c *controlBuffer) executeAndPut (f func (it interface {}) bool , it cbItem ) (bool , error) { var wakeUp bool if c.consumerWaiting { wakeUp = true c.consumerWaiting = false } c.list.enqueue(it) if wakeUp { select { case c.ch <- struct {}{}: default : } } return true , nil }
帧分发器
将帧从帧缓冲器 获取出来进行分发,就需要用到帧分发器 ,执行逻辑帧逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 func (l *loopyWriter) run () (err error) { for { it, err := l.cbuf.get(true ) if err != nil { return err } if err = l.handle(it); err != nil { return err } if _, err = l.processData(); err != nil { return err } gosched := true hasdata: for { it, err := l.cbuf.get(false ) if err != nil { return err } if it != nil { if err = l.handle(it); err != nil { return err } if _, err = l.processData(); err != nil { return err } continue hasdata } isEmpty, err := l.processData() if err != nil { return err } if !isEmpty { continue hasdata } if gosched { gosched = false if l.framer.writer.offset < minBatchSize { runtime.Gosched() continue hasdata } } l.framer.writer.Flush() break hasdata } } }
注意:
一开始阻塞式获取, 后来非阻塞式获取
一开始阻塞式获取,因为初始化的时候一般没有消息,那么这里就一直等到有消息了
processData
的作用是将数据写入到连接中,并且如果还有数据则继续写入
有一段让出CPU的操作,意思是:在数据量不足最小批处理大小时,让出CPU时间片,以等待更多数据到达,以尽可能地将数据一次性写入连接中,从而提高性能。
具体来说,当 l.framer.writer.offset(写入缓冲区中的数据量)小于 minBatchSize(最小批处理大小)时,表示当前缓冲区中的数据量不足以一次性写入连接中,这时候我们不应该立即将数据写入连接中,而是应该等待更多的数据到达,以尽可能地将数据一次性写入连接中,从而减少系统调用的次数,提高性能。因此,我们使用 runtime.Gosched() 方法让出CPU时间片,等待更多的数据到达,然后继续循环,直到数据量达到最小批处理大小时,再将数据一次性写入连接中,以提高性能
1 2 3 4 5 6 7 if gosched { gosched = false if l.framer.writer.offset < minBatchSize { runtime.Gosched() continue hasdata } }
总结
首先回答一开始的问题
问题1 :帧缓冲区可以无限存储帧吗,满了怎么办,什么时候删,大小可以设置吗
答:理论上可以无限存储(链表),通过ping与窗口更新帧来控制来控制负载
问题2 :多个流公用一个帧缓冲区吗
答:因为帧属于不同的流,它们都公用同一个帧缓冲区中
问题3 :帧缓冲区是如何进行消息发送的
答:利用生产者消费者模式进行消息存储与发送
参考文档
https://blog.csdn.net/u011582922/article/details/117979181
https://juejin.cn/post/7092975446257565726