balancer_test.go 24 KB


  1. /*
  2. *
  3. * Copyright 2016 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "context"
  21. "fmt"
  22. "math"
  23. "strconv"
  24. "sync"
  25. "testing"
  26. "time"
  27. "google.golang.org/grpc/codes"
  28. _ "google.golang.org/grpc/grpclog/glogger"
  29. "google.golang.org/grpc/naming"
  30. "google.golang.org/grpc/status"
  31. // V1 balancer tests use passthrough resolver instead of dns.
  32. // TODO(bar) remove this when removing v1 balaner entirely.
  33. _ "google.golang.org/grpc/resolver/passthrough"
  34. )
  35. func pickFirstBalancerV1(r naming.Resolver) Balancer {
  36. return &pickFirst{&roundRobin{r: r}}
  37. }
  38. type testWatcher struct {
  39. // the channel to receives name resolution updates
  40. update chan *naming.Update
  41. // the side channel to get to know how many updates in a batch
  42. side chan int
  43. // the channel to notify update injector that the update reading is done
  44. readDone chan int
  45. }
  46. func (w *testWatcher) Next() (updates []*naming.Update, err error) {
  47. n := <-w.side
  48. if n == 0 {
  49. return nil, fmt.Errorf("w.side is closed")
  50. }
  51. for i := 0; i < n; i++ {
  52. u := <-w.update
  53. if u != nil {
  54. updates = append(updates, u)
  55. }
  56. }
  57. w.readDone <- 0
  58. return
  59. }
  60. func (w *testWatcher) Close() {
  61. close(w.side)
  62. }
  63. // Inject naming resolution updates to the testWatcher.
  64. func (w *testWatcher) inject(updates []*naming.Update) {
  65. w.side <- len(updates)
  66. for _, u := range updates {
  67. w.update <- u
  68. }
  69. <-w.readDone
  70. }
  71. type testNameResolver struct {
  72. w *testWatcher
  73. addr string
  74. }
  75. func (r *testNameResolver) Resolve(target string) (naming.Watcher, error) {
  76. r.w = &testWatcher{
  77. update: make(chan *naming.Update, 1),
  78. side: make(chan int, 1),
  79. readDone: make(chan int),
  80. }
  81. r.w.side <- 1
  82. r.w.update <- &naming.Update{
  83. Op: naming.Add,
  84. Addr: r.addr,
  85. }
  86. go func() {
  87. <-r.w.readDone
  88. }()
  89. return r.w, nil
  90. }
  91. func startServers(t *testing.T, numServers int, maxStreams uint32) ([]*server, *testNameResolver, func()) {
  92. var servers []*server
  93. for i := 0; i < numServers; i++ {
  94. s := newTestServer()
  95. servers = append(servers, s)
  96. go s.start(t, 0, maxStreams)
  97. s.wait(t, 2*time.Second)
  98. }
  99. // Point to server[0]
  100. addr := "localhost:" + servers[0].port
  101. return servers, &testNameResolver{
  102. addr: addr,
  103. }, func() {
  104. for i := 0; i < numServers; i++ {
  105. servers[i].stop()
  106. }
  107. }
  108. }
  109. func (s) TestNameDiscovery(t *testing.T) {
  110. // Start 2 servers on 2 ports.
  111. numServers := 2
  112. servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
  113. defer cleanup()
  114. cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
  115. if err != nil {
  116. t.Fatalf("Failed to create ClientConn: %v", err)
  117. }
  118. defer cc.Close()
  119. req := "port"
  120. var reply string
  121. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
  122. t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
  123. }
  124. // Inject the name resolution change to remove servers[0] and add servers[1].
  125. var updates []*naming.Update
  126. updates = append(updates, &naming.Update{
  127. Op: naming.Delete,
  128. Addr: "localhost:" + servers[0].port,
  129. })
  130. updates = append(updates, &naming.Update{
  131. Op: naming.Add,
  132. Addr: "localhost:" + servers[1].port,
  133. })
  134. r.w.inject(updates)
  135. // Loop until the rpcs in flight talks to servers[1].
  136. for {
  137. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
  138. break
  139. }
  140. time.Sleep(10 * time.Millisecond)
  141. }
  142. }
  143. func (s) TestEmptyAddrs(t *testing.T) {
  144. servers, r, cleanup := startServers(t, 1, math.MaxUint32)
  145. defer cleanup()
  146. cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
  147. if err != nil {
  148. t.Fatalf("Failed to create ClientConn: %v", err)
  149. }
  150. defer cc.Close()
  151. var reply string
  152. if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse {
  153. t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, reply = %q, want %q, <nil>", err, reply, expectedResponse)
  154. }
  155. // Inject name resolution change to remove the server so that there is no address
  156. // available after that.
  157. u := &naming.Update{
  158. Op: naming.Delete,
  159. Addr: "localhost:" + servers[0].port,
  160. }
  161. r.w.inject([]*naming.Update{u})
  162. // Loop until the above updates apply.
  163. for {
  164. time.Sleep(10 * time.Millisecond)
  165. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
  166. if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply); err != nil {
  167. cancel()
  168. break
  169. }
  170. cancel()
  171. }
  172. }
  173. func (s) TestRoundRobin(t *testing.T) {
  174. // Start 3 servers on 3 ports.
  175. numServers := 3
  176. servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
  177. defer cleanup()
  178. cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
  179. if err != nil {
  180. t.Fatalf("Failed to create ClientConn: %v", err)
  181. }
  182. defer cc.Close()
  183. // Add servers[1] to the service discovery.
  184. u := &naming.Update{
  185. Op: naming.Add,
  186. Addr: "localhost:" + servers[1].port,
  187. }
  188. r.w.inject([]*naming.Update{u})
  189. req := "port"
  190. var reply string
  191. // Loop until servers[1] is up
  192. for {
  193. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
  194. break
  195. }
  196. time.Sleep(10 * time.Millisecond)
  197. }
  198. // Add server2[2] to the service discovery.
  199. u = &naming.Update{
  200. Op: naming.Add,
  201. Addr: "localhost:" + servers[2].port,
  202. }
  203. r.w.inject([]*naming.Update{u})
  204. // Loop until both servers[2] are up.
  205. for {
  206. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[2].port {
  207. break
  208. }
  209. time.Sleep(10 * time.Millisecond)
  210. }
  211. // Check the incoming RPCs served in a round-robin manner.
  212. for i := 0; i < 10; i++ {
  213. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[i%numServers].port {
  214. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", i, err, servers[i%numServers].port)
  215. }
  216. }
  217. }
  218. func (s) TestCloseWithPendingRPC(t *testing.T) {
  219. servers, r, cleanup := startServers(t, 1, math.MaxUint32)
  220. defer cleanup()
  221. cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
  222. if err != nil {
  223. t.Fatalf("Failed to create ClientConn: %v", err)
  224. }
  225. defer cc.Close()
  226. var reply string
  227. if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil {
  228. t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
  229. }
  230. // Remove the server.
  231. updates := []*naming.Update{{
  232. Op: naming.Delete,
  233. Addr: "localhost:" + servers[0].port,
  234. }}
  235. r.w.inject(updates)
  236. // Loop until the above update applies.
  237. for {
  238. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
  239. if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); status.Code(err) == codes.DeadlineExceeded {
  240. cancel()
  241. break
  242. }
  243. time.Sleep(10 * time.Millisecond)
  244. cancel()
  245. }
  246. // Issue 2 RPCs which should be completed with error status once cc is closed.
  247. var wg sync.WaitGroup
  248. wg.Add(2)
  249. go func() {
  250. defer wg.Done()
  251. var reply string
  252. if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err == nil {
  253. t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
  254. }
  255. }()
  256. go func() {
  257. defer wg.Done()
  258. var reply string
  259. time.Sleep(5 * time.Millisecond)
  260. if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err == nil {
  261. t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
  262. }
  263. }()
  264. time.Sleep(5 * time.Millisecond)
  265. cc.Close()
  266. wg.Wait()
  267. }
  268. func (s) TestGetOnWaitChannel(t *testing.T) {
  269. servers, r, cleanup := startServers(t, 1, math.MaxUint32)
  270. defer cleanup()
  271. cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
  272. if err != nil {
  273. t.Fatalf("Failed to create ClientConn: %v", err)
  274. }
  275. defer cc.Close()
  276. // Remove all servers so that all upcoming RPCs will block on waitCh.
  277. updates := []*naming.Update{{
  278. Op: naming.Delete,
  279. Addr: "localhost:" + servers[0].port,
  280. }}
  281. r.w.inject(updates)
  282. for {
  283. var reply string
  284. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
  285. if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); status.Code(err) == codes.DeadlineExceeded {
  286. cancel()
  287. break
  288. }
  289. cancel()
  290. time.Sleep(10 * time.Millisecond)
  291. }
  292. var wg sync.WaitGroup
  293. wg.Add(1)
  294. go func() {
  295. defer wg.Done()
  296. var reply string
  297. if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil {
  298. t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want <nil>", err)
  299. }
  300. }()
  301. // Add a connected server to get the above RPC through.
  302. updates = []*naming.Update{{
  303. Op: naming.Add,
  304. Addr: "localhost:" + servers[0].port,
  305. }}
  306. r.w.inject(updates)
  307. // Wait until the above RPC succeeds.
  308. wg.Wait()
  309. }
  310. func (s) TestOneServerDown(t *testing.T) {
  311. // Start 2 servers.
  312. numServers := 2
  313. servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
  314. defer cleanup()
  315. cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}), WithWaitForHandshake())
  316. if err != nil {
  317. t.Fatalf("Failed to create ClientConn: %v", err)
  318. }
  319. defer cc.Close()
  320. // Add servers[1] to the service discovery.
  321. var updates []*naming.Update
  322. updates = append(updates, &naming.Update{
  323. Op: naming.Add,
  324. Addr: "localhost:" + servers[1].port,
  325. })
  326. r.w.inject(updates)
  327. req := "port"
  328. var reply string
  329. // Loop until servers[1] is up
  330. for {
  331. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
  332. break
  333. }
  334. time.Sleep(10 * time.Millisecond)
  335. }
  336. var wg sync.WaitGroup
  337. numRPC := 100
  338. sleepDuration := 10 * time.Millisecond
  339. wg.Add(1)
  340. go func() {
  341. time.Sleep(sleepDuration)
  342. // After sleepDuration, kill server[0].
  343. servers[0].stop()
  344. wg.Done()
  345. }()
  346. // All non-failfast RPCs should not block because there's at least one connection available.
  347. for i := 0; i < numRPC; i++ {
  348. wg.Add(1)
  349. go func() {
  350. time.Sleep(sleepDuration)
  351. // After sleepDuration, invoke RPC.
  352. // server[0] is killed around the same time to make it racy between balancer and gRPC internals.
  353. cc.Invoke(context.Background(), "/foo/bar", &req, &reply, WaitForReady(true))
  354. wg.Done()
  355. }()
  356. }
  357. wg.Wait()
  358. }
  359. func (s) TestOneAddressRemoval(t *testing.T) {
  360. // Start 2 servers.
  361. numServers := 2
  362. servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
  363. defer cleanup()
  364. cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
  365. if err != nil {
  366. t.Fatalf("Failed to create ClientConn: %v", err)
  367. }
  368. defer cc.Close()
  369. // Add servers[1] to the service discovery.
  370. var updates []*naming.Update
  371. updates = append(updates, &naming.Update{
  372. Op: naming.Add,
  373. Addr: "localhost:" + servers[1].port,
  374. })
  375. r.w.inject(updates)
  376. req := "port"
  377. var reply string
  378. // Loop until servers[1] is up
  379. for {
  380. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
  381. break
  382. }
  383. time.Sleep(10 * time.Millisecond)
  384. }
  385. var wg sync.WaitGroup
  386. numRPC := 100
  387. sleepDuration := 10 * time.Millisecond
  388. wg.Add(1)
  389. go func() {
  390. time.Sleep(sleepDuration)
  391. // After sleepDuration, delete server[0].
  392. var updates []*naming.Update
  393. updates = append(updates, &naming.Update{
  394. Op: naming.Delete,
  395. Addr: "localhost:" + servers[0].port,
  396. })
  397. r.w.inject(updates)
  398. wg.Done()
  399. }()
  400. // All non-failfast RPCs should not fail because there's at least one connection available.
  401. for i := 0; i < numRPC; i++ {
  402. wg.Add(1)
  403. go func() {
  404. var reply string
  405. time.Sleep(sleepDuration)
  406. // After sleepDuration, invoke RPC.
  407. // server[0] is removed around the same time to make it racy between balancer and gRPC internals.
  408. if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil {
  409. t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want nil", err)
  410. }
  411. wg.Done()
  412. }()
  413. }
  414. wg.Wait()
  415. }
  416. func checkServerUp(t *testing.T, currentServer *server) {
  417. req := "port"
  418. port := currentServer.port
  419. cc, err := Dial("passthrough:///localhost:"+port, WithBlock(), WithInsecure(), WithCodec(testCodec{}))
  420. if err != nil {
  421. t.Fatalf("Failed to create ClientConn: %v", err)
  422. }
  423. defer cc.Close()
  424. var reply string
  425. for {
  426. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == port {
  427. break
  428. }
  429. time.Sleep(10 * time.Millisecond)
  430. }
  431. }
  432. func (s) TestPickFirstEmptyAddrs(t *testing.T) {
  433. servers, r, cleanup := startServers(t, 1, math.MaxUint32)
  434. defer cleanup()
  435. cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
  436. if err != nil {
  437. t.Fatalf("Failed to create ClientConn: %v", err)
  438. }
  439. defer cc.Close()
  440. var reply string
  441. if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse {
  442. t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, reply = %q, want %q, <nil>", err, reply, expectedResponse)
  443. }
  444. // Inject name resolution change to remove the server so that there is no address
  445. // available after that.
  446. u := &naming.Update{
  447. Op: naming.Delete,
  448. Addr: "localhost:" + servers[0].port,
  449. }
  450. r.w.inject([]*naming.Update{u})
  451. // Loop until the above updates apply.
  452. for {
  453. time.Sleep(10 * time.Millisecond)
  454. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
  455. if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply); err != nil {
  456. cancel()
  457. break
  458. }
  459. cancel()
  460. }
  461. }
  462. func (s) TestPickFirstCloseWithPendingRPC(t *testing.T) {
  463. servers, r, cleanup := startServers(t, 1, math.MaxUint32)
  464. defer cleanup()
  465. cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
  466. if err != nil {
  467. t.Fatalf("Failed to create ClientConn: %v", err)
  468. }
  469. defer cc.Close()
  470. var reply string
  471. if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil {
  472. t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
  473. }
  474. // Remove the server.
  475. updates := []*naming.Update{{
  476. Op: naming.Delete,
  477. Addr: "localhost:" + servers[0].port,
  478. }}
  479. r.w.inject(updates)
  480. // Loop until the above update applies.
  481. for {
  482. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
  483. if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); status.Code(err) == codes.DeadlineExceeded {
  484. cancel()
  485. break
  486. }
  487. time.Sleep(10 * time.Millisecond)
  488. cancel()
  489. }
  490. // Issue 2 RPCs which should be completed with error status once cc is closed.
  491. var wg sync.WaitGroup
  492. wg.Add(2)
  493. go func() {
  494. defer wg.Done()
  495. var reply string
  496. if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err == nil {
  497. t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
  498. }
  499. }()
  500. go func() {
  501. defer wg.Done()
  502. var reply string
  503. time.Sleep(5 * time.Millisecond)
  504. if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err == nil {
  505. t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
  506. }
  507. }()
  508. time.Sleep(5 * time.Millisecond)
  509. cc.Close()
  510. wg.Wait()
  511. }
  512. func (s) TestPickFirstOrderAllServerUp(t *testing.T) {
  513. // Start 3 servers on 3 ports.
  514. numServers := 3
  515. servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
  516. defer cleanup()
  517. cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
  518. if err != nil {
  519. t.Fatalf("Failed to create ClientConn: %v", err)
  520. }
  521. defer cc.Close()
  522. // Add servers[1] and [2] to the service discovery.
  523. u := &naming.Update{
  524. Op: naming.Add,
  525. Addr: "localhost:" + servers[1].port,
  526. }
  527. r.w.inject([]*naming.Update{u})
  528. u = &naming.Update{
  529. Op: naming.Add,
  530. Addr: "localhost:" + servers[2].port,
  531. }
  532. r.w.inject([]*naming.Update{u})
  533. // Loop until all 3 servers are up
  534. checkServerUp(t, servers[0])
  535. checkServerUp(t, servers[1])
  536. checkServerUp(t, servers[2])
  537. // Check the incoming RPCs served in server[0]
  538. req := "port"
  539. var reply string
  540. for i := 0; i < 20; i++ {
  541. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
  542. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
  543. }
  544. time.Sleep(10 * time.Millisecond)
  545. }
  546. // Delete server[0] in the balancer, the incoming RPCs served in server[1]
  547. // For test addrconn, close server[0] instead
  548. u = &naming.Update{
  549. Op: naming.Delete,
  550. Addr: "localhost:" + servers[0].port,
  551. }
  552. r.w.inject([]*naming.Update{u})
  553. // Loop until it changes to server[1]
  554. for {
  555. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
  556. break
  557. }
  558. time.Sleep(10 * time.Millisecond)
  559. }
  560. for i := 0; i < 20; i++ {
  561. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
  562. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
  563. }
  564. time.Sleep(10 * time.Millisecond)
  565. }
  566. // Add server[0] back to the balancer, the incoming RPCs served in server[1]
  567. // Add is append operation, the order of Notify now is {server[1].port server[2].port server[0].port}
  568. u = &naming.Update{
  569. Op: naming.Add,
  570. Addr: "localhost:" + servers[0].port,
  571. }
  572. r.w.inject([]*naming.Update{u})
  573. for i := 0; i < 20; i++ {
  574. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
  575. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
  576. }
  577. time.Sleep(10 * time.Millisecond)
  578. }
  579. // Delete server[1] in the balancer, the incoming RPCs served in server[2]
  580. u = &naming.Update{
  581. Op: naming.Delete,
  582. Addr: "localhost:" + servers[1].port,
  583. }
  584. r.w.inject([]*naming.Update{u})
  585. for {
  586. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[2].port {
  587. break
  588. }
  589. time.Sleep(1 * time.Second)
  590. }
  591. for i := 0; i < 20; i++ {
  592. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[2].port {
  593. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 2, err, servers[2].port)
  594. }
  595. time.Sleep(10 * time.Millisecond)
  596. }
  597. // Delete server[2] in the balancer, the incoming RPCs served in server[0]
  598. u = &naming.Update{
  599. Op: naming.Delete,
  600. Addr: "localhost:" + servers[2].port,
  601. }
  602. r.w.inject([]*naming.Update{u})
  603. for {
  604. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
  605. break
  606. }
  607. time.Sleep(1 * time.Second)
  608. }
  609. for i := 0; i < 20; i++ {
  610. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
  611. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
  612. }
  613. time.Sleep(10 * time.Millisecond)
  614. }
  615. }
  616. func (s) TestPickFirstOrderOneServerDown(t *testing.T) {
  617. // Start 3 servers on 3 ports.
  618. numServers := 3
  619. servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
  620. defer cleanup()
  621. cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}), WithWaitForHandshake())
  622. if err != nil {
  623. t.Fatalf("Failed to create ClientConn: %v", err)
  624. }
  625. defer cc.Close()
  626. // Add servers[1] and [2] to the service discovery.
  627. u := &naming.Update{
  628. Op: naming.Add,
  629. Addr: "localhost:" + servers[1].port,
  630. }
  631. r.w.inject([]*naming.Update{u})
  632. u = &naming.Update{
  633. Op: naming.Add,
  634. Addr: "localhost:" + servers[2].port,
  635. }
  636. r.w.inject([]*naming.Update{u})
  637. // Loop until all 3 servers are up
  638. checkServerUp(t, servers[0])
  639. checkServerUp(t, servers[1])
  640. checkServerUp(t, servers[2])
  641. // Check the incoming RPCs served in server[0]
  642. req := "port"
  643. var reply string
  644. for i := 0; i < 20; i++ {
  645. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
  646. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
  647. }
  648. time.Sleep(10 * time.Millisecond)
  649. }
  650. // server[0] down, incoming RPCs served in server[1], but the order of Notify still remains
  651. // {server[0] server[1] server[2]}
  652. servers[0].stop()
  653. // Loop until it changes to server[1]
  654. for {
  655. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
  656. break
  657. }
  658. time.Sleep(10 * time.Millisecond)
  659. }
  660. for i := 0; i < 20; i++ {
  661. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
  662. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
  663. }
  664. time.Sleep(10 * time.Millisecond)
  665. }
  666. // up the server[0] back, the incoming RPCs served in server[1]
  667. p, _ := strconv.Atoi(servers[0].port)
  668. servers[0] = newTestServer()
  669. go servers[0].start(t, p, math.MaxUint32)
  670. defer servers[0].stop()
  671. servers[0].wait(t, 2*time.Second)
  672. checkServerUp(t, servers[0])
  673. for i := 0; i < 20; i++ {
  674. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
  675. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
  676. }
  677. time.Sleep(10 * time.Millisecond)
  678. }
  679. // Delete server[1] in the balancer, the incoming RPCs served in server[0]
  680. u = &naming.Update{
  681. Op: naming.Delete,
  682. Addr: "localhost:" + servers[1].port,
  683. }
  684. r.w.inject([]*naming.Update{u})
  685. for {
  686. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
  687. break
  688. }
  689. time.Sleep(1 * time.Second)
  690. }
  691. for i := 0; i < 20; i++ {
  692. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
  693. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
  694. }
  695. time.Sleep(10 * time.Millisecond)
  696. }
  697. }
  698. func (s) TestPickFirstOneAddressRemoval(t *testing.T) {
  699. // Start 2 servers.
  700. numServers := 2
  701. servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
  702. defer cleanup()
  703. cc, err := Dial("passthrough:///localhost:"+servers[0].port, WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
  704. if err != nil {
  705. t.Fatalf("Failed to create ClientConn: %v", err)
  706. }
  707. defer cc.Close()
  708. // Add servers[1] to the service discovery.
  709. var updates []*naming.Update
  710. updates = append(updates, &naming.Update{
  711. Op: naming.Add,
  712. Addr: "localhost:" + servers[1].port,
  713. })
  714. r.w.inject(updates)
  715. // Create a new cc to Loop until servers[1] is up
  716. checkServerUp(t, servers[0])
  717. checkServerUp(t, servers[1])
  718. var wg sync.WaitGroup
  719. numRPC := 100
  720. sleepDuration := 10 * time.Millisecond
  721. wg.Add(1)
  722. go func() {
  723. time.Sleep(sleepDuration)
  724. // After sleepDuration, delete server[0].
  725. var updates []*naming.Update
  726. updates = append(updates, &naming.Update{
  727. Op: naming.Delete,
  728. Addr: "localhost:" + servers[0].port,
  729. })
  730. r.w.inject(updates)
  731. wg.Done()
  732. }()
  733. // All non-failfast RPCs should not fail because there's at least one connection available.
  734. for i := 0; i < numRPC; i++ {
  735. wg.Add(1)
  736. go func() {
  737. var reply string
  738. time.Sleep(sleepDuration)
  739. // After sleepDuration, invoke RPC.
  740. // server[0] is removed around the same time to make it racy between balancer and gRPC internals.
  741. if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil {
  742. t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want nil", err)
  743. }
  744. wg.Done()
  745. }()
  746. }
  747. wg.Wait()
  748. }