var kacp = keepalive.ClientParameters{ Time: 10 * time.Second, // send pings every 10 seconds if there is no activity Timeout: time.Second, // wait 1 second for ping ack before considering the connection dead PermitWithoutStream: true, // send pings even without active streams }
var kaep = keepalive.EnforcementPolicy{ MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection PermitWithoutStream: true, // Allow pings even when there are no active streams }
var kasp = keepalive.ServerParameters{ MaxConnectionIdle: 15 * time.Second, // If a client is idle for 15 seconds, send a GOAWAY MaxConnectionAge: 30 * time.Second, // If any connection is alive for more than 30 seconds, send a GOAWAY MaxConnectionAgeGrace: 5 * time.Second, // Allow 5 seconds for pending RPCs to complete before forcibly closing connections Time: 5 * time.Second, // Ping the client if it is idle for 5 seconds to ensure the connection is still active Timeout: 1 * time.Second, // Wait 1 second for the ping ack before assuming the connection is dead }
funcmain() { s := grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp)) //... }
func(t *http2Server)keepalive() { p := &ping{} outstandingPing := false//是否有等待响应帧 kpTimeoutLeft := time.Duration(0) prevNano := time.Now().UnixNano() // Initialize the different timers to their default values. idleTimer := time.NewTimer(t.kp.MaxConnectionIdle) ageTimer := time.NewTimer(t.kp.MaxConnectionAge) kpTimer := time.NewTimer(t.kp.Time) deferfunc() { idleTimer.Stop() ageTimer.Stop() kpTimer.Stop() }()
for { select { case <-idleTimer.C: //闲置定时器 t.mu.Lock() idle := t.idle if idle.IsZero() { //未闲置则重置最大闲置时间 t.mu.Unlock() idleTimer.Reset(t.kp.MaxConnectionIdle) continue } val := t.kp.MaxConnectionIdle - time.Since(idle) t.mu.Unlock() if val <= 0 { t.Drain() //优雅关闭连接 return } idleTimer.Reset(val) //重置闲置连接 case <-ageTimer.C: //最长连接 t.Drain() ageTimer.Reset(t.kp.MaxConnectionAgeGrace) select { case <-ageTimer.C: // Close the connection after grace period. if logger.V(logLevel) { logger.Infof("transport: closing server transport due to maximum connection age.") } t.Close() case <-t.done: } return case <-kpTimer.C: //保活定时器 lastRead := atomic.LoadInt64(&t.lastRead) if lastRead > prevNano { // There has been read activity since the last time we were // here. Setup the timer to fire at kp.Time seconds from // lastRead time and continue. outstandingPing = false kpTimer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano())) prevNano = lastRead continue } if outstandingPing && kpTimeoutLeft <= 0 { if logger.V(logLevel) { logger.Infof("transport: closing server transport due to idleness.") } t.Close() return } if !outstandingPing { //如果没有等待保活则发送保活帧 if channelz.IsOn() { atomic.AddInt64(&t.czData.kpCount, 1) } t.controlBuf.put(p) kpTimeoutLeft = t.kp.Timeout outstandingPing = true } sleepDuration := minTime(t.kp.Time, kpTimeoutLeft) kpTimeoutLeft -= sleepDuration kpTimer.Reset(sleepDuration) case <-t.done: return } } }