clientconn_state_transition_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509
  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "context"
  21. "net"
  22. "sync"
  23. "testing"
  24. "time"
  25. "golang.org/x/net/http2"
  26. "google.golang.org/grpc/balancer"
  27. "google.golang.org/grpc/connectivity"
  28. "google.golang.org/grpc/internal/testutils"
  29. "google.golang.org/grpc/resolver"
  30. "google.golang.org/grpc/resolver/manual"
  31. )
  32. const stateRecordingBalancerName = "state_recoding_balancer"
  33. var testBalancerBuilder = newStateRecordingBalancerBuilder()
  34. func init() {
  35. balancer.Register(testBalancerBuilder)
  36. }
  37. // These tests use a pipeListener. This listener is similar to net.Listener
  38. // except that it is unbuffered, so each read and write will wait for the other
  39. // side's corresponding write or read.
  40. func (s) TestStateTransitions_SingleAddress(t *testing.T) {
  41. for _, test := range []struct {
  42. desc string
  43. want []connectivity.State
  44. server func(net.Listener) net.Conn
  45. }{
  46. {
  47. desc: "When the server returns server preface, the client enters READY.",
  48. want: []connectivity.State{
  49. connectivity.Connecting,
  50. connectivity.Ready,
  51. },
  52. server: func(lis net.Listener) net.Conn {
  53. conn, err := lis.Accept()
  54. if err != nil {
  55. t.Error(err)
  56. return nil
  57. }
  58. go keepReading(conn)
  59. framer := http2.NewFramer(conn, conn)
  60. if err := framer.WriteSettings(http2.Setting{}); err != nil {
  61. t.Errorf("Error while writing settings frame. %v", err)
  62. return nil
  63. }
  64. return conn
  65. },
  66. },
  67. {
  68. desc: "When the connection is closed, the client enters TRANSIENT FAILURE.",
  69. want: []connectivity.State{
  70. connectivity.Connecting,
  71. connectivity.TransientFailure,
  72. },
  73. server: func(lis net.Listener) net.Conn {
  74. conn, err := lis.Accept()
  75. if err != nil {
  76. t.Error(err)
  77. return nil
  78. }
  79. conn.Close()
  80. return nil
  81. },
  82. },
  83. {
  84. desc: `When the server sends its connection preface, but the connection dies before the client can write its
  85. connection preface, the client enters TRANSIENT FAILURE.`,
  86. want: []connectivity.State{
  87. connectivity.Connecting,
  88. connectivity.TransientFailure,
  89. },
  90. server: func(lis net.Listener) net.Conn {
  91. conn, err := lis.Accept()
  92. if err != nil {
  93. t.Error(err)
  94. return nil
  95. }
  96. framer := http2.NewFramer(conn, conn)
  97. if err := framer.WriteSettings(http2.Setting{}); err != nil {
  98. t.Errorf("Error while writing settings frame. %v", err)
  99. return nil
  100. }
  101. conn.Close()
  102. return nil
  103. },
  104. },
  105. {
  106. desc: `When the server reads the client connection preface but does not send its connection preface, the
  107. client enters TRANSIENT FAILURE.`,
  108. want: []connectivity.State{
  109. connectivity.Connecting,
  110. connectivity.TransientFailure,
  111. },
  112. server: func(lis net.Listener) net.Conn {
  113. conn, err := lis.Accept()
  114. if err != nil {
  115. t.Error(err)
  116. return nil
  117. }
  118. go keepReading(conn)
  119. return conn
  120. },
  121. },
  122. } {
  123. t.Log(test.desc)
  124. testStateTransitionSingleAddress(t, test.want, test.server)
  125. }
  126. }
  127. func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, server func(net.Listener) net.Conn) {
  128. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
  129. defer cancel()
  130. pl := testutils.NewPipeListener()
  131. defer pl.Close()
  132. // Launch the server.
  133. var conn net.Conn
  134. var connMu sync.Mutex
  135. go func() {
  136. connMu.Lock()
  137. conn = server(pl)
  138. connMu.Unlock()
  139. }()
  140. client, err := DialContext(ctx,
  141. "",
  142. WithInsecure(),
  143. WithBalancerName(stateRecordingBalancerName),
  144. WithDialer(pl.Dialer()),
  145. withBackoff(noBackoff{}),
  146. withMinConnectDeadline(func() time.Duration { return time.Millisecond * 100 }))
  147. if err != nil {
  148. t.Fatal(err)
  149. }
  150. defer client.Close()
  151. stateNotifications := testBalancerBuilder.nextStateNotifier()
  152. timeout := time.After(5 * time.Second)
  153. for i := 0; i < len(want); i++ {
  154. select {
  155. case <-timeout:
  156. t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
  157. case seen := <-stateNotifications:
  158. if seen != want[i] {
  159. t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
  160. }
  161. }
  162. }
  163. connMu.Lock()
  164. defer connMu.Unlock()
  165. if conn != nil {
  166. err = conn.Close()
  167. if err != nil {
  168. t.Fatal(err)
  169. }
  170. }
  171. }
  172. // When a READY connection is closed, the client enters CONNECTING.
  173. func (s) TestStateTransitions_ReadyToConnecting(t *testing.T) {
  174. want := []connectivity.State{
  175. connectivity.Connecting,
  176. connectivity.Ready,
  177. connectivity.Connecting,
  178. }
  179. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
  180. defer cancel()
  181. lis, err := net.Listen("tcp", "localhost:0")
  182. if err != nil {
  183. t.Fatalf("Error while listening. Err: %v", err)
  184. }
  185. defer lis.Close()
  186. sawReady := make(chan struct{})
  187. // Launch the server.
  188. go func() {
  189. conn, err := lis.Accept()
  190. if err != nil {
  191. t.Error(err)
  192. return
  193. }
  194. go keepReading(conn)
  195. framer := http2.NewFramer(conn, conn)
  196. if err := framer.WriteSettings(http2.Setting{}); err != nil {
  197. t.Errorf("Error while writing settings frame. %v", err)
  198. return
  199. }
  200. // Prevents race between onPrefaceReceipt and onClose.
  201. <-sawReady
  202. conn.Close()
  203. }()
  204. client, err := DialContext(ctx, lis.Addr().String(), WithInsecure(), WithBalancerName(stateRecordingBalancerName))
  205. if err != nil {
  206. t.Fatal(err)
  207. }
  208. defer client.Close()
  209. stateNotifications := testBalancerBuilder.nextStateNotifier()
  210. timeout := time.After(5 * time.Second)
  211. for i := 0; i < len(want); i++ {
  212. select {
  213. case <-timeout:
  214. t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
  215. case seen := <-stateNotifications:
  216. if seen == connectivity.Ready {
  217. close(sawReady)
  218. }
  219. if seen != want[i] {
  220. t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
  221. }
  222. }
  223. }
  224. }
  225. // When the first connection is closed, the client stays in CONNECTING until it
  226. // tries the second address (which succeeds, and then it enters READY).
  227. func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T) {
  228. want := []connectivity.State{
  229. connectivity.Connecting,
  230. connectivity.Ready,
  231. }
  232. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
  233. defer cancel()
  234. lis1, err := net.Listen("tcp", "localhost:0")
  235. if err != nil {
  236. t.Fatalf("Error while listening. Err: %v", err)
  237. }
  238. defer lis1.Close()
  239. lis2, err := net.Listen("tcp", "localhost:0")
  240. if err != nil {
  241. t.Fatalf("Error while listening. Err: %v", err)
  242. }
  243. defer lis2.Close()
  244. server1Done := make(chan struct{})
  245. server2Done := make(chan struct{})
  246. // Launch server 1.
  247. go func() {
  248. conn, err := lis1.Accept()
  249. if err != nil {
  250. t.Error(err)
  251. return
  252. }
  253. conn.Close()
  254. close(server1Done)
  255. }()
  256. // Launch server 2.
  257. go func() {
  258. conn, err := lis2.Accept()
  259. if err != nil {
  260. t.Error(err)
  261. return
  262. }
  263. go keepReading(conn)
  264. framer := http2.NewFramer(conn, conn)
  265. if err := framer.WriteSettings(http2.Setting{}); err != nil {
  266. t.Errorf("Error while writing settings frame. %v", err)
  267. return
  268. }
  269. close(server2Done)
  270. }()
  271. rb := manual.NewBuilderWithScheme("whatever")
  272. rb.InitialState(resolver.State{Addresses: []resolver.Address{
  273. {Addr: lis1.Addr().String()},
  274. {Addr: lis2.Addr().String()},
  275. }})
  276. client, err := DialContext(ctx, "whatever:///this-gets-overwritten", WithInsecure(), WithBalancerName(stateRecordingBalancerName), WithResolvers(rb))
  277. if err != nil {
  278. t.Fatal(err)
  279. }
  280. defer client.Close()
  281. stateNotifications := testBalancerBuilder.nextStateNotifier()
  282. timeout := time.After(5 * time.Second)
  283. for i := 0; i < len(want); i++ {
  284. select {
  285. case <-timeout:
  286. t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
  287. case seen := <-stateNotifications:
  288. if seen != want[i] {
  289. t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
  290. }
  291. }
  292. }
  293. select {
  294. case <-timeout:
  295. t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 1")
  296. case <-server1Done:
  297. }
  298. select {
  299. case <-timeout:
  300. t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 2")
  301. case <-server2Done:
  302. }
  303. }
  304. // When there are multiple addresses, and we enter READY on one of them, a
  305. // later closure should cause the client to enter CONNECTING
  306. func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) {
  307. want := []connectivity.State{
  308. connectivity.Connecting,
  309. connectivity.Ready,
  310. connectivity.Connecting,
  311. }
  312. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
  313. defer cancel()
  314. lis1, err := net.Listen("tcp", "localhost:0")
  315. if err != nil {
  316. t.Fatalf("Error while listening. Err: %v", err)
  317. }
  318. defer lis1.Close()
  319. // Never actually gets used; we just want it to be alive so that the resolver has two addresses to target.
  320. lis2, err := net.Listen("tcp", "localhost:0")
  321. if err != nil {
  322. t.Fatalf("Error while listening. Err: %v", err)
  323. }
  324. defer lis2.Close()
  325. server1Done := make(chan struct{})
  326. sawReady := make(chan struct{})
  327. // Launch server 1.
  328. go func() {
  329. conn, err := lis1.Accept()
  330. if err != nil {
  331. t.Error(err)
  332. return
  333. }
  334. go keepReading(conn)
  335. framer := http2.NewFramer(conn, conn)
  336. if err := framer.WriteSettings(http2.Setting{}); err != nil {
  337. t.Errorf("Error while writing settings frame. %v", err)
  338. return
  339. }
  340. <-sawReady
  341. conn.Close()
  342. _, err = lis1.Accept()
  343. if err != nil {
  344. t.Error(err)
  345. return
  346. }
  347. close(server1Done)
  348. }()
  349. rb := manual.NewBuilderWithScheme("whatever")
  350. rb.InitialState(resolver.State{Addresses: []resolver.Address{
  351. {Addr: lis1.Addr().String()},
  352. {Addr: lis2.Addr().String()},
  353. }})
  354. client, err := DialContext(ctx, "whatever:///this-gets-overwritten", WithInsecure(), WithBalancerName(stateRecordingBalancerName), WithResolvers(rb))
  355. if err != nil {
  356. t.Fatal(err)
  357. }
  358. defer client.Close()
  359. stateNotifications := testBalancerBuilder.nextStateNotifier()
  360. timeout := time.After(2 * time.Second)
  361. for i := 0; i < len(want); i++ {
  362. select {
  363. case <-timeout:
  364. t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
  365. case seen := <-stateNotifications:
  366. if seen == connectivity.Ready {
  367. close(sawReady)
  368. }
  369. if seen != want[i] {
  370. t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
  371. }
  372. }
  373. }
  374. select {
  375. case <-timeout:
  376. t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 1")
  377. case <-server1Done:
  378. }
  379. }
  380. type stateRecordingBalancer struct {
  381. notifier chan<- connectivity.State
  382. balancer.Balancer
  383. }
  384. func (b *stateRecordingBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
  385. b.notifier <- s.ConnectivityState
  386. b.Balancer.UpdateSubConnState(sc, s)
  387. }
  388. func (b *stateRecordingBalancer) ResetNotifier(r chan<- connectivity.State) {
  389. b.notifier = r
  390. }
  391. func (b *stateRecordingBalancer) Close() {
  392. b.Balancer.Close()
  393. }
  394. type stateRecordingBalancerBuilder struct {
  395. mu sync.Mutex
  396. notifier chan connectivity.State // The notifier used in the last Balancer.
  397. }
  398. func newStateRecordingBalancerBuilder() *stateRecordingBalancerBuilder {
  399. return &stateRecordingBalancerBuilder{}
  400. }
  401. func (b *stateRecordingBalancerBuilder) Name() string {
  402. return stateRecordingBalancerName
  403. }
  404. func (b *stateRecordingBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
  405. stateNotifications := make(chan connectivity.State, 10)
  406. b.mu.Lock()
  407. b.notifier = stateNotifications
  408. b.mu.Unlock()
  409. return &stateRecordingBalancer{
  410. notifier: stateNotifications,
  411. Balancer: balancer.Get(PickFirstBalancerName).Build(cc, opts),
  412. }
  413. }
  414. func (b *stateRecordingBalancerBuilder) nextStateNotifier() <-chan connectivity.State {
  415. b.mu.Lock()
  416. defer b.mu.Unlock()
  417. ret := b.notifier
  418. b.notifier = nil
  419. return ret
  420. }
  421. type noBackoff struct{}
  422. func (b noBackoff) Backoff(int) time.Duration { return time.Duration(0) }
  423. // Keep reading until something causes the connection to die (EOF, server
  424. // closed, etc). Useful as a tool for mindlessly keeping the connection
  425. // healthy, since the client will error if things like client prefaces are not
  426. // accepted in a timely fashion.
  427. func keepReading(conn net.Conn) {
  428. buf := make([]byte, 1024)
  429. for _, err := conn.Read(buf); err == nil; _, err = conn.Read(buf) {
  430. }
  431. }