// +build go1.12 /* * * Copyright 2019 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package xds import ( "context" "errors" "io" "net" "testing" "time" "github.com/golang/protobuf/proto" anypb "github.com/golang/protobuf/ptypes/any" structpb "github.com/golang/protobuf/ptypes/struct" wrpb "github.com/golang/protobuf/ptypes/wrappers" "google.golang.org/grpc" "google.golang.org/grpc/balancer" cdspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/cds" addresspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/core/address" basepb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/core/base" discoverypb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/discovery" edspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/eds" endpointpb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/endpoint/endpoint" adspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/service/discovery/v2/ads" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) var ( testServiceName = "test/foo" testCDSReq = &discoverypb.DiscoveryRequest{ Node: &basepb.Node{ Metadata: &structpb.Struct{ Fields: map[string]*structpb.Value{ grpcHostname: { Kind: &structpb.Value_StringValue{StringValue: testServiceName}, }, }, }, }, TypeUrl: cdsType, } testEDSReq = &discoverypb.DiscoveryRequest{ Node: &basepb.Node{ Metadata: &structpb.Struct{ Fields: map[string]*structpb.Value{ endpointRequired: { Kind: &structpb.Value_BoolValue{BoolValue: true}, }, }, }, }, ResourceNames: []string{testServiceName}, TypeUrl: edsType, } testEDSReqWithoutEndpoints = &discoverypb.DiscoveryRequest{ Node: &basepb.Node{ Metadata: &structpb.Struct{ Fields: map[string]*structpb.Value{ endpointRequired: { Kind: &structpb.Value_BoolValue{BoolValue: false}, }, }, }, }, ResourceNames: []string{testServiceName}, TypeUrl: edsType, } testCluster = &cdspb.Cluster{ Name: testServiceName, ClusterDiscoveryType: &cdspb.Cluster_Type{Type: cdspb.Cluster_EDS}, LbPolicy: cdspb.Cluster_ROUND_ROBIN, } marshaledCluster, _ = proto.Marshal(testCluster) testCDSResp = &discoverypb.DiscoveryResponse{ Resources: []*anypb.Any{ { TypeUrl: cdsType, Value: marshaledCluster, }, }, TypeUrl: cdsType, } testClusterLoadAssignment = &edspb.ClusterLoadAssignment{ ClusterName: testServiceName, Endpoints: []*endpointpb.LocalityLbEndpoints{ { Locality: &basepb.Locality{ Region: "asia-east1", Zone: "1", SubZone: "sa", }, LbEndpoints: []*endpointpb.LbEndpoint{ { HostIdentifier: &endpointpb.LbEndpoint_Endpoint{ Endpoint: &endpointpb.Endpoint{ Address: &addresspb.Address{ Address: &addresspb.Address_SocketAddress{ SocketAddress: &addresspb.SocketAddress{ Address: "1.1.1.1", PortSpecifier: &addresspb.SocketAddress_PortValue{ PortValue: 10001, }, ResolverName: "dns", }, }, }, HealthCheckConfig: nil, }, }, Metadata: &basepb.Metadata{ FilterMetadata: map[string]*structpb.Struct{ "xx.lb": { Fields: map[string]*structpb.Value{ "endpoint_name": { Kind: &structpb.Value_StringValue{ StringValue: "some.endpoint.name", }, }, }, }, }, }, }, }, LoadBalancingWeight: &wrpb.UInt32Value{ Value: 1, }, Priority: 0, }, }, } marshaledClusterLoadAssignment, _ = proto.Marshal(testClusterLoadAssignment) testEDSResp = &discoverypb.DiscoveryResponse{ Resources: []*anypb.Any{ { TypeUrl: edsType, Value: marshaledClusterLoadAssignment, }, }, TypeUrl: edsType, } testClusterLoadAssignmentWithoutEndpoints = &edspb.ClusterLoadAssignment{ ClusterName: testServiceName, Endpoints: []*endpointpb.LocalityLbEndpoints{ { Locality: &basepb.Locality{ SubZone: "sa", }, LoadBalancingWeight: &wrpb.UInt32Value{ Value: 128, }, Priority: 0, }, }, Policy: nil, } marshaledClusterLoadAssignmentWithoutEndpoints, _ = proto.Marshal(testClusterLoadAssignmentWithoutEndpoints) testEDSRespWithoutEndpoints = &discoverypb.DiscoveryResponse{ Resources: []*anypb.Any{ { TypeUrl: edsType, Value: marshaledClusterLoadAssignmentWithoutEndpoints, }, }, TypeUrl: edsType, } ) type testTrafficDirector struct { reqChan chan *request respChan chan *response } type request struct { req *discoverypb.DiscoveryRequest err error } type response struct { resp *discoverypb.DiscoveryResponse err error } func (ttd *testTrafficDirector) StreamAggregatedResources(s adspb.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error { for { req, err := s.Recv() if err != nil { ttd.reqChan <- &request{ req: nil, err: err, } if err == io.EOF { return nil } return err } ttd.reqChan <- &request{ req: req, err: nil, } if req.TypeUrl == edsType { break } } for { select { case resp := <-ttd.respChan: if resp.err != nil { return resp.err } if err := s.Send(resp.resp); err != nil { return err } case <-s.Context().Done(): return s.Context().Err() } } } func (ttd *testTrafficDirector) DeltaAggregatedResources(adspb.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error { return status.Error(codes.Unimplemented, "") } func (ttd *testTrafficDirector) sendResp(resp *response) { ttd.respChan <- resp } func (ttd *testTrafficDirector) getReq() *request { return <-ttd.reqChan } func newTestTrafficDirector() *testTrafficDirector { return &testTrafficDirector{ reqChan: make(chan *request, 10), respChan: make(chan *response, 10), } } type testConfig struct { doCDS bool expectedRequests []*discoverypb.DiscoveryRequest responsesToSend []*discoverypb.DiscoveryResponse expectedADSResponses []proto.Message adsErr error svrErr error } func setupServer(t *testing.T) (addr string, td *testTrafficDirector, cleanup func()) { lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("listen failed due to: %v", err) } svr := grpc.NewServer() td = newTestTrafficDirector() adspb.RegisterAggregatedDiscoveryServiceServer(svr, td) go svr.Serve(lis) return lis.Addr().String(), td, func() { svr.Stop() lis.Close() } } func (s) TestXdsClientResponseHandling(t *testing.T) { for _, test := range []*testConfig{ { doCDS: true, expectedRequests: []*discoverypb.DiscoveryRequest{testCDSReq, testEDSReq}, responsesToSend: []*discoverypb.DiscoveryResponse{testCDSResp, testEDSResp}, expectedADSResponses: []proto.Message{testCluster, testClusterLoadAssignment}, }, { doCDS: false, expectedRequests: []*discoverypb.DiscoveryRequest{testEDSReqWithoutEndpoints}, responsesToSend: []*discoverypb.DiscoveryResponse{testEDSRespWithoutEndpoints}, expectedADSResponses: []proto.Message{testClusterLoadAssignmentWithoutEndpoints}, }, } { testXdsClientResponseHandling(t, test) } } func testXdsClientResponseHandling(t *testing.T, test *testConfig) { addr, td, cleanup := setupServer(t) defer cleanup() adsChan := make(chan proto.Message, 10) newADS := func(ctx context.Context, i proto.Message) error { adsChan <- i return nil } client := newXDSClient(addr, testServiceName, test.doCDS, balancer.BuildOptions{}, newADS, func(context.Context) {}, func() {}) defer client.close() go client.run() for _, expectedReq := range test.expectedRequests { req := td.getReq() if req.err != nil { t.Fatalf("ads RPC failed with err: %v", req.err) } if !proto.Equal(req.req, expectedReq) { t.Fatalf("got ADS request %T %v, expected: %T %v", req.req, req.req, expectedReq, expectedReq) } } for i, resp := range test.responsesToSend { td.sendResp(&response{resp: resp}) ads := <-adsChan if !proto.Equal(ads, test.expectedADSResponses[i]) { t.Fatalf("received unexpected ads response, got %v, want %v", ads, test.expectedADSResponses[i]) } } } func (s) TestXdsClientLoseContact(t *testing.T) { for _, test := range []*testConfig{ { doCDS: true, responsesToSend: []*discoverypb.DiscoveryResponse{}, }, { doCDS: false, responsesToSend: []*discoverypb.DiscoveryResponse{testEDSRespWithoutEndpoints}, }, } { testXdsClientLoseContactRemoteClose(t, test) } for _, test := range []*testConfig{ { doCDS: false, responsesToSend: []*discoverypb.DiscoveryResponse{testCDSResp}, // CDS response when in custom mode. }, { doCDS: true, responsesToSend: []*discoverypb.DiscoveryResponse{{}}, // response with 0 resources is an error case. }, { doCDS: true, responsesToSend: []*discoverypb.DiscoveryResponse{testCDSResp}, adsErr: errors.New("some ads parsing error from xdsBalancer"), }, } { testXdsClientLoseContactADSRelatedErrorOccur(t, test) } } func testXdsClientLoseContactRemoteClose(t *testing.T, test *testConfig) { addr, td, cleanup := setupServer(t) defer cleanup() adsChan := make(chan proto.Message, 10) newADS := func(ctx context.Context, i proto.Message) error { adsChan <- i return nil } contactChan := make(chan *loseContact, 10) loseContactFunc := func(context.Context) { contactChan <- &loseContact{} } client := newXDSClient(addr, testServiceName, test.doCDS, balancer.BuildOptions{}, newADS, loseContactFunc, func() {}) defer client.close() go client.run() // make sure server side get the request (i.e stream created successfully on client side) td.getReq() for _, resp := range test.responsesToSend { td.sendResp(&response{resp: resp}) // make sure client side receives it <-adsChan } cleanup() select { case <-contactChan: case <-time.After(2 * time.Second): t.Fatal("time out when expecting lost contact signal") } } func testXdsClientLoseContactADSRelatedErrorOccur(t *testing.T, test *testConfig) { addr, td, cleanup := setupServer(t) defer cleanup() adsChan := make(chan proto.Message, 10) newADS := func(ctx context.Context, i proto.Message) error { adsChan <- i return test.adsErr } contactChan := make(chan *loseContact, 10) loseContactFunc := func(context.Context) { contactChan <- &loseContact{} } client := newXDSClient(addr, testServiceName, test.doCDS, balancer.BuildOptions{}, newADS, loseContactFunc, func() {}) defer client.close() go client.run() // make sure server side get the request (i.e stream created successfully on client side) td.getReq() for _, resp := range test.responsesToSend { td.sendResp(&response{resp: resp}) } select { case <-contactChan: case <-time.After(2 * time.Second): t.Fatal("time out when expecting lost contact signal") } } func (s) TestXdsClientExponentialRetry(t *testing.T) { cfg := &testConfig{ svrErr: status.Errorf(codes.Aborted, "abort the stream to trigger retry"), } addr, td, cleanup := setupServer(t) defer cleanup() adsChan := make(chan proto.Message, 10) newADS := func(ctx context.Context, i proto.Message) error { adsChan <- i return nil } contactChan := make(chan *loseContact, 10) loseContactFunc := func(context.Context) { contactChan <- &loseContact{} } client := newXDSClient(addr, testServiceName, cfg.doCDS, balancer.BuildOptions{}, newADS, loseContactFunc, func() {}) defer client.close() go client.run() var secondRetry, thirdRetry time.Time for i := 0; i < 3; i++ { // make sure server side get the request (i.e stream created successfully on client side) td.getReq() td.sendResp(&response{err: cfg.svrErr}) select { case <-contactChan: if i == 1 { secondRetry = time.Now() } if i == 2 { thirdRetry = time.Now() } case <-time.After(2 * time.Second): t.Fatal("time out when expecting lost contact signal") } } if thirdRetry.Sub(secondRetry) < 1*time.Second { t.Fatalf("interval between second and third retry is %v, expected > 1s", thirdRetry.Sub(secondRetry)) } }