balancer_switching_test.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516
  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "context"
  21. "fmt"
  22. "math"
  23. "testing"
  24. "time"
  25. "google.golang.org/grpc/balancer"
  26. "google.golang.org/grpc/balancer/roundrobin"
  27. "google.golang.org/grpc/connectivity"
  28. _ "google.golang.org/grpc/grpclog/glogger"
  29. "google.golang.org/grpc/internal"
  30. "google.golang.org/grpc/resolver"
  31. "google.golang.org/grpc/resolver/manual"
  32. )
  33. var _ balancer.Builder = &magicalLB{}
  34. var _ balancer.Balancer = &magicalLB{}
  35. // magicalLB is a ringer for grpclb. It is used to avoid circular dependencies on the grpclb package
  36. type magicalLB struct{}
  37. func (b *magicalLB) Name() string {
  38. return "grpclb"
  39. }
  40. func (b *magicalLB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
  41. return b
  42. }
  43. func (b *magicalLB) HandleSubConnStateChange(balancer.SubConn, connectivity.State) {}
  44. func (b *magicalLB) HandleResolvedAddrs([]resolver.Address, error) {}
  45. func (b *magicalLB) Close() {}
  46. func init() {
  47. balancer.Register(&magicalLB{})
  48. }
  49. func checkPickFirst(cc *ClientConn, servers []*server) error {
  50. var (
  51. req = "port"
  52. reply string
  53. err error
  54. )
  55. connected := false
  56. for i := 0; i < 5000; i++ {
  57. if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); errorDesc(err) == servers[0].port {
  58. if connected {
  59. // connected is set to false if peer is not server[0]. So if
  60. // connected is true here, this is the second time we saw
  61. // server[0] in a row. Break because pickfirst is in effect.
  62. break
  63. }
  64. connected = true
  65. } else {
  66. connected = false
  67. }
  68. time.Sleep(time.Millisecond)
  69. }
  70. if !connected {
  71. return fmt.Errorf("pickfirst is not in effect after 5 second, EmptyCall() = _, %v, want _, %v", err, servers[0].port)
  72. }
  73. // The following RPCs should all succeed with the first server.
  74. for i := 0; i < 3; i++ {
  75. err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply)
  76. if errorDesc(err) != servers[0].port {
  77. return fmt.Errorf("index %d: want peer %v, got peer %v", i, servers[0].port, err)
  78. }
  79. }
  80. return nil
  81. }
  82. func checkRoundRobin(cc *ClientConn, servers []*server) error {
  83. var (
  84. req = "port"
  85. reply string
  86. err error
  87. )
  88. // Make sure connections to all servers are up.
  89. for i := 0; i < 2; i++ {
  90. // Do this check twice, otherwise the first RPC's transport may still be
  91. // picked by the closing pickfirst balancer, and the test becomes flaky.
  92. for _, s := range servers {
  93. var up bool
  94. for i := 0; i < 5000; i++ {
  95. if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); errorDesc(err) == s.port {
  96. up = true
  97. break
  98. }
  99. time.Sleep(time.Millisecond)
  100. }
  101. if !up {
  102. return fmt.Errorf("server %v is not up within 5 second", s.port)
  103. }
  104. }
  105. }
  106. serverCount := len(servers)
  107. for i := 0; i < 3*serverCount; i++ {
  108. err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply)
  109. if errorDesc(err) != servers[i%serverCount].port {
  110. return fmt.Errorf("index %d: want peer %v, got peer %v", i, servers[i%serverCount].port, err)
  111. }
  112. }
  113. return nil
  114. }
  115. func (s) TestSwitchBalancer(t *testing.T) {
  116. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  117. defer rcleanup()
  118. const numServers = 2
  119. servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
  120. defer scleanup()
  121. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
  122. if err != nil {
  123. t.Fatalf("failed to dial: %v", err)
  124. }
  125. defer cc.Close()
  126. addrs := []resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}}
  127. r.UpdateState(resolver.State{Addresses: addrs})
  128. // The default balancer is pickfirst.
  129. if err := checkPickFirst(cc, servers); err != nil {
  130. t.Fatalf("check pickfirst returned non-nil error: %v", err)
  131. }
  132. // Switch to roundrobin.
  133. cc.updateResolverState(resolver.State{ServiceConfig: `{"loadBalancingPolicy": "round_robin"}`, Addresses: addrs})
  134. if err := checkRoundRobin(cc, servers); err != nil {
  135. t.Fatalf("check roundrobin returned non-nil error: %v", err)
  136. }
  137. // Switch to pickfirst.
  138. cc.updateResolverState(resolver.State{ServiceConfig: `{"loadBalancingPolicy": "pick_first"}`, Addresses: addrs})
  139. if err := checkPickFirst(cc, servers); err != nil {
  140. t.Fatalf("check pickfirst returned non-nil error: %v", err)
  141. }
  142. }
  143. // Test that balancer specified by dial option will not be overridden.
  144. func (s) TestBalancerDialOption(t *testing.T) {
  145. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  146. defer rcleanup()
  147. const numServers = 2
  148. servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
  149. defer scleanup()
  150. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}), WithBalancerName(roundrobin.Name))
  151. if err != nil {
  152. t.Fatalf("failed to dial: %v", err)
  153. }
  154. defer cc.Close()
  155. addrs := []resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}}
  156. r.UpdateState(resolver.State{Addresses: addrs})
  157. // The init balancer is roundrobin.
  158. if err := checkRoundRobin(cc, servers); err != nil {
  159. t.Fatalf("check roundrobin returned non-nil error: %v", err)
  160. }
  161. // Switch to pickfirst.
  162. cc.updateResolverState(resolver.State{ServiceConfig: `{"loadBalancingPolicy": "pick_first"}`, Addresses: addrs})
  163. // Balancer is still roundrobin.
  164. if err := checkRoundRobin(cc, servers); err != nil {
  165. t.Fatalf("check roundrobin returned non-nil error: %v", err)
  166. }
  167. }
  168. // First addr update contains grpclb.
  169. func (s) TestSwitchBalancerGRPCLBFirst(t *testing.T) {
  170. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  171. defer rcleanup()
  172. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
  173. if err != nil {
  174. t.Fatalf("failed to dial: %v", err)
  175. }
  176. defer cc.Close()
  177. // ClientConn will switch balancer to grpclb when receives an address of
  178. // type GRPCLB.
  179. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}, {Addr: "grpclb", Type: resolver.GRPCLB}}})
  180. var isGRPCLB bool
  181. for i := 0; i < 5000; i++ {
  182. cc.mu.Lock()
  183. isGRPCLB = cc.curBalancerName == "grpclb"
  184. cc.mu.Unlock()
  185. if isGRPCLB {
  186. break
  187. }
  188. time.Sleep(time.Millisecond)
  189. }
  190. if !isGRPCLB {
  191. t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
  192. }
  193. // New update containing new backend and new grpclb. Should not switch
  194. // balancer.
  195. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend2"}, {Addr: "grpclb2", Type: resolver.GRPCLB}}})
  196. for i := 0; i < 200; i++ {
  197. cc.mu.Lock()
  198. isGRPCLB = cc.curBalancerName == "grpclb"
  199. cc.mu.Unlock()
  200. if !isGRPCLB {
  201. break
  202. }
  203. time.Sleep(time.Millisecond)
  204. }
  205. if !isGRPCLB {
  206. t.Fatalf("within 200 ms, cc.balancer switched to !grpclb, want grpclb")
  207. }
  208. var isPickFirst bool
  209. // Switch balancer to pickfirst.
  210. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}})
  211. for i := 0; i < 5000; i++ {
  212. cc.mu.Lock()
  213. isPickFirst = cc.curBalancerName == PickFirstBalancerName
  214. cc.mu.Unlock()
  215. if isPickFirst {
  216. break
  217. }
  218. time.Sleep(time.Millisecond)
  219. }
  220. if !isPickFirst {
  221. t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName)
  222. }
  223. }
  224. // First addr update does not contain grpclb.
  225. func (s) TestSwitchBalancerGRPCLBSecond(t *testing.T) {
  226. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  227. defer rcleanup()
  228. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
  229. if err != nil {
  230. t.Fatalf("failed to dial: %v", err)
  231. }
  232. defer cc.Close()
  233. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}})
  234. var isPickFirst bool
  235. for i := 0; i < 5000; i++ {
  236. cc.mu.Lock()
  237. isPickFirst = cc.curBalancerName == PickFirstBalancerName
  238. cc.mu.Unlock()
  239. if isPickFirst {
  240. break
  241. }
  242. time.Sleep(time.Millisecond)
  243. }
  244. if !isPickFirst {
  245. t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName)
  246. }
  247. // ClientConn will switch balancer to grpclb when receives an address of
  248. // type GRPCLB.
  249. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}, {Addr: "grpclb", Type: resolver.GRPCLB}}})
  250. var isGRPCLB bool
  251. for i := 0; i < 5000; i++ {
  252. cc.mu.Lock()
  253. isGRPCLB = cc.curBalancerName == "grpclb"
  254. cc.mu.Unlock()
  255. if isGRPCLB {
  256. break
  257. }
  258. time.Sleep(time.Millisecond)
  259. }
  260. if !isGRPCLB {
  261. t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
  262. }
  263. // New update containing new backend and new grpclb. Should not switch
  264. // balancer.
  265. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend2"}, {Addr: "grpclb2", Type: resolver.GRPCLB}}})
  266. for i := 0; i < 200; i++ {
  267. cc.mu.Lock()
  268. isGRPCLB = cc.curBalancerName == "grpclb"
  269. cc.mu.Unlock()
  270. if !isGRPCLB {
  271. break
  272. }
  273. time.Sleep(time.Millisecond)
  274. }
  275. if !isGRPCLB {
  276. t.Fatalf("within 200 ms, cc.balancer switched to !grpclb, want grpclb")
  277. }
  278. // Switch balancer back.
  279. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}})
  280. for i := 0; i < 5000; i++ {
  281. cc.mu.Lock()
  282. isPickFirst = cc.curBalancerName == PickFirstBalancerName
  283. cc.mu.Unlock()
  284. if isPickFirst {
  285. break
  286. }
  287. time.Sleep(time.Millisecond)
  288. }
  289. if !isPickFirst {
  290. t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName)
  291. }
  292. }
  293. // Test that if the current balancer is roundrobin, after switching to grpclb,
  294. // when the resolved address doesn't contain grpclb addresses, balancer will be
  295. // switched back to roundrobin.
  296. func (s) TestSwitchBalancerGRPCLBRoundRobin(t *testing.T) {
  297. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  298. defer rcleanup()
  299. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
  300. if err != nil {
  301. t.Fatalf("failed to dial: %v", err)
  302. }
  303. defer cc.Close()
  304. sc := `{"loadBalancingPolicy": "round_robin"}`
  305. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}, ServiceConfig: sc})
  306. var isRoundRobin bool
  307. for i := 0; i < 5000; i++ {
  308. cc.mu.Lock()
  309. isRoundRobin = cc.curBalancerName == "round_robin"
  310. cc.mu.Unlock()
  311. if isRoundRobin {
  312. break
  313. }
  314. time.Sleep(time.Millisecond)
  315. }
  316. if !isRoundRobin {
  317. t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName)
  318. }
  319. // ClientConn will switch balancer to grpclb when receives an address of
  320. // type GRPCLB.
  321. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "grpclb", Type: resolver.GRPCLB}}, ServiceConfig: sc})
  322. var isGRPCLB bool
  323. for i := 0; i < 5000; i++ {
  324. cc.mu.Lock()
  325. isGRPCLB = cc.curBalancerName == "grpclb"
  326. cc.mu.Unlock()
  327. if isGRPCLB {
  328. break
  329. }
  330. time.Sleep(time.Millisecond)
  331. }
  332. if !isGRPCLB {
  333. t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
  334. }
  335. // Switch balancer back.
  336. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}, ServiceConfig: sc})
  337. for i := 0; i < 5000; i++ {
  338. cc.mu.Lock()
  339. isRoundRobin = cc.curBalancerName == "round_robin"
  340. cc.mu.Unlock()
  341. if isRoundRobin {
  342. break
  343. }
  344. time.Sleep(time.Millisecond)
  345. }
  346. if !isRoundRobin {
  347. t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName)
  348. }
  349. }
  350. // Test that if resolved address list contains grpclb, the balancer option in
  351. // service config won't take effect. But when there's no grpclb address in a new
  352. // resolved address list, balancer will be switched to the new one.
  353. func (s) TestSwitchBalancerGRPCLBServiceConfig(t *testing.T) {
  354. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  355. defer rcleanup()
  356. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
  357. if err != nil {
  358. t.Fatalf("failed to dial: %v", err)
  359. }
  360. defer cc.Close()
  361. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}})
  362. var isPickFirst bool
  363. for i := 0; i < 5000; i++ {
  364. cc.mu.Lock()
  365. isPickFirst = cc.curBalancerName == PickFirstBalancerName
  366. cc.mu.Unlock()
  367. if isPickFirst {
  368. break
  369. }
  370. time.Sleep(time.Millisecond)
  371. }
  372. if !isPickFirst {
  373. t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName)
  374. }
  375. // ClientConn will switch balancer to grpclb when receives an address of
  376. // type GRPCLB.
  377. addrs := []resolver.Address{{Addr: "grpclb", Type: resolver.GRPCLB}}
  378. r.UpdateState(resolver.State{Addresses: addrs})
  379. var isGRPCLB bool
  380. for i := 0; i < 5000; i++ {
  381. cc.mu.Lock()
  382. isGRPCLB = cc.curBalancerName == "grpclb"
  383. cc.mu.Unlock()
  384. if isGRPCLB {
  385. break
  386. }
  387. time.Sleep(time.Millisecond)
  388. }
  389. if !isGRPCLB {
  390. t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
  391. }
  392. sc := `{"loadBalancingPolicy": "round_robin"}`
  393. r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: sc})
  394. var isRoundRobin bool
  395. for i := 0; i < 200; i++ {
  396. cc.mu.Lock()
  397. isRoundRobin = cc.curBalancerName == "round_robin"
  398. cc.mu.Unlock()
  399. if isRoundRobin {
  400. break
  401. }
  402. time.Sleep(time.Millisecond)
  403. }
  404. // Balancer should NOT switch to round_robin because resolved list contains
  405. // grpclb.
  406. if isRoundRobin {
  407. t.Fatalf("within 200 ms, cc.balancer switched to round_robin, want grpclb")
  408. }
  409. // Switch balancer back.
  410. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}, ServiceConfig: sc})
  411. for i := 0; i < 5000; i++ {
  412. cc.mu.Lock()
  413. isRoundRobin = cc.curBalancerName == "round_robin"
  414. cc.mu.Unlock()
  415. if isRoundRobin {
  416. break
  417. }
  418. time.Sleep(time.Millisecond)
  419. }
  420. if !isRoundRobin {
  421. t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName)
  422. }
  423. }
  424. // Test that when switching to grpclb fails because grpclb is not registered,
  425. // the fallback balancer will only get backend addresses, not the grpclb server
  426. // address.
  427. //
  428. // The tests sends 3 server addresses (all backends) as resolved addresses, but
  429. // claim the first one is grpclb server. The all RPCs should all be send to the
  430. // other addresses, not the first one.
  431. func (s) TestSwitchBalancerGRPCLBWithGRPCLBNotRegistered(t *testing.T) {
  432. internal.BalancerUnregister("grpclb")
  433. defer balancer.Register(&magicalLB{})
  434. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  435. defer rcleanup()
  436. const numServers = 3
  437. servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
  438. defer scleanup()
  439. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
  440. if err != nil {
  441. t.Fatalf("failed to dial: %v", err)
  442. }
  443. defer cc.Close()
  444. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[1].addr}, {Addr: servers[2].addr}}})
  445. // The default balancer is pickfirst.
  446. if err := checkPickFirst(cc, servers[1:]); err != nil {
  447. t.Fatalf("check pickfirst returned non-nil error: %v", err)
  448. }
  449. // Try switching to grpclb by sending servers[0] as grpclb address. It's
  450. // expected that servers[0] will be filtered out, so it will not be used by
  451. // the balancer.
  452. //
  453. // If the filtering failed, servers[0] will be used for RPCs and the RPCs
  454. // will succeed. The following checks will catch this and fail.
  455. addrs := []resolver.Address{
  456. {Addr: servers[0].addr, Type: resolver.GRPCLB},
  457. {Addr: servers[1].addr}, {Addr: servers[2].addr}}
  458. r.UpdateState(resolver.State{Addresses: addrs})
  459. // Still check for pickfirst, but only with server[1] and server[2].
  460. if err := checkPickFirst(cc, servers[1:]); err != nil {
  461. t.Fatalf("check pickfirst returned non-nil error: %v", err)
  462. }
  463. // Switch to roundrobin, and check against server[1] and server[2].
  464. cc.updateResolverState(resolver.State{ServiceConfig: `{"loadBalancingPolicy": "round_robin"}`, Addresses: addrs})
  465. if err := checkRoundRobin(cc, servers[1:]); err != nil {
  466. t.Fatalf("check roundrobin returned non-nil error: %v", err)
  467. }
  468. }