benchmark.go 8.5 KB


  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. /*
  19. Package benchmark implements the building blocks to setup end-to-end gRPC benchmarks.
  20. */
  21. package benchmark
  22. import (
  23. "context"
  24. "fmt"
  25. "io"
  26. "log"
  27. "net"
  28. "google.golang.org/grpc"
  29. testpb "google.golang.org/grpc/benchmark/grpc_testing"
  30. "google.golang.org/grpc/codes"
  31. "google.golang.org/grpc/grpclog"
  32. "google.golang.org/grpc/status"
  33. )
  34. // Allows reuse of the same testpb.Payload object.
  35. func setPayload(p *testpb.Payload, t testpb.PayloadType, size int) {
  36. if size < 0 {
  37. grpclog.Fatalf("Requested a response with invalid length %d", size)
  38. }
  39. body := make([]byte, size)
  40. switch t {
  41. case testpb.PayloadType_COMPRESSABLE:
  42. case testpb.PayloadType_UNCOMPRESSABLE:
  43. grpclog.Fatalf("PayloadType UNCOMPRESSABLE is not supported")
  44. default:
  45. grpclog.Fatalf("Unsupported payload type: %d", t)
  46. }
  47. p.Type = t
  48. p.Body = body
  49. }
  50. // NewPayload creates a payload with the given type and size.
  51. func NewPayload(t testpb.PayloadType, size int) *testpb.Payload {
  52. p := new(testpb.Payload)
  53. setPayload(p, t, size)
  54. return p
  55. }
  56. type testServer struct {
  57. testpb.UnimplementedBenchmarkServiceServer
  58. }
  59. func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
  60. return &testpb.SimpleResponse{
  61. Payload: NewPayload(in.ResponseType, int(in.ResponseSize)),
  62. }, nil
  63. }
  64. func (s *testServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallServer) error {
  65. response := &testpb.SimpleResponse{
  66. Payload: new(testpb.Payload),
  67. }
  68. in := new(testpb.SimpleRequest)
  69. for {
  70. // use ServerStream directly to reuse the same testpb.SimpleRequest object
  71. err := stream.(grpc.ServerStream).RecvMsg(in)
  72. if err == io.EOF {
  73. // read done.
  74. return nil
  75. }
  76. if err != nil {
  77. return err
  78. }
  79. setPayload(response.Payload, in.ResponseType, int(in.ResponseSize))
  80. if err := stream.Send(response); err != nil {
  81. return err
  82. }
  83. }
  84. }
  85. func (s *testServer) UnconstrainedStreamingCall(stream testpb.BenchmarkService_UnconstrainedStreamingCallServer) error {
  86. in := new(testpb.SimpleRequest)
  87. // Receive a message to learn response type and size.
  88. err := stream.RecvMsg(in)
  89. if err == io.EOF {
  90. // read done.
  91. return nil
  92. }
  93. if err != nil {
  94. return err
  95. }
  96. response := &testpb.SimpleResponse{
  97. Payload: new(testpb.Payload),
  98. }
  99. setPayload(response.Payload, in.ResponseType, int(in.ResponseSize))
  100. go func() {
  101. for {
  102. // Using RecvMsg rather than Recv to prevent reallocation of SimpleRequest.
  103. err := stream.RecvMsg(in)
  104. switch status.Code(err) {
  105. case codes.Canceled:
  106. return
  107. case codes.OK:
  108. default:
  109. log.Fatalf("server recv error: %v", err)
  110. }
  111. }
  112. }()
  113. go func() {
  114. for {
  115. err := stream.Send(response)
  116. switch status.Code(err) {
  117. case codes.Unavailable:
  118. return
  119. case codes.OK:
  120. default:
  121. log.Fatalf("server send error: %v", err)
  122. }
  123. }
  124. }()
  125. <-stream.Context().Done()
  126. return stream.Context().Err()
  127. }
  128. // byteBufServer is a gRPC server that sends and receives byte buffer.
  129. // The purpose is to benchmark the gRPC performance without protobuf serialization/deserialization overhead.
  130. type byteBufServer struct {
  131. testpb.UnimplementedBenchmarkServiceServer
  132. respSize int32
  133. }
  134. // UnaryCall is an empty function and is not used for benchmark.
  135. // If bytebuf UnaryCall benchmark is needed later, the function body needs to be updated.
  136. func (s *byteBufServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
  137. return &testpb.SimpleResponse{}, nil
  138. }
  139. func (s *byteBufServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallServer) error {
  140. for {
  141. var in []byte
  142. err := stream.(grpc.ServerStream).RecvMsg(&in)
  143. if err == io.EOF {
  144. return nil
  145. }
  146. if err != nil {
  147. return err
  148. }
  149. out := make([]byte, s.respSize)
  150. if err := stream.(grpc.ServerStream).SendMsg(&out); err != nil {
  151. return err
  152. }
  153. }
  154. }
  155. func (s *byteBufServer) UnconstrainedStreamingCall(stream testpb.BenchmarkService_UnconstrainedStreamingCallServer) error {
  156. for {
  157. var in []byte
  158. err := stream.(grpc.ServerStream).RecvMsg(&in)
  159. if err == io.EOF {
  160. return nil
  161. }
  162. if err != nil {
  163. return err
  164. }
  165. out := make([]byte, s.respSize)
  166. if err := stream.(grpc.ServerStream).SendMsg(&out); err != nil {
  167. return err
  168. }
  169. }
  170. }
  171. // ServerInfo contains the information to create a gRPC benchmark server.
  172. type ServerInfo struct {
  173. // Type is the type of the server.
  174. // It should be "protobuf" or "bytebuf".
  175. Type string
  176. // Metadata is an optional configuration.
  177. // For "protobuf", it's ignored.
  178. // For "bytebuf", it should be an int representing response size.
  179. Metadata interface{}
  180. // Listener is the network listener for the server to use
  181. Listener net.Listener
  182. }
  183. // StartServer starts a gRPC server serving a benchmark service according to info.
  184. // It returns a function to stop the server.
  185. func StartServer(info ServerInfo, opts ...grpc.ServerOption) func() {
  186. opts = append(opts, grpc.WriteBufferSize(128*1024))
  187. opts = append(opts, grpc.ReadBufferSize(128*1024))
  188. s := grpc.NewServer(opts...)
  189. switch info.Type {
  190. case "protobuf":
  191. testpb.RegisterBenchmarkServiceServer(s, &testServer{})
  192. case "bytebuf":
  193. respSize, ok := info.Metadata.(int32)
  194. if !ok {
  195. grpclog.Fatalf("failed to StartServer, invalid metadata: %v, for Type: %v", info.Metadata, info.Type)
  196. }
  197. testpb.RegisterBenchmarkServiceServer(s, &byteBufServer{respSize: respSize})
  198. default:
  199. grpclog.Fatalf("failed to StartServer, unknown Type: %v", info.Type)
  200. }
  201. go s.Serve(info.Listener)
  202. return func() {
  203. s.Stop()
  204. }
  205. }
  206. // DoUnaryCall performs an unary RPC with given stub and request and response sizes.
  207. func DoUnaryCall(tc testpb.BenchmarkServiceClient, reqSize, respSize int) error {
  208. pl := NewPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
  209. req := &testpb.SimpleRequest{
  210. ResponseType: pl.Type,
  211. ResponseSize: int32(respSize),
  212. Payload: pl,
  213. }
  214. if _, err := tc.UnaryCall(context.Background(), req); err != nil {
  215. return fmt.Errorf("/BenchmarkService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
  216. }
  217. return nil
  218. }
  219. // DoStreamingRoundTrip performs a round trip for a single streaming rpc.
  220. func DoStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) error {
  221. pl := NewPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
  222. req := &testpb.SimpleRequest{
  223. ResponseType: pl.Type,
  224. ResponseSize: int32(respSize),
  225. Payload: pl,
  226. }
  227. if err := stream.Send(req); err != nil {
  228. return fmt.Errorf("/BenchmarkService/StreamingCall.Send(_) = %v, want <nil>", err)
  229. }
  230. if _, err := stream.Recv(); err != nil {
  231. // EOF is a valid error here.
  232. if err == io.EOF {
  233. return nil
  234. }
  235. return fmt.Errorf("/BenchmarkService/StreamingCall.Recv(_) = %v, want <nil>", err)
  236. }
  237. return nil
  238. }
  239. // DoByteBufStreamingRoundTrip performs a round trip for a single streaming rpc, using a custom codec for byte buffer.
  240. func DoByteBufStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) error {
  241. out := make([]byte, reqSize)
  242. if err := stream.(grpc.ClientStream).SendMsg(&out); err != nil {
  243. return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).SendMsg(_) = %v, want <nil>", err)
  244. }
  245. var in []byte
  246. if err := stream.(grpc.ClientStream).RecvMsg(&in); err != nil {
  247. // EOF is a valid error here.
  248. if err == io.EOF {
  249. return nil
  250. }
  251. return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).RecvMsg(_) = %v, want <nil>", err)
  252. }
  253. return nil
  254. }
  255. // NewClientConn creates a gRPC client connection to addr.
  256. func NewClientConn(addr string, opts ...grpc.DialOption) *grpc.ClientConn {
  257. return NewClientConnWithContext(context.Background(), addr, opts...)
  258. }
  259. // NewClientConnWithContext creates a gRPC client connection to addr using ctx.
  260. func NewClientConnWithContext(ctx context.Context, addr string, opts ...grpc.DialOption) *grpc.ClientConn {
  261. opts = append(opts, grpc.WithWriteBufferSize(128*1024))
  262. opts = append(opts, grpc.WithReadBufferSize(128*1024))
  263. conn, err := grpc.DialContext(ctx, addr, opts...)
  264. if err != nil {
  265. grpclog.Fatalf("NewClientConn(%q) failed to create a ClientConn %v", addr, err)
  266. }
  267. return conn
  268. }