Grpc-13-保持链接

在学习的过程中看到了grpc-go中的两个概念 保持链接心跳检查,这里先学习一下保持链接

带着问题看世界

  1. 保持链接的作用
  2. 保持链接的原理
  3. 它与TCP的 keepalive 有什么区别

首先来看一下官方的介绍

  • 保持连接功能可以检测TCP层面的连接故障。在某些情况下,TCP连接会丢失数据包(包括FIN),需要等待系统TCP超时时间(可能为30分钟)才能检测到故障。使用保持连接功能可以让gRPC更早地检测到这种故障

  • 另外是保持连接活动,比如在L4代理中配置为关闭“空闲连接”的情况下,发送包活消息可以是连接不处于“空闲”状态

这里补充一下TCP的 keepalive 的原理:

当一方发送一个数据包后,如果对方没有回应,那么TCP协议会按照指数退避的算法重新发送数据包,最多会重发12次。如果在这12次内仍然没有收到对方的响应,则会将连接标记为“超时”,并关闭连接。这个过程通常需要30分钟左右

可以手动修改TCP默认的超时时间

sudo sysctl -w net.ipv4.tcp_keepalive_time=300

基本使用

客户端

1
2
3
4
5
6
7
8
9
10
11
var kacp = keepalive.ClientParameters{
Time: 10 * time.Second, // send pings every 10 seconds if there is no activity
Timeout: time.Second, // wait 1 second for ping ack before considering the connection dead
PermitWithoutStream: true, // send pings even without active streams
}

func main() {

conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithKeepaliveParams(kacp))
//...
}

客户端的保持心跳参数一共有三个

  1. Time:在没有活动的情况下,发送 ping 的时间间隔(不能少于10s)
  2. Timeout:等待ping确认的超时时间
  3. PermitWithoutStream:及时没有活动的流也要发送ping请求

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
var kaep = keepalive.EnforcementPolicy{
MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection
PermitWithoutStream: true, // Allow pings even when there are no active streams
}

var kasp = keepalive.ServerParameters{
MaxConnectionIdle: 15 * time.Second, // If a client is idle for 15 seconds, send a GOAWAY
MaxConnectionAge: 30 * time.Second, // If any connection is alive for more than 30 seconds, send a GOAWAY
MaxConnectionAgeGrace: 5 * time.Second, // Allow 5 seconds for pending RPCs to complete before forcibly closing connections
Time: 5 * time.Second, // Ping the client if it is idle for 5 seconds to ensure the connection is still active
Timeout: 1 * time.Second, // Wait 1 second for the ping ack before assuming the connection is dead
}

func main() {
s := grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp))
//...
}

这里一共有两个保活配置 EnforcementPolicyServerParameters

EnforcementPolicy 定义了服务端如何执行保活策略

  • MinTime:如果客户端每隔不到5秒就发送一个ping请求,服务器就终止连接
  • PermitWithoutStream:表示即使没有活动的流,也允许ping请求

ServerParameters 定义了保活

  • MaxConnectionIdle:如果客户端闲置超过 15 秒,则发送 GOAWAY
  • MaxConnectionAge: 如果任何连接存在时间超过 30 秒,则发送 GOAWAY
  • MaxConnectionAgeGrace:允许在强制关闭连接之前等待 5 秒钟以完成待处理的 RPC
  • Time:如果客户端闲置超过 5 秒钟,则发送 ping 以确保连接仍处于活动状态
  • Timeout:等待 1 秒钟以获取 ping 的响应,在此之后假定连接已断开

MaxConnectionAge是指整个连接(connection)的最长存在时间,不是单个流(stream)的最长存在时间。当一个连接的时间超过了 MaxConnectionAge 指定的时间,服务器会发送一个 GOAWAY 帧,表示不再接受来自该连接的新流。任何新的流的创建请求都会被拒绝,并且服务器会等待 MaxConnectionAgeGrace 指定的一段时间,让尚未完成的 RPC 请求完成。在这段时间内,服务器不会发送任何新的数据帧,但仍会响应已有的流。

