xds_client.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. // +build go1.12
  2. /*
  3. *
  4. * Copyright 2019 gRPC authors.
  5. *
  6. * Licensed under the Apache License, Version 2.0 (the "License");
  7. * you may not use this file except in compliance with the License.
  8. * You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. *
  18. */
  19. package xds
  20. import (
  21. "context"
  22. "sync"
  23. "time"
  24. "github.com/golang/protobuf/proto"
  25. "github.com/golang/protobuf/ptypes"
  26. structpb "github.com/golang/protobuf/ptypes/struct"
  27. "google.golang.org/grpc"
  28. "google.golang.org/grpc/balancer"
  29. cdspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/cds"
  30. basepb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/core/base"
  31. discoverypb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/discovery"
  32. edspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/eds"
  33. adspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/service/discovery/v2/ads"
  34. "google.golang.org/grpc/grpclog"
  35. "google.golang.org/grpc/internal/backoff"
  36. "google.golang.org/grpc/internal/channelz"
  37. )
  38. const (
  39. grpcHostname = "com.googleapis.trafficdirector.grpc_hostname"
  40. cdsType = "type.googleapis.com/envoy.api.v2.Cluster"
  41. edsType = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"
  42. endpointRequired = "endpoints_required"
  43. )
  44. var (
  45. defaultBackoffConfig = backoff.Exponential{
  46. MaxDelay: 120 * time.Second,
  47. }
  48. )
  49. // client is responsible for connecting to the specified traffic director, passing the received
  50. // ADS response from the traffic director, and sending notification when communication with the
  51. // traffic director is lost.
  52. type client struct {
  53. ctx context.Context
  54. cancel context.CancelFunc
  55. cli adspb.AggregatedDiscoveryServiceClient
  56. opts balancer.BuildOptions
  57. balancerName string // the traffic director name
  58. serviceName string // the user dial target name
  59. enableCDS bool
  60. newADS func(ctx context.Context, resp proto.Message) error
  61. loseContact func(ctx context.Context)
  62. cleanup func()
  63. backoff backoff.Strategy
  64. mu sync.Mutex
  65. cc *grpc.ClientConn
  66. }
  67. func (c *client) run() {
  68. c.dial()
  69. c.makeADSCall()
  70. }
  71. func (c *client) close() {
  72. c.cancel()
  73. c.mu.Lock()
  74. if c.cc != nil {
  75. c.cc.Close()
  76. }
  77. c.mu.Unlock()
  78. c.cleanup()
  79. }
  80. func (c *client) dial() {
  81. var dopts []grpc.DialOption
  82. if creds := c.opts.DialCreds; creds != nil {
  83. if err := creds.OverrideServerName(c.balancerName); err == nil {
  84. dopts = append(dopts, grpc.WithTransportCredentials(creds))
  85. } else {
  86. grpclog.Warningf("xds: failed to override the server name in the credentials: %v, using Insecure", err)
  87. dopts = append(dopts, grpc.WithInsecure())
  88. }
  89. } else {
  90. dopts = append(dopts, grpc.WithInsecure())
  91. }
  92. if c.opts.Dialer != nil {
  93. dopts = append(dopts, grpc.WithContextDialer(c.opts.Dialer))
  94. }
  95. // Explicitly set pickfirst as the balancer.
  96. dopts = append(dopts, grpc.WithBalancerName(grpc.PickFirstBalancerName))
  97. if channelz.IsOn() {
  98. dopts = append(dopts, grpc.WithChannelzParentID(c.opts.ChannelzParentID))
  99. }
  100. cc, err := grpc.DialContext(c.ctx, c.balancerName, dopts...)
  101. // Since this is a non-blocking dial, so if it fails, it due to some serious error (not network
  102. // related) error.
  103. if err != nil {
  104. grpclog.Fatalf("xds: failed to dial: %v", err)
  105. }
  106. c.mu.Lock()
  107. select {
  108. case <-c.ctx.Done():
  109. cc.Close()
  110. default:
  111. // only assign c.cc when xds client has not been closed, to prevent ClientConn leak.
  112. c.cc = cc
  113. }
  114. c.mu.Unlock()
  115. }
  116. func (c *client) newCDSRequest() *discoverypb.DiscoveryRequest {
  117. cdsReq := &discoverypb.DiscoveryRequest{
  118. Node: &basepb.Node{
  119. Metadata: &structpb.Struct{
  120. Fields: map[string]*structpb.Value{
  121. grpcHostname: {
  122. Kind: &structpb.Value_StringValue{StringValue: c.serviceName},
  123. },
  124. },
  125. },
  126. },
  127. TypeUrl: cdsType,
  128. }
  129. return cdsReq
  130. }
  131. func (c *client) newEDSRequest() *discoverypb.DiscoveryRequest {
  132. edsReq := &discoverypb.DiscoveryRequest{
  133. Node: &basepb.Node{
  134. Metadata: &structpb.Struct{
  135. Fields: map[string]*structpb.Value{
  136. endpointRequired: {
  137. Kind: &structpb.Value_BoolValue{BoolValue: c.enableCDS},
  138. },
  139. },
  140. },
  141. },
  142. ResourceNames: []string{c.serviceName},
  143. TypeUrl: edsType,
  144. }
  145. return edsReq
  146. }
  147. func (c *client) makeADSCall() {
  148. c.cli = adspb.NewAggregatedDiscoveryServiceClient(c.cc)
  149. retryCount := 0
  150. var doRetry bool
  151. for {
  152. select {
  153. case <-c.ctx.Done():
  154. return
  155. default:
  156. }
  157. if doRetry {
  158. backoffTimer := time.NewTimer(c.backoff.Backoff(retryCount))
  159. select {
  160. case <-backoffTimer.C:
  161. case <-c.ctx.Done():
  162. backoffTimer.Stop()
  163. return
  164. }
  165. retryCount++
  166. }
  167. firstRespReceived := c.adsCallAttempt()
  168. if firstRespReceived {
  169. retryCount = 0
  170. doRetry = false
  171. } else {
  172. doRetry = true
  173. }
  174. c.loseContact(c.ctx)
  175. }
  176. }
  177. func (c *client) adsCallAttempt() (firstRespReceived bool) {
  178. firstRespReceived = false
  179. ctx, cancel := context.WithCancel(c.ctx)
  180. defer cancel()
  181. st, err := c.cli.StreamAggregatedResources(ctx, grpc.WaitForReady(true))
  182. if err != nil {
  183. grpclog.Infof("xds: failed to initial ADS streaming RPC due to %v", err)
  184. return
  185. }
  186. if c.enableCDS {
  187. if err := st.Send(c.newCDSRequest()); err != nil {
  188. // current stream is broken, start a new one.
  189. return
  190. }
  191. }
  192. if err := st.Send(c.newEDSRequest()); err != nil {
  193. // current stream is broken, start a new one.
  194. return
  195. }
  196. expectCDS := c.enableCDS
  197. for {
  198. resp, err := st.Recv()
  199. if err != nil {
  200. // current stream is broken, start a new one.
  201. return
  202. }
  203. firstRespReceived = true
  204. resources := resp.GetResources()
  205. if len(resources) < 1 {
  206. grpclog.Warning("xds: ADS response contains 0 resource info.")
  207. // start a new call as server misbehaves by sending a ADS response with 0 resource info.
  208. return
  209. }
  210. if resp.GetTypeUrl() == cdsType && !c.enableCDS {
  211. grpclog.Warning("xds: received CDS response in custom plugin mode.")
  212. // start a new call as we receive CDS response when in EDS-only mode.
  213. return
  214. }
  215. var adsResp ptypes.DynamicAny
  216. if err := ptypes.UnmarshalAny(resources[0], &adsResp); err != nil {
  217. grpclog.Warningf("xds: failed to unmarshal resources due to %v.", err)
  218. return
  219. }
  220. switch adsResp.Message.(type) {
  221. case *cdspb.Cluster:
  222. expectCDS = false
  223. case *edspb.ClusterLoadAssignment:
  224. if expectCDS {
  225. grpclog.Warningf("xds: expecting CDS response, got EDS response instead.")
  226. return
  227. }
  228. }
  229. if err := c.newADS(c.ctx, adsResp.Message); err != nil {
  230. grpclog.Warningf("xds: processing new ADS message failed due to %v.", err)
  231. return
  232. }
  233. }
  234. }
  235. func newXDSClient(balancerName string, serviceName string, enableCDS bool, opts balancer.BuildOptions, newADS func(context.Context, proto.Message) error, loseContact func(ctx context.Context), exitCleanup func()) *client {
  236. c := &client{
  237. balancerName: balancerName,
  238. serviceName: serviceName,
  239. enableCDS: enableCDS,
  240. opts: opts,
  241. newADS: newADS,
  242. loseContact: loseContact,
  243. cleanup: exitCleanup,
  244. backoff: defaultBackoffConfig,
  245. }
  246. c.ctx, c.cancel = context.WithCancel(context.Background())
  247. return c
  248. }