Grpc-14-健康检查

前面学习了保持链接,这里看一下健康检查

带着问题看世界

  1. 健康检查的作用
  2. 健康检查的原理
  3. 保持链接与健康检查的区别

首先看一下源码例子中的ReadMe说明,gRPC提供了一个健康检查库,用于向客户端通报系统的健康状况。它通过使用health/v1 API提供服务定义。通过使用健康检查库,客户端可以在遇到问题时优雅地避免使用服务器。大多数语言都提供了现成的实现,这使得它在不同系统之间可以互操作。内置健康检查的优势

  • 健康检查的格式与普通的RPC一样
  • 重用现有的配额等机制,内部对健康检查有完全的控制权

基本使用

内部定义了健康检查的 proto

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
syntax = "proto3";

package grpc.health.v1;

message HealthCheckRequest {
string service = 1;
}

message HealthCheckResponse {
enum ServingStatus {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
SERVICE_UNKNOWN = 3; // Used only by the Watch method.
}
ServingStatus status = 1;
}

service Health {
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);

rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
}

一共有两种方式进行健康检查

  • Check 探测服务器的健康状态
  • Watch 观察服务端变化,这里注意 是一个服务端流,也就是客户端不断会收到服务端的状态更新

一般,客户端不需要手动执行 Check操作,而是通过配置 healthCheckConfig ,它会在内部自动执行 Watch操作

客户端

1
2
3
4
5
6
7
8
9
10
11
12
// import grpc/health to enable transparent client side checking 
import _ "google.golang.org/grpc/health"

// set up appropriate service config
serviceConfig := grpc.WithDefaultServiceConfig(`{
"loadBalancingPolicy": "round_robin",
"healthCheckConfig": {
"serviceName": ""
}
}`)

conn, err := grpc.Dial(..., serviceConfig)

服务端

启动一个协程,来模拟服务端状态变化,核心逻辑就是设置服务的状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
var (
port = flag.Int("port", 50051, "the port to serve on")
sleep = flag.Duration("sleep", time.Second*5, "duration between changes in health")

system = "" // empty string represents the health of the system
)

//...

func main() {
flag.Parse()

lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}

s := grpc.NewServer()
healthcheck := health.NewServer()
healthpb.RegisterHealthServer(s, healthcheck) //健康检查
pb.RegisterEchoServer(s, &echoServer{})

//健康状态更新
go func() {
// asynchronously inspect dependencies and toggle serving status as needed
next := healthpb.HealthCheckResponse_SERVING

for {
healthcheck.SetServingStatus(system, next)

if next == healthpb.HealthCheckResponse_SERVING {
next = healthpb.HealthCheckResponse_NOT_SERVING
} else {
next = healthpb.HealthCheckResponse_SERVING
}

time.Sleep(*sleep)
}
}()

if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}

健康服务器可以返回四种状态:UNKNOWNSERVINGNOT_SERVINGSERVICE_UNKNOWN

  • UNKNOWN 表示当前状态尚未知晓。在服务器实例启动时经常会看到这种状态。

  • SERVING 表示系统健康,准备好提供服务请求。

  • NOT_SERVING 表示系统当前无法处理请求。

  • SERVICE_UNKNOWN 表示客户端请求的服务名未被服务器所知。此状态仅由 Watch() 调用报告

实现原理

客户端

客户端默认初始化Watch函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
func init() {
internal.HealthCheckFunc = clientHealthCheck
}

const healthCheckMethod = "/grpc.health.v1.Health/Watch" //指定健康检查方法名称