实现原理

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
//...
kp := opts.KeepaliveParams
// Validate keepalive parameters.
if kp.Time == 0 {
kp.Time = defaultClientKeepaliveTime
}
if kp.Timeout == 0 {
kp.Timeout = defaultClientKeepaliveTimeout
}
keepaliveEnabled := false
if kp.Time != infinity {
if err = syscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil {
return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
}
keepaliveEnabled = true
}
//...

if t.keepaliveEnabled { //如果开启包活
t.kpDormancyCond = sync.NewCond(&t.mu)
go t.keepalive()
}
//...
}

这是在构建链接的时候设置的超时时间 SetTCPUserTimeout,指定发送数据后多久没有收到确认信号就会超时。

  • 这是设置是设置网络层的,并不会影响gRPC应用层,因为gRPC超时会重试
  • 并不是所有的操作系统都支持 syscall.SetTCPUserTimeout(可能老的操作系统不支持,Linux、Windows、MacOS都支持)

如果开启保活,通过开启独立协程发送ping帧确保链接是活的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
func (t *http2Client) keepalive() {
p := &ping{data: [8]byte{}}
outstandingPing := false //是否有等待响应的ping帧
timeoutLeft := time.Duration(0) //记录发送ping之后剩余时间
prevNano := time.Now().UnixNano() //记录活动时间
timer := time.NewTimer(t.kp.Time) //开启定时器
for {
select {
case <-timer.C:
lastRead := atomic.LoadInt64(&t.lastRead)
if lastRead > prevNano { //自从上次定时器已经读取过了
outstandingPing = false
// Next timer should fire at kp.Time seconds from lastRead time.
//重新计算下次触发定时器时间(从上次读取时间开始算)
timer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
prevNano = lastRead
continue
}
//outstandingPing 有等待响应的ping帧,且已经超时。表示链接超时关闭
if outstandingPing && timeoutLeft <= 0 {
t.Close(connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout"))
return
}
t.mu.Lock()
if t.state == closing { //如果链接的状态是关闭中,那么也退出
t.mu.Unlock()
return
}
//如果没有活动流,且没有活动流不允许发送ping帧
if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
outstandingPing = false
t.kpDormant = true
t.kpDormancyCond.Wait() //在初始化流或者关闭客户端的时候 会触发 t.kpDormancyCond.Signal()
}
t.kpDormant = false
t.mu.Unlock()

//没有等待响应的ping帧,就发送ping帧
if !outstandingPing {
//...
t.controlBuf.put(p) //发送ping帧
timeoutLeft = t.kp.Timeout
outstandingPing = true
}

sleepDuration := minTime(t.kp.Time, timeoutLeft)
timeoutLeft -= sleepDuration
timer.Reset(sleepDuration) //计算定时器下一次触发时间
case <-t.ctx.Done():
if !timer.Stop() {
<-timer.C
}
return
}
}
}

其实就是根据 ping帧的状态以及链接是否有活动来计算发送ping帧时间。

有几个注意点

  1. 如何判断有ping帧响应的

    其实它并没有判断是有有相应帧,而是是否在指定时间内是否读取到消息即可

  2. 如果没有活动流又不允许非活动流发送ping,那么它是如何处理的

    1
    2
    3
    4
    5
    6
    //如果没有活动流,且没有活动流不允许发送ping帧
    if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
    outstandingPing = false
    t.kpDormant = true
    t.kpDormancyCond.Wait() //在初始化流或者关闭客户端的时候 会触发 t.kpDormancyCond.Signal()
    }

    而出发 kpDormancyCond.Signal()的位置是 构建流或者关闭连接

服务端

服务端同样也是使用独立协程来运行保活逻辑

  1. 超过 最长闲置时间 MaxConnectionIdle 优雅的关闭连接
  2. 超过 最大连接时间 MaxConnectionAge 优雅的关闭连接
  3. 等待 MaxConnectionAgeGrace 后强制关闭连接。
  4. Time 的频率发送 ping 确保连接存活,并在 Timeout 的额外时间内关闭无响应的连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
