healthcheck_test.go 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981
  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package test
  19. import (
  20. "context"
  21. "errors"
  22. "fmt"
  23. "net"
  24. "sync"
  25. "testing"
  26. "time"
  27. "google.golang.org/grpc"
  28. "google.golang.org/grpc/codes"
  29. "google.golang.org/grpc/connectivity"
  30. _ "google.golang.org/grpc/health"
  31. healthgrpc "google.golang.org/grpc/health/grpc_health_v1"
  32. healthpb "google.golang.org/grpc/health/grpc_health_v1"
  33. "google.golang.org/grpc/internal"
  34. "google.golang.org/grpc/internal/channelz"
  35. "google.golang.org/grpc/internal/grpctest"
  36. "google.golang.org/grpc/resolver"
  37. "google.golang.org/grpc/resolver/manual"
  38. "google.golang.org/grpc/status"
  39. testpb "google.golang.org/grpc/test/grpc_testing"
  40. )
  41. var testHealthCheckFunc = internal.HealthCheckFunc
  42. func newTestHealthServer() *testHealthServer {
  43. return newTestHealthServerWithWatchFunc(defaultWatchFunc)
  44. }
  45. func newTestHealthServerWithWatchFunc(f func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error) *testHealthServer {
  46. return &testHealthServer{
  47. watchFunc: f,
  48. update: make(chan struct{}, 1),
  49. status: make(map[string]healthpb.HealthCheckResponse_ServingStatus),
  50. }
  51. }
  52. // defaultWatchFunc will send a HealthCheckResponse to the client whenever SetServingStatus is called.
  53. func defaultWatchFunc(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
  54. if in.Service != "foo" {
  55. return status.Error(codes.FailedPrecondition,
  56. "the defaultWatchFunc only handles request with service name to be \"foo\"")
  57. }
  58. var done bool
  59. for {
  60. select {
  61. case <-stream.Context().Done():
  62. done = true
  63. case <-s.update:
  64. }
  65. if done {
  66. break
  67. }
  68. s.mu.Lock()
  69. resp := &healthpb.HealthCheckResponse{
  70. Status: s.status[in.Service],
  71. }
  72. s.mu.Unlock()
  73. stream.SendMsg(resp)
  74. }
  75. return nil
  76. }
  77. type testHealthServer struct {
  78. healthpb.UnimplementedHealthServer
  79. watchFunc func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error
  80. mu sync.Mutex
  81. status map[string]healthpb.HealthCheckResponse_ServingStatus
  82. update chan struct{}
  83. }
  84. func (s *testHealthServer) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) {
  85. return &healthpb.HealthCheckResponse{
  86. Status: healthpb.HealthCheckResponse_SERVING,
  87. }, nil
  88. }
  89. func (s *testHealthServer) Watch(in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
  90. return s.watchFunc(s, in, stream)
  91. }
  92. // SetServingStatus is called when need to reset the serving status of a service
  93. // or insert a new service entry into the statusMap.
  94. func (s *testHealthServer) SetServingStatus(service string, status healthpb.HealthCheckResponse_ServingStatus) {
  95. s.mu.Lock()
  96. s.status[service] = status
  97. select {
  98. case <-s.update:
  99. default:
  100. }
  101. s.update <- struct{}{}
  102. s.mu.Unlock()
  103. }
  104. func setupHealthCheckWrapper() (hcEnterChan chan struct{}, hcExitChan chan struct{}, wrapper internal.HealthChecker) {
  105. hcEnterChan = make(chan struct{})
  106. hcExitChan = make(chan struct{})
  107. wrapper = func(ctx context.Context, newStream func(string) (interface{}, error), update func(connectivity.State, error), service string) error {
  108. close(hcEnterChan)
  109. defer close(hcExitChan)
  110. return testHealthCheckFunc(ctx, newStream, update, service)
  111. }
  112. return
  113. }
  114. type svrConfig struct {
  115. specialWatchFunc func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error
  116. }
  117. func setupServer(sc *svrConfig) (s *grpc.Server, lis net.Listener, ts *testHealthServer, deferFunc func(), err error) {
  118. s = grpc.NewServer()
  119. lis, err = net.Listen("tcp", "localhost:0")
  120. if err != nil {
  121. return nil, nil, nil, func() {}, fmt.Errorf("failed to listen due to err %v", err)
  122. }
  123. if sc.specialWatchFunc != nil {
  124. ts = newTestHealthServerWithWatchFunc(sc.specialWatchFunc)
  125. } else {
  126. ts = newTestHealthServer()
  127. }
  128. healthgrpc.RegisterHealthServer(s, ts)
  129. testpb.RegisterTestServiceServer(s, &testServer{})
  130. go s.Serve(lis)
  131. return s, lis, ts, s.Stop, nil
  132. }
  133. type clientConfig struct {
  134. balancerName string
  135. testHealthCheckFuncWrapper internal.HealthChecker
  136. extraDialOption []grpc.DialOption
  137. }
  138. func setupClient(c *clientConfig) (cc *grpc.ClientConn, r *manual.Resolver, deferFunc func(), err error) {
  139. r = manual.NewBuilderWithScheme("whatever")
  140. var opts []grpc.DialOption
  141. opts = append(opts, grpc.WithInsecure(), grpc.WithResolvers(r), grpc.WithBalancerName(c.balancerName))
  142. if c.testHealthCheckFuncWrapper != nil {
  143. opts = append(opts, internal.WithHealthCheckFunc.(func(internal.HealthChecker) grpc.DialOption)(c.testHealthCheckFuncWrapper))
  144. }
  145. opts = append(opts, c.extraDialOption...)
  146. cc, err = grpc.Dial(r.Scheme()+":///test.server", opts...)
  147. if err != nil {
  148. return nil, nil, nil, fmt.Errorf("dial failed due to err: %v", err)
  149. }
  150. return cc, r, func() { cc.Close() }, nil
  151. }
  152. func (s) TestHealthCheckWatchStateChange(t *testing.T) {
  153. _, lis, ts, deferFunc, err := setupServer(&svrConfig{})
  154. defer deferFunc()
  155. if err != nil {
  156. t.Fatal(err)
  157. }
  158. // The table below shows the expected series of addrConn connectivity transitions when server
  159. // updates its health status. As there's only one addrConn corresponds with the ClientConn in this
  160. // test, we use ClientConn's connectivity state as the addrConn connectivity state.
  161. //+------------------------------+-------------------------------------------+
  162. //| Health Check Returned Status | Expected addrConn Connectivity Transition |
  163. //+------------------------------+-------------------------------------------+
  164. //| NOT_SERVING | ->TRANSIENT FAILURE |
  165. //| SERVING | ->READY |
  166. //| SERVICE_UNKNOWN | ->TRANSIENT FAILURE |
  167. //| SERVING | ->READY |
  168. //| UNKNOWN | ->TRANSIENT FAILURE |
  169. //+------------------------------+-------------------------------------------+
  170. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_NOT_SERVING)
  171. cc, r, deferFunc, err := setupClient(&clientConfig{balancerName: "round_robin"})
  172. if err != nil {
  173. t.Fatal(err)
  174. }
  175. defer deferFunc()
  176. r.UpdateState(resolver.State{
  177. Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
  178. ServiceConfig: parseCfg(r, `{
  179. "healthCheckConfig": {
  180. "serviceName": "foo"
  181. }
  182. }`)})
  183. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  184. defer cancel()
  185. if ok := cc.WaitForStateChange(ctx, connectivity.Idle); !ok {
  186. t.Fatal("ClientConn is still in IDLE state when the context times out.")
  187. }
  188. if ok := cc.WaitForStateChange(ctx, connectivity.Connecting); !ok {
  189. t.Fatal("ClientConn is still in CONNECTING state when the context times out.")
  190. }
  191. if s := cc.GetState(); s != connectivity.TransientFailure {
  192. t.Fatalf("ClientConn is in %v state, want TRANSIENT FAILURE", s)
  193. }
  194. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
  195. if ok := cc.WaitForStateChange(ctx, connectivity.TransientFailure); !ok {
  196. t.Fatal("ClientConn is still in TRANSIENT FAILURE state when the context times out.")
  197. }
  198. if s := cc.GetState(); s != connectivity.Ready {
  199. t.Fatalf("ClientConn is in %v state, want READY", s)
  200. }
  201. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVICE_UNKNOWN)
  202. if ok := cc.WaitForStateChange(ctx, connectivity.Ready); !ok {
  203. t.Fatal("ClientConn is still in READY state when the context times out.")
  204. }
  205. if s := cc.GetState(); s != connectivity.TransientFailure {
  206. t.Fatalf("ClientConn is in %v state, want TRANSIENT FAILURE", s)
  207. }
  208. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
  209. if ok := cc.WaitForStateChange(ctx, connectivity.TransientFailure); !ok {
  210. t.Fatal("ClientConn is still in TRANSIENT FAILURE state when the context times out.")
  211. }
  212. if s := cc.GetState(); s != connectivity.Ready {
  213. t.Fatalf("ClientConn is in %v state, want READY", s)
  214. }
  215. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_UNKNOWN)
  216. if ok := cc.WaitForStateChange(ctx, connectivity.Ready); !ok {
  217. t.Fatal("ClientConn is still in READY state when the context times out.")
  218. }
  219. if s := cc.GetState(); s != connectivity.TransientFailure {
  220. t.Fatalf("ClientConn is in %v state, want TRANSIENT FAILURE", s)
  221. }
  222. }
  223. // If Watch returns Unimplemented, then the ClientConn should go into READY state.
  224. func (s) TestHealthCheckHealthServerNotRegistered(t *testing.T) {
  225. grpctest.TLogger.ExpectError("Subchannel health check is unimplemented at server side, thus health check is disabled")
  226. s := grpc.NewServer()
  227. lis, err := net.Listen("tcp", "localhost:0")
  228. if err != nil {
  229. t.Fatalf("failed to listen due to err: %v", err)
  230. }
  231. go s.Serve(lis)
  232. defer s.Stop()
  233. cc, r, deferFunc, err := setupClient(&clientConfig{balancerName: "round_robin"})
  234. if err != nil {
  235. t.Fatal(err)
  236. }
  237. defer deferFunc()
  238. r.UpdateState(resolver.State{
  239. Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
  240. ServiceConfig: parseCfg(r, `{
  241. "healthCheckConfig": {
  242. "serviceName": "foo"
  243. }
  244. }`)})
  245. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  246. defer cancel()
  247. if ok := cc.WaitForStateChange(ctx, connectivity.Idle); !ok {
  248. t.Fatal("ClientConn is still in IDLE state when the context times out.")
  249. }
  250. if ok := cc.WaitForStateChange(ctx, connectivity.Connecting); !ok {
  251. t.Fatal("ClientConn is still in CONNECTING state when the context times out.")
  252. }
  253. if s := cc.GetState(); s != connectivity.Ready {
  254. t.Fatalf("ClientConn is in %v state, want READY", s)
  255. }
  256. }
  257. // In the case of a goaway received, the health check stream should be terminated and health check
  258. // function should exit.
  259. func (s) TestHealthCheckWithGoAway(t *testing.T) {
  260. hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
  261. s, lis, ts, deferFunc, err := setupServer(&svrConfig{})
  262. defer deferFunc()
  263. if err != nil {
  264. t.Fatal(err)
  265. }
  266. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
  267. cc, r, deferFunc, err := setupClient(&clientConfig{
  268. balancerName: "round_robin",
  269. testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
  270. })
  271. if err != nil {
  272. t.Fatal(err)
  273. }
  274. defer deferFunc()
  275. tc := testpb.NewTestServiceClient(cc)
  276. r.UpdateState(resolver.State{
  277. Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
  278. ServiceConfig: parseCfg(r, `{
  279. "healthCheckConfig": {
  280. "serviceName": "foo"
  281. }
  282. }`)})
  283. // make some rpcs to make sure connection is working.
  284. if err := verifyResultWithDelay(func() (bool, error) {
  285. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  286. return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  287. }
  288. return true, nil
  289. }); err != nil {
  290. t.Fatal(err)
  291. }
  292. // the stream rpc will persist through goaway event.
  293. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  294. defer cancel()
  295. stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
  296. if err != nil {
  297. t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  298. }
  299. respParam := []*testpb.ResponseParameters{{Size: 1}}
  300. payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(1))
  301. if err != nil {
  302. t.Fatal(err)
  303. }
  304. req := &testpb.StreamingOutputCallRequest{
  305. ResponseParameters: respParam,
  306. Payload: payload,
  307. }
  308. if err := stream.Send(req); err != nil {
  309. t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
  310. }
  311. if _, err := stream.Recv(); err != nil {
  312. t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
  313. }
  314. select {
  315. case <-hcExitChan:
  316. t.Fatal("Health check function has exited, which is not expected.")
  317. default:
  318. }
  319. // server sends GoAway
  320. go s.GracefulStop()
  321. select {
  322. case <-hcExitChan:
  323. case <-time.After(5 * time.Second):
  324. select {
  325. case <-hcEnterChan:
  326. default:
  327. t.Fatal("Health check function has not entered after 5s.")
  328. }
  329. t.Fatal("Health check function has not exited after 5s.")
  330. }
  331. // The existing RPC should be still good to proceed.
  332. if err := stream.Send(req); err != nil {
  333. t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
  334. }
  335. if _, err := stream.Recv(); err != nil {
  336. t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
  337. }
  338. }
  339. func (s) TestHealthCheckWithConnClose(t *testing.T) {
  340. hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
  341. s, lis, ts, deferFunc, err := setupServer(&svrConfig{})
  342. defer deferFunc()
  343. if err != nil {
  344. t.Fatal(err)
  345. }
  346. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
  347. cc, r, deferFunc, err := setupClient(&clientConfig{
  348. balancerName: "round_robin",
  349. testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
  350. })
  351. if err != nil {
  352. t.Fatal(err)
  353. }
  354. defer deferFunc()
  355. tc := testpb.NewTestServiceClient(cc)
  356. r.UpdateState(resolver.State{
  357. Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
  358. ServiceConfig: parseCfg(r, `{
  359. "healthCheckConfig": {
  360. "serviceName": "foo"
  361. }
  362. }`)})
  363. // make some rpcs to make sure connection is working.
  364. if err := verifyResultWithDelay(func() (bool, error) {
  365. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  366. return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  367. }
  368. return true, nil
  369. }); err != nil {
  370. t.Fatal(err)
  371. }
  372. select {
  373. case <-hcExitChan:
  374. t.Fatal("Health check function has exited, which is not expected.")
  375. default:
  376. }
  377. // server closes the connection
  378. s.Stop()
  379. select {
  380. case <-hcExitChan:
  381. case <-time.After(5 * time.Second):
  382. select {
  383. case <-hcEnterChan:
  384. default:
  385. t.Fatal("Health check function has not entered after 5s.")
  386. }
  387. t.Fatal("Health check function has not exited after 5s.")
  388. }
  389. }
  390. // addrConn drain happens when addrConn gets torn down due to its address being no longer in the
  391. // address list returned by the resolver.
  392. func (s) TestHealthCheckWithAddrConnDrain(t *testing.T) {
  393. hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
  394. _, lis, ts, deferFunc, err := setupServer(&svrConfig{})
  395. defer deferFunc()
  396. if err != nil {
  397. t.Fatal(err)
  398. }
  399. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
  400. cc, r, deferFunc, err := setupClient(&clientConfig{
  401. balancerName: "round_robin",
  402. testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
  403. })
  404. if err != nil {
  405. t.Fatal(err)
  406. }
  407. defer deferFunc()
  408. tc := testpb.NewTestServiceClient(cc)
  409. sc := parseCfg(r, `{
  410. "healthCheckConfig": {
  411. "serviceName": "foo"
  412. }
  413. }`)
  414. r.UpdateState(resolver.State{
  415. Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
  416. ServiceConfig: sc,
  417. })
  418. // make some rpcs to make sure connection is working.
  419. if err := verifyResultWithDelay(func() (bool, error) {
  420. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  421. return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  422. }
  423. return true, nil
  424. }); err != nil {
  425. t.Fatal(err)
  426. }
  427. // the stream rpc will persist through goaway event.
  428. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  429. defer cancel()
  430. stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
  431. if err != nil {
  432. t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  433. }
  434. respParam := []*testpb.ResponseParameters{{Size: 1}}
  435. payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(1))
  436. if err != nil {
  437. t.Fatal(err)
  438. }
  439. req := &testpb.StreamingOutputCallRequest{
  440. ResponseParameters: respParam,
  441. Payload: payload,
  442. }
  443. if err := stream.Send(req); err != nil {
  444. t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
  445. }
  446. if _, err := stream.Recv(); err != nil {
  447. t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
  448. }
  449. select {
  450. case <-hcExitChan:
  451. t.Fatal("Health check function has exited, which is not expected.")
  452. default:
  453. }
  454. // trigger teardown of the ac
  455. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}, ServiceConfig: sc})
  456. select {
  457. case <-hcExitChan:
  458. case <-time.After(5 * time.Second):
  459. select {
  460. case <-hcEnterChan:
  461. default:
  462. t.Fatal("Health check function has not entered after 5s.")
  463. }
  464. t.Fatal("Health check function has not exited after 5s.")
  465. }
  466. // The existing RPC should be still good to proceed.
  467. if err := stream.Send(req); err != nil {
  468. t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
  469. }
  470. if _, err := stream.Recv(); err != nil {
  471. t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
  472. }
  473. }
  474. // ClientConn close will lead to its addrConns being torn down.
  475. func (s) TestHealthCheckWithClientConnClose(t *testing.T) {
  476. hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
  477. _, lis, ts, deferFunc, err := setupServer(&svrConfig{})
  478. defer deferFunc()
  479. if err != nil {
  480. t.Fatal(err)
  481. }
  482. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
  483. cc, r, deferFunc, err := setupClient(&clientConfig{
  484. balancerName: "round_robin",
  485. testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
  486. })
  487. if err != nil {
  488. t.Fatal(err)
  489. }
  490. defer deferFunc()
  491. tc := testpb.NewTestServiceClient(cc)
  492. r.UpdateState(resolver.State{
  493. Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
  494. ServiceConfig: parseCfg(r, `{
  495. "healthCheckConfig": {
  496. "serviceName": "foo"
  497. }
  498. }`)})
  499. // make some rpcs to make sure connection is working.
  500. if err := verifyResultWithDelay(func() (bool, error) {
  501. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  502. return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  503. }
  504. return true, nil
  505. }); err != nil {
  506. t.Fatal(err)
  507. }
  508. select {
  509. case <-hcExitChan:
  510. t.Fatal("Health check function has exited, which is not expected.")
  511. default:
  512. }
  513. // trigger addrConn teardown
  514. cc.Close()
  515. select {
  516. case <-hcExitChan:
  517. case <-time.After(5 * time.Second):
  518. select {
  519. case <-hcEnterChan:
  520. default:
  521. t.Fatal("Health check function has not entered after 5s.")
  522. }
  523. t.Fatal("Health check function has not exited after 5s.")
  524. }
  525. }
  526. // This test is to test the logic in the createTransport after the health check function returns which
  527. // closes the skipReset channel(since it has not been closed inside health check func) to unblock
  528. // onGoAway/onClose goroutine.
  529. func (s) TestHealthCheckWithoutSetConnectivityStateCalledAddrConnShutDown(t *testing.T) {
  530. hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
  531. _, lis, ts, deferFunc, err := setupServer(&svrConfig{
  532. specialWatchFunc: func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
  533. if in.Service != "delay" {
  534. return status.Error(codes.FailedPrecondition,
  535. "this special Watch function only handles request with service name to be \"delay\"")
  536. }
  537. // Do nothing to mock a delay of health check response from server side.
  538. // This case is to help with the test that covers the condition that setConnectivityState is not
  539. // called inside HealthCheckFunc before the func returns.
  540. select {
  541. case <-stream.Context().Done():
  542. case <-time.After(5 * time.Second):
  543. }
  544. return nil
  545. },
  546. })
  547. defer deferFunc()
  548. if err != nil {
  549. t.Fatal(err)
  550. }
  551. ts.SetServingStatus("delay", healthpb.HealthCheckResponse_SERVING)
  552. _, r, deferFunc, err := setupClient(&clientConfig{
  553. balancerName: "round_robin",
  554. testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
  555. })
  556. if err != nil {
  557. t.Fatal(err)
  558. }
  559. defer deferFunc()
  560. // The serviceName "delay" is specially handled at server side, where response will not be sent
  561. // back to client immediately upon receiving the request (client should receive no response until
  562. // test ends).
  563. sc := parseCfg(r, `{
  564. "healthCheckConfig": {
  565. "serviceName": "delay"
  566. }
  567. }`)
  568. r.UpdateState(resolver.State{
  569. Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
  570. ServiceConfig: sc,
  571. })
  572. select {
  573. case <-hcExitChan:
  574. t.Fatal("Health check function has exited, which is not expected.")
  575. default:
  576. }
  577. select {
  578. case <-hcEnterChan:
  579. case <-time.After(5 * time.Second):
  580. t.Fatal("Health check function has not been invoked after 5s.")
  581. }
  582. // trigger teardown of the ac, ac in SHUTDOWN state
  583. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}, ServiceConfig: sc})
  584. // The health check func should exit without calling the setConnectivityState func, as server hasn't sent
  585. // any response.
  586. select {
  587. case <-hcExitChan:
  588. case <-time.After(5 * time.Second):
  589. t.Fatal("Health check function has not exited after 5s.")
  590. }
  591. // The deferred leakcheck will check whether there's leaked goroutine, which is an indication
  592. // whether we closes the skipReset channel to unblock onGoAway/onClose goroutine.
  593. }
  594. // This test is to test the logic in the createTransport after the health check function returns which
  595. // closes the allowedToReset channel(since it has not been closed inside health check func) to unblock
  596. // onGoAway/onClose goroutine.
  597. func (s) TestHealthCheckWithoutSetConnectivityStateCalled(t *testing.T) {
  598. hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
  599. s, lis, ts, deferFunc, err := setupServer(&svrConfig{
  600. specialWatchFunc: func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
  601. if in.Service != "delay" {
  602. return status.Error(codes.FailedPrecondition,
  603. "this special Watch function only handles request with service name to be \"delay\"")
  604. }
  605. // Do nothing to mock a delay of health check response from server side.
  606. // This case is to help with the test that covers the condition that setConnectivityState is not
  607. // called inside HealthCheckFunc before the func returns.
  608. select {
  609. case <-stream.Context().Done():
  610. case <-time.After(5 * time.Second):
  611. }
  612. return nil
  613. },
  614. })
  615. defer deferFunc()
  616. if err != nil {
  617. t.Fatal(err)
  618. }
  619. ts.SetServingStatus("delay", healthpb.HealthCheckResponse_SERVING)
  620. _, r, deferFunc, err := setupClient(&clientConfig{
  621. balancerName: "round_robin",
  622. testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
  623. })
  624. if err != nil {
  625. t.Fatal(err)
  626. }
  627. defer deferFunc()
  628. // The serviceName "delay" is specially handled at server side, where response will not be sent
  629. // back to client immediately upon receiving the request (client should receive no response until
  630. // test ends).
  631. r.UpdateState(resolver.State{
  632. Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
  633. ServiceConfig: parseCfg(r, `{
  634. "healthCheckConfig": {
  635. "serviceName": "delay"
  636. }
  637. }`)})
  638. select {
  639. case <-hcExitChan:
  640. t.Fatal("Health check function has exited, which is not expected.")
  641. default:
  642. }
  643. select {
  644. case <-hcEnterChan:
  645. case <-time.After(5 * time.Second):
  646. t.Fatal("Health check function has not been invoked after 5s.")
  647. }
  648. // trigger transport being closed
  649. s.Stop()
  650. // The health check func should exit without calling the setConnectivityState func, as server hasn't sent
  651. // any response.
  652. select {
  653. case <-hcExitChan:
  654. case <-time.After(5 * time.Second):
  655. t.Fatal("Health check function has not exited after 5s.")
  656. }
  657. // The deferred leakcheck will check whether there's leaked goroutine, which is an indication
  658. // whether we closes the allowedToReset channel to unblock onGoAway/onClose goroutine.
  659. }
  660. func testHealthCheckDisableWithDialOption(t *testing.T, addr string) {
  661. hcEnterChan, _, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
  662. cc, r, deferFunc, err := setupClient(&clientConfig{
  663. balancerName: "round_robin",
  664. testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
  665. extraDialOption: []grpc.DialOption{grpc.WithDisableHealthCheck()},
  666. })
  667. if err != nil {
  668. t.Fatal(err)
  669. }
  670. defer deferFunc()
  671. tc := testpb.NewTestServiceClient(cc)
  672. r.UpdateState(resolver.State{
  673. Addresses: []resolver.Address{{Addr: addr}},
  674. ServiceConfig: parseCfg(r, `{
  675. "healthCheckConfig": {
  676. "serviceName": "foo"
  677. }
  678. }`)})
  679. // send some rpcs to make sure transport has been created and is ready for use.
  680. if err := verifyResultWithDelay(func() (bool, error) {
  681. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  682. return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  683. }
  684. return true, nil
  685. }); err != nil {
  686. t.Fatal(err)
  687. }
  688. select {
  689. case <-hcEnterChan:
  690. t.Fatal("Health check function has exited, which is not expected.")
  691. default:
  692. }
  693. }
  694. func testHealthCheckDisableWithBalancer(t *testing.T, addr string) {
  695. hcEnterChan, _, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
  696. cc, r, deferFunc, err := setupClient(&clientConfig{
  697. balancerName: "pick_first",
  698. testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
  699. })
  700. if err != nil {
  701. t.Fatal(err)
  702. }
  703. defer deferFunc()
  704. tc := testpb.NewTestServiceClient(cc)
  705. r.UpdateState(resolver.State{
  706. Addresses: []resolver.Address{{Addr: addr}},
  707. ServiceConfig: parseCfg(r, `{
  708. "healthCheckConfig": {
  709. "serviceName": "foo"
  710. }
  711. }`)})
  712. // send some rpcs to make sure transport has been created and is ready for use.
  713. if err := verifyResultWithDelay(func() (bool, error) {
  714. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  715. return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  716. }
  717. return true, nil
  718. }); err != nil {
  719. t.Fatal(err)
  720. }
  721. select {
  722. case <-hcEnterChan:
  723. t.Fatal("Health check function has started, which is not expected.")
  724. default:
  725. }
  726. }
  727. func testHealthCheckDisableWithServiceConfig(t *testing.T, addr string) {
  728. hcEnterChan, _, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
  729. cc, r, deferFunc, err := setupClient(&clientConfig{
  730. balancerName: "round_robin",
  731. testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
  732. })
  733. if err != nil {
  734. t.Fatal(err)
  735. }
  736. defer deferFunc()
  737. tc := testpb.NewTestServiceClient(cc)
  738. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: addr}}})
  739. // send some rpcs to make sure transport has been created and is ready for use.
  740. if err := verifyResultWithDelay(func() (bool, error) {
  741. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  742. return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  743. }
  744. return true, nil
  745. }); err != nil {
  746. t.Fatal(err)
  747. }
  748. select {
  749. case <-hcEnterChan:
  750. t.Fatal("Health check function has started, which is not expected.")
  751. default:
  752. }
  753. }
  754. func (s) TestHealthCheckDisable(t *testing.T) {
  755. _, lis, ts, deferFunc, err := setupServer(&svrConfig{})
  756. defer deferFunc()
  757. if err != nil {
  758. t.Fatal(err)
  759. }
  760. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
  761. // test client side disabling configuration.
  762. testHealthCheckDisableWithDialOption(t, lis.Addr().String())
  763. testHealthCheckDisableWithBalancer(t, lis.Addr().String())
  764. testHealthCheckDisableWithServiceConfig(t, lis.Addr().String())
  765. }
  766. func (s) TestHealthCheckChannelzCountingCallSuccess(t *testing.T) {
  767. _, lis, _, deferFunc, err := setupServer(&svrConfig{
  768. specialWatchFunc: func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
  769. if in.Service != "channelzSuccess" {
  770. return status.Error(codes.FailedPrecondition,
  771. "this special Watch function only handles request with service name to be \"channelzSuccess\"")
  772. }
  773. return status.Error(codes.OK, "fake success")
  774. },
  775. })
  776. defer deferFunc()
  777. if err != nil {
  778. t.Fatal(err)
  779. }
  780. _, r, deferFunc, err := setupClient(&clientConfig{balancerName: "round_robin"})
  781. if err != nil {
  782. t.Fatal(err)
  783. }
  784. defer deferFunc()
  785. r.UpdateState(resolver.State{
  786. Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
  787. ServiceConfig: parseCfg(r, `{
  788. "healthCheckConfig": {
  789. "serviceName": "channelzSuccess"
  790. }
  791. }`)})
  792. if err := verifyResultWithDelay(func() (bool, error) {
  793. cm, _ := channelz.GetTopChannels(0, 0)
  794. if len(cm) == 0 {
  795. return false, errors.New("channelz.GetTopChannels return 0 top channel")
  796. }
  797. if len(cm[0].SubChans) == 0 {
  798. return false, errors.New("there is 0 subchannel")
  799. }
  800. var id int64
  801. for k := range cm[0].SubChans {
  802. id = k
  803. break
  804. }
  805. scm := channelz.GetSubChannel(id)
  806. if scm == nil || scm.ChannelData == nil {
  807. return false, errors.New("nil subchannel metric or nil subchannel metric ChannelData returned")
  808. }
  809. // exponential backoff retry may result in more than one health check call.
  810. if scm.ChannelData.CallsStarted > 0 && scm.ChannelData.CallsSucceeded > 0 && scm.ChannelData.CallsFailed == 0 {
  811. return true, nil
  812. }
  813. return false, fmt.Errorf("got %d CallsStarted, %d CallsSucceeded, want >0 >0", scm.ChannelData.CallsStarted, scm.ChannelData.CallsSucceeded)
  814. }); err != nil {
  815. t.Fatal(err)
  816. }
  817. }
  818. func (s) TestHealthCheckChannelzCountingCallFailure(t *testing.T) {
  819. _, lis, _, deferFunc, err := setupServer(&svrConfig{
  820. specialWatchFunc: func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
  821. if in.Service != "channelzFailure" {
  822. return status.Error(codes.FailedPrecondition,
  823. "this special Watch function only handles request with service name to be \"channelzFailure\"")
  824. }
  825. return status.Error(codes.Internal, "fake failure")
  826. },
  827. })
  828. if err != nil {
  829. t.Fatal(err)
  830. }
  831. defer deferFunc()
  832. _, r, deferFunc, err := setupClient(&clientConfig{balancerName: "round_robin"})
  833. if err != nil {
  834. t.Fatal(err)
  835. }
  836. defer deferFunc()
  837. r.UpdateState(resolver.State{
  838. Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
  839. ServiceConfig: parseCfg(r, `{
  840. "healthCheckConfig": {
  841. "serviceName": "channelzFailure"
  842. }
  843. }`)})
  844. if err := verifyResultWithDelay(func() (bool, error) {
  845. cm, _ := channelz.GetTopChannels(0, 0)
  846. if len(cm) == 0 {
  847. return false, errors.New("channelz.GetTopChannels return 0 top channel")
  848. }
  849. if len(cm[0].SubChans) == 0 {
  850. return false, errors.New("there is 0 subchannel")
  851. }
  852. var id int64
  853. for k := range cm[0].SubChans {
  854. id = k
  855. break
  856. }
  857. scm := channelz.GetSubChannel(id)
  858. if scm == nil || scm.ChannelData == nil {
  859. return false, errors.New("nil subchannel metric or nil subchannel metric ChannelData returned")
  860. }
  861. // exponential backoff retry may result in more than one health check call.
  862. if scm.ChannelData.CallsStarted > 0 && scm.ChannelData.CallsFailed > 0 && scm.ChannelData.CallsSucceeded == 0 {
  863. return true, nil
  864. }
  865. return false, fmt.Errorf("got %d CallsStarted, %d CallsFailed, want >0, >0", scm.ChannelData.CallsStarted, scm.ChannelData.CallsFailed)
  866. }); err != nil {
  867. t.Fatal(err)
  868. }
  869. }