xds_resolver.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. /*
  2. * Copyright 2019 gRPC authors.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. *
  16. */
  17. // Package resolver implements the xds resolver, that does LDS and RDS to find
  18. // the cluster to use.
  19. package resolver
  20. import (
  21. "context"
  22. "fmt"
  23. "google.golang.org/grpc"
  24. "google.golang.org/grpc/attributes"
  25. "google.golang.org/grpc/internal/grpclog"
  26. "google.golang.org/grpc/resolver"
  27. xdsinternal "google.golang.org/grpc/xds/internal"
  28. xdsclient "google.golang.org/grpc/xds/internal/client"
  29. "google.golang.org/grpc/xds/internal/client/bootstrap"
  30. )
  31. // xDS balancer name is xds_experimental while resolver scheme is
  32. // xds-experimental since "_" is not a valid character in the URL.
  33. const xdsScheme = "xds-experimental"
  34. // For overriding in unittests.
  35. var (
  36. newXDSClient = func(opts xdsclient.Options) (xdsClientInterface, error) {
  37. return xdsclient.New(opts)
  38. }
  39. newXDSConfig = bootstrap.NewConfig
  40. )
  41. func init() {
  42. resolver.Register(&xdsResolverBuilder{})
  43. }
  44. type xdsResolverBuilder struct{}
  45. // Build helps implement the resolver.Builder interface.
  46. //
  47. // The xds bootstrap process is performed (and a new xds client is built) every
  48. // time an xds resolver is built.
  49. func (b *xdsResolverBuilder) Build(t resolver.Target, cc resolver.ClientConn, rbo resolver.BuildOptions) (resolver.Resolver, error) {
  50. config, err := newXDSConfig()
  51. if err != nil {
  52. return nil, fmt.Errorf("xds: failed to read bootstrap file: %v", err)
  53. }
  54. r := &xdsResolver{
  55. target: t,
  56. cc: cc,
  57. updateCh: make(chan suWithError, 1),
  58. }
  59. r.logger = grpclog.NewPrefixLogger(loggingPrefix(r))
  60. r.logger.Infof("Creating resolver for target: %+v", t)
  61. if config.Creds == nil {
  62. // TODO: Once we start supporting a mechanism to register credential
  63. // types, a failure to find the credential type mentioned in the
  64. // bootstrap file should result in a failure, and not in using
  65. // credentials from the parent channel (passed through the
  66. // resolver.BuildOptions).
  67. config.Creds = r.defaultDialCreds(config.BalancerName, rbo)
  68. }
  69. var dopts []grpc.DialOption
  70. if rbo.Dialer != nil {
  71. dopts = []grpc.DialOption{grpc.WithContextDialer(rbo.Dialer)}
  72. }
  73. client, err := newXDSClient(xdsclient.Options{Config: *config, DialOpts: dopts, TargetName: t.Endpoint})
  74. if err != nil {
  75. return nil, fmt.Errorf("xds: failed to create xds-client: %v", err)
  76. }
  77. r.client = client
  78. r.ctx, r.cancelCtx = context.WithCancel(context.Background())
  79. cancelWatch := r.client.WatchService(r.target.Endpoint, r.handleServiceUpdate)
  80. r.logger.Infof("Watch started on resource name %v with xds-client %p", r.target.Endpoint, r.client)
  81. r.cancelWatch = func() {
  82. cancelWatch()
  83. r.logger.Infof("Watch cancel on resource name %v with xds-client %p", r.target.Endpoint, r.client)
  84. }
  85. go r.run()
  86. return r, nil
  87. }
  88. // defaultDialCreds builds a DialOption containing the credentials to be used
  89. // while talking to the xDS server (this is done only if the xds bootstrap
  90. // process does not return any credentials to use). If the parent channel
  91. // contains DialCreds, we use it as is. If it contains a CredsBundle, we use
  92. // just the transport credentials from the bundle. If we don't find any
  93. // credentials on the parent channel, we resort to using an insecure channel.
  94. func (r *xdsResolver) defaultDialCreds(balancerName string, rbo resolver.BuildOptions) grpc.DialOption {
  95. switch {
  96. case rbo.DialCreds != nil:
  97. if err := rbo.DialCreds.OverrideServerName(balancerName); err != nil {
  98. r.logger.Errorf("Failed to override server name in credentials: %v, using Insecure", err)
  99. return grpc.WithInsecure()
  100. }
  101. return grpc.WithTransportCredentials(rbo.DialCreds)
  102. case rbo.CredsBundle != nil:
  103. return grpc.WithTransportCredentials(rbo.CredsBundle.TransportCredentials())
  104. default:
  105. r.logger.Warningf("No credentials available, using Insecure")
  106. return grpc.WithInsecure()
  107. }
  108. }
  109. // Name helps implement the resolver.Builder interface.
  110. func (*xdsResolverBuilder) Scheme() string {
  111. return xdsScheme
  112. }
  113. // xdsClientInterface contains methods from xdsClient.Client which are used by
  114. // the resolver. This will be faked out in unittests.
  115. type xdsClientInterface interface {
  116. WatchService(string, func(xdsclient.ServiceUpdate, error)) func()
  117. Close()
  118. }
  119. // suWithError wraps the ServiceUpdate and error received through a watch API
  120. // callback, so that it can pushed onto the update channel as a single entity.
  121. type suWithError struct {
  122. su xdsclient.ServiceUpdate
  123. err error
  124. }
  125. // xdsResolver implements the resolver.Resolver interface.
  126. //
  127. // It registers a watcher for ServiceConfig updates with the xdsClient object
  128. // (which performs LDS/RDS queries for the same), and passes the received
  129. // updates to the ClientConn.
  130. type xdsResolver struct {
  131. ctx context.Context
  132. cancelCtx context.CancelFunc
  133. target resolver.Target
  134. cc resolver.ClientConn
  135. logger *grpclog.PrefixLogger
  136. // The underlying xdsClient which performs all xDS requests and responses.
  137. client xdsClientInterface
  138. // A channel for the watch API callback to write service updates on to. The
  139. // updates are read by the run goroutine and passed on to the ClientConn.
  140. updateCh chan suWithError
  141. // cancelWatch is the function to cancel the watcher.
  142. cancelWatch func()
  143. }
  144. const jsonFormatSC = `{
  145. "loadBalancingConfig":[
  146. {
  147. "cds_experimental":{
  148. "Cluster": "%s"
  149. }
  150. }
  151. ]
  152. }`
  153. // run is a long running goroutine which blocks on receiving service updates
  154. // and passes it on the ClientConn.
  155. func (r *xdsResolver) run() {
  156. for {
  157. select {
  158. case <-r.ctx.Done():
  159. case update := <-r.updateCh:
  160. if update.err != nil {
  161. r.logger.Warningf("Watch error on resource %v from xds-client %p, %v", r.target.Endpoint, r.client, update.err)
  162. r.cc.ReportError(update.err)
  163. continue
  164. }
  165. sc := fmt.Sprintf(jsonFormatSC, update.su.Cluster)
  166. r.logger.Infof("Received update on resource %v from xds-client %p, generated service config: %v", r.target.Endpoint, r.client, sc)
  167. r.cc.UpdateState(resolver.State{
  168. ServiceConfig: r.cc.ParseServiceConfig(sc),
  169. Attributes: attributes.New(xdsinternal.XDSClientID, r.client),
  170. })
  171. }
  172. }
  173. }
  174. // handleServiceUpdate is the callback which handles service updates. It writes
  175. // the received update to the update channel, which is picked by the run
  176. // goroutine.
  177. func (r *xdsResolver) handleServiceUpdate(su xdsclient.ServiceUpdate, err error) {
  178. if r.ctx.Err() != nil {
  179. // Do not pass updates to the ClientConn once the resolver is closed.
  180. return
  181. }
  182. r.updateCh <- suWithError{su, err}
  183. }
  184. // ResolveNow is a no-op at this point.
  185. func (*xdsResolver) ResolveNow(o resolver.ResolveNowOptions) {}
  186. // Close closes the resolver, and also closes the underlying xdsClient.
  187. func (r *xdsResolver) Close() {
  188. r.cancelWatch()
  189. r.client.Close()
  190. r.cancelCtx()
  191. r.logger.Infof("Shutdown")
  192. }