benchmark.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. //go:generate protoc -I grpc_testing --go_out=plugins=grpc:grpc_testing grpc_testing/control.proto grpc_testing/messages.proto grpc_testing/payloads.proto grpc_testing/services.proto grpc_testing/stats.proto
  19. /*
  20. Package benchmark implements the building blocks to setup end-to-end gRPC benchmarks.
  21. */
  22. package benchmark
  23. import (
  24. "context"
  25. "fmt"
  26. "io"
  27. "log"
  28. "net"
  29. "google.golang.org/grpc"
  30. testpb "google.golang.org/grpc/benchmark/grpc_testing"
  31. "google.golang.org/grpc/codes"
  32. "google.golang.org/grpc/grpclog"
  33. "google.golang.org/grpc/status"
  34. )
  35. // Allows reuse of the same testpb.Payload object.
  36. func setPayload(p *testpb.Payload, t testpb.PayloadType, size int) {
  37. if size < 0 {
  38. grpclog.Fatalf("Requested a response with invalid length %d", size)
  39. }
  40. body := make([]byte, size)
  41. switch t {
  42. case testpb.PayloadType_COMPRESSABLE:
  43. case testpb.PayloadType_UNCOMPRESSABLE:
  44. grpclog.Fatalf("PayloadType UNCOMPRESSABLE is not supported")
  45. default:
  46. grpclog.Fatalf("Unsupported payload type: %d", t)
  47. }
  48. p.Type = t
  49. p.Body = body
  50. }
  51. // NewPayload creates a payload with the given type and size.
  52. func NewPayload(t testpb.PayloadType, size int) *testpb.Payload {
  53. p := new(testpb.Payload)
  54. setPayload(p, t, size)
  55. return p
  56. }
  57. type testServer struct {
  58. }
  59. func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
  60. return &testpb.SimpleResponse{
  61. Payload: NewPayload(in.ResponseType, int(in.ResponseSize)),
  62. }, nil
  63. }
  64. func (s *testServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallServer) error {
  65. response := &testpb.SimpleResponse{
  66. Payload: new(testpb.Payload),
  67. }
  68. in := new(testpb.SimpleRequest)
  69. for {
  70. // use ServerStream directly to reuse the same testpb.SimpleRequest object
  71. err := stream.(grpc.ServerStream).RecvMsg(in)
  72. if err == io.EOF {
  73. // read done.
  74. return nil
  75. }
  76. if err != nil {
  77. return err
  78. }
  79. setPayload(response.Payload, in.ResponseType, int(in.ResponseSize))
  80. if err := stream.Send(response); err != nil {
  81. return err
  82. }
  83. }
  84. }
  85. func (s *testServer) UnconstrainedStreamingCall(stream testpb.BenchmarkService_UnconstrainedStreamingCallServer) error {
  86. in := new(testpb.SimpleRequest)
  87. // Receive a message to learn response type and size.
  88. err := stream.RecvMsg(in)
  89. if err == io.EOF {
  90. // read done.
  91. return nil
  92. }
  93. if err != nil {
  94. return err
  95. }
  96. response := &testpb.SimpleResponse{
  97. Payload: new(testpb.Payload),
  98. }
  99. setPayload(response.Payload, in.ResponseType, int(in.ResponseSize))
  100. go func() {
  101. for {
  102. // Using RecvMsg rather than Recv to prevent reallocation of SimpleRequest.
  103. err := stream.RecvMsg(in)
  104. switch status.Code(err) {
  105. case codes.Canceled:
  106. case codes.OK:
  107. default:
  108. log.Fatalf("server recv error: %v", err)
  109. }
  110. }
  111. }()
  112. go func() {
  113. for {
  114. err := stream.Send(response)
  115. switch status.Code(err) {
  116. case codes.Unavailable:
  117. case codes.OK:
  118. default:
  119. log.Fatalf("server send error: %v", err)
  120. }
  121. }
  122. }()
  123. <-stream.Context().Done()
  124. return stream.Context().Err()
  125. }
  126. // byteBufServer is a gRPC server that sends and receives byte buffer.
  127. // The purpose is to benchmark the gRPC performance without protobuf serialization/deserialization overhead.
  128. type byteBufServer struct {
  129. respSize int32
  130. }
  131. // UnaryCall is an empty function and is not used for benchmark.
  132. // If bytebuf UnaryCall benchmark is needed later, the function body needs to be updated.
  133. func (s *byteBufServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
  134. return &testpb.SimpleResponse{}, nil
  135. }
  136. func (s *byteBufServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallServer) error {
  137. for {
  138. var in []byte
  139. err := stream.(grpc.ServerStream).RecvMsg(&in)
  140. if err == io.EOF {
  141. return nil
  142. }
  143. if err != nil {
  144. return err
  145. }
  146. out := make([]byte, s.respSize)
  147. if err := stream.(grpc.ServerStream).SendMsg(&out); err != nil {
  148. return err
  149. }
  150. }
  151. }
  152. func (s *byteBufServer) UnconstrainedStreamingCall(stream testpb.BenchmarkService_UnconstrainedStreamingCallServer) error {
  153. for {
  154. var in []byte
  155. err := stream.(grpc.ServerStream).RecvMsg(&in)
  156. if err == io.EOF {
  157. return nil
  158. }
  159. if err != nil {
  160. return err
  161. }
  162. out := make([]byte, s.respSize)
  163. if err := stream.(grpc.ServerStream).SendMsg(&out); err != nil {
  164. return err
  165. }
  166. }
  167. }
  168. // ServerInfo contains the information to create a gRPC benchmark server.
  169. type ServerInfo struct {
  170. // Type is the type of the server.
  171. // It should be "protobuf" or "bytebuf".
  172. Type string
  173. // Metadata is an optional configuration.
  174. // For "protobuf", it's ignored.
  175. // For "bytebuf", it should be an int representing response size.
  176. Metadata interface{}
  177. // Listener is the network listener for the server to use
  178. Listener net.Listener
  179. }
  180. // StartServer starts a gRPC server serving a benchmark service according to info.
  181. // It returns a function to stop the server.
  182. func StartServer(info ServerInfo, opts ...grpc.ServerOption) func() {
  183. opts = append(opts, grpc.WriteBufferSize(128*1024))
  184. opts = append(opts, grpc.ReadBufferSize(128*1024))
  185. s := grpc.NewServer(opts...)
  186. switch info.Type {
  187. case "protobuf":
  188. testpb.RegisterBenchmarkServiceServer(s, &testServer{})
  189. case "bytebuf":
  190. respSize, ok := info.Metadata.(int32)
  191. if !ok {
  192. grpclog.Fatalf("failed to StartServer, invalid metadata: %v, for Type: %v", info.Metadata, info.Type)
  193. }
  194. testpb.RegisterBenchmarkServiceServer(s, &byteBufServer{respSize: respSize})
  195. default:
  196. grpclog.Fatalf("failed to StartServer, unknown Type: %v", info.Type)
  197. }
  198. go s.Serve(info.Listener)
  199. return func() {
  200. s.Stop()
  201. }
  202. }
  203. // DoUnaryCall performs an unary RPC with given stub and request and response sizes.
  204. func DoUnaryCall(tc testpb.BenchmarkServiceClient, reqSize, respSize int) error {
  205. pl := NewPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
  206. req := &testpb.SimpleRequest{
  207. ResponseType: pl.Type,
  208. ResponseSize: int32(respSize),
  209. Payload: pl,
  210. }
  211. if _, err := tc.UnaryCall(context.Background(), req); err != nil {
  212. return fmt.Errorf("/BenchmarkService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
  213. }
  214. return nil
  215. }
  216. // DoStreamingRoundTrip performs a round trip for a single streaming rpc.
  217. func DoStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) error {
  218. pl := NewPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
  219. req := &testpb.SimpleRequest{
  220. ResponseType: pl.Type,
  221. ResponseSize: int32(respSize),
  222. Payload: pl,
  223. }
  224. if err := stream.Send(req); err != nil {
  225. return fmt.Errorf("/BenchmarkService/StreamingCall.Send(_) = %v, want <nil>", err)
  226. }
  227. if _, err := stream.Recv(); err != nil {
  228. // EOF is a valid error here.
  229. if err == io.EOF {
  230. return nil
  231. }
  232. return fmt.Errorf("/BenchmarkService/StreamingCall.Recv(_) = %v, want <nil>", err)
  233. }
  234. return nil
  235. }
  236. // DoByteBufStreamingRoundTrip performs a round trip for a single streaming rpc, using a custom codec for byte buffer.
  237. func DoByteBufStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) error {
  238. out := make([]byte, reqSize)
  239. if err := stream.(grpc.ClientStream).SendMsg(&out); err != nil {
  240. return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).SendMsg(_) = %v, want <nil>", err)
  241. }
  242. var in []byte
  243. if err := stream.(grpc.ClientStream).RecvMsg(&in); err != nil {
  244. // EOF is a valid error here.
  245. if err == io.EOF {
  246. return nil
  247. }
  248. return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).RecvMsg(_) = %v, want <nil>", err)
  249. }
  250. return nil
  251. }
  252. // NewClientConn creates a gRPC client connection to addr.
  253. func NewClientConn(addr string, opts ...grpc.DialOption) *grpc.ClientConn {
  254. return NewClientConnWithContext(context.Background(), addr, opts...)
  255. }
  256. // NewClientConnWithContext creates a gRPC client connection to addr using ctx.
  257. func NewClientConnWithContext(ctx context.Context, addr string, opts ...grpc.DialOption) *grpc.ClientConn {
  258. opts = append(opts, grpc.WithWriteBufferSize(128*1024))
  259. opts = append(opts, grpc.WithReadBufferSize(128*1024))
  260. conn, err := grpc.DialContext(ctx, addr, opts...)
  261. if err != nil {
  262. grpclog.Fatalf("NewClientConn(%q) failed to create a ClientConn %v", addr, err)
  263. }
  264. return conn
  265. }