pickfirst_test.go 12 KB


  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "context"
  21. "math"
  22. "sync"
  23. "testing"
  24. "time"
  25. "google.golang.org/grpc/codes"
  26. "google.golang.org/grpc/resolver"
  27. "google.golang.org/grpc/resolver/manual"
  28. "google.golang.org/grpc/status"
  29. )
  30. func errorDesc(err error) string {
  31. if s, ok := status.FromError(err); ok {
  32. return s.Message()
  33. }
  34. return err.Error()
  35. }
  36. func (s) TestOneBackendPickfirst(t *testing.T) {
  37. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  38. defer rcleanup()
  39. numServers := 1
  40. servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
  41. defer scleanup()
  42. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
  43. if err != nil {
  44. t.Fatalf("failed to dial: %v", err)
  45. }
  46. defer cc.Close()
  47. // The first RPC should fail because there's no address.
  48. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
  49. defer cancel()
  50. req := "port"
  51. var reply string
  52. if err := cc.Invoke(ctx, "/foo/bar", &req, &reply); err == nil || status.Code(err) != codes.DeadlineExceeded {
  53. t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
  54. }
  55. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[0].addr}}})
  56. // The second RPC should succeed.
  57. for i := 0; i < 1000; i++ {
  58. if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
  59. return
  60. }
  61. time.Sleep(time.Millisecond)
  62. }
  63. t.Fatalf("EmptyCall() = _, %v, want _, %v", err, servers[0].port)
  64. }
  65. func (s) TestBackendsPickfirst(t *testing.T) {
  66. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  67. defer rcleanup()
  68. numServers := 2
  69. servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
  70. defer scleanup()
  71. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
  72. if err != nil {
  73. t.Fatalf("failed to dial: %v", err)
  74. }
  75. defer cc.Close()
  76. // The first RPC should fail because there's no address.
  77. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
  78. defer cancel()
  79. req := "port"
  80. var reply string
  81. if err := cc.Invoke(ctx, "/foo/bar", &req, &reply); err == nil || status.Code(err) != codes.DeadlineExceeded {
  82. t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
  83. }
  84. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}}})
  85. // The second RPC should succeed with the first server.
  86. for i := 0; i < 1000; i++ {
  87. if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
  88. return
  89. }
  90. time.Sleep(time.Millisecond)
  91. }
  92. t.Fatalf("EmptyCall() = _, %v, want _, %v", err, servers[0].port)
  93. }
  94. func (s) TestNewAddressWhileBlockingPickfirst(t *testing.T) {
  95. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  96. defer rcleanup()
  97. numServers := 1
  98. servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
  99. defer scleanup()
  100. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
  101. if err != nil {
  102. t.Fatalf("failed to dial: %v", err)
  103. }
  104. defer cc.Close()
  105. // The first RPC should fail because there's no address.
  106. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
  107. defer cancel()
  108. req := "port"
  109. var reply string
  110. if err := cc.Invoke(ctx, "/foo/bar", &req, &reply); err == nil || status.Code(err) != codes.DeadlineExceeded {
  111. t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
  112. }
  113. var wg sync.WaitGroup
  114. for i := 0; i < 3; i++ {
  115. wg.Add(1)
  116. go func() {
  117. defer wg.Done()
  118. // This RPC blocks until NewAddress is called.
  119. cc.Invoke(context.Background(), "/foo/bar", &req, &reply)
  120. }()
  121. }
  122. time.Sleep(50 * time.Millisecond)
  123. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[0].addr}}})
  124. wg.Wait()
  125. }
  126. func (s) TestCloseWithPendingRPCPickfirst(t *testing.T) {
  127. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  128. defer rcleanup()
  129. numServers := 1
  130. _, _, scleanup := startServers(t, numServers, math.MaxInt32)
  131. defer scleanup()
  132. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
  133. if err != nil {
  134. t.Fatalf("failed to dial: %v", err)
  135. }
  136. defer cc.Close()
  137. // The first RPC should fail because there's no address.
  138. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
  139. defer cancel()
  140. req := "port"
  141. var reply string
  142. if err := cc.Invoke(ctx, "/foo/bar", &req, &reply); err == nil || status.Code(err) != codes.DeadlineExceeded {
  143. t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
  144. }
  145. var wg sync.WaitGroup
  146. for i := 0; i < 3; i++ {
  147. wg.Add(1)
  148. go func() {
  149. defer wg.Done()
  150. // This RPC blocks until NewAddress is called.
  151. cc.Invoke(context.Background(), "/foo/bar", &req, &reply)
  152. }()
  153. }
  154. time.Sleep(50 * time.Millisecond)
  155. cc.Close()
  156. wg.Wait()
  157. }
  158. func (s) TestOneServerDownPickfirst(t *testing.T) {
  159. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  160. defer rcleanup()
  161. numServers := 2
  162. servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
  163. defer scleanup()
  164. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
  165. if err != nil {
  166. t.Fatalf("failed to dial: %v", err)
  167. }
  168. defer cc.Close()
  169. // The first RPC should fail because there's no address.
  170. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
  171. defer cancel()
  172. req := "port"
  173. var reply string
  174. if err := cc.Invoke(ctx, "/foo/bar", &req, &reply); err == nil || status.Code(err) != codes.DeadlineExceeded {
  175. t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
  176. }
  177. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}}})
  178. // The second RPC should succeed with the first server.
  179. for i := 0; i < 1000; i++ {
  180. if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
  181. break
  182. }
  183. time.Sleep(time.Millisecond)
  184. }
  185. servers[0].stop()
  186. for i := 0; i < 1000; i++ {
  187. if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
  188. return
  189. }
  190. time.Sleep(time.Millisecond)
  191. }
  192. t.Fatalf("EmptyCall() = _, %v, want _, %v", err, servers[0].port)
  193. }
  194. func (s) TestAllServersDownPickfirst(t *testing.T) {
  195. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  196. defer rcleanup()
  197. numServers := 2
  198. servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
  199. defer scleanup()
  200. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
  201. if err != nil {
  202. t.Fatalf("failed to dial: %v", err)
  203. }
  204. defer cc.Close()
  205. // The first RPC should fail because there's no address.
  206. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
  207. defer cancel()
  208. req := "port"
  209. var reply string
  210. if err := cc.Invoke(ctx, "/foo/bar", &req, &reply); err == nil || status.Code(err) != codes.DeadlineExceeded {
  211. t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
  212. }
  213. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}}})
  214. // The second RPC should succeed with the first server.
  215. for i := 0; i < 1000; i++ {
  216. if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
  217. break
  218. }
  219. time.Sleep(time.Millisecond)
  220. }
  221. for i := 0; i < numServers; i++ {
  222. servers[i].stop()
  223. }
  224. for i := 0; i < 1000; i++ {
  225. if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); status.Code(err) == codes.Unavailable {
  226. return
  227. }
  228. time.Sleep(time.Millisecond)
  229. }
  230. t.Fatalf("EmptyCall() = _, %v, want _, error with code unavailable", err)
  231. }
  232. func (s) TestAddressesRemovedPickfirst(t *testing.T) {
  233. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  234. defer rcleanup()
  235. numServers := 3
  236. servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
  237. defer scleanup()
  238. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
  239. if err != nil {
  240. t.Fatalf("failed to dial: %v", err)
  241. }
  242. defer cc.Close()
  243. // The first RPC should fail because there's no address.
  244. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
  245. defer cancel()
  246. req := "port"
  247. var reply string
  248. if err := cc.Invoke(ctx, "/foo/bar", &req, &reply); err == nil || status.Code(err) != codes.DeadlineExceeded {
  249. t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
  250. }
  251. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}, {Addr: servers[2].addr}}})
  252. for i := 0; i < 1000; i++ {
  253. if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
  254. break
  255. }
  256. time.Sleep(time.Millisecond)
  257. }
  258. for i := 0; i < 20; i++ {
  259. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
  260. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
  261. }
  262. time.Sleep(10 * time.Millisecond)
  263. }
  264. // Remove server[0].
  265. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[1].addr}, {Addr: servers[2].addr}}})
  266. for i := 0; i < 1000; i++ {
  267. if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
  268. break
  269. }
  270. time.Sleep(time.Millisecond)
  271. }
  272. for i := 0; i < 20; i++ {
  273. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
  274. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
  275. }
  276. time.Sleep(10 * time.Millisecond)
  277. }
  278. // Append server[0], nothing should change.
  279. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[1].addr}, {Addr: servers[2].addr}, {Addr: servers[0].addr}}})
  280. for i := 0; i < 20; i++ {
  281. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
  282. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
  283. }
  284. time.Sleep(10 * time.Millisecond)
  285. }
  286. // Remove server[1].
  287. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[2].addr}, {Addr: servers[0].addr}}})
  288. for i := 0; i < 1000; i++ {
  289. if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[2].port {
  290. break
  291. }
  292. time.Sleep(time.Millisecond)
  293. }
  294. for i := 0; i < 20; i++ {
  295. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[2].port {
  296. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 2, err, servers[2].port)
  297. }
  298. time.Sleep(10 * time.Millisecond)
  299. }
  300. // Remove server[2].
  301. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[0].addr}}})
  302. for i := 0; i < 1000; i++ {
  303. if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
  304. break
  305. }
  306. time.Sleep(time.Millisecond)
  307. }
  308. for i := 0; i < 20; i++ {
  309. if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
  310. t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
  311. }
  312. time.Sleep(10 * time.Millisecond)
  313. }
  314. }