roundrobin_test.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475
  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package roundrobin_test
  19. import (
  20. "context"
  21. "fmt"
  22. "net"
  23. "sync"
  24. "testing"
  25. "time"
  26. "google.golang.org/grpc"
  27. "google.golang.org/grpc/balancer/roundrobin"
  28. "google.golang.org/grpc/codes"
  29. _ "google.golang.org/grpc/grpclog/glogger"
  30. "google.golang.org/grpc/internal/leakcheck"
  31. "google.golang.org/grpc/peer"
  32. "google.golang.org/grpc/resolver"
  33. "google.golang.org/grpc/resolver/manual"
  34. "google.golang.org/grpc/status"
  35. testpb "google.golang.org/grpc/test/grpc_testing"
  36. )
  37. type testServer struct {
  38. testpb.TestServiceServer
  39. }
  40. func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
  41. return &testpb.Empty{}, nil
  42. }
  43. func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
  44. return nil
  45. }
  46. type test struct {
  47. servers []*grpc.Server
  48. addresses []string
  49. }
  50. func (t *test) cleanup() {
  51. for _, s := range t.servers {
  52. s.Stop()
  53. }
  54. }
  55. func startTestServers(count int) (_ *test, err error) {
  56. t := &test{}
  57. defer func() {
  58. if err != nil {
  59. t.cleanup()
  60. }
  61. }()
  62. for i := 0; i < count; i++ {
  63. lis, err := net.Listen("tcp", "localhost:0")
  64. if err != nil {
  65. return nil, fmt.Errorf("failed to listen %v", err)
  66. }
  67. s := grpc.NewServer()
  68. testpb.RegisterTestServiceServer(s, &testServer{})
  69. t.servers = append(t.servers, s)
  70. t.addresses = append(t.addresses, lis.Addr().String())
  71. go func(s *grpc.Server, l net.Listener) {
  72. s.Serve(l)
  73. }(s, lis)
  74. }
  75. return t, nil
  76. }
  77. func TestOneBackend(t *testing.T) {
  78. defer leakcheck.Check(t)
  79. r, cleanup := manual.GenerateAndRegisterManualResolver()
  80. defer cleanup()
  81. test, err := startTestServers(1)
  82. if err != nil {
  83. t.Fatalf("failed to start servers: %v", err)
  84. }
  85. defer test.cleanup()
  86. cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
  87. if err != nil {
  88. t.Fatalf("failed to dial: %v", err)
  89. }
  90. defer cc.Close()
  91. testc := testpb.NewTestServiceClient(cc)
  92. // The first RPC should fail because there's no address.
  93. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
  94. defer cancel()
  95. if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
  96. t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
  97. }
  98. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: test.addresses[0]}}})
  99. // The second RPC should succeed.
  100. if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  101. t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
  102. }
  103. }
  104. func TestBackendsRoundRobin(t *testing.T) {
  105. defer leakcheck.Check(t)
  106. r, cleanup := manual.GenerateAndRegisterManualResolver()
  107. defer cleanup()
  108. backendCount := 5
  109. test, err := startTestServers(backendCount)
  110. if err != nil {
  111. t.Fatalf("failed to start servers: %v", err)
  112. }
  113. defer test.cleanup()
  114. cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
  115. if err != nil {
  116. t.Fatalf("failed to dial: %v", err)
  117. }
  118. defer cc.Close()
  119. testc := testpb.NewTestServiceClient(cc)
  120. // The first RPC should fail because there's no address.
  121. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
  122. defer cancel()
  123. if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
  124. t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
  125. }
  126. var resolvedAddrs []resolver.Address
  127. for i := 0; i < backendCount; i++ {
  128. resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]})
  129. }
  130. r.UpdateState(resolver.State{Addresses: resolvedAddrs})
  131. var p peer.Peer
  132. // Make sure connections to all servers are up.
  133. for si := 0; si < backendCount; si++ {
  134. var connected bool
  135. for i := 0; i < 1000; i++ {
  136. if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
  137. t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
  138. }
  139. if p.Addr.String() == test.addresses[si] {
  140. connected = true
  141. break
  142. }
  143. time.Sleep(time.Millisecond)
  144. }
  145. if !connected {
  146. t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si])
  147. }
  148. }
  149. for i := 0; i < 3*backendCount; i++ {
  150. if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
  151. t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
  152. }
  153. if p.Addr.String() != test.addresses[i%backendCount] {
  154. t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String())
  155. }
  156. }
  157. }
  158. func TestAddressesRemoved(t *testing.T) {
  159. defer leakcheck.Check(t)
  160. r, cleanup := manual.GenerateAndRegisterManualResolver()
  161. defer cleanup()
  162. test, err := startTestServers(1)
  163. if err != nil {
  164. t.Fatalf("failed to start servers: %v", err)
  165. }
  166. defer test.cleanup()
  167. cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
  168. if err != nil {
  169. t.Fatalf("failed to dial: %v", err)
  170. }
  171. defer cc.Close()
  172. testc := testpb.NewTestServiceClient(cc)
  173. // The first RPC should fail because there's no address.
  174. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
  175. defer cancel()
  176. if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
  177. t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
  178. }
  179. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: test.addresses[0]}}})
  180. // The second RPC should succeed.
  181. if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  182. t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
  183. }
  184. r.UpdateState(resolver.State{Addresses: []resolver.Address{}})
  185. for i := 0; i < 1000; i++ {
  186. ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
  187. defer cancel()
  188. if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) == codes.DeadlineExceeded {
  189. return
  190. }
  191. time.Sleep(time.Millisecond)
  192. }
  193. t.Fatalf("No RPC failed after removing all addresses, want RPC to fail with DeadlineExceeded")
  194. }
  195. func TestCloseWithPendingRPC(t *testing.T) {
  196. defer leakcheck.Check(t)
  197. r, cleanup := manual.GenerateAndRegisterManualResolver()
  198. defer cleanup()
  199. test, err := startTestServers(1)
  200. if err != nil {
  201. t.Fatalf("failed to start servers: %v", err)
  202. }
  203. defer test.cleanup()
  204. cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
  205. if err != nil {
  206. t.Fatalf("failed to dial: %v", err)
  207. }
  208. testc := testpb.NewTestServiceClient(cc)
  209. var wg sync.WaitGroup
  210. for i := 0; i < 3; i++ {
  211. wg.Add(1)
  212. go func() {
  213. defer wg.Done()
  214. // This RPC blocks until cc is closed.
  215. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  216. if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) == codes.DeadlineExceeded {
  217. t.Errorf("RPC failed because of deadline after cc is closed; want error the client connection is closing")
  218. }
  219. cancel()
  220. }()
  221. }
  222. cc.Close()
  223. wg.Wait()
  224. }
  225. func TestNewAddressWhileBlocking(t *testing.T) {
  226. defer leakcheck.Check(t)
  227. r, cleanup := manual.GenerateAndRegisterManualResolver()
  228. defer cleanup()
  229. test, err := startTestServers(1)
  230. if err != nil {
  231. t.Fatalf("failed to start servers: %v", err)
  232. }
  233. defer test.cleanup()
  234. cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
  235. if err != nil {
  236. t.Fatalf("failed to dial: %v", err)
  237. }
  238. defer cc.Close()
  239. testc := testpb.NewTestServiceClient(cc)
  240. // The first RPC should fail because there's no address.
  241. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
  242. defer cancel()
  243. if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
  244. t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
  245. }
  246. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: test.addresses[0]}}})
  247. // The second RPC should succeed.
  248. ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second)
  249. defer cancel()
  250. if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
  251. t.Fatalf("EmptyCall() = _, %v, want _, nil", err)
  252. }
  253. r.UpdateState(resolver.State{Addresses: []resolver.Address{}})
  254. var wg sync.WaitGroup
  255. for i := 0; i < 3; i++ {
  256. wg.Add(1)
  257. go func() {
  258. defer wg.Done()
  259. // This RPC blocks until NewAddress is called.
  260. testc.EmptyCall(context.Background(), &testpb.Empty{})
  261. }()
  262. }
  263. time.Sleep(50 * time.Millisecond)
  264. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: test.addresses[0]}}})
  265. wg.Wait()
  266. }
  267. func TestOneServerDown(t *testing.T) {
  268. defer leakcheck.Check(t)
  269. r, cleanup := manual.GenerateAndRegisterManualResolver()
  270. defer cleanup()
  271. backendCount := 3
  272. test, err := startTestServers(backendCount)
  273. if err != nil {
  274. t.Fatalf("failed to start servers: %v", err)
  275. }
  276. defer test.cleanup()
  277. cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name), grpc.WithWaitForHandshake())
  278. if err != nil {
  279. t.Fatalf("failed to dial: %v", err)
  280. }
  281. defer cc.Close()
  282. testc := testpb.NewTestServiceClient(cc)
  283. // The first RPC should fail because there's no address.
  284. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
  285. defer cancel()
  286. if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
  287. t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
  288. }
  289. var resolvedAddrs []resolver.Address
  290. for i := 0; i < backendCount; i++ {
  291. resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]})
  292. }
  293. r.UpdateState(resolver.State{Addresses: resolvedAddrs})
  294. var p peer.Peer
  295. // Make sure connections to all servers are up.
  296. for si := 0; si < backendCount; si++ {
  297. var connected bool
  298. for i := 0; i < 1000; i++ {
  299. if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
  300. t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
  301. }
  302. if p.Addr.String() == test.addresses[si] {
  303. connected = true
  304. break
  305. }
  306. time.Sleep(time.Millisecond)
  307. }
  308. if !connected {
  309. t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si])
  310. }
  311. }
  312. for i := 0; i < 3*backendCount; i++ {
  313. if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
  314. t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
  315. }
  316. if p.Addr.String() != test.addresses[i%backendCount] {
  317. t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String())
  318. }
  319. }
  320. // Stop one server, RPCs should roundrobin among the remaining servers.
  321. backendCount--
  322. test.servers[backendCount].Stop()
  323. // Loop until see server[backendCount-1] twice without seeing server[backendCount].
  324. var targetSeen int
  325. for i := 0; i < 1000; i++ {
  326. if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
  327. targetSeen = 0
  328. t.Logf("EmptyCall() = _, %v, want _, <nil>", err)
  329. // Due to a race, this RPC could possibly get the connection that
  330. // was closing, and this RPC may fail. Keep trying when this
  331. // happens.
  332. continue
  333. }
  334. switch p.Addr.String() {
  335. case test.addresses[backendCount-1]:
  336. targetSeen++
  337. case test.addresses[backendCount]:
  338. // Reset targetSeen if peer is server[backendCount].
  339. targetSeen = 0
  340. }
  341. // Break to make sure the last picked address is server[-1], so the following for loop won't be flaky.
  342. if targetSeen >= 2 {
  343. break
  344. }
  345. }
  346. if targetSeen != 2 {
  347. t.Fatal("Failed to see server[backendCount-1] twice without seeing server[backendCount]")
  348. }
  349. for i := 0; i < 3*backendCount; i++ {
  350. if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
  351. t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
  352. }
  353. if p.Addr.String() != test.addresses[i%backendCount] {
  354. t.Errorf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String())
  355. }
  356. }
  357. }
  358. func TestAllServersDown(t *testing.T) {
  359. defer leakcheck.Check(t)
  360. r, cleanup := manual.GenerateAndRegisterManualResolver()
  361. defer cleanup()
  362. backendCount := 3
  363. test, err := startTestServers(backendCount)
  364. if err != nil {
  365. t.Fatalf("failed to start servers: %v", err)
  366. }
  367. defer test.cleanup()
  368. cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name), grpc.WithWaitForHandshake())
  369. if err != nil {
  370. t.Fatalf("failed to dial: %v", err)
  371. }
  372. defer cc.Close()
  373. testc := testpb.NewTestServiceClient(cc)
  374. // The first RPC should fail because there's no address.
  375. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
  376. defer cancel()
  377. if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
  378. t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
  379. }
  380. var resolvedAddrs []resolver.Address
  381. for i := 0; i < backendCount; i++ {
  382. resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]})
  383. }
  384. r.UpdateState(resolver.State{Addresses: resolvedAddrs})
  385. var p peer.Peer
  386. // Make sure connections to all servers are up.
  387. for si := 0; si < backendCount; si++ {
  388. var connected bool
  389. for i := 0; i < 1000; i++ {
  390. if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
  391. t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
  392. }
  393. if p.Addr.String() == test.addresses[si] {
  394. connected = true
  395. break
  396. }
  397. time.Sleep(time.Millisecond)
  398. }
  399. if !connected {
  400. t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si])
  401. }
  402. }
  403. for i := 0; i < 3*backendCount; i++ {
  404. if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
  405. t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
  406. }
  407. if p.Addr.String() != test.addresses[i%backendCount] {
  408. t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String())
  409. }
  410. }
  411. // All servers are stopped, failfast RPC should fail with unavailable.
  412. for i := 0; i < backendCount; i++ {
  413. test.servers[i].Stop()
  414. }
  415. time.Sleep(100 * time.Millisecond)
  416. for i := 0; i < 1000; i++ {
  417. if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) == codes.Unavailable {
  418. return
  419. }
  420. time.Sleep(time.Millisecond)
  421. }
  422. t.Fatalf("Failfast RPCs didn't fail with Unavailable after all servers are stopped")
  423. }