123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269 |
- // +build go1.12
- /*
- *
- * Copyright 2019 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
- package xds
- import (
- "context"
- "sync"
- "time"
- "github.com/golang/protobuf/proto"
- "github.com/golang/protobuf/ptypes"
- structpb "github.com/golang/protobuf/ptypes/struct"
- "google.golang.org/grpc"
- "google.golang.org/grpc/balancer"
- cdspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/cds"
- basepb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/core/base"
- discoverypb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/discovery"
- edspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/eds"
- adspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/service/discovery/v2/ads"
- "google.golang.org/grpc/grpclog"
- "google.golang.org/grpc/internal/backoff"
- "google.golang.org/grpc/internal/channelz"
- )
- const (
- grpcHostname = "com.googleapis.trafficdirector.grpc_hostname"
- cdsType = "type.googleapis.com/envoy.api.v2.Cluster"
- edsType = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"
- endpointRequired = "endpoints_required"
- )
- var (
- defaultBackoffConfig = backoff.Exponential{
- MaxDelay: 120 * time.Second,
- }
- )
- // client is responsible for connecting to the specified traffic director, passing the received
- // ADS response from the traffic director, and sending notification when communication with the
- // traffic director is lost.
- type client struct {
- ctx context.Context
- cancel context.CancelFunc
- cli adspb.AggregatedDiscoveryServiceClient
- opts balancer.BuildOptions
- balancerName string // the traffic director name
- serviceName string // the user dial target name
- enableCDS bool
- newADS func(ctx context.Context, resp proto.Message) error
- loseContact func(ctx context.Context)
- cleanup func()
- backoff backoff.Strategy
- mu sync.Mutex
- cc *grpc.ClientConn
- }
- func (c *client) run() {
- c.dial()
- c.makeADSCall()
- }
- func (c *client) close() {
- c.cancel()
- c.mu.Lock()
- if c.cc != nil {
- c.cc.Close()
- }
- c.mu.Unlock()
- c.cleanup()
- }
- func (c *client) dial() {
- var dopts []grpc.DialOption
- if creds := c.opts.DialCreds; creds != nil {
- if err := creds.OverrideServerName(c.balancerName); err == nil {
- dopts = append(dopts, grpc.WithTransportCredentials(creds))
- } else {
- grpclog.Warningf("xds: failed to override the server name in the credentials: %v, using Insecure", err)
- dopts = append(dopts, grpc.WithInsecure())
- }
- } else {
- dopts = append(dopts, grpc.WithInsecure())
- }
- if c.opts.Dialer != nil {
- dopts = append(dopts, grpc.WithContextDialer(c.opts.Dialer))
- }
- // Explicitly set pickfirst as the balancer.
- dopts = append(dopts, grpc.WithBalancerName(grpc.PickFirstBalancerName))
- if channelz.IsOn() {
- dopts = append(dopts, grpc.WithChannelzParentID(c.opts.ChannelzParentID))
- }
- cc, err := grpc.DialContext(c.ctx, c.balancerName, dopts...)
- // Since this is a non-blocking dial, so if it fails, it due to some serious error (not network
- // related) error.
- if err != nil {
- grpclog.Fatalf("xds: failed to dial: %v", err)
- }
- c.mu.Lock()
- select {
- case <-c.ctx.Done():
- cc.Close()
- default:
- // only assign c.cc when xds client has not been closed, to prevent ClientConn leak.
- c.cc = cc
- }
- c.mu.Unlock()
- }
- func (c *client) newCDSRequest() *discoverypb.DiscoveryRequest {
- cdsReq := &discoverypb.DiscoveryRequest{
- Node: &basepb.Node{
- Metadata: &structpb.Struct{
- Fields: map[string]*structpb.Value{
- grpcHostname: {
- Kind: &structpb.Value_StringValue{StringValue: c.serviceName},
- },
- },
- },
- },
- TypeUrl: cdsType,
- }
- return cdsReq
- }
- func (c *client) newEDSRequest() *discoverypb.DiscoveryRequest {
- edsReq := &discoverypb.DiscoveryRequest{
- Node: &basepb.Node{
- Metadata: &structpb.Struct{
- Fields: map[string]*structpb.Value{
- endpointRequired: {
- Kind: &structpb.Value_BoolValue{BoolValue: c.enableCDS},
- },
- },
- },
- },
- ResourceNames: []string{c.serviceName},
- TypeUrl: edsType,
- }
- return edsReq
- }
- func (c *client) makeADSCall() {
- c.cli = adspb.NewAggregatedDiscoveryServiceClient(c.cc)
- retryCount := 0
- var doRetry bool
- for {
- select {
- case <-c.ctx.Done():
- return
- default:
- }
- if doRetry {
- backoffTimer := time.NewTimer(c.backoff.Backoff(retryCount))
- select {
- case <-backoffTimer.C:
- case <-c.ctx.Done():
- backoffTimer.Stop()
- return
- }
- retryCount++
- }
- firstRespReceived := c.adsCallAttempt()
- if firstRespReceived {
- retryCount = 0
- doRetry = false
- } else {
- doRetry = true
- }
- c.loseContact(c.ctx)
- }
- }
- func (c *client) adsCallAttempt() (firstRespReceived bool) {
- firstRespReceived = false
- ctx, cancel := context.WithCancel(c.ctx)
- defer cancel()
- st, err := c.cli.StreamAggregatedResources(ctx, grpc.WaitForReady(true))
- if err != nil {
- grpclog.Infof("xds: failed to initial ADS streaming RPC due to %v", err)
- return
- }
- if c.enableCDS {
- if err := st.Send(c.newCDSRequest()); err != nil {
- // current stream is broken, start a new one.
- return
- }
- }
- if err := st.Send(c.newEDSRequest()); err != nil {
- // current stream is broken, start a new one.
- return
- }
- expectCDS := c.enableCDS
- for {
- resp, err := st.Recv()
- if err != nil {
- // current stream is broken, start a new one.
- return
- }
- firstRespReceived = true
- resources := resp.GetResources()
- if len(resources) < 1 {
- grpclog.Warning("xds: ADS response contains 0 resource info.")
- // start a new call as server misbehaves by sending a ADS response with 0 resource info.
- return
- }
- if resp.GetTypeUrl() == cdsType && !c.enableCDS {
- grpclog.Warning("xds: received CDS response in custom plugin mode.")
- // start a new call as we receive CDS response when in EDS-only mode.
- return
- }
- var adsResp ptypes.DynamicAny
- if err := ptypes.UnmarshalAny(resources[0], &adsResp); err != nil {
- grpclog.Warningf("xds: failed to unmarshal resources due to %v.", err)
- return
- }
- switch adsResp.Message.(type) {
- case *cdspb.Cluster:
- expectCDS = false
- case *edspb.ClusterLoadAssignment:
- if expectCDS {
- grpclog.Warningf("xds: expecting CDS response, got EDS response instead.")
- return
- }
- }
- if err := c.newADS(c.ctx, adsResp.Message); err != nil {
- grpclog.Warningf("xds: processing new ADS message failed due to %v.", err)
- return
- }
- }
- }
- func newXDSClient(balancerName string, serviceName string, enableCDS bool, opts balancer.BuildOptions, newADS func(context.Context, proto.Message) error, loseContact func(ctx context.Context), exitCleanup func()) *client {
- c := &client{
- balancerName: balancerName,
- serviceName: serviceName,
- enableCDS: enableCDS,
- opts: opts,
- newADS: newADS,
- loseContact: loseContact,
- cleanup: exitCleanup,
- backoff: defaultBackoffConfig,
- }
- c.ctx, c.cancel = context.WithCancel(context.Background())
- return c
- }
|