client.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package health
  19. import (
  20. "context"
  21. "fmt"
  22. "io"
  23. "time"
  24. "google.golang.org/grpc"
  25. "google.golang.org/grpc/codes"
  26. "google.golang.org/grpc/connectivity"
  27. healthpb "google.golang.org/grpc/health/grpc_health_v1"
  28. "google.golang.org/grpc/internal"
  29. "google.golang.org/grpc/internal/backoff"
  30. "google.golang.org/grpc/status"
  31. )
  32. var (
  33. backoffStrategy = backoff.DefaultExponential
  34. backoffFunc = func(ctx context.Context, retries int) bool {
  35. d := backoffStrategy.Backoff(retries)
  36. timer := time.NewTimer(d)
  37. select {
  38. case <-timer.C:
  39. return true
  40. case <-ctx.Done():
  41. timer.Stop()
  42. return false
  43. }
  44. }
  45. )
  46. func init() {
  47. internal.HealthCheckFunc = clientHealthCheck
  48. }
  49. const healthCheckMethod = "/grpc.health.v1.Health/Watch"
  50. // This function implements the protocol defined at:
  51. // https://github.com/grpc/grpc/blob/master/doc/health-checking.md
  52. func clientHealthCheck(ctx context.Context, newStream func(string) (interface{}, error), setConnectivityState func(connectivity.State, error), service string) error {
  53. tryCnt := 0
  54. retryConnection:
  55. for {
  56. // Backs off if the connection has failed in some way without receiving a message in the previous retry.
  57. if tryCnt > 0 && !backoffFunc(ctx, tryCnt-1) {
  58. return nil
  59. }
  60. tryCnt++
  61. if ctx.Err() != nil {
  62. return nil
  63. }
  64. setConnectivityState(connectivity.Connecting, nil)
  65. rawS, err := newStream(healthCheckMethod)
  66. if err != nil {
  67. continue retryConnection
  68. }
  69. s, ok := rawS.(grpc.ClientStream)
  70. // Ideally, this should never happen. But if it happens, the server is marked as healthy for LBing purposes.
  71. if !ok {
  72. setConnectivityState(connectivity.Ready, nil)
  73. return fmt.Errorf("newStream returned %v (type %T); want grpc.ClientStream", rawS, rawS)
  74. }
  75. if err = s.SendMsg(&healthpb.HealthCheckRequest{Service: service}); err != nil && err != io.EOF {
  76. // Stream should have been closed, so we can safely continue to create a new stream.
  77. continue retryConnection
  78. }
  79. s.CloseSend()
  80. resp := new(healthpb.HealthCheckResponse)
  81. for {
  82. err = s.RecvMsg(resp)
  83. // Reports healthy for the LBing purposes if health check is not implemented in the server.
  84. if status.Code(err) == codes.Unimplemented {
  85. setConnectivityState(connectivity.Ready, nil)
  86. return err
  87. }
  88. // Reports unhealthy if server's Watch method gives an error other than UNIMPLEMENTED.
  89. if err != nil {
  90. setConnectivityState(connectivity.TransientFailure, fmt.Errorf("connection active but received health check RPC error: %v", err))
  91. continue retryConnection
  92. }
  93. // As a message has been received, removes the need for backoff for the next retry by resetting the try count.
  94. tryCnt = 0
  95. if resp.Status == healthpb.HealthCheckResponse_SERVING {
  96. setConnectivityState(connectivity.Ready, nil)
  97. } else {
  98. setConnectivityState(connectivity.TransientFailure, fmt.Errorf("connection active but health check failed. status=%s", resp.Status))
  99. }
  100. }
  101. }
  102. }