server.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. // Package health provides a service that exposes server's health and it must be
  19. // imported to enable support for client-side health checks.
  20. package health
  21. import (
  22. "context"
  23. "sync"
  24. "google.golang.org/grpc/codes"
  25. "google.golang.org/grpc/grpclog"
  26. healthgrpc "google.golang.org/grpc/health/grpc_health_v1"
  27. healthpb "google.golang.org/grpc/health/grpc_health_v1"
  28. "google.golang.org/grpc/status"
  29. )
  30. // Server implements `service Health`.
  31. type Server struct {
  32. healthgrpc.UnimplementedHealthServer
  33. mu sync.RWMutex
  34. // If shutdown is true, it's expected all serving status is NOT_SERVING, and
  35. // will stay in NOT_SERVING.
  36. shutdown bool
  37. // statusMap stores the serving status of the services this Server monitors.
  38. statusMap map[string]healthpb.HealthCheckResponse_ServingStatus
  39. updates map[string]map[healthgrpc.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus
  40. }
  41. // NewServer returns a new Server.
  42. func NewServer() *Server {
  43. return &Server{
  44. statusMap: map[string]healthpb.HealthCheckResponse_ServingStatus{"": healthpb.HealthCheckResponse_SERVING},
  45. updates: make(map[string]map[healthgrpc.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus),
  46. }
  47. }
  48. // Check implements `service Health`.
  49. func (s *Server) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) {
  50. s.mu.RLock()
  51. defer s.mu.RUnlock()
  52. if servingStatus, ok := s.statusMap[in.Service]; ok {
  53. return &healthpb.HealthCheckResponse{
  54. Status: servingStatus,
  55. }, nil
  56. }
  57. return nil, status.Error(codes.NotFound, "unknown service")
  58. }
  59. // Watch implements `service Health`.
  60. func (s *Server) Watch(in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
  61. service := in.Service
  62. // update channel is used for getting service status updates.
  63. update := make(chan healthpb.HealthCheckResponse_ServingStatus, 1)
  64. s.mu.Lock()
  65. // Puts the initial status to the channel.
  66. if servingStatus, ok := s.statusMap[service]; ok {
  67. update <- servingStatus
  68. } else {
  69. update <- healthpb.HealthCheckResponse_SERVICE_UNKNOWN
  70. }
  71. // Registers the update channel to the correct place in the updates map.
  72. if _, ok := s.updates[service]; !ok {
  73. s.updates[service] = make(map[healthgrpc.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus)
  74. }
  75. s.updates[service][stream] = update
  76. defer func() {
  77. s.mu.Lock()
  78. delete(s.updates[service], stream)
  79. s.mu.Unlock()
  80. }()
  81. s.mu.Unlock()
  82. var lastSentStatus healthpb.HealthCheckResponse_ServingStatus = -1
  83. for {
  84. select {
  85. // Status updated. Sends the up-to-date status to the client.
  86. case servingStatus := <-update:
  87. if lastSentStatus == servingStatus {
  88. continue
  89. }
  90. lastSentStatus = servingStatus
  91. err := stream.Send(&healthpb.HealthCheckResponse{Status: servingStatus})
  92. if err != nil {
  93. return status.Error(codes.Canceled, "Stream has ended.")
  94. }
  95. // Context done. Removes the update channel from the updates map.
  96. case <-stream.Context().Done():
  97. return status.Error(codes.Canceled, "Stream has ended.")
  98. }
  99. }
  100. }
  101. // SetServingStatus is called when need to reset the serving status of a service
  102. // or insert a new service entry into the statusMap.
  103. func (s *Server) SetServingStatus(service string, servingStatus healthpb.HealthCheckResponse_ServingStatus) {
  104. s.mu.Lock()
  105. defer s.mu.Unlock()
  106. if s.shutdown {
  107. grpclog.Infof("health: status changing for %s to %v is ignored because health service is shutdown", service, servingStatus)
  108. return
  109. }
  110. s.setServingStatusLocked(service, servingStatus)
  111. }
  112. func (s *Server) setServingStatusLocked(service string, servingStatus healthpb.HealthCheckResponse_ServingStatus) {
  113. s.statusMap[service] = servingStatus
  114. for _, update := range s.updates[service] {
  115. // Clears previous updates, that are not sent to the client, from the channel.
  116. // This can happen if the client is not reading and the server gets flow control limited.
  117. select {
  118. case <-update:
  119. default:
  120. }
  121. // Puts the most recent update to the channel.
  122. update <- servingStatus
  123. }
  124. }
  125. // Shutdown sets all serving status to NOT_SERVING, and configures the server to
  126. // ignore all future status changes.
  127. //
  128. // This changes serving status for all services. To set status for a particular
  129. // services, call SetServingStatus().
  130. func (s *Server) Shutdown() {
  131. s.mu.Lock()
  132. defer s.mu.Unlock()
  133. s.shutdown = true
  134. for service := range s.statusMap {
  135. s.setServingStatusLocked(service, healthpb.HealthCheckResponse_NOT_SERVING)
  136. }
  137. }
  138. // Resume sets all serving status to SERVING, and configures the server to
  139. // accept all future status changes.
  140. //
  141. // This changes serving status for all services. To set status for a particular
  142. // services, call SetServingStatus().
  143. func (s *Server) Resume() {
  144. s.mu.Lock()
  145. defer s.mu.Unlock()
  146. s.shutdown = false
  147. for service := range s.statusMap {
  148. s.setServingStatusLocked(service, healthpb.HealthCheckResponse_SERVING)
  149. }
  150. }