balancer_test.go 24 KB


  1. /*
  2. *
  3. * Copyright 2016 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "context"
  21. "fmt"
  22. "math"
  23. "strconv"
  24. "sync"
  25. "testing"
  26. "time"
  27. "google.golang.org/grpc/codes"
  28. "google.golang.org/grpc/naming"
  29. "google.golang.org/grpc/status"
  30. )
  31. func pickFirstBalancerV1(r naming.Resolver) Balancer {
  32. return &pickFirst{&roundRobin{r: r}}
  33. }
  34. type testWatcher struct {
  35. // the channel to receives name resolution updates
  36. update chan *naming.Update
  37. // the side channel to get to know how many updates in a batch
  38. side chan int
  39. // the channel to notify update injector that the update reading is done
  40. readDone chan int
  41. }
  42. func (w *testWatcher) Next() (updates []*naming.Update, err error) {
  43. n := <-w.side
  44. if n == 0 {
  45. return nil, fmt.Errorf("w.side is closed")
  46. }
  47. for i := 0; i < n; i++ {
  48. u := <-w.update
  49. if u != nil {
  50. updates = append(updates, u)
  51. }
  52. }
  53. w.readDone <- 0
  54. return
  55. }
  56. func (w *testWatcher) Close() {
  57. close(w.side)
  58. }
  59. // Inject naming resolution updates to the testWatcher.
  60. func (w *testWatcher) inject(updates []*naming.Update) {
  61. w.side <- len(updates)
  62. for _, u := range updates {
  63. w.update <- u
  64. }
  65. <-w.readDone
  66. }
  67. type testNameResolver struct {
  68. w *testWatcher
  69. addr string
  70. }
  71. func (r *testNameResolver) Resolve(target string) (naming.Watcher, error) {
  72. r.w = &testWatcher{
  73. update: make(chan *naming.Update, 1),
  74. side: make(chan int, 1),
  75. readDone: make(chan int),
  76. }
  77. r.w.side <- 1
  78. r.w.update <- &naming.Update{
  79. Op: naming.Add,
  80. Addr: r.addr,
  81. }
  82. go func() {
  83. <-r.w.readDone
  84. }()
  85. return r.w, nil
  86. }
  87. func startServers(t *testing.T, numServers int, maxStreams uint32) ([]*server, *testNameResolver, func()) {
  88. var servers []*server
  89. for i := 0; i < numServers; i++ {
  90. s := newTestServer()
  91. servers = append(servers, s)
  92. go s.start(t, 0, maxStreams)
  93. s.wait(t, 2*time.Second)
  94. }
  95. // Point to server[0]
  96. addr := "localhost:" + servers[0].port
  97. return servers, &testNameResolver{
  98. addr: addr,
  99. }, func() {
  100. for i := 0; i < numServers; i++ {
  101. servers[i].stop()
  102. }
  103. }
  104. }
  105. func (s) TestNameDiscovery(t *testing.T) {
  106. // Start 2 servers on 2 ports.
  107. numServers := 2
  108. servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
  109. defer cleanup()
  110. cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
  111. if err != nil {
  112. t.Fatalf("Failed to create ClientConn: %v", err)
  113. }
  114. defer cc.Close()
  115. req := "port"
  116. var reply string
  117. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
  118. t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
  119. }
  120. // Inject the name resolution change to remove servers[0] and add servers[1].
  121. var updates []*naming.Update
  122. updates = append(updates, &naming.Update{
  123. Op: naming.Delete,
  124. Addr: "localhost:" + servers[0].port,
  125. })
  126. updates = append(updates, &naming.Update{
  127. Op: naming.Add,
  128. Addr: "localhost:" + servers[1].port,
  129. })
  130. r.w.inject(updates)
  131. // Loop until the rpcs in flight talks to servers[1].
  132. for {
  133. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
  134. break
  135. }
  136. time.Sleep(10 * time.Millisecond)
  137. }
  138. }
  139. func (s) TestEmptyAddrs(t *testing.T) {
  140. servers, r, cleanup := startServers(t, 1, math.MaxUint32)
  141. defer cleanup()
  142. cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
  143. if err != nil {
  144. t.Fatalf("Failed to create ClientConn: %v", err)
  145. }
  146. defer cc.Close()
  147. var reply string
  148. if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse {
  149. t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, reply = %q, want %q, <nil>", err, reply, expectedResponse)
  150. }
  151. // Inject name resolution change to remove the server so that there is no address
  152. // available after that.
  153. u := &naming.Update{
  154. Op: naming.Delete,
  155. Addr: "localhost:" + servers[0].port,
  156. }
  157. r.w.inject([]*naming.Update{u})
  158. // Loop until the above updates apply.
  159. for {
  160. time.Sleep(10 * time.Millisecond)
  161. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
  162. if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply); err != nil {
  163. cancel()
  164. break
  165. }
  166. cancel()
  167. }
  168. }
  169. func (s) TestRoundRobin(t *testing.T) {
  170. // Start 3 servers on 3 ports.
  171. numServers := 3
  172. servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
  173. defer cleanup()
  174. cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
  175. if err != nil {
  176. t.Fatalf("Failed to create ClientConn: %v", err)
  177. }
  178. defer cc.Close()
  179. // Add servers[1] to the service discovery.
  180. u := &naming.Update{
  181. Op: naming.Add,
  182. Addr: "localhost:" + servers[1].port,
  183. }
  184. r.w.inject([]*naming.Update{u})
  185. req := "port"
  186. var reply string
  187. // Loop until servers[1] is up
  188. for {
  189. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
  190. break
  191. }
  192. time.Sleep(10 * time.Millisecond)
  193. }
  194. // Add server2[2] to the service discovery.
  195. u = &naming.Update{
  196. Op: naming.Add,
  197. Addr: "localhost:" + servers[2].port,
  198. }
  199. r.w.inject([]*naming.Update{u})
  200. // Loop until both servers[2] are up.
  201. for {
  202. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[2].port {
  203. break
  204. }
  205. time.Sleep(10 * time.Millisecond)
  206. }
  207. // Check the incoming RPCs served in a round-robin manner.
  208. for i := 0; i < 10; i++ {
  209. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[i%numServers].port {
  210. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", i, err, servers[i%numServers].port)
  211. }
  212. }
  213. }
  214. func (s) TestCloseWithPendingRPC(t *testing.T) {
  215. servers, r, cleanup := startServers(t, 1, math.MaxUint32)
  216. defer cleanup()
  217. cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
  218. if err != nil {
  219. t.Fatalf("Failed to create ClientConn: %v", err)
  220. }
  221. defer cc.Close()
  222. var reply string
  223. if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil {
  224. t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
  225. }
  226. // Remove the server.
  227. updates := []*naming.Update{{
  228. Op: naming.Delete,
  229. Addr: "localhost:" + servers[0].port,
  230. }}
  231. r.w.inject(updates)
  232. // Loop until the above update applies.
  233. for {
  234. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
  235. if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); status.Code(err) == codes.DeadlineExceeded {
  236. cancel()
  237. break
  238. }
  239. time.Sleep(10 * time.Millisecond)
  240. cancel()
  241. }
  242. // Issue 2 RPCs which should be completed with error status once cc is closed.
  243. var wg sync.WaitGroup
  244. wg.Add(2)
  245. go func() {
  246. defer wg.Done()
  247. var reply string
  248. if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err == nil {
  249. t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
  250. }
  251. }()
  252. go func() {
  253. defer wg.Done()
  254. var reply string
  255. time.Sleep(5 * time.Millisecond)
  256. if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err == nil {
  257. t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
  258. }
  259. }()
  260. time.Sleep(5 * time.Millisecond)
  261. cc.Close()
  262. wg.Wait()
  263. }
  264. func (s) TestGetOnWaitChannel(t *testing.T) {
  265. servers, r, cleanup := startServers(t, 1, math.MaxUint32)
  266. defer cleanup()
  267. cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
  268. if err != nil {
  269. t.Fatalf("Failed to create ClientConn: %v", err)
  270. }
  271. defer cc.Close()
  272. // Remove all servers so that all upcoming RPCs will block on waitCh.
  273. updates := []*naming.Update{{
  274. Op: naming.Delete,
  275. Addr: "localhost:" + servers[0].port,
  276. }}
  277. r.w.inject(updates)
  278. for {
  279. var reply string
  280. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
  281. if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); status.Code(err) == codes.DeadlineExceeded {
  282. cancel()
  283. break
  284. }
  285. cancel()
  286. time.Sleep(10 * time.Millisecond)
  287. }
  288. var wg sync.WaitGroup
  289. wg.Add(1)
  290. go func() {
  291. defer wg.Done()
  292. var reply string
  293. if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil {
  294. t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want <nil>", err)
  295. }
  296. }()
  297. // Add a connected server to get the above RPC through.
  298. updates = []*naming.Update{{
  299. Op: naming.Add,
  300. Addr: "localhost:" + servers[0].port,
  301. }}
  302. r.w.inject(updates)
  303. // Wait until the above RPC succeeds.
  304. wg.Wait()
  305. }
  306. func (s) TestOneServerDown(t *testing.T) {
  307. // Start 2 servers.
  308. numServers := 2
  309. servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
  310. defer cleanup()
  311. cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
  312. if err != nil {
  313. t.Fatalf("Failed to create ClientConn: %v", err)
  314. }
  315. defer cc.Close()
  316. // Add servers[1] to the service discovery.
  317. var updates []*naming.Update
  318. updates = append(updates, &naming.Update{
  319. Op: naming.Add,
  320. Addr: "localhost:" + servers[1].port,
  321. })
  322. r.w.inject(updates)
  323. req := "port"
  324. var reply string
  325. // Loop until servers[1] is up
  326. for {
  327. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
  328. break
  329. }
  330. time.Sleep(10 * time.Millisecond)
  331. }
  332. var wg sync.WaitGroup
  333. numRPC := 100
  334. sleepDuration := 10 * time.Millisecond
  335. wg.Add(1)
  336. go func() {
  337. time.Sleep(sleepDuration)
  338. // After sleepDuration, kill server[0].
  339. servers[0].stop()
  340. wg.Done()
  341. }()
  342. // All non-failfast RPCs should not block because there's at least one connection available.
  343. for i := 0; i < numRPC; i++ {
  344. wg.Add(1)
  345. go func() {
  346. time.Sleep(sleepDuration)
  347. // After sleepDuration, invoke RPC.
  348. // server[0] is killed around the same time to make it racy between balancer and gRPC internals.
  349. cc.Invoke(context.Background(), "/foo/bar", &req, &reply, WaitForReady(true))
  350. wg.Done()
  351. }()
  352. }
  353. wg.Wait()
  354. }
  355. func (s) TestOneAddressRemoval(t *testing.T) {
  356. // Start 2 servers.
  357. numServers := 2
  358. servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
  359. defer cleanup()
  360. cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
  361. if err != nil {
  362. t.Fatalf("Failed to create ClientConn: %v", err)
  363. }
  364. defer cc.Close()
  365. // Add servers[1] to the service discovery.
  366. var updates []*naming.Update
  367. updates = append(updates, &naming.Update{
  368. Op: naming.Add,
  369. Addr: "localhost:" + servers[1].port,
  370. })
  371. r.w.inject(updates)
  372. req := "port"
  373. var reply string
  374. // Loop until servers[1] is up
  375. for {
  376. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
  377. break
  378. }
  379. time.Sleep(10 * time.Millisecond)
  380. }
  381. var wg sync.WaitGroup
  382. numRPC := 100
  383. sleepDuration := 10 * time.Millisecond
  384. wg.Add(1)
  385. go func() {
  386. time.Sleep(sleepDuration)
  387. // After sleepDuration, delete server[0].
  388. var updates []*naming.Update
  389. updates = append(updates, &naming.Update{
  390. Op: naming.Delete,
  391. Addr: "localhost:" + servers[0].port,
  392. })
  393. r.w.inject(updates)
  394. wg.Done()
  395. }()
  396. // All non-failfast RPCs should not fail because there's at least one connection available.
  397. for i := 0; i < numRPC; i++ {
  398. wg.Add(1)
  399. go func() {
  400. var reply string
  401. time.Sleep(sleepDuration)
  402. // After sleepDuration, invoke RPC.
  403. // server[0] is removed around the same time to make it racy between balancer and gRPC internals.
  404. if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil {
  405. t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want nil", err)
  406. }
  407. wg.Done()
  408. }()
  409. }
  410. wg.Wait()
  411. }
  412. func checkServerUp(t *testing.T, currentServer *server) {
  413. req := "port"
  414. port := currentServer.port
  415. cc, err := Dial("passthrough:///localhost:"+port, WithBlock(), WithInsecure(), WithCodec(testCodec{}))
  416. if err != nil {
  417. t.Fatalf("Failed to create ClientConn: %v", err)
  418. }
  419. defer cc.Close()
  420. var reply string
  421. for {
  422. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == port {
  423. break
  424. }
  425. time.Sleep(10 * time.Millisecond)
  426. }
  427. }
  428. func (s) TestPickFirstEmptyAddrs(t *testing.T) {
  429. servers, r, cleanup := startServers(t, 1, math.MaxUint32)
  430. defer cleanup()
  431. cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
  432. if err != nil {
  433. t.Fatalf("Failed to create ClientConn: %v", err)
  434. }
  435. defer cc.Close()
  436. var reply string
  437. if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse {
  438. t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, reply = %q, want %q, <nil>", err, reply, expectedResponse)
  439. }
  440. // Inject name resolution change to remove the server so that there is no address
  441. // available after that.
  442. u := &naming.Update{
  443. Op: naming.Delete,
  444. Addr: "localhost:" + servers[0].port,
  445. }
  446. r.w.inject([]*naming.Update{u})
  447. // Loop until the above updates apply.
  448. for {
  449. time.Sleep(10 * time.Millisecond)
  450. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
  451. if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply); err != nil {
  452. cancel()
  453. break
  454. }
  455. cancel()
  456. }
  457. }
  458. func (s) TestPickFirstCloseWithPendingRPC(t *testing.T) {
  459. servers, r, cleanup := startServers(t, 1, math.MaxUint32)
  460. defer cleanup()
  461. cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
  462. if err != nil {
  463. t.Fatalf("Failed to create ClientConn: %v", err)
  464. }
  465. defer cc.Close()
  466. var reply string
  467. if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil {
  468. t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
  469. }
  470. // Remove the server.
  471. updates := []*naming.Update{{
  472. Op: naming.Delete,
  473. Addr: "localhost:" + servers[0].port,
  474. }}
  475. r.w.inject(updates)
  476. // Loop until the above update applies.
  477. for {
  478. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
  479. if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); status.Code(err) == codes.DeadlineExceeded {
  480. cancel()
  481. break
  482. }
  483. time.Sleep(10 * time.Millisecond)
  484. cancel()
  485. }
  486. // Issue 2 RPCs which should be completed with error status once cc is closed.
  487. var wg sync.WaitGroup
  488. wg.Add(2)
  489. go func() {
  490. defer wg.Done()
  491. var reply string
  492. if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err == nil {
  493. t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
  494. }
  495. }()
  496. go func() {
  497. defer wg.Done()
  498. var reply string
  499. time.Sleep(5 * time.Millisecond)
  500. if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err == nil {
  501. t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
  502. }
  503. }()
  504. time.Sleep(5 * time.Millisecond)
  505. cc.Close()
  506. wg.Wait()
  507. }
  508. func (s) TestPickFirstOrderAllServerUp(t *testing.T) {
  509. // Start 3 servers on 3 ports.
  510. numServers := 3
  511. servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
  512. defer cleanup()
  513. cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
  514. if err != nil {
  515. t.Fatalf("Failed to create ClientConn: %v", err)
  516. }
  517. defer cc.Close()
  518. // Add servers[1] and [2] to the service discovery.
  519. u := &naming.Update{
  520. Op: naming.Add,
  521. Addr: "localhost:" + servers[1].port,
  522. }
  523. r.w.inject([]*naming.Update{u})
  524. u = &naming.Update{
  525. Op: naming.Add,
  526. Addr: "localhost:" + servers[2].port,
  527. }
  528. r.w.inject([]*naming.Update{u})
  529. // Loop until all 3 servers are up
  530. checkServerUp(t, servers[0])
  531. checkServerUp(t, servers[1])
  532. checkServerUp(t, servers[2])
  533. // Check the incoming RPCs served in server[0]
  534. req := "port"
  535. var reply string
  536. for i := 0; i < 20; i++ {
  537. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
  538. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
  539. }
  540. time.Sleep(10 * time.Millisecond)
  541. }
  542. // Delete server[0] in the balancer, the incoming RPCs served in server[1]
  543. // For test addrconn, close server[0] instead
  544. u = &naming.Update{
  545. Op: naming.Delete,
  546. Addr: "localhost:" + servers[0].port,
  547. }
  548. r.w.inject([]*naming.Update{u})
  549. // Loop until it changes to server[1]
  550. for {
  551. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
  552. break
  553. }
  554. time.Sleep(10 * time.Millisecond)
  555. }
  556. for i := 0; i < 20; i++ {
  557. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
  558. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
  559. }
  560. time.Sleep(10 * time.Millisecond)
  561. }
  562. // Add server[0] back to the balancer, the incoming RPCs served in server[1]
  563. // Add is append operation, the order of Notify now is {server[1].port server[2].port server[0].port}
  564. u = &naming.Update{
  565. Op: naming.Add,
  566. Addr: "localhost:" + servers[0].port,
  567. }
  568. r.w.inject([]*naming.Update{u})
  569. for i := 0; i < 20; i++ {
  570. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
  571. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
  572. }
  573. time.Sleep(10 * time.Millisecond)
  574. }
  575. // Delete server[1] in the balancer, the incoming RPCs served in server[2]
  576. u = &naming.Update{
  577. Op: naming.Delete,
  578. Addr: "localhost:" + servers[1].port,
  579. }
  580. r.w.inject([]*naming.Update{u})
  581. for {
  582. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[2].port {
  583. break
  584. }
  585. time.Sleep(1 * time.Second)
  586. }
  587. for i := 0; i < 20; i++ {
  588. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[2].port {
  589. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 2, err, servers[2].port)
  590. }
  591. time.Sleep(10 * time.Millisecond)
  592. }
  593. // Delete server[2] in the balancer, the incoming RPCs served in server[0]
  594. u = &naming.Update{
  595. Op: naming.Delete,
  596. Addr: "localhost:" + servers[2].port,
  597. }
  598. r.w.inject([]*naming.Update{u})
  599. for {
  600. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
  601. break
  602. }
  603. time.Sleep(1 * time.Second)
  604. }
  605. for i := 0; i < 20; i++ {
  606. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
  607. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
  608. }
  609. time.Sleep(10 * time.Millisecond)
  610. }
  611. }
  612. func (s) TestPickFirstOrderOneServerDown(t *testing.T) {
  613. // Start 3 servers on 3 ports.
  614. numServers := 3
  615. servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
  616. defer cleanup()
  617. cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
  618. if err != nil {
  619. t.Fatalf("Failed to create ClientConn: %v", err)
  620. }
  621. defer cc.Close()
  622. // Add servers[1] and [2] to the service discovery.
  623. u := &naming.Update{
  624. Op: naming.Add,
  625. Addr: "localhost:" + servers[1].port,
  626. }
  627. r.w.inject([]*naming.Update{u})
  628. u = &naming.Update{
  629. Op: naming.Add,
  630. Addr: "localhost:" + servers[2].port,
  631. }
  632. r.w.inject([]*naming.Update{u})
  633. // Loop until all 3 servers are up
  634. checkServerUp(t, servers[0])
  635. checkServerUp(t, servers[1])
  636. checkServerUp(t, servers[2])
  637. // Check the incoming RPCs served in server[0]
  638. req := "port"
  639. var reply string
  640. for i := 0; i < 20; i++ {
  641. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
  642. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
  643. }
  644. time.Sleep(10 * time.Millisecond)
  645. }
  646. // server[0] down, incoming RPCs served in server[1], but the order of Notify still remains
  647. // {server[0] server[1] server[2]}
  648. servers[0].stop()
  649. // Loop until it changes to server[1]
  650. for {
  651. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
  652. break
  653. }
  654. time.Sleep(10 * time.Millisecond)
  655. }
  656. for i := 0; i < 20; i++ {
  657. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
  658. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
  659. }
  660. time.Sleep(10 * time.Millisecond)
  661. }
  662. // up the server[0] back, the incoming RPCs served in server[1]
  663. p, _ := strconv.Atoi(servers[0].port)
  664. servers[0] = newTestServer()
  665. go servers[0].start(t, p, math.MaxUint32)
  666. defer servers[0].stop()
  667. servers[0].wait(t, 2*time.Second)
  668. checkServerUp(t, servers[0])
  669. for i := 0; i < 20; i++ {
  670. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
  671. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
  672. }
  673. time.Sleep(10 * time.Millisecond)
  674. }
  675. // Delete server[1] in the balancer, the incoming RPCs served in server[0]
  676. u = &naming.Update{
  677. Op: naming.Delete,
  678. Addr: "localhost:" + servers[1].port,
  679. }
  680. r.w.inject([]*naming.Update{u})
  681. for {
  682. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
  683. break
  684. }
  685. time.Sleep(1 * time.Second)
  686. }
  687. for i := 0; i < 20; i++ {
  688. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
  689. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
  690. }
  691. time.Sleep(10 * time.Millisecond)
  692. }
  693. }
  694. func (s) TestPickFirstOneAddressRemoval(t *testing.T) {
  695. // Start 2 servers.
  696. numServers := 2
  697. servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
  698. defer cleanup()
  699. cc, err := Dial("passthrough:///localhost:"+servers[0].port, WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
  700. if err != nil {
  701. t.Fatalf("Failed to create ClientConn: %v", err)
  702. }
  703. defer cc.Close()
  704. // Add servers[1] to the service discovery.
  705. var updates []*naming.Update
  706. updates = append(updates, &naming.Update{
  707. Op: naming.Add,
  708. Addr: "localhost:" + servers[1].port,
  709. })
  710. r.w.inject(updates)
  711. // Create a new cc to Loop until servers[1] is up
  712. checkServerUp(t, servers[0])
  713. checkServerUp(t, servers[1])
  714. var wg sync.WaitGroup
  715. numRPC := 100
  716. sleepDuration := 10 * time.Millisecond
  717. wg.Add(1)
  718. go func() {
  719. time.Sleep(sleepDuration)
  720. // After sleepDuration, delete server[0].
  721. var updates []*naming.Update
  722. updates = append(updates, &naming.Update{
  723. Op: naming.Delete,
  724. Addr: "localhost:" + servers[0].port,
  725. })
  726. r.w.inject(updates)
  727. wg.Done()
  728. }()
  729. // All non-failfast RPCs should not fail because there's at least one connection available.
  730. for i := 0; i < numRPC; i++ {
  731. wg.Add(1)
  732. go func() {
  733. var reply string
  734. time.Sleep(sleepDuration)
  735. // After sleepDuration, invoke RPC.
  736. // server[0] is removed around the same time to make it racy between balancer and gRPC internals.
  737. if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil {
  738. t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want nil", err)
  739. }
  740. wg.Done()
  741. }()
  742. }
  743. wg.Wait()
  744. }