healthcheck_test.go 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978
  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package test
  19. import (
  20. "context"
  21. "errors"
  22. "fmt"
  23. "net"
  24. "sync"
  25. "testing"
  26. "time"
  27. "google.golang.org/grpc"
  28. "google.golang.org/grpc/codes"
  29. "google.golang.org/grpc/connectivity"
  30. _ "google.golang.org/grpc/health"
  31. healthgrpc "google.golang.org/grpc/health/grpc_health_v1"
  32. healthpb "google.golang.org/grpc/health/grpc_health_v1"
  33. "google.golang.org/grpc/internal"
  34. "google.golang.org/grpc/internal/channelz"
  35. "google.golang.org/grpc/resolver"
  36. "google.golang.org/grpc/resolver/manual"
  37. "google.golang.org/grpc/status"
  38. testpb "google.golang.org/grpc/test/grpc_testing"
  39. )
  40. var testHealthCheckFunc = internal.HealthCheckFunc
  41. func newTestHealthServer() *testHealthServer {
  42. return newTestHealthServerWithWatchFunc(defaultWatchFunc)
  43. }
  44. func newTestHealthServerWithWatchFunc(f func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error) *testHealthServer {
  45. return &testHealthServer{
  46. watchFunc: f,
  47. update: make(chan struct{}, 1),
  48. status: make(map[string]healthpb.HealthCheckResponse_ServingStatus),
  49. }
  50. }
  51. // defaultWatchFunc will send a HealthCheckResponse to the client whenever SetServingStatus is called.
  52. func defaultWatchFunc(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
  53. if in.Service != "foo" {
  54. return status.Error(codes.FailedPrecondition,
  55. "the defaultWatchFunc only handles request with service name to be \"foo\"")
  56. }
  57. var done bool
  58. for {
  59. select {
  60. case <-stream.Context().Done():
  61. done = true
  62. case <-s.update:
  63. }
  64. if done {
  65. break
  66. }
  67. s.mu.Lock()
  68. resp := &healthpb.HealthCheckResponse{
  69. Status: s.status[in.Service],
  70. }
  71. s.mu.Unlock()
  72. stream.SendMsg(resp)
  73. }
  74. return nil
  75. }
  76. type testHealthServer struct {
  77. watchFunc func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error
  78. mu sync.Mutex
  79. status map[string]healthpb.HealthCheckResponse_ServingStatus
  80. update chan struct{}
  81. }
  82. func (s *testHealthServer) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) {
  83. return &healthpb.HealthCheckResponse{
  84. Status: healthpb.HealthCheckResponse_SERVING,
  85. }, nil
  86. }
  87. func (s *testHealthServer) Watch(in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
  88. return s.watchFunc(s, in, stream)
  89. }
  90. // SetServingStatus is called when need to reset the serving status of a service
  91. // or insert a new service entry into the statusMap.
  92. func (s *testHealthServer) SetServingStatus(service string, status healthpb.HealthCheckResponse_ServingStatus) {
  93. s.mu.Lock()
  94. s.status[service] = status
  95. select {
  96. case <-s.update:
  97. default:
  98. }
  99. s.update <- struct{}{}
  100. s.mu.Unlock()
  101. }
  102. func setupHealthCheckWrapper() (hcEnterChan chan struct{}, hcExitChan chan struct{}, wrapper func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error) {
  103. hcEnterChan = make(chan struct{})
  104. hcExitChan = make(chan struct{})
  105. wrapper = func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error {
  106. close(hcEnterChan)
  107. defer close(hcExitChan)
  108. return testHealthCheckFunc(ctx, newStream, update, service)
  109. }
  110. return
  111. }
  112. type svrConfig struct {
  113. specialWatchFunc func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error
  114. }
  115. func setupServer(sc *svrConfig) (s *grpc.Server, lis net.Listener, ts *testHealthServer, deferFunc func(), err error) {
  116. s = grpc.NewServer()
  117. lis, err = net.Listen("tcp", "localhost:0")
  118. if err != nil {
  119. return nil, nil, nil, func() {}, fmt.Errorf("failed to listen due to err %v", err)
  120. }
  121. if sc.specialWatchFunc != nil {
  122. ts = newTestHealthServerWithWatchFunc(sc.specialWatchFunc)
  123. } else {
  124. ts = newTestHealthServer()
  125. }
  126. healthgrpc.RegisterHealthServer(s, ts)
  127. testpb.RegisterTestServiceServer(s, &testServer{})
  128. go s.Serve(lis)
  129. return s, lis, ts, s.Stop, nil
  130. }
  131. type clientConfig struct {
  132. balancerName string
  133. testHealthCheckFuncWrapper func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error
  134. extraDialOption []grpc.DialOption
  135. }
  136. func setupClient(c *clientConfig) (cc *grpc.ClientConn, r *manual.Resolver, deferFunc func(), err error) {
  137. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  138. var opts []grpc.DialOption
  139. opts = append(opts, grpc.WithInsecure(), grpc.WithBalancerName(c.balancerName))
  140. if c.testHealthCheckFuncWrapper != nil {
  141. opts = append(opts, internal.WithHealthCheckFunc.(func(internal.HealthChecker) grpc.DialOption)(c.testHealthCheckFuncWrapper))
  142. }
  143. opts = append(opts, c.extraDialOption...)
  144. cc, err = grpc.Dial(r.Scheme()+":///test.server", opts...)
  145. if err != nil {
  146. rcleanup()
  147. return nil, nil, nil, fmt.Errorf("dial failed due to err: %v", err)
  148. }
  149. return cc, r, func() { cc.Close(); rcleanup() }, nil
  150. }
  151. func (s) TestHealthCheckWatchStateChange(t *testing.T) {
  152. _, lis, ts, deferFunc, err := setupServer(&svrConfig{})
  153. defer deferFunc()
  154. if err != nil {
  155. t.Fatal(err)
  156. }
  157. // The table below shows the expected series of addrConn connectivity transitions when server
  158. // updates its health status. As there's only one addrConn corresponds with the ClientConn in this
  159. // test, we use ClientConn's connectivity state as the addrConn connectivity state.
  160. //+------------------------------+-------------------------------------------+
  161. //| Health Check Returned Status | Expected addrConn Connectivity Transition |
  162. //+------------------------------+-------------------------------------------+
  163. //| NOT_SERVING | ->TRANSIENT FAILURE |
  164. //| SERVING | ->READY |
  165. //| SERVICE_UNKNOWN | ->TRANSIENT FAILURE |
  166. //| SERVING | ->READY |
  167. //| UNKNOWN | ->TRANSIENT FAILURE |
  168. //+------------------------------+-------------------------------------------+
  169. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_NOT_SERVING)
  170. cc, r, deferFunc, err := setupClient(&clientConfig{balancerName: "round_robin"})
  171. if err != nil {
  172. t.Fatal(err)
  173. }
  174. defer deferFunc()
  175. r.UpdateState(resolver.State{
  176. Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
  177. ServiceConfig: `{
  178. "healthCheckConfig": {
  179. "serviceName": "foo"
  180. }
  181. }`})
  182. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  183. defer cancel()
  184. if ok := cc.WaitForStateChange(ctx, connectivity.Idle); !ok {
  185. t.Fatal("ClientConn is still in IDLE state when the context times out.")
  186. }
  187. if ok := cc.WaitForStateChange(ctx, connectivity.Connecting); !ok {
  188. t.Fatal("ClientConn is still in CONNECTING state when the context times out.")
  189. }
  190. if s := cc.GetState(); s != connectivity.TransientFailure {
  191. t.Fatalf("ClientConn is in %v state, want TRANSIENT FAILURE", s)
  192. }
  193. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
  194. if ok := cc.WaitForStateChange(ctx, connectivity.TransientFailure); !ok {
  195. t.Fatal("ClientConn is still in TRANSIENT FAILURE state when the context times out.")
  196. }
  197. if s := cc.GetState(); s != connectivity.Ready {
  198. t.Fatalf("ClientConn is in %v state, want READY", s)
  199. }
  200. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVICE_UNKNOWN)
  201. if ok := cc.WaitForStateChange(ctx, connectivity.Ready); !ok {
  202. t.Fatal("ClientConn is still in READY state when the context times out.")
  203. }
  204. if s := cc.GetState(); s != connectivity.TransientFailure {
  205. t.Fatalf("ClientConn is in %v state, want TRANSIENT FAILURE", s)
  206. }
  207. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
  208. if ok := cc.WaitForStateChange(ctx, connectivity.TransientFailure); !ok {
  209. t.Fatal("ClientConn is still in TRANSIENT FAILURE state when the context times out.")
  210. }
  211. if s := cc.GetState(); s != connectivity.Ready {
  212. t.Fatalf("ClientConn is in %v state, want READY", s)
  213. }
  214. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_UNKNOWN)
  215. if ok := cc.WaitForStateChange(ctx, connectivity.Ready); !ok {
  216. t.Fatal("ClientConn is still in READY state when the context times out.")
  217. }
  218. if s := cc.GetState(); s != connectivity.TransientFailure {
  219. t.Fatalf("ClientConn is in %v state, want TRANSIENT FAILURE", s)
  220. }
  221. }
  222. // If Watch returns Unimplemented, then the ClientConn should go into READY state.
  223. func (s) TestHealthCheckHealthServerNotRegistered(t *testing.T) {
  224. s := grpc.NewServer()
  225. lis, err := net.Listen("tcp", "localhost:0")
  226. if err != nil {
  227. t.Fatalf("failed to listen due to err: %v", err)
  228. }
  229. go s.Serve(lis)
  230. defer s.Stop()
  231. cc, r, deferFunc, err := setupClient(&clientConfig{balancerName: "round_robin"})
  232. if err != nil {
  233. t.Fatal(err)
  234. }
  235. defer deferFunc()
  236. r.UpdateState(resolver.State{
  237. Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
  238. ServiceConfig: `{
  239. "healthCheckConfig": {
  240. "serviceName": "foo"
  241. }
  242. }`})
  243. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  244. defer cancel()
  245. if ok := cc.WaitForStateChange(ctx, connectivity.Idle); !ok {
  246. t.Fatal("ClientConn is still in IDLE state when the context times out.")
  247. }
  248. if ok := cc.WaitForStateChange(ctx, connectivity.Connecting); !ok {
  249. t.Fatal("ClientConn is still in CONNECTING state when the context times out.")
  250. }
  251. if s := cc.GetState(); s != connectivity.Ready {
  252. t.Fatalf("ClientConn is in %v state, want READY", s)
  253. }
  254. }
  255. // In the case of a goaway received, the health check stream should be terminated and health check
  256. // function should exit.
  257. func (s) TestHealthCheckWithGoAway(t *testing.T) {
  258. hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
  259. s, lis, ts, deferFunc, err := setupServer(&svrConfig{})
  260. defer deferFunc()
  261. if err != nil {
  262. t.Fatal(err)
  263. }
  264. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
  265. cc, r, deferFunc, err := setupClient(&clientConfig{
  266. balancerName: "round_robin",
  267. testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
  268. })
  269. if err != nil {
  270. t.Fatal(err)
  271. }
  272. defer deferFunc()
  273. tc := testpb.NewTestServiceClient(cc)
  274. r.UpdateState(resolver.State{
  275. Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
  276. ServiceConfig: `{
  277. "healthCheckConfig": {
  278. "serviceName": "foo"
  279. }
  280. }`})
  281. // make some rpcs to make sure connection is working.
  282. if err := verifyResultWithDelay(func() (bool, error) {
  283. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  284. return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  285. }
  286. return true, nil
  287. }); err != nil {
  288. t.Fatal(err)
  289. }
  290. // the stream rpc will persist through goaway event.
  291. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  292. defer cancel()
  293. stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
  294. if err != nil {
  295. t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  296. }
  297. respParam := []*testpb.ResponseParameters{{Size: 1}}
  298. payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(1))
  299. if err != nil {
  300. t.Fatal(err)
  301. }
  302. req := &testpb.StreamingOutputCallRequest{
  303. ResponseParameters: respParam,
  304. Payload: payload,
  305. }
  306. if err := stream.Send(req); err != nil {
  307. t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
  308. }
  309. if _, err := stream.Recv(); err != nil {
  310. t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
  311. }
  312. select {
  313. case <-hcExitChan:
  314. t.Fatal("Health check function has exited, which is not expected.")
  315. default:
  316. }
  317. // server sends GoAway
  318. go s.GracefulStop()
  319. select {
  320. case <-hcExitChan:
  321. case <-time.After(5 * time.Second):
  322. select {
  323. case <-hcEnterChan:
  324. default:
  325. t.Fatal("Health check function has not entered after 5s.")
  326. }
  327. t.Fatal("Health check function has not exited after 5s.")
  328. }
  329. // The existing RPC should be still good to proceed.
  330. if err := stream.Send(req); err != nil {
  331. t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
  332. }
  333. if _, err := stream.Recv(); err != nil {
  334. t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
  335. }
  336. }
  337. func (s) TestHealthCheckWithConnClose(t *testing.T) {
  338. hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
  339. s, lis, ts, deferFunc, err := setupServer(&svrConfig{})
  340. defer deferFunc()
  341. if err != nil {
  342. t.Fatal(err)
  343. }
  344. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
  345. cc, r, deferFunc, err := setupClient(&clientConfig{
  346. balancerName: "round_robin",
  347. testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
  348. })
  349. if err != nil {
  350. t.Fatal(err)
  351. }
  352. defer deferFunc()
  353. tc := testpb.NewTestServiceClient(cc)
  354. r.UpdateState(resolver.State{
  355. Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
  356. ServiceConfig: `{
  357. "healthCheckConfig": {
  358. "serviceName": "foo"
  359. }
  360. }`})
  361. // make some rpcs to make sure connection is working.
  362. if err := verifyResultWithDelay(func() (bool, error) {
  363. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  364. return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  365. }
  366. return true, nil
  367. }); err != nil {
  368. t.Fatal(err)
  369. }
  370. select {
  371. case <-hcExitChan:
  372. t.Fatal("Health check function has exited, which is not expected.")
  373. default:
  374. }
  375. // server closes the connection
  376. s.Stop()
  377. select {
  378. case <-hcExitChan:
  379. case <-time.After(5 * time.Second):
  380. select {
  381. case <-hcEnterChan:
  382. default:
  383. t.Fatal("Health check function has not entered after 5s.")
  384. }
  385. t.Fatal("Health check function has not exited after 5s.")
  386. }
  387. }
  388. // addrConn drain happens when addrConn gets torn down due to its address being no longer in the
  389. // address list returned by the resolver.
  390. func (s) TestHealthCheckWithAddrConnDrain(t *testing.T) {
  391. hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
  392. _, lis, ts, deferFunc, err := setupServer(&svrConfig{})
  393. defer deferFunc()
  394. if err != nil {
  395. t.Fatal(err)
  396. }
  397. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
  398. cc, r, deferFunc, err := setupClient(&clientConfig{
  399. balancerName: "round_robin",
  400. testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
  401. })
  402. if err != nil {
  403. t.Fatal(err)
  404. }
  405. defer deferFunc()
  406. tc := testpb.NewTestServiceClient(cc)
  407. sc := `{
  408. "healthCheckConfig": {
  409. "serviceName": "foo"
  410. }
  411. }`
  412. r.UpdateState(resolver.State{
  413. Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
  414. ServiceConfig: sc,
  415. })
  416. // make some rpcs to make sure connection is working.
  417. if err := verifyResultWithDelay(func() (bool, error) {
  418. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  419. return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  420. }
  421. return true, nil
  422. }); err != nil {
  423. t.Fatal(err)
  424. }
  425. // the stream rpc will persist through goaway event.
  426. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  427. defer cancel()
  428. stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
  429. if err != nil {
  430. t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  431. }
  432. respParam := []*testpb.ResponseParameters{{Size: 1}}
  433. payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(1))
  434. if err != nil {
  435. t.Fatal(err)
  436. }
  437. req := &testpb.StreamingOutputCallRequest{
  438. ResponseParameters: respParam,
  439. Payload: payload,
  440. }
  441. if err := stream.Send(req); err != nil {
  442. t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
  443. }
  444. if _, err := stream.Recv(); err != nil {
  445. t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
  446. }
  447. select {
  448. case <-hcExitChan:
  449. t.Fatal("Health check function has exited, which is not expected.")
  450. default:
  451. }
  452. // trigger teardown of the ac
  453. r.UpdateState(resolver.State{Addresses: []resolver.Address{}, ServiceConfig: sc})
  454. select {
  455. case <-hcExitChan:
  456. case <-time.After(5 * time.Second):
  457. select {
  458. case <-hcEnterChan:
  459. default:
  460. t.Fatal("Health check function has not entered after 5s.")
  461. }
  462. t.Fatal("Health check function has not exited after 5s.")
  463. }
  464. // The existing RPC should be still good to proceed.
  465. if err := stream.Send(req); err != nil {
  466. t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
  467. }
  468. if _, err := stream.Recv(); err != nil {
  469. t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
  470. }
  471. }
  472. // ClientConn close will lead to its addrConns being torn down.
  473. func (s) TestHealthCheckWithClientConnClose(t *testing.T) {
  474. hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
  475. _, lis, ts, deferFunc, err := setupServer(&svrConfig{})
  476. defer deferFunc()
  477. if err != nil {
  478. t.Fatal(err)
  479. }
  480. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
  481. cc, r, deferFunc, err := setupClient(&clientConfig{
  482. balancerName: "round_robin",
  483. testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
  484. })
  485. if err != nil {
  486. t.Fatal(err)
  487. }
  488. defer deferFunc()
  489. tc := testpb.NewTestServiceClient(cc)
  490. r.UpdateState(resolver.State{
  491. Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
  492. ServiceConfig: `{
  493. "healthCheckConfig": {
  494. "serviceName": "foo"
  495. }
  496. }`})
  497. // make some rpcs to make sure connection is working.
  498. if err := verifyResultWithDelay(func() (bool, error) {
  499. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  500. return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  501. }
  502. return true, nil
  503. }); err != nil {
  504. t.Fatal(err)
  505. }
  506. select {
  507. case <-hcExitChan:
  508. t.Fatal("Health check function has exited, which is not expected.")
  509. default:
  510. }
  511. // trigger addrConn teardown
  512. cc.Close()
  513. select {
  514. case <-hcExitChan:
  515. case <-time.After(5 * time.Second):
  516. select {
  517. case <-hcEnterChan:
  518. default:
  519. t.Fatal("Health check function has not entered after 5s.")
  520. }
  521. t.Fatal("Health check function has not exited after 5s.")
  522. }
  523. }
  524. // This test is to test the logic in the createTransport after the health check function returns which
  525. // closes the skipReset channel(since it has not been closed inside health check func) to unblock
  526. // onGoAway/onClose goroutine.
  527. func (s) TestHealthCheckWithoutReportHealthCalledAddrConnShutDown(t *testing.T) {
  528. hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
  529. _, lis, ts, deferFunc, err := setupServer(&svrConfig{
  530. specialWatchFunc: func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
  531. if in.Service != "delay" {
  532. return status.Error(codes.FailedPrecondition,
  533. "this special Watch function only handles request with service name to be \"delay\"")
  534. }
  535. // Do nothing to mock a delay of health check response from server side.
  536. // This case is to help with the test that covers the condition that reportHealth is not
  537. // called inside HealthCheckFunc before the func returns.
  538. select {
  539. case <-stream.Context().Done():
  540. case <-time.After(5 * time.Second):
  541. }
  542. return nil
  543. },
  544. })
  545. defer deferFunc()
  546. if err != nil {
  547. t.Fatal(err)
  548. }
  549. ts.SetServingStatus("delay", healthpb.HealthCheckResponse_SERVING)
  550. _, r, deferFunc, err := setupClient(&clientConfig{
  551. balancerName: "round_robin",
  552. testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
  553. })
  554. if err != nil {
  555. t.Fatal(err)
  556. }
  557. defer deferFunc()
  558. // The serviceName "delay" is specially handled at server side, where response will not be sent
  559. // back to client immediately upon receiving the request (client should receive no response until
  560. // test ends).
  561. sc := `{
  562. "healthCheckConfig": {
  563. "serviceName": "delay"
  564. }
  565. }`
  566. r.UpdateState(resolver.State{
  567. Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
  568. ServiceConfig: sc,
  569. })
  570. select {
  571. case <-hcExitChan:
  572. t.Fatal("Health check function has exited, which is not expected.")
  573. default:
  574. }
  575. select {
  576. case <-hcEnterChan:
  577. case <-time.After(5 * time.Second):
  578. t.Fatal("Health check function has not been invoked after 5s.")
  579. }
  580. // trigger teardown of the ac, ac in SHUTDOWN state
  581. r.UpdateState(resolver.State{Addresses: []resolver.Address{}, ServiceConfig: sc})
  582. // The health check func should exit without calling the reportHealth func, as server hasn't sent
  583. // any response.
  584. select {
  585. case <-hcExitChan:
  586. case <-time.After(5 * time.Second):
  587. t.Fatal("Health check function has not exited after 5s.")
  588. }
  589. // The deferred leakcheck will check whether there's leaked goroutine, which is an indication
  590. // whether we closes the skipReset channel to unblock onGoAway/onClose goroutine.
  591. }
  592. // This test is to test the logic in the createTransport after the health check function returns which
  593. // closes the allowedToReset channel(since it has not been closed inside health check func) to unblock
  594. // onGoAway/onClose goroutine.
  595. func (s) TestHealthCheckWithoutReportHealthCalled(t *testing.T) {
  596. hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
  597. s, lis, ts, deferFunc, err := setupServer(&svrConfig{
  598. specialWatchFunc: func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
  599. if in.Service != "delay" {
  600. return status.Error(codes.FailedPrecondition,
  601. "this special Watch function only handles request with service name to be \"delay\"")
  602. }
  603. // Do nothing to mock a delay of health check response from server side.
  604. // This case is to help with the test that covers the condition that reportHealth is not
  605. // called inside HealthCheckFunc before the func returns.
  606. select {
  607. case <-stream.Context().Done():
  608. case <-time.After(5 * time.Second):
  609. }
  610. return nil
  611. },
  612. })
  613. defer deferFunc()
  614. if err != nil {
  615. t.Fatal(err)
  616. }
  617. ts.SetServingStatus("delay", healthpb.HealthCheckResponse_SERVING)
  618. _, r, deferFunc, err := setupClient(&clientConfig{
  619. balancerName: "round_robin",
  620. testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
  621. })
  622. if err != nil {
  623. t.Fatal(err)
  624. }
  625. defer deferFunc()
  626. // The serviceName "delay" is specially handled at server side, where response will not be sent
  627. // back to client immediately upon receiving the request (client should receive no response until
  628. // test ends).
  629. r.UpdateState(resolver.State{
  630. Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
  631. ServiceConfig: `{
  632. "healthCheckConfig": {
  633. "serviceName": "delay"
  634. }
  635. }`})
  636. select {
  637. case <-hcExitChan:
  638. t.Fatal("Health check function has exited, which is not expected.")
  639. default:
  640. }
  641. select {
  642. case <-hcEnterChan:
  643. case <-time.After(5 * time.Second):
  644. t.Fatal("Health check function has not been invoked after 5s.")
  645. }
  646. // trigger transport being closed
  647. s.Stop()
  648. // The health check func should exit without calling the reportHealth func, as server hasn't sent
  649. // any response.
  650. select {
  651. case <-hcExitChan:
  652. case <-time.After(5 * time.Second):
  653. t.Fatal("Health check function has not exited after 5s.")
  654. }
  655. // The deferred leakcheck will check whether there's leaked goroutine, which is an indication
  656. // whether we closes the allowedToReset channel to unblock onGoAway/onClose goroutine.
  657. }
  658. func testHealthCheckDisableWithDialOption(t *testing.T, addr string) {
  659. hcEnterChan, _, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
  660. cc, r, deferFunc, err := setupClient(&clientConfig{
  661. balancerName: "round_robin",
  662. testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
  663. extraDialOption: []grpc.DialOption{grpc.WithDisableHealthCheck()},
  664. })
  665. if err != nil {
  666. t.Fatal(err)
  667. }
  668. defer deferFunc()
  669. tc := testpb.NewTestServiceClient(cc)
  670. r.UpdateState(resolver.State{
  671. Addresses: []resolver.Address{{Addr: addr}},
  672. ServiceConfig: `{
  673. "healthCheckConfig": {
  674. "serviceName": "foo"
  675. }
  676. }`})
  677. // send some rpcs to make sure transport has been created and is ready for use.
  678. if err := verifyResultWithDelay(func() (bool, error) {
  679. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  680. return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  681. }
  682. return true, nil
  683. }); err != nil {
  684. t.Fatal(err)
  685. }
  686. select {
  687. case <-hcEnterChan:
  688. t.Fatal("Health check function has exited, which is not expected.")
  689. default:
  690. }
  691. }
  692. func testHealthCheckDisableWithBalancer(t *testing.T, addr string) {
  693. hcEnterChan, _, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
  694. cc, r, deferFunc, err := setupClient(&clientConfig{
  695. balancerName: "pick_first",
  696. testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
  697. })
  698. if err != nil {
  699. t.Fatal(err)
  700. }
  701. defer deferFunc()
  702. tc := testpb.NewTestServiceClient(cc)
  703. r.UpdateState(resolver.State{
  704. Addresses: []resolver.Address{{Addr: addr}},
  705. ServiceConfig: `{
  706. "healthCheckConfig": {
  707. "serviceName": "foo"
  708. }
  709. }`})
  710. // send some rpcs to make sure transport has been created and is ready for use.
  711. if err := verifyResultWithDelay(func() (bool, error) {
  712. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  713. return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  714. }
  715. return true, nil
  716. }); err != nil {
  717. t.Fatal(err)
  718. }
  719. select {
  720. case <-hcEnterChan:
  721. t.Fatal("Health check function has started, which is not expected.")
  722. default:
  723. }
  724. }
  725. func testHealthCheckDisableWithServiceConfig(t *testing.T, addr string) {
  726. hcEnterChan, _, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
  727. cc, r, deferFunc, err := setupClient(&clientConfig{
  728. balancerName: "round_robin",
  729. testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
  730. })
  731. if err != nil {
  732. t.Fatal(err)
  733. }
  734. defer deferFunc()
  735. tc := testpb.NewTestServiceClient(cc)
  736. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: addr}}})
  737. // send some rpcs to make sure transport has been created and is ready for use.
  738. if err := verifyResultWithDelay(func() (bool, error) {
  739. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  740. return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  741. }
  742. return true, nil
  743. }); err != nil {
  744. t.Fatal(err)
  745. }
  746. select {
  747. case <-hcEnterChan:
  748. t.Fatal("Health check function has started, which is not expected.")
  749. default:
  750. }
  751. }
  752. func (s) TestHealthCheckDisable(t *testing.T) {
  753. _, lis, ts, deferFunc, err := setupServer(&svrConfig{})
  754. defer deferFunc()
  755. if err != nil {
  756. t.Fatal(err)
  757. }
  758. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
  759. // test client side disabling configuration.
  760. testHealthCheckDisableWithDialOption(t, lis.Addr().String())
  761. testHealthCheckDisableWithBalancer(t, lis.Addr().String())
  762. testHealthCheckDisableWithServiceConfig(t, lis.Addr().String())
  763. }
  764. func (s) TestHealthCheckChannelzCountingCallSuccess(t *testing.T) {
  765. _, lis, _, deferFunc, err := setupServer(&svrConfig{
  766. specialWatchFunc: func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
  767. if in.Service != "channelzSuccess" {
  768. return status.Error(codes.FailedPrecondition,
  769. "this special Watch function only handles request with service name to be \"channelzSuccess\"")
  770. }
  771. return status.Error(codes.OK, "fake success")
  772. },
  773. })
  774. defer deferFunc()
  775. if err != nil {
  776. t.Fatal(err)
  777. }
  778. _, r, deferFunc, err := setupClient(&clientConfig{balancerName: "round_robin"})
  779. if err != nil {
  780. t.Fatal(err)
  781. }
  782. defer deferFunc()
  783. r.UpdateState(resolver.State{
  784. Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
  785. ServiceConfig: `{
  786. "healthCheckConfig": {
  787. "serviceName": "channelzSuccess"
  788. }
  789. }`})
  790. if err := verifyResultWithDelay(func() (bool, error) {
  791. cm, _ := channelz.GetTopChannels(0, 0)
  792. if len(cm) == 0 {
  793. return false, errors.New("channelz.GetTopChannels return 0 top channel")
  794. }
  795. if len(cm[0].SubChans) == 0 {
  796. return false, errors.New("there is 0 subchannel")
  797. }
  798. var id int64
  799. for k := range cm[0].SubChans {
  800. id = k
  801. break
  802. }
  803. scm := channelz.GetSubChannel(id)
  804. if scm == nil || scm.ChannelData == nil {
  805. return false, errors.New("nil subchannel metric or nil subchannel metric ChannelData returned")
  806. }
  807. // exponential backoff retry may result in more than one health check call.
  808. if scm.ChannelData.CallsStarted > 0 && scm.ChannelData.CallsSucceeded > 0 && scm.ChannelData.CallsFailed == 0 {
  809. return true, nil
  810. }
  811. return false, fmt.Errorf("got %d CallsStarted, %d CallsSucceeded, want >0 >0", scm.ChannelData.CallsStarted, scm.ChannelData.CallsSucceeded)
  812. }); err != nil {
  813. t.Fatal(err)
  814. }
  815. }
  816. func (s) TestHealthCheckChannelzCountingCallFailure(t *testing.T) {
  817. _, lis, _, deferFunc, err := setupServer(&svrConfig{
  818. specialWatchFunc: func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
  819. if in.Service != "channelzFailure" {
  820. return status.Error(codes.FailedPrecondition,
  821. "this special Watch function only handles request with service name to be \"channelzFailure\"")
  822. }
  823. return status.Error(codes.Internal, "fake failure")
  824. },
  825. })
  826. if err != nil {
  827. t.Fatal(err)
  828. }
  829. defer deferFunc()
  830. _, r, deferFunc, err := setupClient(&clientConfig{balancerName: "round_robin"})
  831. if err != nil {
  832. t.Fatal(err)
  833. }
  834. defer deferFunc()
  835. r.UpdateState(resolver.State{
  836. Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
  837. ServiceConfig: `{
  838. "healthCheckConfig": {
  839. "serviceName": "channelzFailure"
  840. }
  841. }`})
  842. if err := verifyResultWithDelay(func() (bool, error) {
  843. cm, _ := channelz.GetTopChannels(0, 0)
  844. if len(cm) == 0 {
  845. return false, errors.New("channelz.GetTopChannels return 0 top channel")
  846. }
  847. if len(cm[0].SubChans) == 0 {
  848. return false, errors.New("there is 0 subchannel")
  849. }
  850. var id int64
  851. for k := range cm[0].SubChans {
  852. id = k
  853. break
  854. }
  855. scm := channelz.GetSubChannel(id)
  856. if scm == nil || scm.ChannelData == nil {
  857. return false, errors.New("nil subchannel metric or nil subchannel metric ChannelData returned")
  858. }
  859. // exponential backoff retry may result in more than one health check call.
  860. if scm.ChannelData.CallsStarted > 0 && scm.ChannelData.CallsFailed > 0 && scm.ChannelData.CallsSucceeded == 0 {
  861. return true, nil
  862. }
  863. return false, fmt.Errorf("got %d CallsStarted, %d CallsFailed, want >0, >0", scm.ChannelData.CallsStarted, scm.ChannelData.CallsFailed)
  864. }); err != nil {
  865. t.Fatal(err)
  866. }
  867. }