Grpc-04-帧接收器

看完帧的发送器,看看帧是如何接收的。

带着问题看世界:

  1. 上一节中,一个连接对应一个帧发送器,那么一个连接有几个帧接收器。
  2. 服务端又是如何存储帧的

实现原理

帧缓存器阀门

跟帧发送器一样,帧接收器收到帧之后,也会根据不同类型进行分发,不同的是它并没有缓存的逻辑。

数据帧处理流程

开启读取

在构建 httpClient 客户端连接的时候会同时使用独立协程进行帧的读取

1
go t.reader()

每个连接(具体连接)会创建一个 Framer , 这个 Framer 就是实际上负责发送和接收 HTTP2 frame 的接口. 每一个 client 都会对应一个 Framer 来处理来自该 client 的所有 frame, 不管这些 frame 是不是属于一个 stream

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type framer struct {
writer *bufWriter //buf输入
fr *http2.Framer //http帧处理器
}

type bufWriter struct {
buf []byte
offset int
batchSize int
conn net.Conn
err error

onFlush func()
}

读取帧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (t *http2Client) reader() {
defer close(t.readerDone)
frame, err := t.framer.fr.ReadFrame() //检查帧的读取是否正常
//... 读取失败则关闭连接,触发重建连接
t.conn.SetReadDeadline(time.Time{}) //设置超时时间为0
//... 更新最近读取帧的时间
//... 这个帧必须是设置帧并更新设置

//循环读取帧
for {
t.controlBuf.throttle()
frame, err := t.framer.fr.ReadFrame()
if t.keepaliveEnabled {
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
}
if err != nil {
//如果返回流错误则关闭流,并解析错误信息。仅仅当服务器返回错误信息才会返回
if se, ok := err.(http2.StreamError); ok {
//...
}
continue
} else { //否则关闭连接
// Transport error.
t.Close(connectionErrorf(true, err, "error reading from server: %v", err))
return
}
}

//... 根据帧的类型进行处理
}
}
}

这里有一个设置超时时间的逻辑需要注意

1
t.conn.SetReadDeadline(time.Time{})  //设置成 读取没有超时时间

为什么这样设置就不会有超时时间可以看 SetReadDeadline 源码,当设置 零值的时候读取将不超时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// SetReadDeadline sets the deadline for future Read calls
// and any currently-blocked Read call.
// A zero value for t means Read will not time out.
SetReadDeadline(t time.Time) error

func (c *conn) SetReadDeadline(t time.Time) error {
p := c.Reader.(*pipe)
p.mu.Lock()
defer p.mu.Unlock()
p.rtimer.Stop()
p.rtimedout = false
if !t.IsZero() {
p.rtimer = time.AfterFunc(time.Until(t), func() {
p.mu.Lock()
defer p.mu.Unlock()
p.rtimedout = true
p.rwait.Broadcast()
})
}
return nil
}

那么为什么 time.Time{} 被认为是 0,因为 time.Time{}0001-01-01 00:00:00 +0000 UTC,而 IsZero的判断标准就是从起始时间开始的秒与毫秒

1
2
3
func (t Time) IsZero() bool {
return t.sec() == 0 && t.nsec() == 0
}

帧处理

读取到的帧会根据不同类型直接处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
t.operateHeaders(frame)
case *http2.DataFrame:
t.handleData(frame)
case *http2.RSTStreamFrame:
t.handleRSTStream(frame)
case *http2.SettingsFrame:
t.handleSettings(frame, false)
case *http2.PingFrame:
t.handlePing(frame)
case *http2.GoAwayFrame:
t.handleGoAway(frame)
case *http2.WindowUpdateFrame:
t.handleWindowUpdate(frame)
default:
if logger.V(logLevel) {
logger.Errorf("transport: http2Client.reader got unhandled frame type %v.", frame)
}
}

读取帧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (fr *Framer) ReadFrame() (Frame, error) {
fr.errDetail = nil
if fr.lastFrame != nil {
fr.lastFrame.invalidate()
}
fh, err := readFrameHeader(fr.headerBuf[:], fr.r)
if err != nil {
return nil, err
}
//超过最大读取格式,那么将报错
if fh.Length > fr.maxReadSize {
return nil, ErrFrameTooLarge
}
payload := fr.getReadBuf(fh.Length)
if _, err := io.ReadFull(fr.r, payload); err != nil {
return nil, err
}
f, err := typeFrameParser(fh.Type)(fr.frameCache, fh, fr.countError, payload)
if err != nil {
if ce, ok := err.(connError); ok {
return nil, fr.connError(ce.Code, ce.Reason)
}
return nil, err
}
if err := fr.checkFrameOrder(f); err != nil {
return nil, err
}
if fr.logReads {
fr.debugReadLoggerf("http2: Framer %p: read %v", fr, summarizeFrame(f))
}
if fh.Type == FrameHeaders && fr.ReadMetaHeaders != nil {
return fr.readMetaFrame(f.(*HeadersFrame))
}
return f, nil
}

参考文档

  1. https://blog.csdn.net/u011582922/article/details/117979181
  2. https://juejin.cn/post/7092975446257565726