func clientHealthCheck(ctx context.Context, newStream func(string) (interface{}, error), setConnectivityState func(connectivity.State, error), service string) error {
tryCnt := 0

retryConnection:
for {
//退避重试
if tryCnt > 0 && !backoffFunc(ctx, tryCnt-1) {
return nil
}
tryCnt++

if ctx.Err() != nil {
return nil
}
setConnectivityState(connectivity.Connecting, nil) //设置状态
rawS, err := newStream(healthCheckMethod) //构建流链接
if err != nil {
continue retryConnection
}

s, ok := rawS.(grpc.ClientStream)
if !ok {
setConnectivityState(connectivity.Ready, nil) //服务端健康
return fmt.Errorf("newStream returned %v (type %T); want grpc.ClientStream", rawS, rawS)
}

if err = s.SendMsg(&healthpb.HealthCheckRequest{Service: service}); err != nil && err != io.EOF {
// Stream should have been closed, so we can safely continue to create a new stream.
continue retryConnection
}
s.CloseSend()

resp := new(healthpb.HealthCheckResponse)
for { //不断接收服务端响应
err = s.RecvMsg(resp)

//
if status.Code(err) == codes.Unimplemented { //表示服务端未支持健康检查
setConnectivityState(connectivity.Ready, nil)
return err
}

// 其他错误
if err != nil {
setConnectivityState(connectivity.TransientFailure, fmt.Errorf("connection active but received health check RPC error: %v", err))
continue retryConnection
}

//收到消息那么
tryCnt = 0
if resp.Status == healthpb.HealthCheckResponse_SERVING {
setConnectivityState(connectivity.Ready, nil)
} else {
setConnectivityState(connectivity.TransientFailure, fmt.Errorf("connection active but health check failed. status=%s", resp.Status))
}
}
}
}

根据策略启动健康检查,针对的是同一个域名对应的多个子地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//开启健康检查
func (ac *addrConn) startHealthCheck(ctx context.Context) {
var healthcheckManagingState bool
defer func() {
if !healthcheckManagingState {
ac.updateConnectivityState(connectivity.Ready, nil)
}
}()

//... 判断健康检查是否开启
currentTr := ac.transport
newStream := func(method string) (interface{}, error) {
ac.mu.Lock()
if ac.transport != currentTr {
ac.mu.Unlock()
return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
}
ac.mu.Unlock()
return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac)
}
//更新连接状态
setConnectivityState := func(s connectivity.State, lastErr error) {
ac.mu.Lock()
defer ac.mu.Unlock()
if ac.transport != currentTr {
return
}
ac.updateConnectivityState(s, lastErr)
}
// 独立协程去检查健康状态
go func() {
err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
//....
}()
}

这里可以通过健康检查更新连接状态,但不是唯一更新连接状态的方式

服务端

维护

首先需要明白服务端是如何维护状态的 SetServingStatus 其实还是需要用户自定义状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (s *Server) SetServingStatus(service string, servingStatus healthpb.HealthCheckResponse_ServingStatus) {
//....
s.setServingStatusLocked(service, servingStatus)
}

func (s *Server) setServingStatusLocked(service string, servingStatus healthpb.HealthCheckResponse_ServingStatus) {
s.statusMap[service] = servingStatus
for _, update := range s.updates[service] {
// Clears previous updates, that are not sent to the client, from the channel.
// This can happen if the client is not reading and the server gets flow control limited.
select {
case <-update:
default:
}
// Puts the most recent update to the channel.
update <- servingStatus
}
}

statusMap map[string]healthpb.HealthCheckResponse_ServingStatus
updates map[string]map[healthgrpc.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus

这里的状态更新有两种

  1. 更新 statusMap[service] 对应的服务状态
  2. 将状态写入到服务对应的隧道中,隧道中的状态则是通过Watch函数不断地通知客户端(不同的状态才会通知)

这里有两种特殊的情况可以直接使用内置函数,就是服务恢复正常与停止服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (s *Server) Shutdown() {
s.mu.Lock()
defer s.mu.Unlock()
s.shutdown = true
for service := range s.statusMap {
s.setServingStatusLocked(service, healthpb.HealthCheckResponse_NOT_SERVING)
}
}

func (s *Server) Resume() {
s.mu.Lock()
defer s.mu.Unlock()
s.shutdown = false
for service := range s.statusMap {
s.setServingStatusLocked(service, healthpb.HealthCheckResponse_SERVING)
}
}

这里会统一更新服务的状态,而不需要单独设置某个服务的状态更新,一般用于服务启动与恢复。

接着就是接收客户端的健康检查请求

Check

当客户端请求服务状态的时候,直接从状态服务中返回结果 statusMap[in.Service]

1
2
3
4
5
6
7
8
9
10
func (s *Server) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) {
s.mu.RLock()
defer s.mu.RUnlock()
if servingStatus, ok := s.statusMap[in.Service]; ok {
return &healthpb.HealthCheckResponse{
Status: servingStatus,
}, nil
}
return nil, status.Error(codes.NotFound, "unknown service")
}
Watch

