gracefulstop_test.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package test
  19. import (
  20. "context"
  21. "fmt"
  22. "io"
  23. "net"
  24. "sync"
  25. "testing"
  26. "time"
  27. "golang.org/x/net/http2"
  28. "google.golang.org/grpc"
  29. "google.golang.org/grpc/internal/envconfig"
  30. testpb "google.golang.org/grpc/test/grpc_testing"
  31. )
  32. type delayListener struct {
  33. net.Listener
  34. closeCalled chan struct{}
  35. acceptCalled chan struct{}
  36. allowCloseCh chan struct{}
  37. cc *delayConn
  38. dialed bool
  39. }
  40. func (d *delayListener) Accept() (net.Conn, error) {
  41. select {
  42. case <-d.acceptCalled:
  43. // On the second call, block until closed, then return an error.
  44. <-d.closeCalled
  45. <-d.allowCloseCh
  46. return nil, fmt.Errorf("listener is closed")
  47. default:
  48. close(d.acceptCalled)
  49. conn, err := d.Listener.Accept()
  50. if err != nil {
  51. return nil, err
  52. }
  53. framer := http2.NewFramer(conn, conn)
  54. if err = framer.WriteSettings(http2.Setting{}); err != nil {
  55. return nil, err
  56. }
  57. // Allow closing of listener only after accept.
  58. // Note: Dial can return successfully, yet Accept
  59. // might now have finished.
  60. d.allowClose()
  61. return conn, err
  62. }
  63. }
  64. func (d *delayListener) allowClose() {
  65. close(d.allowCloseCh)
  66. }
  67. func (d *delayListener) Close() error {
  68. close(d.closeCalled)
  69. go func() {
  70. <-d.allowCloseCh
  71. d.Listener.Close()
  72. }()
  73. return nil
  74. }
  75. func (d *delayListener) allowClientRead() {
  76. d.cc.allowRead()
  77. }
  78. func (d *delayListener) Dial(ctx context.Context) (net.Conn, error) {
  79. if d.dialed {
  80. // Only hand out one connection (net.Dial can return more even after the
  81. // listener is closed). This is not thread-safe, but Dial should never be
  82. // called concurrently in this environment.
  83. return nil, fmt.Errorf("no more conns")
  84. }
  85. d.dialed = true
  86. c, err := (&net.Dialer{}).DialContext(ctx, "tcp", d.Listener.Addr().String())
  87. if err != nil {
  88. return nil, err
  89. }
  90. d.cc = &delayConn{Conn: c, blockRead: make(chan struct{})}
  91. return d.cc, nil
  92. }
  93. type delayConn struct {
  94. net.Conn
  95. blockRead chan struct{}
  96. }
  97. func (d *delayConn) allowRead() {
  98. close(d.blockRead)
  99. }
  100. func (d *delayConn) Read(b []byte) (n int, err error) {
  101. <-d.blockRead
  102. return d.Conn.Read(b)
  103. }
  104. func (s) TestGracefulStop(t *testing.T) {
  105. // We need to turn off RequireHandshake because if it were on, it would
  106. // block forever waiting to read the handshake, and the delayConn would
  107. // never let it (the delay is intended to block until later in the test).
  108. //
  109. // Restore current setting after test.
  110. old := envconfig.RequireHandshake
  111. defer func() { envconfig.RequireHandshake = old }()
  112. envconfig.RequireHandshake = envconfig.RequireHandshakeOff
  113. // This test ensures GracefulStop cannot race and break RPCs on new
  114. // connections created after GracefulStop was called but before
  115. // listener.Accept() returns a "closing" error.
  116. //
  117. // Steps of this test:
  118. // 1. Start Server
  119. // 2. GracefulStop() Server after listener's Accept is called, but don't
  120. // allow Accept() to exit when Close() is called on it.
  121. // 3. Create a new connection to the server after listener.Close() is called.
  122. // Server will want to send a GoAway on the new conn, but we delay client
  123. // reads until 5.
  124. // 4. Send an RPC on the new connection.
  125. // 5. Allow the client to read the GoAway. The RPC should complete
  126. // successfully.
  127. lis, err := net.Listen("tcp", "localhost:0")
  128. if err != nil {
  129. t.Fatalf("Error listenening: %v", err)
  130. }
  131. dlis := &delayListener{
  132. Listener: lis,
  133. acceptCalled: make(chan struct{}),
  134. closeCalled: make(chan struct{}),
  135. allowCloseCh: make(chan struct{}),
  136. }
  137. d := func(ctx context.Context, _ string) (net.Conn, error) { return dlis.Dial(ctx) }
  138. serverGotReq := make(chan struct{})
  139. ss := &stubServer{
  140. fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
  141. close(serverGotReq)
  142. _, err := stream.Recv()
  143. if err != nil {
  144. return err
  145. }
  146. return stream.Send(&testpb.StreamingOutputCallResponse{})
  147. },
  148. }
  149. s := grpc.NewServer()
  150. testpb.RegisterTestServiceServer(s, ss)
  151. // 1. Start Server
  152. wg := sync.WaitGroup{}
  153. wg.Add(1)
  154. go func() {
  155. s.Serve(dlis)
  156. wg.Done()
  157. }()
  158. // 2. GracefulStop() Server after listener's Accept is called, but don't
  159. // allow Accept() to exit when Close() is called on it.
  160. <-dlis.acceptCalled
  161. wg.Add(1)
  162. go func() {
  163. s.GracefulStop()
  164. wg.Done()
  165. }()
  166. // 3. Create a new connection to the server after listener.Close() is called.
  167. // Server will want to send a GoAway on the new conn, but we delay it
  168. // until 5.
  169. <-dlis.closeCalled // Block until GracefulStop calls dlis.Close()
  170. // Now dial. The listener's Accept method will return a valid connection,
  171. // even though GracefulStop has closed the listener.
  172. ctx, dialCancel := context.WithTimeout(context.Background(), 5*time.Second)
  173. defer dialCancel()
  174. cc, err := grpc.DialContext(ctx, "", grpc.WithInsecure(), grpc.WithBlock(), grpc.WithContextDialer(d))
  175. if err != nil {
  176. dlis.allowClientRead()
  177. t.Fatalf("grpc.Dial(%q) = %v", lis.Addr().String(), err)
  178. }
  179. client := testpb.NewTestServiceClient(cc)
  180. defer cc.Close()
  181. // 4. Send an RPC on the new connection.
  182. // The server would send a GOAWAY first, but we are delaying the server's
  183. // writes for now until the client writes more than the preface.
  184. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  185. stream, err := client.FullDuplexCall(ctx)
  186. if err != nil {
  187. t.Fatalf("FullDuplexCall= _, %v; want _, <nil>", err)
  188. }
  189. go func() {
  190. // 5. Allow the client to read the GoAway. The RPC should complete
  191. // successfully.
  192. <-serverGotReq
  193. dlis.allowClientRead()
  194. }()
  195. if err := stream.Send(&testpb.StreamingOutputCallRequest{}); err != nil {
  196. t.Fatalf("stream.Send(_) = %v, want <nil>", err)
  197. }
  198. if _, err := stream.Recv(); err != nil {
  199. t.Fatalf("stream.Recv() = _, %v, want _, <nil>", err)
  200. }
  201. if _, err := stream.Recv(); err != io.EOF {
  202. t.Fatalf("stream.Recv() = _, %v, want _, io.EOF", err)
  203. }
  204. // 5. happens above, then we finish the call.
  205. cancel()
  206. wg.Wait()
  207. }