resolver_conn_wrapper.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "fmt"
  21. "strings"
  22. "sync"
  23. "time"
  24. "google.golang.org/grpc/balancer"
  25. "google.golang.org/grpc/credentials"
  26. "google.golang.org/grpc/internal/channelz"
  27. "google.golang.org/grpc/internal/grpcsync"
  28. "google.golang.org/grpc/resolver"
  29. "google.golang.org/grpc/serviceconfig"
  30. )
  31. // ccResolverWrapper is a wrapper on top of cc for resolvers.
  32. // It implements resolver.ClientConn interface.
  33. type ccResolverWrapper struct {
  34. cc *ClientConn
  35. resolverMu sync.Mutex
  36. resolver resolver.Resolver
  37. done *grpcsync.Event
  38. curState resolver.State
  39. pollingMu sync.Mutex
  40. polling chan struct{}
  41. }
  42. // newCCResolverWrapper uses the resolver.Builder to build a Resolver and
  43. // returns a ccResolverWrapper object which wraps the newly built resolver.
  44. func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error) {
  45. ccr := &ccResolverWrapper{
  46. cc: cc,
  47. done: grpcsync.NewEvent(),
  48. }
  49. var credsClone credentials.TransportCredentials
  50. if creds := cc.dopts.copts.TransportCredentials; creds != nil {
  51. credsClone = creds.Clone()
  52. }
  53. rbo := resolver.BuildOptions{
  54. DisableServiceConfig: cc.dopts.disableServiceConfig,
  55. DialCreds: credsClone,
  56. CredsBundle: cc.dopts.copts.CredsBundle,
  57. Dialer: cc.dopts.copts.Dialer,
  58. }
  59. var err error
  60. // We need to hold the lock here while we assign to the ccr.resolver field
  61. // to guard against a data race caused by the following code path,
  62. // rb.Build-->ccr.ReportError-->ccr.poll-->ccr.resolveNow, would end up
  63. // accessing ccr.resolver which is being assigned here.
  64. ccr.resolverMu.Lock()
  65. defer ccr.resolverMu.Unlock()
  66. ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)
  67. if err != nil {
  68. return nil, err
  69. }
  70. return ccr, nil
  71. }
  72. func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) {
  73. ccr.resolverMu.Lock()
  74. if !ccr.done.HasFired() {
  75. ccr.resolver.ResolveNow(o)
  76. }
  77. ccr.resolverMu.Unlock()
  78. }
  79. func (ccr *ccResolverWrapper) close() {
  80. ccr.resolverMu.Lock()
  81. ccr.resolver.Close()
  82. ccr.done.Fire()
  83. ccr.resolverMu.Unlock()
  84. }
  85. // poll begins or ends asynchronous polling of the resolver based on whether
  86. // err is ErrBadResolverState.
  87. func (ccr *ccResolverWrapper) poll(err error) {
  88. ccr.pollingMu.Lock()
  89. defer ccr.pollingMu.Unlock()
  90. if err != balancer.ErrBadResolverState {
  91. // stop polling
  92. if ccr.polling != nil {
  93. close(ccr.polling)
  94. ccr.polling = nil
  95. }
  96. return
  97. }
  98. if ccr.polling != nil {
  99. // already polling
  100. return
  101. }
  102. p := make(chan struct{})
  103. ccr.polling = p
  104. go func() {
  105. for i := 0; ; i++ {
  106. ccr.resolveNow(resolver.ResolveNowOptions{})
  107. t := time.NewTimer(ccr.cc.dopts.resolveNowBackoff(i))
  108. select {
  109. case <-p:
  110. t.Stop()
  111. return
  112. case <-ccr.done.Done():
  113. // Resolver has been closed.
  114. t.Stop()
  115. return
  116. case <-t.C:
  117. select {
  118. case <-p:
  119. return
  120. default:
  121. }
  122. // Timer expired; re-resolve.
  123. }
  124. }
  125. }()
  126. }
  127. func (ccr *ccResolverWrapper) UpdateState(s resolver.State) {
  128. if ccr.done.HasFired() {
  129. return
  130. }
  131. channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: sending update to cc: %v", s)
  132. if channelz.IsOn() {
  133. ccr.addChannelzTraceEvent(s)
  134. }
  135. ccr.curState = s
  136. ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
  137. }
  138. func (ccr *ccResolverWrapper) ReportError(err error) {
  139. if ccr.done.HasFired() {
  140. return
  141. }
  142. channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: reporting error to cc: %v", err)
  143. ccr.poll(ccr.cc.updateResolverState(resolver.State{}, err))
  144. }
  145. // NewAddress is called by the resolver implementation to send addresses to gRPC.
  146. func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
  147. if ccr.done.HasFired() {
  148. return
  149. }
  150. channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: sending new addresses to cc: %v", addrs)
  151. if channelz.IsOn() {
  152. ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig})
  153. }
  154. ccr.curState.Addresses = addrs
  155. ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
  156. }
  157. // NewServiceConfig is called by the resolver implementation to send service
  158. // configs to gRPC.
  159. func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
  160. if ccr.done.HasFired() {
  161. return
  162. }
  163. channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: got new service config: %v", sc)
  164. if ccr.cc.dopts.disableServiceConfig {
  165. channelz.Info(logger, ccr.cc.channelzID, "Service config lookups disabled; ignoring config")
  166. return
  167. }
  168. scpr := parseServiceConfig(sc)
  169. if scpr.Err != nil {
  170. channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: error parsing service config: %v", scpr.Err)
  171. ccr.poll(balancer.ErrBadResolverState)
  172. return
  173. }
  174. if channelz.IsOn() {
  175. ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: scpr})
  176. }
  177. ccr.curState.ServiceConfig = scpr
  178. ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
  179. }
  180. func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.ParseResult {
  181. return parseServiceConfig(scJSON)
  182. }
  183. func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {
  184. var updates []string
  185. var oldSC, newSC *ServiceConfig
  186. var oldOK, newOK bool
  187. if ccr.curState.ServiceConfig != nil {
  188. oldSC, oldOK = ccr.curState.ServiceConfig.Config.(*ServiceConfig)
  189. }
  190. if s.ServiceConfig != nil {
  191. newSC, newOK = s.ServiceConfig.Config.(*ServiceConfig)
  192. }
  193. if oldOK != newOK || (oldOK && newOK && oldSC.rawJSONString != newSC.rawJSONString) {
  194. updates = append(updates, "service config updated")
  195. }
  196. if len(ccr.curState.Addresses) > 0 && len(s.Addresses) == 0 {
  197. updates = append(updates, "resolver returned an empty address list")
  198. } else if len(ccr.curState.Addresses) == 0 && len(s.Addresses) > 0 {
  199. updates = append(updates, "resolver returned new addresses")
  200. }
  201. channelz.AddTraceEvent(logger, ccr.cc.channelzID, 0, &channelz.TraceEventDesc{
  202. Desc: fmt.Sprintf("Resolver state updated: %+v (%v)", s, strings.Join(updates, "; ")),
  203. Severity: channelz.CtINFO,
  204. })
  205. }