//健康状态更新 gofunc() { // asynchronously inspect dependencies and toggle serving status as needed next := healthpb.HealthCheckResponse_SERVING
for { healthcheck.SetServingStatus(system, next)
if next == healthpb.HealthCheckResponse_SERVING { next = healthpb.HealthCheckResponse_NOT_SERVING } else { next = healthpb.HealthCheckResponse_SERVING }
time.Sleep(*sleep) } }()
if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } }
s, ok := rawS.(grpc.ClientStream) if !ok { setConnectivityState(connectivity.Ready, nil) //服务端健康 return fmt.Errorf("newStream returned %v (type %T); want grpc.ClientStream", rawS, rawS) }
if err = s.SendMsg(&healthpb.HealthCheckRequest{Service: service}); err != nil && err != io.EOF { // Stream should have been closed, so we can safely continue to create a new stream. continue retryConnection } s.CloseSend()
resp := new(healthpb.HealthCheckResponse) for { //不断接收服务端响应 err = s.RecvMsg(resp)
// 其他错误 if err != nil { setConnectivityState(connectivity.TransientFailure, fmt.Errorf("connection active but received health check RPC error: %v", err)) continue retryConnection }
//收到消息那么 tryCnt = 0 if resp.Status == healthpb.HealthCheckResponse_SERVING { setConnectivityState(connectivity.Ready, nil) } else { setConnectivityState(connectivity.TransientFailure, fmt.Errorf("connection active but health check failed. status=%s", resp.Status)) } } } }
func(s *Server)setServingStatusLocked(service string, servingStatus healthpb.HealthCheckResponse_ServingStatus) { s.statusMap[service] = servingStatus for _, update := range s.updates[service] { // Clears previous updates, that are not sent to the client, from the channel. // This can happen if the client is not reading and the server gets flow control limited. select { case <-update: default: } // Puts the most recent update to the channel. update <- servingStatus } }
// Watch implements `service Health`. func(s *Server)Watch(in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer)error { service := in.Service // update channel is used for getting service status updates. update := make(chan healthpb.HealthCheckResponse_ServingStatus, 1) s.mu.Lock() // Puts the initial status to the channel. if servingStatus, ok := s.statusMap[service]; ok { update <- servingStatus //如果服务状态存在,那么写入隧道中 } else { //如果不存在,则更新未知 update <- healthpb.HealthCheckResponse_SERVICE_UNKNOWN }