func (t *http2Server) keepalive() {
p := &ping{}
outstandingPing := false //是否有等待响应帧
kpTimeoutLeft := time.Duration(0)
prevNano := time.Now().UnixNano()
// Initialize the different timers to their default values.
idleTimer := time.NewTimer(t.kp.MaxConnectionIdle)
ageTimer := time.NewTimer(t.kp.MaxConnectionAge)
kpTimer := time.NewTimer(t.kp.Time)
defer func() {
idleTimer.Stop()
ageTimer.Stop()
kpTimer.Stop()
}()

for {
select {
case <-idleTimer.C: //闲置定时器
t.mu.Lock()
idle := t.idle
if idle.IsZero() { //未闲置则重置最大闲置时间
t.mu.Unlock()
idleTimer.Reset(t.kp.MaxConnectionIdle)
continue
}
val := t.kp.MaxConnectionIdle - time.Since(idle)
t.mu.Unlock()
if val <= 0 {
t.Drain() //优雅关闭连接
return
}
idleTimer.Reset(val) //重置闲置连接
case <-ageTimer.C: //最长连接
t.Drain()
ageTimer.Reset(t.kp.MaxConnectionAgeGrace)
select {
case <-ageTimer.C:
// Close the connection after grace period.
if logger.V(logLevel) {
logger.Infof("transport: closing server transport due to maximum connection age.")
}
t.Close()
case <-t.done:
}
return
case <-kpTimer.C: //保活定时器
lastRead := atomic.LoadInt64(&t.lastRead)
if lastRead > prevNano {
// There has been read activity since the last time we were
// here. Setup the timer to fire at kp.Time seconds from
// lastRead time and continue.
outstandingPing = false
kpTimer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
prevNano = lastRead
continue
}
if outstandingPing && kpTimeoutLeft <= 0 {
if logger.V(logLevel) {
logger.Infof("transport: closing server transport due to idleness.")
}
t.Close()
return
}
if !outstandingPing { //如果没有等待保活则发送保活帧
if channelz.IsOn() {
atomic.AddInt64(&t.czData.kpCount, 1)
}
t.controlBuf.put(p)
kpTimeoutLeft = t.kp.Timeout
outstandingPing = true
}
sleepDuration := minTime(t.kp.Time, kpTimeoutLeft)
kpTimeoutLeft -= sleepDuration
kpTimer.Reset(sleepDuration)
case <-t.done:
return
}
}
}

注意点:

  1. 连接什么时候闲置的?

    1
    2
    3
    if len(t.activeStreams) == 1 {
    t.idle = time.Time{}
    }

    在处理头帧的时候(构建了一个活跃流),如果 len(t.activeStreams) == 1 ,表示刚新建一个流且仅有一个活跃流,那么这个时候连接是闲置的

  2. 服务端也会主动发送ping帧

总结

  1. TCP与gRPC的区别

    TCP的Keepalive是一种机制,它允许在网络连接空闲时发送探测包(keepalive包)来维护连接的状态。这些探测包不包含有效负载,只是一个空的TCP报文段,主要用于检测连接是否仍然活着。

    当TCP连接上没有传输数据时会进入空闲状态,即使连接已经中断或不可用,导致不必要的延迟和资源浪费。Keepalive机制可以在连接空闲时周期性地发送探测包,以检测连接是否仍然活着。如果远程端点没有响应这些探测包,则可以视为连接已经断开,并且可以关闭连接。

    在gRPC中,使用Keepalive可以检测底层TCP连接是否失效,并在检测到连接问题时及时重新建立连接,从而提高网络连接的可靠性和性能。

  2. 客户端与服务端都会根据配置向对端发送ping帧,只要在指定时间内读取到帧,那么就不会主动发送ping帧

参考链接

  1. https://blog.csdn.net/u011582922/article/details/120279303