clientconn_state_transition_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513
  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "context"
  21. "net"
  22. "sync"
  23. "testing"
  24. "time"
  25. "golang.org/x/net/http2"
  26. "google.golang.org/grpc/balancer"
  27. "google.golang.org/grpc/connectivity"
  28. "google.golang.org/grpc/internal/testutils"
  29. "google.golang.org/grpc/resolver"
  30. "google.golang.org/grpc/resolver/manual"
  31. )
  32. const stateRecordingBalancerName = "state_recoding_balancer"
  33. var testBalancerBuilder = newStateRecordingBalancerBuilder()
  34. func init() {
  35. balancer.Register(testBalancerBuilder)
  36. }
  37. // These tests use a pipeListener. This listener is similar to net.Listener
  38. // except that it is unbuffered, so each read and write will wait for the other
  39. // side's corresponding write or read.
  40. func (s) TestStateTransitions_SingleAddress(t *testing.T) {
  41. for _, test := range []struct {
  42. desc string
  43. want []connectivity.State
  44. server func(net.Listener) net.Conn
  45. }{
  46. {
  47. desc: "When the server returns server preface, the client enters READY.",
  48. want: []connectivity.State{
  49. connectivity.Connecting,
  50. connectivity.Ready,
  51. },
  52. server: func(lis net.Listener) net.Conn {
  53. conn, err := lis.Accept()
  54. if err != nil {
  55. t.Error(err)
  56. return nil
  57. }
  58. go keepReading(conn)
  59. framer := http2.NewFramer(conn, conn)
  60. if err := framer.WriteSettings(http2.Setting{}); err != nil {
  61. t.Errorf("Error while writing settings frame. %v", err)
  62. return nil
  63. }
  64. return conn
  65. },
  66. },
  67. {
  68. desc: "When the connection is closed, the client enters TRANSIENT FAILURE.",
  69. want: []connectivity.State{
  70. connectivity.Connecting,
  71. connectivity.TransientFailure,
  72. },
  73. server: func(lis net.Listener) net.Conn {
  74. conn, err := lis.Accept()
  75. if err != nil {
  76. t.Error(err)
  77. return nil
  78. }
  79. conn.Close()
  80. return nil
  81. },
  82. },
  83. {
  84. desc: `When the server sends its connection preface, but the connection dies before the client can write its
  85. connection preface, the client enters TRANSIENT FAILURE.`,
  86. want: []connectivity.State{
  87. connectivity.Connecting,
  88. connectivity.TransientFailure,
  89. },
  90. server: func(lis net.Listener) net.Conn {
  91. conn, err := lis.Accept()
  92. if err != nil {
  93. t.Error(err)
  94. return nil
  95. }
  96. framer := http2.NewFramer(conn, conn)
  97. if err := framer.WriteSettings(http2.Setting{}); err != nil {
  98. t.Errorf("Error while writing settings frame. %v", err)
  99. return nil
  100. }
  101. conn.Close()
  102. return nil
  103. },
  104. },
  105. {
  106. desc: `When the server reads the client connection preface but does not send its connection preface, the
  107. client enters TRANSIENT FAILURE.`,
  108. want: []connectivity.State{
  109. connectivity.Connecting,
  110. connectivity.TransientFailure,
  111. },
  112. server: func(lis net.Listener) net.Conn {
  113. conn, err := lis.Accept()
  114. if err != nil {
  115. t.Error(err)
  116. return nil
  117. }
  118. go keepReading(conn)
  119. return conn
  120. },
  121. },
  122. } {
  123. t.Log(test.desc)
  124. testStateTransitionSingleAddress(t, test.want, test.server)
  125. }
  126. }
  127. func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, server func(net.Listener) net.Conn) {
  128. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
  129. defer cancel()
  130. pl := testutils.NewPipeListener()
  131. defer pl.Close()
  132. // Launch the server.
  133. var conn net.Conn
  134. var connMu sync.Mutex
  135. go func() {
  136. connMu.Lock()
  137. conn = server(pl)
  138. connMu.Unlock()
  139. }()
  140. client, err := DialContext(ctx,
  141. "",
  142. WithWaitForHandshake(),
  143. WithInsecure(),
  144. WithBalancerName(stateRecordingBalancerName),
  145. WithDialer(pl.Dialer()),
  146. withBackoff(noBackoff{}),
  147. withMinConnectDeadline(func() time.Duration { return time.Millisecond * 100 }))
  148. if err != nil {
  149. t.Fatal(err)
  150. }
  151. defer client.Close()
  152. stateNotifications := testBalancerBuilder.nextStateNotifier()
  153. timeout := time.After(5 * time.Second)
  154. for i := 0; i < len(want); i++ {
  155. select {
  156. case <-timeout:
  157. t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
  158. case seen := <-stateNotifications:
  159. if seen != want[i] {
  160. t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
  161. }
  162. }
  163. }
  164. connMu.Lock()
  165. defer connMu.Unlock()
  166. if conn != nil {
  167. err = conn.Close()
  168. if err != nil {
  169. t.Fatal(err)
  170. }
  171. }
  172. }
  173. // When a READY connection is closed, the client enters TRANSIENT FAILURE before CONNECTING.
  174. func (s) TestStateTransitions_ReadyToTransientFailure(t *testing.T) {
  175. want := []connectivity.State{
  176. connectivity.Connecting,
  177. connectivity.Ready,
  178. connectivity.TransientFailure,
  179. connectivity.Connecting,
  180. }
  181. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
  182. defer cancel()
  183. lis, err := net.Listen("tcp", "localhost:0")
  184. if err != nil {
  185. t.Fatalf("Error while listening. Err: %v", err)
  186. }
  187. defer lis.Close()
  188. sawReady := make(chan struct{})
  189. // Launch the server.
  190. go func() {
  191. conn, err := lis.Accept()
  192. if err != nil {
  193. t.Error(err)
  194. return
  195. }
  196. go keepReading(conn)
  197. framer := http2.NewFramer(conn, conn)
  198. if err := framer.WriteSettings(http2.Setting{}); err != nil {
  199. t.Errorf("Error while writing settings frame. %v", err)
  200. return
  201. }
  202. // Prevents race between onPrefaceReceipt and onClose.
  203. <-sawReady
  204. conn.Close()
  205. }()
  206. client, err := DialContext(ctx, lis.Addr().String(), WithWaitForHandshake(), WithInsecure(), WithBalancerName(stateRecordingBalancerName))
  207. if err != nil {
  208. t.Fatal(err)
  209. }
  210. defer client.Close()
  211. stateNotifications := testBalancerBuilder.nextStateNotifier()
  212. timeout := time.After(5 * time.Second)
  213. for i := 0; i < len(want); i++ {
  214. select {
  215. case <-timeout:
  216. t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
  217. case seen := <-stateNotifications:
  218. if seen == connectivity.Ready {
  219. close(sawReady)
  220. }
  221. if seen != want[i] {
  222. t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
  223. }
  224. }
  225. }
  226. }
  227. // When the first connection is closed, the client enters stays in CONNECTING
  228. // until it tries the second address (which succeeds, and then it enters READY).
  229. func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T) {
  230. want := []connectivity.State{
  231. connectivity.Connecting,
  232. connectivity.Ready,
  233. }
  234. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
  235. defer cancel()
  236. lis1, err := net.Listen("tcp", "localhost:0")
  237. if err != nil {
  238. t.Fatalf("Error while listening. Err: %v", err)
  239. }
  240. defer lis1.Close()
  241. lis2, err := net.Listen("tcp", "localhost:0")
  242. if err != nil {
  243. t.Fatalf("Error while listening. Err: %v", err)
  244. }
  245. defer lis2.Close()
  246. server1Done := make(chan struct{})
  247. server2Done := make(chan struct{})
  248. // Launch server 1.
  249. go func() {
  250. conn, err := lis1.Accept()
  251. if err != nil {
  252. t.Error(err)
  253. return
  254. }
  255. conn.Close()
  256. close(server1Done)
  257. }()
  258. // Launch server 2.
  259. go func() {
  260. conn, err := lis2.Accept()
  261. if err != nil {
  262. t.Error(err)
  263. return
  264. }
  265. go keepReading(conn)
  266. framer := http2.NewFramer(conn, conn)
  267. if err := framer.WriteSettings(http2.Setting{}); err != nil {
  268. t.Errorf("Error while writing settings frame. %v", err)
  269. return
  270. }
  271. close(server2Done)
  272. }()
  273. rb := manual.NewBuilderWithScheme("whatever")
  274. rb.InitialState(resolver.State{Addresses: []resolver.Address{
  275. {Addr: lis1.Addr().String()},
  276. {Addr: lis2.Addr().String()},
  277. }})
  278. client, err := DialContext(ctx, "this-gets-overwritten", WithInsecure(), WithWaitForHandshake(), WithBalancerName(stateRecordingBalancerName), withResolverBuilder(rb))
  279. if err != nil {
  280. t.Fatal(err)
  281. }
  282. defer client.Close()
  283. stateNotifications := testBalancerBuilder.nextStateNotifier()
  284. timeout := time.After(5 * time.Second)
  285. for i := 0; i < len(want); i++ {
  286. select {
  287. case <-timeout:
  288. t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
  289. case seen := <-stateNotifications:
  290. if seen != want[i] {
  291. t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
  292. }
  293. }
  294. }
  295. select {
  296. case <-timeout:
  297. t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 1")
  298. case <-server1Done:
  299. }
  300. select {
  301. case <-timeout:
  302. t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 2")
  303. case <-server2Done:
  304. }
  305. }
  306. // When there are multiple addresses, and we enter READY on one of them, a
  307. // later closure should cause the client to enter TRANSIENT FAILURE before it
  308. // re-enters CONNECTING.
  309. func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) {
  310. want := []connectivity.State{
  311. connectivity.Connecting,
  312. connectivity.Ready,
  313. connectivity.TransientFailure,
  314. connectivity.Connecting,
  315. }
  316. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
  317. defer cancel()
  318. lis1, err := net.Listen("tcp", "localhost:0")
  319. if err != nil {
  320. t.Fatalf("Error while listening. Err: %v", err)
  321. }
  322. defer lis1.Close()
  323. // Never actually gets used; we just want it to be alive so that the resolver has two addresses to target.
  324. lis2, err := net.Listen("tcp", "localhost:0")
  325. if err != nil {
  326. t.Fatalf("Error while listening. Err: %v", err)
  327. }
  328. defer lis2.Close()
  329. server1Done := make(chan struct{})
  330. sawReady := make(chan struct{})
  331. // Launch server 1.
  332. go func() {
  333. conn, err := lis1.Accept()
  334. if err != nil {
  335. t.Error(err)
  336. return
  337. }
  338. go keepReading(conn)
  339. framer := http2.NewFramer(conn, conn)
  340. if err := framer.WriteSettings(http2.Setting{}); err != nil {
  341. t.Errorf("Error while writing settings frame. %v", err)
  342. return
  343. }
  344. <-sawReady
  345. conn.Close()
  346. _, err = lis1.Accept()
  347. if err != nil {
  348. t.Error(err)
  349. return
  350. }
  351. close(server1Done)
  352. }()
  353. rb := manual.NewBuilderWithScheme("whatever")
  354. rb.InitialState(resolver.State{Addresses: []resolver.Address{
  355. {Addr: lis1.Addr().String()},
  356. {Addr: lis2.Addr().String()},
  357. }})
  358. client, err := DialContext(ctx, "this-gets-overwritten", WithInsecure(), WithWaitForHandshake(), WithBalancerName(stateRecordingBalancerName), withResolverBuilder(rb))
  359. if err != nil {
  360. t.Fatal(err)
  361. }
  362. defer client.Close()
  363. stateNotifications := testBalancerBuilder.nextStateNotifier()
  364. timeout := time.After(2 * time.Second)
  365. for i := 0; i < len(want); i++ {
  366. select {
  367. case <-timeout:
  368. t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
  369. case seen := <-stateNotifications:
  370. if seen == connectivity.Ready {
  371. close(sawReady)
  372. }
  373. if seen != want[i] {
  374. t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
  375. }
  376. }
  377. }
  378. select {
  379. case <-timeout:
  380. t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 1")
  381. case <-server1Done:
  382. }
  383. }
  384. type stateRecordingBalancer struct {
  385. notifier chan<- connectivity.State
  386. balancer.Balancer
  387. }
  388. func (b *stateRecordingBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
  389. b.notifier <- s
  390. b.Balancer.HandleSubConnStateChange(sc, s)
  391. }
  392. func (b *stateRecordingBalancer) ResetNotifier(r chan<- connectivity.State) {
  393. b.notifier = r
  394. }
  395. func (b *stateRecordingBalancer) Close() {
  396. b.Balancer.Close()
  397. }
  398. type stateRecordingBalancerBuilder struct {
  399. mu sync.Mutex
  400. notifier chan connectivity.State // The notifier used in the last Balancer.
  401. }
  402. func newStateRecordingBalancerBuilder() *stateRecordingBalancerBuilder {
  403. return &stateRecordingBalancerBuilder{}
  404. }
  405. func (b *stateRecordingBalancerBuilder) Name() string {
  406. return stateRecordingBalancerName
  407. }
  408. func (b *stateRecordingBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
  409. stateNotifications := make(chan connectivity.State, 10)
  410. b.mu.Lock()
  411. b.notifier = stateNotifications
  412. b.mu.Unlock()
  413. return &stateRecordingBalancer{
  414. notifier: stateNotifications,
  415. Balancer: balancer.Get(PickFirstBalancerName).Build(cc, opts),
  416. }
  417. }
  418. func (b *stateRecordingBalancerBuilder) nextStateNotifier() <-chan connectivity.State {
  419. b.mu.Lock()
  420. defer b.mu.Unlock()
  421. ret := b.notifier
  422. b.notifier = nil
  423. return ret
  424. }
  425. type noBackoff struct{}
  426. func (b noBackoff) Backoff(int) time.Duration { return time.Duration(0) }
  427. // Keep reading until something causes the connection to die (EOF, server
  428. // closed, etc). Useful as a tool for mindlessly keeping the connection
  429. // healthy, since the client will error if things like client prefaces are not
  430. // accepted in a timely fashion.
  431. func keepReading(conn net.Conn) {
  432. buf := make([]byte, 1024)
  433. for _, err := conn.Read(buf); err == nil; _, err = conn.Read(buf) {
  434. }
  435. }