xds_client_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472
  1. // +build go1.12
  2. /*
  3. *
  4. * Copyright 2019 gRPC authors.
  5. *
  6. * Licensed under the Apache License, Version 2.0 (the "License");
  7. * you may not use this file except in compliance with the License.
  8. * You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. *
  18. */
  19. package xds
  20. import (
  21. "context"
  22. "errors"
  23. "io"
  24. "net"
  25. "testing"
  26. "time"
  27. "github.com/golang/protobuf/proto"
  28. anypb "github.com/golang/protobuf/ptypes/any"
  29. structpb "github.com/golang/protobuf/ptypes/struct"
  30. wrpb "github.com/golang/protobuf/ptypes/wrappers"
  31. "google.golang.org/grpc"
  32. "google.golang.org/grpc/balancer"
  33. cdspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/cds"
  34. addresspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/core/address"
  35. basepb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/core/base"
  36. discoverypb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/discovery"
  37. edspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/eds"
  38. endpointpb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/endpoint/endpoint"
  39. adspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/service/discovery/v2/ads"
  40. "google.golang.org/grpc/codes"
  41. "google.golang.org/grpc/status"
  42. )
  43. var (
  44. testServiceName = "test/foo"
  45. testCDSReq = &discoverypb.DiscoveryRequest{
  46. Node: &basepb.Node{
  47. Metadata: &structpb.Struct{
  48. Fields: map[string]*structpb.Value{
  49. grpcHostname: {
  50. Kind: &structpb.Value_StringValue{StringValue: testServiceName},
  51. },
  52. },
  53. },
  54. },
  55. TypeUrl: cdsType,
  56. }
  57. testEDSReq = &discoverypb.DiscoveryRequest{
  58. Node: &basepb.Node{
  59. Metadata: &structpb.Struct{
  60. Fields: map[string]*structpb.Value{
  61. endpointRequired: {
  62. Kind: &structpb.Value_BoolValue{BoolValue: true},
  63. },
  64. },
  65. },
  66. },
  67. ResourceNames: []string{testServiceName},
  68. TypeUrl: edsType,
  69. }
  70. testEDSReqWithoutEndpoints = &discoverypb.DiscoveryRequest{
  71. Node: &basepb.Node{
  72. Metadata: &structpb.Struct{
  73. Fields: map[string]*structpb.Value{
  74. endpointRequired: {
  75. Kind: &structpb.Value_BoolValue{BoolValue: false},
  76. },
  77. },
  78. },
  79. },
  80. ResourceNames: []string{testServiceName},
  81. TypeUrl: edsType,
  82. }
  83. testCluster = &cdspb.Cluster{
  84. Name: testServiceName,
  85. ClusterDiscoveryType: &cdspb.Cluster_Type{Type: cdspb.Cluster_EDS},
  86. LbPolicy: cdspb.Cluster_ROUND_ROBIN,
  87. }
  88. marshaledCluster, _ = proto.Marshal(testCluster)
  89. testCDSResp = &discoverypb.DiscoveryResponse{
  90. Resources: []*anypb.Any{
  91. {
  92. TypeUrl: cdsType,
  93. Value: marshaledCluster,
  94. },
  95. },
  96. TypeUrl: cdsType,
  97. }
  98. testClusterLoadAssignment = &edspb.ClusterLoadAssignment{
  99. ClusterName: testServiceName,
  100. Endpoints: []*endpointpb.LocalityLbEndpoints{
  101. {
  102. Locality: &basepb.Locality{
  103. Region: "asia-east1",
  104. Zone: "1",
  105. SubZone: "sa",
  106. },
  107. LbEndpoints: []*endpointpb.LbEndpoint{
  108. {
  109. HostIdentifier: &endpointpb.LbEndpoint_Endpoint{
  110. Endpoint: &endpointpb.Endpoint{
  111. Address: &addresspb.Address{
  112. Address: &addresspb.Address_SocketAddress{
  113. SocketAddress: &addresspb.SocketAddress{
  114. Address: "1.1.1.1",
  115. PortSpecifier: &addresspb.SocketAddress_PortValue{
  116. PortValue: 10001,
  117. },
  118. ResolverName: "dns",
  119. },
  120. },
  121. },
  122. HealthCheckConfig: nil,
  123. },
  124. },
  125. Metadata: &basepb.Metadata{
  126. FilterMetadata: map[string]*structpb.Struct{
  127. "xx.lb": {
  128. Fields: map[string]*structpb.Value{
  129. "endpoint_name": {
  130. Kind: &structpb.Value_StringValue{
  131. StringValue: "some.endpoint.name",
  132. },
  133. },
  134. },
  135. },
  136. },
  137. },
  138. },
  139. },
  140. LoadBalancingWeight: &wrpb.UInt32Value{
  141. Value: 1,
  142. },
  143. Priority: 0,
  144. },
  145. },
  146. }
  147. marshaledClusterLoadAssignment, _ = proto.Marshal(testClusterLoadAssignment)
  148. testEDSResp = &discoverypb.DiscoveryResponse{
  149. Resources: []*anypb.Any{
  150. {
  151. TypeUrl: edsType,
  152. Value: marshaledClusterLoadAssignment,
  153. },
  154. },
  155. TypeUrl: edsType,
  156. }
  157. testClusterLoadAssignmentWithoutEndpoints = &edspb.ClusterLoadAssignment{
  158. ClusterName: testServiceName,
  159. Endpoints: []*endpointpb.LocalityLbEndpoints{
  160. {
  161. Locality: &basepb.Locality{
  162. SubZone: "sa",
  163. },
  164. LoadBalancingWeight: &wrpb.UInt32Value{
  165. Value: 128,
  166. },
  167. Priority: 0,
  168. },
  169. },
  170. Policy: nil,
  171. }
  172. marshaledClusterLoadAssignmentWithoutEndpoints, _ = proto.Marshal(testClusterLoadAssignmentWithoutEndpoints)
  173. testEDSRespWithoutEndpoints = &discoverypb.DiscoveryResponse{
  174. Resources: []*anypb.Any{
  175. {
  176. TypeUrl: edsType,
  177. Value: marshaledClusterLoadAssignmentWithoutEndpoints,
  178. },
  179. },
  180. TypeUrl: edsType,
  181. }
  182. )
  183. type testTrafficDirector struct {
  184. reqChan chan *request
  185. respChan chan *response
  186. }
  187. type request struct {
  188. req *discoverypb.DiscoveryRequest
  189. err error
  190. }
  191. type response struct {
  192. resp *discoverypb.DiscoveryResponse
  193. err error
  194. }
  195. func (ttd *testTrafficDirector) StreamAggregatedResources(s adspb.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
  196. for {
  197. req, err := s.Recv()
  198. if err != nil {
  199. ttd.reqChan <- &request{
  200. req: nil,
  201. err: err,
  202. }
  203. if err == io.EOF {
  204. return nil
  205. }
  206. return err
  207. }
  208. ttd.reqChan <- &request{
  209. req: req,
  210. err: nil,
  211. }
  212. if req.TypeUrl == edsType {
  213. break
  214. }
  215. }
  216. for {
  217. select {
  218. case resp := <-ttd.respChan:
  219. if resp.err != nil {
  220. return resp.err
  221. }
  222. if err := s.Send(resp.resp); err != nil {
  223. return err
  224. }
  225. case <-s.Context().Done():
  226. return s.Context().Err()
  227. }
  228. }
  229. }
  230. func (ttd *testTrafficDirector) DeltaAggregatedResources(adspb.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error {
  231. return status.Error(codes.Unimplemented, "")
  232. }
  233. func (ttd *testTrafficDirector) sendResp(resp *response) {
  234. ttd.respChan <- resp
  235. }
  236. func (ttd *testTrafficDirector) getReq() *request {
  237. return <-ttd.reqChan
  238. }
  239. func newTestTrafficDirector() *testTrafficDirector {
  240. return &testTrafficDirector{
  241. reqChan: make(chan *request, 10),
  242. respChan: make(chan *response, 10),
  243. }
  244. }
  245. type testConfig struct {
  246. doCDS bool
  247. expectedRequests []*discoverypb.DiscoveryRequest
  248. responsesToSend []*discoverypb.DiscoveryResponse
  249. expectedADSResponses []proto.Message
  250. adsErr error
  251. svrErr error
  252. }
  253. func setupServer(t *testing.T) (addr string, td *testTrafficDirector, cleanup func()) {
  254. lis, err := net.Listen("tcp", "localhost:0")
  255. if err != nil {
  256. t.Fatalf("listen failed due to: %v", err)
  257. }
  258. svr := grpc.NewServer()
  259. td = newTestTrafficDirector()
  260. adspb.RegisterAggregatedDiscoveryServiceServer(svr, td)
  261. go svr.Serve(lis)
  262. return lis.Addr().String(), td, func() {
  263. svr.Stop()
  264. lis.Close()
  265. }
  266. }
  267. func (s) TestXdsClientResponseHandling(t *testing.T) {
  268. for _, test := range []*testConfig{
  269. {
  270. doCDS: true,
  271. expectedRequests: []*discoverypb.DiscoveryRequest{testCDSReq, testEDSReq},
  272. responsesToSend: []*discoverypb.DiscoveryResponse{testCDSResp, testEDSResp},
  273. expectedADSResponses: []proto.Message{testCluster, testClusterLoadAssignment},
  274. },
  275. {
  276. doCDS: false,
  277. expectedRequests: []*discoverypb.DiscoveryRequest{testEDSReqWithoutEndpoints},
  278. responsesToSend: []*discoverypb.DiscoveryResponse{testEDSRespWithoutEndpoints},
  279. expectedADSResponses: []proto.Message{testClusterLoadAssignmentWithoutEndpoints},
  280. },
  281. } {
  282. testXdsClientResponseHandling(t, test)
  283. }
  284. }
  285. func testXdsClientResponseHandling(t *testing.T, test *testConfig) {
  286. addr, td, cleanup := setupServer(t)
  287. defer cleanup()
  288. adsChan := make(chan proto.Message, 10)
  289. newADS := func(ctx context.Context, i proto.Message) error {
  290. adsChan <- i
  291. return nil
  292. }
  293. client := newXDSClient(addr, testServiceName, test.doCDS, balancer.BuildOptions{}, newADS, func(context.Context) {}, func() {})
  294. defer client.close()
  295. go client.run()
  296. for _, expectedReq := range test.expectedRequests {
  297. req := td.getReq()
  298. if req.err != nil {
  299. t.Fatalf("ads RPC failed with err: %v", req.err)
  300. }
  301. if !proto.Equal(req.req, expectedReq) {
  302. t.Fatalf("got ADS request %T %v, expected: %T %v", req.req, req.req, expectedReq, expectedReq)
  303. }
  304. }
  305. for i, resp := range test.responsesToSend {
  306. td.sendResp(&response{resp: resp})
  307. ads := <-adsChan
  308. if !proto.Equal(ads, test.expectedADSResponses[i]) {
  309. t.Fatalf("received unexpected ads response, got %v, want %v", ads, test.expectedADSResponses[i])
  310. }
  311. }
  312. }
  313. func (s) TestXdsClientLoseContact(t *testing.T) {
  314. for _, test := range []*testConfig{
  315. {
  316. doCDS: true,
  317. responsesToSend: []*discoverypb.DiscoveryResponse{},
  318. },
  319. {
  320. doCDS: false,
  321. responsesToSend: []*discoverypb.DiscoveryResponse{testEDSRespWithoutEndpoints},
  322. },
  323. } {
  324. testXdsClientLoseContactRemoteClose(t, test)
  325. }
  326. for _, test := range []*testConfig{
  327. {
  328. doCDS: false,
  329. responsesToSend: []*discoverypb.DiscoveryResponse{testCDSResp}, // CDS response when in custom mode.
  330. },
  331. {
  332. doCDS: true,
  333. responsesToSend: []*discoverypb.DiscoveryResponse{{}}, // response with 0 resources is an error case.
  334. },
  335. {
  336. doCDS: true,
  337. responsesToSend: []*discoverypb.DiscoveryResponse{testCDSResp},
  338. adsErr: errors.New("some ads parsing error from xdsBalancer"),
  339. },
  340. } {
  341. testXdsClientLoseContactADSRelatedErrorOccur(t, test)
  342. }
  343. }
  344. func testXdsClientLoseContactRemoteClose(t *testing.T, test *testConfig) {
  345. addr, td, cleanup := setupServer(t)
  346. defer cleanup()
  347. adsChan := make(chan proto.Message, 10)
  348. newADS := func(ctx context.Context, i proto.Message) error {
  349. adsChan <- i
  350. return nil
  351. }
  352. contactChan := make(chan *loseContact, 10)
  353. loseContactFunc := func(context.Context) {
  354. contactChan <- &loseContact{}
  355. }
  356. client := newXDSClient(addr, testServiceName, test.doCDS, balancer.BuildOptions{}, newADS, loseContactFunc, func() {})
  357. defer client.close()
  358. go client.run()
  359. // make sure server side get the request (i.e stream created successfully on client side)
  360. td.getReq()
  361. for _, resp := range test.responsesToSend {
  362. td.sendResp(&response{resp: resp})
  363. // make sure client side receives it
  364. <-adsChan
  365. }
  366. cleanup()
  367. select {
  368. case <-contactChan:
  369. case <-time.After(2 * time.Second):
  370. t.Fatal("time out when expecting lost contact signal")
  371. }
  372. }
  373. func testXdsClientLoseContactADSRelatedErrorOccur(t *testing.T, test *testConfig) {
  374. addr, td, cleanup := setupServer(t)
  375. defer cleanup()
  376. adsChan := make(chan proto.Message, 10)
  377. newADS := func(ctx context.Context, i proto.Message) error {
  378. adsChan <- i
  379. return test.adsErr
  380. }
  381. contactChan := make(chan *loseContact, 10)
  382. loseContactFunc := func(context.Context) {
  383. contactChan <- &loseContact{}
  384. }
  385. client := newXDSClient(addr, testServiceName, test.doCDS, balancer.BuildOptions{}, newADS, loseContactFunc, func() {})
  386. defer client.close()
  387. go client.run()
  388. // make sure server side get the request (i.e stream created successfully on client side)
  389. td.getReq()
  390. for _, resp := range test.responsesToSend {
  391. td.sendResp(&response{resp: resp})
  392. }
  393. select {
  394. case <-contactChan:
  395. case <-time.After(2 * time.Second):
  396. t.Fatal("time out when expecting lost contact signal")
  397. }
  398. }
  399. func (s) TestXdsClientExponentialRetry(t *testing.T) {
  400. cfg := &testConfig{
  401. svrErr: status.Errorf(codes.Aborted, "abort the stream to trigger retry"),
  402. }
  403. addr, td, cleanup := setupServer(t)
  404. defer cleanup()
  405. adsChan := make(chan proto.Message, 10)
  406. newADS := func(ctx context.Context, i proto.Message) error {
  407. adsChan <- i
  408. return nil
  409. }
  410. contactChan := make(chan *loseContact, 10)
  411. loseContactFunc := func(context.Context) {
  412. contactChan <- &loseContact{}
  413. }
  414. client := newXDSClient(addr, testServiceName, cfg.doCDS, balancer.BuildOptions{}, newADS, loseContactFunc, func() {})
  415. defer client.close()
  416. go client.run()
  417. var secondRetry, thirdRetry time.Time
  418. for i := 0; i < 3; i++ {
  419. // make sure server side get the request (i.e stream created successfully on client side)
  420. td.getReq()
  421. td.sendResp(&response{err: cfg.svrErr})
  422. select {
  423. case <-contactChan:
  424. if i == 1 {
  425. secondRetry = time.Now()
  426. }
  427. if i == 2 {
  428. thirdRetry = time.Now()
  429. }
  430. case <-time.After(2 * time.Second):
  431. t.Fatal("time out when expecting lost contact signal")
  432. }
  433. }
  434. if thirdRetry.Sub(secondRetry) < 1*time.Second {
  435. t.Fatalf("interval between second and third retry is %v, expected > 1s", thirdRetry.Sub(secondRetry))
  436. }
  437. }