xds_test.go 21 KB


  1. // +build go1.12
  2. /*
  3. *
  4. * Copyright 2019 gRPC authors.
  5. *
  6. * Licensed under the Apache License, Version 2.0 (the "License");
  7. * you may not use this file except in compliance with the License.
  8. * You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. *
  18. */
  19. package xds
  20. import (
  21. "encoding/json"
  22. "reflect"
  23. "sync"
  24. "testing"
  25. "time"
  26. "github.com/golang/protobuf/proto"
  27. "google.golang.org/grpc/balancer"
  28. discoverypb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/discovery"
  29. edspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/eds"
  30. "google.golang.org/grpc/connectivity"
  31. "google.golang.org/grpc/internal/grpctest"
  32. "google.golang.org/grpc/internal/leakcheck"
  33. "google.golang.org/grpc/resolver"
  34. )
  35. var lbABuilder *balancerABuilder
  36. func init() {
  37. lbABuilder = &balancerABuilder{}
  38. balancer.Register(lbABuilder)
  39. balancer.Register(&balancerBBuilder{})
  40. }
  41. type s struct{}
  42. func (s) Teardown(t *testing.T) {
  43. leakcheck.Check(t)
  44. }
  45. func Test(t *testing.T) {
  46. grpctest.RunSubTests(t, s{})
  47. }
  48. type lbPolicy string
  49. const (
  50. fakeBalancerA lbPolicy = "fake_balancer_A"
  51. fakeBalancerB lbPolicy = "fake_balancer_B"
  52. fakeBalancerC lbPolicy = "fake_balancer_C"
  53. )
  54. var (
  55. testBalancerNameFooBar = "foo.bar"
  56. testBalancerConfigFooBar, _ = json.Marshal(&testBalancerConfig{
  57. BalancerName: testBalancerNameFooBar,
  58. ChildPolicy: []lbPolicy{fakeBalancerA},
  59. FallbackPolicy: []lbPolicy{fakeBalancerA},
  60. })
  61. specialAddrForBalancerA = resolver.Address{Addr: "this.is.balancer.A"}
  62. specialAddrForBalancerB = resolver.Address{Addr: "this.is.balancer.B"}
  63. // mu protects the access of latestFakeEdsBalancer
  64. mu sync.Mutex
  65. latestFakeEdsBalancer *fakeEDSBalancer
  66. )
  67. type testBalancerConfig struct {
  68. BalancerName string `json:"balancerName,omitempty"`
  69. ChildPolicy []lbPolicy `json:"childPolicy,omitempty"`
  70. FallbackPolicy []lbPolicy `json:"fallbackPolicy,omitempty"`
  71. }
  72. func (l *lbPolicy) UnmarshalJSON(b []byte) error {
  73. // no need to implement, not used.
  74. return nil
  75. }
  76. func (l lbPolicy) MarshalJSON() ([]byte, error) {
  77. m := make(map[string]struct{})
  78. m[string(l)] = struct{}{}
  79. return json.Marshal(m)
  80. }
  81. type balancerABuilder struct {
  82. mu sync.Mutex
  83. lastBalancer *balancerA
  84. }
  85. func (b *balancerABuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
  86. b.mu.Lock()
  87. b.lastBalancer = &balancerA{cc: cc, subconnStateChange: make(chan *scStateChange, 10)}
  88. b.mu.Unlock()
  89. return b.lastBalancer
  90. }
  91. func (b *balancerABuilder) Name() string {
  92. return string(fakeBalancerA)
  93. }
  94. func (b *balancerABuilder) getLastBalancer() *balancerA {
  95. b.mu.Lock()
  96. defer b.mu.Unlock()
  97. return b.lastBalancer
  98. }
  99. func (b *balancerABuilder) clearLastBalancer() {
  100. b.mu.Lock()
  101. defer b.mu.Unlock()
  102. b.lastBalancer = nil
  103. }
  104. type balancerBBuilder struct{}
  105. func (b *balancerBBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
  106. return &balancerB{cc: cc}
  107. }
  108. func (*balancerBBuilder) Name() string {
  109. return string(fakeBalancerB)
  110. }
  111. type balancerA struct {
  112. cc balancer.ClientConn
  113. subconnStateChange chan *scStateChange
  114. }
  115. func (b *balancerA) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
  116. b.subconnStateChange <- &scStateChange{sc: sc, state: state}
  117. }
  118. func (b *balancerA) HandleResolvedAddrs(addrs []resolver.Address, err error) {
  119. _, _ = b.cc.NewSubConn(append(addrs, specialAddrForBalancerA), balancer.NewSubConnOptions{})
  120. }
  121. func (b *balancerA) Close() {}
  122. type balancerB struct {
  123. cc balancer.ClientConn
  124. }
  125. func (balancerB) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
  126. panic("implement me")
  127. }
  128. func (b *balancerB) HandleResolvedAddrs(addrs []resolver.Address, err error) {
  129. _, _ = b.cc.NewSubConn(append(addrs, specialAddrForBalancerB), balancer.NewSubConnOptions{})
  130. }
  131. func (balancerB) Close() {}
  132. func newTestClientConn() *testClientConn {
  133. return &testClientConn{
  134. newSubConns: make(chan []resolver.Address, 10),
  135. }
  136. }
  137. type testClientConn struct {
  138. newSubConns chan []resolver.Address
  139. }
  140. func (t *testClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
  141. t.newSubConns <- addrs
  142. return nil, nil
  143. }
  144. func (testClientConn) RemoveSubConn(balancer.SubConn) {
  145. }
  146. func (testClientConn) UpdateBalancerState(s connectivity.State, p balancer.Picker) {
  147. }
  148. func (testClientConn) ResolveNow(resolver.ResolveNowOption) {}
  149. func (testClientConn) Target() string {
  150. return testServiceName
  151. }
  152. type scStateChange struct {
  153. sc balancer.SubConn
  154. state connectivity.State
  155. }
  156. type fakeEDSBalancer struct {
  157. cc balancer.ClientConn
  158. edsChan chan *edspb.ClusterLoadAssignment
  159. childPolicy chan *loadBalancingConfig
  160. fallbackPolicy chan *loadBalancingConfig
  161. subconnStateChange chan *scStateChange
  162. }
  163. func (f *fakeEDSBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
  164. f.subconnStateChange <- &scStateChange{sc: sc, state: state}
  165. }
  166. func (f *fakeEDSBalancer) Close() {
  167. mu.Lock()
  168. defer mu.Unlock()
  169. latestFakeEdsBalancer = nil
  170. }
  171. func (f *fakeEDSBalancer) HandleEDSResponse(edsResp *edspb.ClusterLoadAssignment) {
  172. f.edsChan <- edsResp
  173. }
  174. func (f *fakeEDSBalancer) HandleChildPolicy(name string, config json.RawMessage) {
  175. f.childPolicy <- &loadBalancingConfig{
  176. Name: name,
  177. Config: config,
  178. }
  179. }
  180. func newFakeEDSBalancer(cc balancer.ClientConn) edsBalancerInterface {
  181. lb := &fakeEDSBalancer{
  182. cc: cc,
  183. edsChan: make(chan *edspb.ClusterLoadAssignment, 10),
  184. childPolicy: make(chan *loadBalancingConfig, 10),
  185. fallbackPolicy: make(chan *loadBalancingConfig, 10),
  186. subconnStateChange: make(chan *scStateChange, 10),
  187. }
  188. mu.Lock()
  189. latestFakeEdsBalancer = lb
  190. mu.Unlock()
  191. return lb
  192. }
  193. func getLatestEdsBalancer() *fakeEDSBalancer {
  194. mu.Lock()
  195. defer mu.Unlock()
  196. return latestFakeEdsBalancer
  197. }
  198. type fakeSubConn struct{}
  199. func (*fakeSubConn) UpdateAddresses([]resolver.Address) {
  200. panic("implement me")
  201. }
  202. func (*fakeSubConn) Connect() {
  203. panic("implement me")
  204. }
  205. func (s) TestXdsBalanceHandleResolvedAddrs(t *testing.T) {
  206. startupTimeout = 500 * time.Millisecond
  207. defer func() { startupTimeout = defaultTimeout }()
  208. builder := balancer.Get("xds")
  209. cc := newTestClientConn()
  210. lb, ok := builder.Build(cc, balancer.BuildOptions{}).(*xdsBalancer)
  211. if !ok {
  212. t.Fatalf("unable to type assert to *xdsBalancer")
  213. }
  214. defer lb.Close()
  215. if err := lb.HandleBalancerConfig(json.RawMessage(testBalancerConfigFooBar)); err != nil {
  216. t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(testBalancerConfigFooBar), err)
  217. }
  218. addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}}
  219. for i := 0; i < 3; i++ {
  220. lb.HandleResolvedAddrs(addrs, nil)
  221. select {
  222. case nsc := <-cc.newSubConns:
  223. if !reflect.DeepEqual(append(addrs, specialAddrForBalancerA), nsc) {
  224. t.Fatalf("got new subconn address %v, want %v", nsc, append(addrs, specialAddrForBalancerA))
  225. }
  226. case <-time.After(2 * time.Second):
  227. t.Fatalf("timeout when geting new subconn result")
  228. }
  229. addrs = addrs[:2-i]
  230. }
  231. }
  232. func (s) TestXdsBalanceHandleBalancerConfigBalancerNameUpdate(t *testing.T) {
  233. startupTimeout = 500 * time.Millisecond
  234. originalNewEDSBalancer := newEDSBalancer
  235. newEDSBalancer = newFakeEDSBalancer
  236. defer func() {
  237. startupTimeout = defaultTimeout
  238. newEDSBalancer = originalNewEDSBalancer
  239. }()
  240. builder := balancer.Get("xds")
  241. cc := newTestClientConn()
  242. lb, ok := builder.Build(cc, balancer.BuildOptions{}).(*xdsBalancer)
  243. if !ok {
  244. t.Fatalf("unable to type assert to *xdsBalancer")
  245. }
  246. defer lb.Close()
  247. if err := lb.HandleBalancerConfig(json.RawMessage(testBalancerConfigFooBar)); err != nil {
  248. t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(testBalancerConfigFooBar), err)
  249. }
  250. addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}}
  251. lb.HandleResolvedAddrs(addrs, nil)
  252. // verify fallback takes over
  253. select {
  254. case nsc := <-cc.newSubConns:
  255. if !reflect.DeepEqual(append(addrs, specialAddrForBalancerA), nsc) {
  256. t.Fatalf("got new subconn address %v, want %v", nsc, append(addrs, specialAddrForBalancerA))
  257. }
  258. case <-time.After(2 * time.Second):
  259. t.Fatalf("timeout when geting new subconn result")
  260. }
  261. var cleanups []func()
  262. defer func() {
  263. for _, cleanup := range cleanups {
  264. cleanup()
  265. }
  266. }()
  267. // In the first iteration, an eds balancer takes over fallback balancer
  268. // In the second iteration, a new xds client takes over previous one.
  269. for i := 0; i < 2; i++ {
  270. addr, td, cleanup := setupServer(t)
  271. cleanups = append(cleanups, cleanup)
  272. workingBalancerConfig, _ := json.Marshal(&testBalancerConfig{
  273. BalancerName: addr,
  274. ChildPolicy: []lbPolicy{fakeBalancerA},
  275. FallbackPolicy: []lbPolicy{fakeBalancerA},
  276. })
  277. if err := lb.HandleBalancerConfig(json.RawMessage(workingBalancerConfig)); err != nil {
  278. t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(workingBalancerConfig), err)
  279. }
  280. td.sendResp(&response{resp: testEDSRespWithoutEndpoints})
  281. var j int
  282. for j = 0; j < 10; j++ {
  283. if edsLB := getLatestEdsBalancer(); edsLB != nil { // edsLB won't change between the two iterations
  284. select {
  285. case gotEDS := <-edsLB.edsChan:
  286. if !proto.Equal(gotEDS, testClusterLoadAssignmentWithoutEndpoints) {
  287. t.Fatalf("edsBalancer got eds: %v, want %v", gotEDS, testClusterLoadAssignmentWithoutEndpoints)
  288. }
  289. case <-time.After(time.Second):
  290. t.Fatal("haven't got EDS update after 1s")
  291. }
  292. break
  293. }
  294. time.Sleep(100 * time.Millisecond)
  295. }
  296. if j == 10 {
  297. t.Fatal("edsBalancer instance has not been created or updated after 1s")
  298. }
  299. }
  300. }
  301. // switch child policy, lb stays the same
  302. // cds->eds or eds -> cds, restart xdsClient, lb stays the same
  303. func (s) TestXdsBalanceHandleBalancerConfigChildPolicyUpdate(t *testing.T) {
  304. originalNewEDSBalancer := newEDSBalancer
  305. newEDSBalancer = newFakeEDSBalancer
  306. defer func() {
  307. newEDSBalancer = originalNewEDSBalancer
  308. }()
  309. builder := balancer.Get("xds")
  310. cc := newTestClientConn()
  311. lb, ok := builder.Build(cc, balancer.BuildOptions{}).(*xdsBalancer)
  312. if !ok {
  313. t.Fatalf("unable to type assert to *xdsBalancer")
  314. }
  315. defer lb.Close()
  316. var cleanups []func()
  317. defer func() {
  318. for _, cleanup := range cleanups {
  319. cleanup()
  320. }
  321. }()
  322. for _, test := range []struct {
  323. cfg *testBalancerConfig
  324. responseToSend *discoverypb.DiscoveryResponse
  325. expectedChildPolicy *loadBalancingConfig
  326. }{
  327. {
  328. cfg: &testBalancerConfig{
  329. ChildPolicy: []lbPolicy{fakeBalancerA},
  330. },
  331. responseToSend: testEDSRespWithoutEndpoints,
  332. expectedChildPolicy: &loadBalancingConfig{
  333. Name: string(fakeBalancerA),
  334. Config: json.RawMessage(`{}`),
  335. },
  336. },
  337. {
  338. cfg: &testBalancerConfig{
  339. ChildPolicy: []lbPolicy{fakeBalancerB},
  340. },
  341. expectedChildPolicy: &loadBalancingConfig{
  342. Name: string(fakeBalancerB),
  343. Config: json.RawMessage(`{}`),
  344. },
  345. },
  346. {
  347. cfg: &testBalancerConfig{},
  348. responseToSend: testCDSResp,
  349. expectedChildPolicy: &loadBalancingConfig{
  350. Name: "ROUND_ROBIN",
  351. },
  352. },
  353. } {
  354. addr, td, cleanup := setupServer(t)
  355. cleanups = append(cleanups, cleanup)
  356. test.cfg.BalancerName = addr
  357. workingBalancerConfig, _ := json.Marshal(test.cfg)
  358. if err := lb.HandleBalancerConfig(json.RawMessage(workingBalancerConfig)); err != nil {
  359. t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(workingBalancerConfig), err)
  360. }
  361. if test.responseToSend != nil {
  362. td.sendResp(&response{resp: test.responseToSend})
  363. }
  364. var i int
  365. for i = 0; i < 10; i++ {
  366. if edsLB := getLatestEdsBalancer(); edsLB != nil {
  367. select {
  368. case childPolicy := <-edsLB.childPolicy:
  369. if !reflect.DeepEqual(childPolicy, test.expectedChildPolicy) {
  370. t.Fatalf("got childPolicy %v, want %v", childPolicy, test.expectedChildPolicy)
  371. }
  372. case <-time.After(time.Second):
  373. t.Fatal("haven't got policy update after 1s")
  374. }
  375. break
  376. }
  377. time.Sleep(100 * time.Millisecond)
  378. }
  379. if i == 10 {
  380. t.Fatal("edsBalancer instance has not been created or updated after 1s")
  381. }
  382. }
  383. }
  384. // not in fallback mode, overwrite fallback info.
  385. // in fallback mode, update config or switch balancer.
  386. func (s) TestXdsBalanceHandleBalancerConfigFallbackUpdate(t *testing.T) {
  387. originalNewEDSBalancer := newEDSBalancer
  388. newEDSBalancer = newFakeEDSBalancer
  389. defer func() {
  390. newEDSBalancer = originalNewEDSBalancer
  391. }()
  392. builder := balancer.Get("xds")
  393. cc := newTestClientConn()
  394. lb, ok := builder.Build(cc, balancer.BuildOptions{}).(*xdsBalancer)
  395. if !ok {
  396. t.Fatalf("unable to type assert to *xdsBalancer")
  397. }
  398. defer lb.Close()
  399. addr, td, cleanup := setupServer(t)
  400. cfg := &testBalancerConfig{
  401. BalancerName: addr,
  402. ChildPolicy: []lbPolicy{fakeBalancerA},
  403. FallbackPolicy: []lbPolicy{fakeBalancerA},
  404. }
  405. workingBalancerConfig, _ := json.Marshal(cfg)
  406. if err := lb.HandleBalancerConfig(json.RawMessage(workingBalancerConfig)); err != nil {
  407. t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(workingBalancerConfig), err)
  408. }
  409. cfg.FallbackPolicy = []lbPolicy{fakeBalancerB}
  410. workingBalancerConfig, _ = json.Marshal(cfg)
  411. if err := lb.HandleBalancerConfig(json.RawMessage(workingBalancerConfig)); err != nil {
  412. t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(workingBalancerConfig), err)
  413. }
  414. td.sendResp(&response{resp: testEDSRespWithoutEndpoints})
  415. var i int
  416. for i = 0; i < 10; i++ {
  417. if edsLB := getLatestEdsBalancer(); edsLB != nil {
  418. break
  419. }
  420. time.Sleep(100 * time.Millisecond)
  421. }
  422. if i == 10 {
  423. t.Fatal("edsBalancer instance has not been created and assigned to lb.xdsLB after 1s")
  424. }
  425. cleanup()
  426. addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}}
  427. lb.HandleResolvedAddrs(addrs, nil)
  428. // verify fallback balancer B takes over
  429. select {
  430. case nsc := <-cc.newSubConns:
  431. if !reflect.DeepEqual(append(addrs, specialAddrForBalancerB), nsc) {
  432. t.Fatalf("got new subconn address %v, want %v", nsc, append(addrs, specialAddrForBalancerB))
  433. }
  434. case <-time.After(5 * time.Second):
  435. t.Fatalf("timeout when geting new subconn result")
  436. }
  437. cfg.FallbackPolicy = []lbPolicy{fakeBalancerA}
  438. workingBalancerConfig, _ = json.Marshal(cfg)
  439. if err := lb.HandleBalancerConfig(json.RawMessage(workingBalancerConfig)); err != nil {
  440. t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(workingBalancerConfig), err)
  441. }
  442. // verify fallback balancer A takes over
  443. select {
  444. case nsc := <-cc.newSubConns:
  445. if !reflect.DeepEqual(append(addrs, specialAddrForBalancerA), nsc) {
  446. t.Fatalf("got new subconn address %v, want %v", nsc, append(addrs, specialAddrForBalancerA))
  447. }
  448. case <-time.After(2 * time.Second):
  449. t.Fatalf("timeout when geting new subconn result")
  450. }
  451. }
  452. func (s) TestXdsBalancerHandlerSubConnStateChange(t *testing.T) {
  453. originalNewEDSBalancer := newEDSBalancer
  454. newEDSBalancer = newFakeEDSBalancer
  455. defer func() {
  456. newEDSBalancer = originalNewEDSBalancer
  457. }()
  458. builder := balancer.Get("xds")
  459. cc := newTestClientConn()
  460. lb, ok := builder.Build(cc, balancer.BuildOptions{}).(*xdsBalancer)
  461. if !ok {
  462. t.Fatalf("unable to type assert to *xdsBalancer")
  463. }
  464. defer lb.Close()
  465. addr, td, cleanup := setupServer(t)
  466. defer cleanup()
  467. cfg := &testBalancerConfig{
  468. BalancerName: addr,
  469. ChildPolicy: []lbPolicy{fakeBalancerA},
  470. FallbackPolicy: []lbPolicy{fakeBalancerA},
  471. }
  472. workingBalancerConfig, _ := json.Marshal(cfg)
  473. if err := lb.HandleBalancerConfig(json.RawMessage(workingBalancerConfig)); err != nil {
  474. t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(workingBalancerConfig), err)
  475. }
  476. td.sendResp(&response{resp: testEDSRespWithoutEndpoints})
  477. expectedScStateChange := &scStateChange{
  478. sc: &fakeSubConn{},
  479. state: connectivity.Ready,
  480. }
  481. var i int
  482. for i = 0; i < 10; i++ {
  483. if edsLB := getLatestEdsBalancer(); edsLB != nil {
  484. lb.HandleSubConnStateChange(expectedScStateChange.sc, expectedScStateChange.state)
  485. select {
  486. case scsc := <-edsLB.subconnStateChange:
  487. if !reflect.DeepEqual(scsc, expectedScStateChange) {
  488. t.Fatalf("got subconn state change %v, want %v", scsc, expectedScStateChange)
  489. }
  490. case <-time.After(time.Second):
  491. t.Fatal("haven't got subconn state change after 1s")
  492. }
  493. break
  494. }
  495. time.Sleep(100 * time.Millisecond)
  496. }
  497. if i == 10 {
  498. t.Fatal("edsBalancer instance has not been created and assigned to lb.xdsLB after 1s")
  499. }
  500. // lbAbuilder has a per binary record what's the last balanceA created. We need to clear the record
  501. // to make sure there's a new one created and get the pointer to it.
  502. lbABuilder.clearLastBalancer()
  503. cleanup()
  504. // switch to fallback
  505. // fallback balancer A takes over
  506. for i = 0; i < 10; i++ {
  507. if fblb := lbABuilder.getLastBalancer(); fblb != nil {
  508. lb.HandleSubConnStateChange(expectedScStateChange.sc, expectedScStateChange.state)
  509. select {
  510. case scsc := <-fblb.subconnStateChange:
  511. if !reflect.DeepEqual(scsc, expectedScStateChange) {
  512. t.Fatalf("got subconn state change %v, want %v", scsc, expectedScStateChange)
  513. }
  514. case <-time.After(time.Second):
  515. t.Fatal("haven't got subconn state change after 1s")
  516. }
  517. break
  518. }
  519. time.Sleep(100 * time.Millisecond)
  520. }
  521. if i == 10 {
  522. t.Fatal("balancerA instance has not been created after 1s")
  523. }
  524. }
  525. func (s) TestXdsBalancerFallbackSignalFromEdsBalancer(t *testing.T) {
  526. originalNewEDSBalancer := newEDSBalancer
  527. newEDSBalancer = newFakeEDSBalancer
  528. defer func() {
  529. newEDSBalancer = originalNewEDSBalancer
  530. }()
  531. builder := balancer.Get("xds")
  532. cc := newTestClientConn()
  533. lb, ok := builder.Build(cc, balancer.BuildOptions{}).(*xdsBalancer)
  534. if !ok {
  535. t.Fatalf("unable to type assert to *xdsBalancer")
  536. }
  537. defer lb.Close()
  538. addr, td, cleanup := setupServer(t)
  539. defer cleanup()
  540. cfg := &testBalancerConfig{
  541. BalancerName: addr,
  542. ChildPolicy: []lbPolicy{fakeBalancerA},
  543. FallbackPolicy: []lbPolicy{fakeBalancerA},
  544. }
  545. workingBalancerConfig, _ := json.Marshal(cfg)
  546. if err := lb.HandleBalancerConfig(json.RawMessage(workingBalancerConfig)); err != nil {
  547. t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(workingBalancerConfig), err)
  548. }
  549. td.sendResp(&response{resp: testEDSRespWithoutEndpoints})
  550. expectedScStateChange := &scStateChange{
  551. sc: &fakeSubConn{},
  552. state: connectivity.Ready,
  553. }
  554. var i int
  555. for i = 0; i < 10; i++ {
  556. if edsLB := getLatestEdsBalancer(); edsLB != nil {
  557. lb.HandleSubConnStateChange(expectedScStateChange.sc, expectedScStateChange.state)
  558. select {
  559. case scsc := <-edsLB.subconnStateChange:
  560. if !reflect.DeepEqual(scsc, expectedScStateChange) {
  561. t.Fatalf("got subconn state change %v, want %v", scsc, expectedScStateChange)
  562. }
  563. case <-time.After(time.Second):
  564. t.Fatal("haven't got subconn state change after 1s")
  565. }
  566. break
  567. }
  568. time.Sleep(100 * time.Millisecond)
  569. }
  570. if i == 10 {
  571. t.Fatal("edsBalancer instance has not been created and assigned to lb.xdsLB after 1s")
  572. }
  573. // lbAbuilder has a per binary record what's the last balanceA created. We need to clear the record
  574. // to make sure there's a new one created and get the pointer to it.
  575. lbABuilder.clearLastBalancer()
  576. cleanup()
  577. // switch to fallback
  578. // fallback balancer A takes over
  579. for i = 0; i < 10; i++ {
  580. if fblb := lbABuilder.getLastBalancer(); fblb != nil {
  581. lb.HandleSubConnStateChange(expectedScStateChange.sc, expectedScStateChange.state)
  582. select {
  583. case scsc := <-fblb.subconnStateChange:
  584. if !reflect.DeepEqual(scsc, expectedScStateChange) {
  585. t.Fatalf("got subconn state change %v, want %v", scsc, expectedScStateChange)
  586. }
  587. case <-time.After(time.Second):
  588. t.Fatal("haven't got subconn state change after 1s")
  589. }
  590. break
  591. }
  592. time.Sleep(100 * time.Millisecond)
  593. }
  594. if i == 10 {
  595. t.Fatal("balancerA instance has not been created after 1s")
  596. }
  597. }
  598. func (s) TestXdsBalancerConfigParsingSelectingLBPolicy(t *testing.T) {
  599. tesCfg := &testBalancerConfig{
  600. BalancerName: "fake.foo.bar",
  601. ChildPolicy: []lbPolicy{fakeBalancerC, fakeBalancerA, fakeBalancerB}, // selects fakeBalancerA
  602. FallbackPolicy: []lbPolicy{fakeBalancerC, fakeBalancerB, fakeBalancerA}, // selects fakeBalancerB
  603. }
  604. js, _ := json.Marshal(tesCfg)
  605. var xdsCfg xdsConfig
  606. if err := json.Unmarshal(js, &xdsCfg); err != nil {
  607. t.Fatal("unable to unmarshal balancer config into xds config")
  608. }
  609. wantChildPolicy := &loadBalancingConfig{Name: string(fakeBalancerA), Config: json.RawMessage(`{}`)}
  610. if !reflect.DeepEqual(xdsCfg.ChildPolicy, wantChildPolicy) {
  611. t.Fatalf("got child policy %v, want %v", xdsCfg.ChildPolicy, wantChildPolicy)
  612. }
  613. wantFallbackPolicy := &loadBalancingConfig{Name: string(fakeBalancerB), Config: json.RawMessage(`{}`)}
  614. if !reflect.DeepEqual(xdsCfg.FallBackPolicy, wantFallbackPolicy) {
  615. t.Fatalf("got fallback policy %v, want %v", xdsCfg.FallBackPolicy, wantFallbackPolicy)
  616. }
  617. }