balancer_switching_test.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535
  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "context"
  21. "fmt"
  22. "math"
  23. "testing"
  24. "time"
  25. "google.golang.org/grpc/balancer"
  26. "google.golang.org/grpc/balancer/roundrobin"
  27. "google.golang.org/grpc/internal"
  28. "google.golang.org/grpc/resolver"
  29. "google.golang.org/grpc/resolver/manual"
  30. "google.golang.org/grpc/serviceconfig"
  31. )
  32. var _ balancer.Builder = &magicalLB{}
  33. var _ balancer.Balancer = &magicalLB{}
  34. // magicalLB is a ringer for grpclb. It is used to avoid circular dependencies on the grpclb package
  35. type magicalLB struct{}
  36. func (b *magicalLB) Name() string {
  37. return "grpclb"
  38. }
  39. func (b *magicalLB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
  40. return b
  41. }
  42. func (b *magicalLB) ResolverError(error) {}
  43. func (b *magicalLB) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) {}
  44. func (b *magicalLB) UpdateClientConnState(balancer.ClientConnState) error {
  45. return nil
  46. }
  47. func (b *magicalLB) Close() {}
  48. func init() {
  49. balancer.Register(&magicalLB{})
  50. }
  51. func startServers(t *testing.T, numServers int, maxStreams uint32) ([]*server, func()) {
  52. var servers []*server
  53. for i := 0; i < numServers; i++ {
  54. s := newTestServer()
  55. servers = append(servers, s)
  56. go s.start(t, 0, maxStreams)
  57. s.wait(t, 2*time.Second)
  58. }
  59. return servers, func() {
  60. for i := 0; i < numServers; i++ {
  61. servers[i].stop()
  62. }
  63. }
  64. }
  65. func checkPickFirst(cc *ClientConn, servers []*server) error {
  66. var (
  67. req = "port"
  68. reply string
  69. err error
  70. )
  71. connected := false
  72. for i := 0; i < 5000; i++ {
  73. if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); errorDesc(err) == servers[0].port {
  74. if connected {
  75. // connected is set to false if peer is not server[0]. So if
  76. // connected is true here, this is the second time we saw
  77. // server[0] in a row. Break because pickfirst is in effect.
  78. break
  79. }
  80. connected = true
  81. } else {
  82. connected = false
  83. }
  84. time.Sleep(time.Millisecond)
  85. }
  86. if !connected {
  87. return fmt.Errorf("pickfirst is not in effect after 5 second, EmptyCall() = _, %v, want _, %v", err, servers[0].port)
  88. }
  89. // The following RPCs should all succeed with the first server.
  90. for i := 0; i < 3; i++ {
  91. err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply)
  92. if errorDesc(err) != servers[0].port {
  93. return fmt.Errorf("index %d: want peer %v, got peer %v", i, servers[0].port, err)
  94. }
  95. }
  96. return nil
  97. }
  98. func checkRoundRobin(cc *ClientConn, servers []*server) error {
  99. var (
  100. req = "port"
  101. reply string
  102. err error
  103. )
  104. // Make sure connections to all servers are up.
  105. for i := 0; i < 2; i++ {
  106. // Do this check twice, otherwise the first RPC's transport may still be
  107. // picked by the closing pickfirst balancer, and the test becomes flaky.
  108. for _, s := range servers {
  109. var up bool
  110. for i := 0; i < 5000; i++ {
  111. if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); errorDesc(err) == s.port {
  112. up = true
  113. break
  114. }
  115. time.Sleep(time.Millisecond)
  116. }
  117. if !up {
  118. return fmt.Errorf("server %v is not up within 5 second", s.port)
  119. }
  120. }
  121. }
  122. serverCount := len(servers)
  123. for i := 0; i < 3*serverCount; i++ {
  124. err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply)
  125. if errorDesc(err) != servers[i%serverCount].port {
  126. return fmt.Errorf("index %d: want peer %v, got peer %v", i, servers[i%serverCount].port, err)
  127. }
  128. }
  129. return nil
  130. }
  131. func (s) TestSwitchBalancer(t *testing.T) {
  132. r := manual.NewBuilderWithScheme("whatever")
  133. const numServers = 2
  134. servers, scleanup := startServers(t, numServers, math.MaxInt32)
  135. defer scleanup()
  136. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
  137. if err != nil {
  138. t.Fatalf("failed to dial: %v", err)
  139. }
  140. defer cc.Close()
  141. addrs := []resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}}
  142. r.UpdateState(resolver.State{Addresses: addrs})
  143. // The default balancer is pickfirst.
  144. if err := checkPickFirst(cc, servers); err != nil {
  145. t.Fatalf("check pickfirst returned non-nil error: %v", err)
  146. }
  147. // Switch to roundrobin.
  148. cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`), Addresses: addrs}, nil)
  149. if err := checkRoundRobin(cc, servers); err != nil {
  150. t.Fatalf("check roundrobin returned non-nil error: %v", err)
  151. }
  152. // Switch to pickfirst.
  153. cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "pick_first"}`), Addresses: addrs}, nil)
  154. if err := checkPickFirst(cc, servers); err != nil {
  155. t.Fatalf("check pickfirst returned non-nil error: %v", err)
  156. }
  157. }
  158. // Test that balancer specified by dial option will not be overridden.
  159. func (s) TestBalancerDialOption(t *testing.T) {
  160. r := manual.NewBuilderWithScheme("whatever")
  161. const numServers = 2
  162. servers, scleanup := startServers(t, numServers, math.MaxInt32)
  163. defer scleanup()
  164. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}), WithBalancerName(roundrobin.Name))
  165. if err != nil {
  166. t.Fatalf("failed to dial: %v", err)
  167. }
  168. defer cc.Close()
  169. addrs := []resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}}
  170. r.UpdateState(resolver.State{Addresses: addrs})
  171. // The init balancer is roundrobin.
  172. if err := checkRoundRobin(cc, servers); err != nil {
  173. t.Fatalf("check roundrobin returned non-nil error: %v", err)
  174. }
  175. // Switch to pickfirst.
  176. cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "pick_first"}`), Addresses: addrs}, nil)
  177. // Balancer is still roundrobin.
  178. if err := checkRoundRobin(cc, servers); err != nil {
  179. t.Fatalf("check roundrobin returned non-nil error: %v", err)
  180. }
  181. }
  182. // First addr update contains grpclb.
  183. func (s) TestSwitchBalancerGRPCLBFirst(t *testing.T) {
  184. r := manual.NewBuilderWithScheme("whatever")
  185. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
  186. if err != nil {
  187. t.Fatalf("failed to dial: %v", err)
  188. }
  189. defer cc.Close()
  190. // ClientConn will switch balancer to grpclb when receives an address of
  191. // type GRPCLB.
  192. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}, {Addr: "grpclb", Type: resolver.GRPCLB}}})
  193. var isGRPCLB bool
  194. for i := 0; i < 5000; i++ {
  195. cc.mu.Lock()
  196. isGRPCLB = cc.curBalancerName == "grpclb"
  197. cc.mu.Unlock()
  198. if isGRPCLB {
  199. break
  200. }
  201. time.Sleep(time.Millisecond)
  202. }
  203. if !isGRPCLB {
  204. t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
  205. }
  206. // New update containing new backend and new grpclb. Should not switch
  207. // balancer.
  208. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend2"}, {Addr: "grpclb2", Type: resolver.GRPCLB}}})
  209. for i := 0; i < 200; i++ {
  210. cc.mu.Lock()
  211. isGRPCLB = cc.curBalancerName == "grpclb"
  212. cc.mu.Unlock()
  213. if !isGRPCLB {
  214. break
  215. }
  216. time.Sleep(time.Millisecond)
  217. }
  218. if !isGRPCLB {
  219. t.Fatalf("within 200 ms, cc.balancer switched to !grpclb, want grpclb")
  220. }
  221. var isPickFirst bool
  222. // Switch balancer to pickfirst.
  223. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}})
  224. for i := 0; i < 5000; i++ {
  225. cc.mu.Lock()
  226. isPickFirst = cc.curBalancerName == PickFirstBalancerName
  227. cc.mu.Unlock()
  228. if isPickFirst {
  229. break
  230. }
  231. time.Sleep(time.Millisecond)
  232. }
  233. if !isPickFirst {
  234. t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName)
  235. }
  236. }
  237. // First addr update does not contain grpclb.
  238. func (s) TestSwitchBalancerGRPCLBSecond(t *testing.T) {
  239. r := manual.NewBuilderWithScheme("whatever")
  240. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
  241. if err != nil {
  242. t.Fatalf("failed to dial: %v", err)
  243. }
  244. defer cc.Close()
  245. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}})
  246. var isPickFirst bool
  247. for i := 0; i < 5000; i++ {
  248. cc.mu.Lock()
  249. isPickFirst = cc.curBalancerName == PickFirstBalancerName
  250. cc.mu.Unlock()
  251. if isPickFirst {
  252. break
  253. }
  254. time.Sleep(time.Millisecond)
  255. }
  256. if !isPickFirst {
  257. t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName)
  258. }
  259. // ClientConn will switch balancer to grpclb when receives an address of
  260. // type GRPCLB.
  261. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}, {Addr: "grpclb", Type: resolver.GRPCLB}}})
  262. var isGRPCLB bool
  263. for i := 0; i < 5000; i++ {
  264. cc.mu.Lock()
  265. isGRPCLB = cc.curBalancerName == "grpclb"
  266. cc.mu.Unlock()
  267. if isGRPCLB {
  268. break
  269. }
  270. time.Sleep(time.Millisecond)
  271. }
  272. if !isGRPCLB {
  273. t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
  274. }
  275. // New update containing new backend and new grpclb. Should not switch
  276. // balancer.
  277. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend2"}, {Addr: "grpclb2", Type: resolver.GRPCLB}}})
  278. for i := 0; i < 200; i++ {
  279. cc.mu.Lock()
  280. isGRPCLB = cc.curBalancerName == "grpclb"
  281. cc.mu.Unlock()
  282. if !isGRPCLB {
  283. break
  284. }
  285. time.Sleep(time.Millisecond)
  286. }
  287. if !isGRPCLB {
  288. t.Fatalf("within 200 ms, cc.balancer switched to !grpclb, want grpclb")
  289. }
  290. // Switch balancer back.
  291. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}})
  292. for i := 0; i < 5000; i++ {
  293. cc.mu.Lock()
  294. isPickFirst = cc.curBalancerName == PickFirstBalancerName
  295. cc.mu.Unlock()
  296. if isPickFirst {
  297. break
  298. }
  299. time.Sleep(time.Millisecond)
  300. }
  301. if !isPickFirst {
  302. t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName)
  303. }
  304. }
  305. // Test that if the current balancer is roundrobin, after switching to grpclb,
  306. // when the resolved address doesn't contain grpclb addresses, balancer will be
  307. // switched back to roundrobin.
  308. func (s) TestSwitchBalancerGRPCLBRoundRobin(t *testing.T) {
  309. r := manual.NewBuilderWithScheme("whatever")
  310. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
  311. if err != nil {
  312. t.Fatalf("failed to dial: %v", err)
  313. }
  314. defer cc.Close()
  315. sc := parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)
  316. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}, ServiceConfig: sc})
  317. var isRoundRobin bool
  318. for i := 0; i < 5000; i++ {
  319. cc.mu.Lock()
  320. isRoundRobin = cc.curBalancerName == "round_robin"
  321. cc.mu.Unlock()
  322. if isRoundRobin {
  323. break
  324. }
  325. time.Sleep(time.Millisecond)
  326. }
  327. if !isRoundRobin {
  328. t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName)
  329. }
  330. // ClientConn will switch balancer to grpclb when receives an address of
  331. // type GRPCLB.
  332. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "grpclb", Type: resolver.GRPCLB}}, ServiceConfig: sc})
  333. var isGRPCLB bool
  334. for i := 0; i < 5000; i++ {
  335. cc.mu.Lock()
  336. isGRPCLB = cc.curBalancerName == "grpclb"
  337. cc.mu.Unlock()
  338. if isGRPCLB {
  339. break
  340. }
  341. time.Sleep(time.Millisecond)
  342. }
  343. if !isGRPCLB {
  344. t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
  345. }
  346. // Switch balancer back.
  347. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}, ServiceConfig: sc})
  348. for i := 0; i < 5000; i++ {
  349. cc.mu.Lock()
  350. isRoundRobin = cc.curBalancerName == "round_robin"
  351. cc.mu.Unlock()
  352. if isRoundRobin {
  353. break
  354. }
  355. time.Sleep(time.Millisecond)
  356. }
  357. if !isRoundRobin {
  358. t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName)
  359. }
  360. }
  361. // Test that if resolved address list contains grpclb, the balancer option in
  362. // service config won't take effect. But when there's no grpclb address in a new
  363. // resolved address list, balancer will be switched to the new one.
  364. func (s) TestSwitchBalancerGRPCLBServiceConfig(t *testing.T) {
  365. r := manual.NewBuilderWithScheme("whatever")
  366. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
  367. if err != nil {
  368. t.Fatalf("failed to dial: %v", err)
  369. }
  370. defer cc.Close()
  371. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}})
  372. var isPickFirst bool
  373. for i := 0; i < 5000; i++ {
  374. cc.mu.Lock()
  375. isPickFirst = cc.curBalancerName == PickFirstBalancerName
  376. cc.mu.Unlock()
  377. if isPickFirst {
  378. break
  379. }
  380. time.Sleep(time.Millisecond)
  381. }
  382. if !isPickFirst {
  383. t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName)
  384. }
  385. // ClientConn will switch balancer to grpclb when receives an address of
  386. // type GRPCLB.
  387. addrs := []resolver.Address{{Addr: "grpclb", Type: resolver.GRPCLB}}
  388. r.UpdateState(resolver.State{Addresses: addrs})
  389. var isGRPCLB bool
  390. for i := 0; i < 5000; i++ {
  391. cc.mu.Lock()
  392. isGRPCLB = cc.curBalancerName == "grpclb"
  393. cc.mu.Unlock()
  394. if isGRPCLB {
  395. break
  396. }
  397. time.Sleep(time.Millisecond)
  398. }
  399. if !isGRPCLB {
  400. t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
  401. }
  402. sc := parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)
  403. r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: sc})
  404. var isRoundRobin bool
  405. for i := 0; i < 200; i++ {
  406. cc.mu.Lock()
  407. isRoundRobin = cc.curBalancerName == "round_robin"
  408. cc.mu.Unlock()
  409. if isRoundRobin {
  410. break
  411. }
  412. time.Sleep(time.Millisecond)
  413. }
  414. // Balancer should NOT switch to round_robin because resolved list contains
  415. // grpclb.
  416. if isRoundRobin {
  417. t.Fatalf("within 200 ms, cc.balancer switched to round_robin, want grpclb")
  418. }
  419. // Switch balancer back.
  420. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}, ServiceConfig: sc})
  421. for i := 0; i < 5000; i++ {
  422. cc.mu.Lock()
  423. isRoundRobin = cc.curBalancerName == "round_robin"
  424. cc.mu.Unlock()
  425. if isRoundRobin {
  426. break
  427. }
  428. time.Sleep(time.Millisecond)
  429. }
  430. if !isRoundRobin {
  431. t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName)
  432. }
  433. }
  434. // Test that when switching to grpclb fails because grpclb is not registered,
  435. // the fallback balancer will only get backend addresses, not the grpclb server
  436. // address.
  437. //
  438. // The tests sends 3 server addresses (all backends) as resolved addresses, but
  439. // claim the first one is grpclb server. The all RPCs should all be send to the
  440. // other addresses, not the first one.
  441. func (s) TestSwitchBalancerGRPCLBWithGRPCLBNotRegistered(t *testing.T) {
  442. internal.BalancerUnregister("grpclb")
  443. defer balancer.Register(&magicalLB{})
  444. r := manual.NewBuilderWithScheme("whatever")
  445. const numServers = 3
  446. servers, scleanup := startServers(t, numServers, math.MaxInt32)
  447. defer scleanup()
  448. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
  449. if err != nil {
  450. t.Fatalf("failed to dial: %v", err)
  451. }
  452. defer cc.Close()
  453. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[1].addr}, {Addr: servers[2].addr}}})
  454. // The default balancer is pickfirst.
  455. if err := checkPickFirst(cc, servers[1:]); err != nil {
  456. t.Fatalf("check pickfirst returned non-nil error: %v", err)
  457. }
  458. // Try switching to grpclb by sending servers[0] as grpclb address. It's
  459. // expected that servers[0] will be filtered out, so it will not be used by
  460. // the balancer.
  461. //
  462. // If the filtering failed, servers[0] will be used for RPCs and the RPCs
  463. // will succeed. The following checks will catch this and fail.
  464. addrs := []resolver.Address{
  465. {Addr: servers[0].addr, Type: resolver.GRPCLB},
  466. {Addr: servers[1].addr}, {Addr: servers[2].addr}}
  467. r.UpdateState(resolver.State{Addresses: addrs})
  468. // Still check for pickfirst, but only with server[1] and server[2].
  469. if err := checkPickFirst(cc, servers[1:]); err != nil {
  470. t.Fatalf("check pickfirst returned non-nil error: %v", err)
  471. }
  472. // Switch to roundrobin, and check against server[1] and server[2].
  473. cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`), Addresses: addrs}, nil)
  474. if err := checkRoundRobin(cc, servers[1:]); err != nil {
  475. t.Fatalf("check roundrobin returned non-nil error: %v", err)
  476. }
  477. }
  478. func parseCfg(r *manual.Resolver, s string) *serviceconfig.ParseResult {
  479. scpr := r.CC.ParseServiceConfig(s)
  480. if scpr.Err != nil {
  481. panic(fmt.Sprintf("Error parsing config %q: %v", s, scpr.Err))
  482. }
  483. return scpr
  484. }