watch 其实就是服务端不断地将状态的变化通知给客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// Watch implements `service Health`.
func (s *Server) Watch(in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
service := in.Service
// update channel is used for getting service status updates.
update := make(chan healthpb.HealthCheckResponse_ServingStatus, 1)
s.mu.Lock()
// Puts the initial status to the channel.
if servingStatus, ok := s.statusMap[service]; ok {
update <- servingStatus //如果服务状态存在,那么写入隧道中
} else { //如果不存在,则更新未知
update <- healthpb.HealthCheckResponse_SERVICE_UNKNOWN
}

// 注册状态
if _, ok := s.updates[service]; !ok {
s.updates[service] = make(map[healthgrpc.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus)
}
s.updates[service][stream] = update
defer func() {
s.mu.Lock()
delete(s.updates[service], stream)
s.mu.Unlock()
}()
s.mu.Unlock()

var lastSentStatus healthpb.HealthCheckResponse_ServingStatus = -1
for {
select {

case servingStatus := <-update: //监听 update
if lastSentStatus == servingStatus { //与上一次的状态相同,那么就不会响应
continue
}
//否则发送状态
lastSentStatus = servingStatus
err := stream.Send(&healthpb.HealthCheckResponse{Status: servingStatus})
if err != nil {
return status.Error(codes.Canceled, "Stream has ended.")
}
case <-stream.Context().Done():
return status.Error(codes.Canceled, "Stream has ended.")
}
}
}

总结

  1. 心跳检查是为了客户端可以根据服务端状态进行自定义操作

  2. 心跳检查启动的时候启动了退避算法,gRPC默认的退避算法是

    1
    2
    3
    4
    5
    6
    var DefaultConfig = Config{
    BaseDelay: 1.0 * time.Second,
    Multiplier: 1.6,
    Jitter: 0.2,
    MaxDelay: 120 * time.Second,
    }
  3. Watch中如果与上一次的状态没有变化,则不会通知客户端

  4. 健康检查针对的是具有相同地址的多个服务,一个服务异常整体连接在不会断开,而是更新连接状态

    • 针对不能提供服务的服务器端,客户端会将链接状态更新为TransientFailure,但是链接不会断的;当该连接的状态重新更新为Ready时,还可以继续创建流,传输数据。
    • 针对可以对外提供服务的服务器端,客户端会将链接状态更新为Ready,生成Picker,即将此链接缓存到平衡器里,并且将链接状态更新为Ready,接下来,就可以创建流,传输数据了
  5. 健康检查可以应用会更新连接状态,且不能自定义 withHealthCheckFunc

  6. 如果在传输过程中收到服务端通知,由于数据发送存在重试机制,所以还是可以从平衡器中选择 Ready 的连接重新传输数据

  7. 健康检查与保持链接的区别

    1. keepalivehealthcheck都是用于确保通信的可用性和健康状态的机制。
    2. keepalive是一种保持连接活动的机制,用于检测连接是否处于空闲状态,并在需要时发送ping帧以防止连接关闭。keepalive机制确保长时间的空闲连接不会被关闭,以避免重新建立连接的开销
    3. healthcheck是一种检查服务可用性的机制,用于检测服务器是否可用,以及在服务器不可用时采取相应的措施。healthcheck机制通过发送特定的RPC请求来检查服务器的可用性,并根据响应的状态码和错误信息来确定服务器的状态。

参考链接

  1. https://github.com/grpc/grpc/blob/master/doc/health-checking.md
  2. https://blog.csdn.net/u011582922/article/details/120052706