main.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838
  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. /*
  19. Package main provides benchmark with setting flags.
  20. An example to run some benchmarks with profiling enabled:
  21. go run benchmark/benchmain/main.go -benchtime=10s -workloads=all \
  22. -compression=gzip -maxConcurrentCalls=1 -trace=off \
  23. -reqSizeBytes=1,1048576 -respSizeBytes=1,1048576 -networkMode=Local \
  24. -cpuProfile=cpuProf -memProfile=memProf -memProfileRate=10000 -resultFile=result
  25. As a suggestion, when creating a branch, you can run this benchmark and save the result
  26. file "-resultFile=basePerf", and later when you at the middle of the work or finish the
  27. work, you can get the benchmark result and compare it with the base anytime.
  28. Assume there are two result files names as "basePerf" and "curPerf" created by adding
  29. -resultFile=basePerf and -resultFile=curPerf.
  30. To format the curPerf, run:
  31. go run benchmark/benchresult/main.go curPerf
  32. To observe how the performance changes based on a base result, run:
  33. go run benchmark/benchresult/main.go basePerf curPerf
  34. */
  35. package main
  36. import (
  37. "context"
  38. "encoding/gob"
  39. "flag"
  40. "fmt"
  41. "io"
  42. "io/ioutil"
  43. "log"
  44. "net"
  45. "os"
  46. "reflect"
  47. "runtime"
  48. "runtime/pprof"
  49. "strings"
  50. "sync"
  51. "sync/atomic"
  52. "time"
  53. "google.golang.org/grpc"
  54. bm "google.golang.org/grpc/benchmark"
  55. "google.golang.org/grpc/benchmark/flags"
  56. testpb "google.golang.org/grpc/benchmark/grpc_testing"
  57. "google.golang.org/grpc/benchmark/latency"
  58. "google.golang.org/grpc/benchmark/stats"
  59. "google.golang.org/grpc/grpclog"
  60. "google.golang.org/grpc/internal/channelz"
  61. "google.golang.org/grpc/keepalive"
  62. "google.golang.org/grpc/test/bufconn"
  63. )
  64. var (
  65. workloads = flags.StringWithAllowedValues("workloads", workloadsAll,
  66. fmt.Sprintf("Workloads to execute - One of: %v", strings.Join(allWorkloads, ", ")), allWorkloads)
  67. traceMode = flags.StringWithAllowedValues("trace", toggleModeOff,
  68. fmt.Sprintf("Trace mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes)
  69. preloaderMode = flags.StringWithAllowedValues("preloader", toggleModeOff,
  70. fmt.Sprintf("Preloader mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes)
  71. channelzOn = flags.StringWithAllowedValues("channelz", toggleModeOff,
  72. fmt.Sprintf("Channelz mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes)
  73. compressorMode = flags.StringWithAllowedValues("compression", compModeOff,
  74. fmt.Sprintf("Compression mode - One of: %v", strings.Join(allCompModes, ", ")), allCompModes)
  75. networkMode = flags.StringWithAllowedValues("networkMode", networkModeNone,
  76. "Network mode includes LAN, WAN, Local and Longhaul", allNetworkModes)
  77. readLatency = flags.DurationSlice("latency", defaultReadLatency, "Simulated one-way network latency - may be a comma-separated list")
  78. readKbps = flags.IntSlice("kbps", defaultReadKbps, "Simulated network throughput (in kbps) - may be a comma-separated list")
  79. readMTU = flags.IntSlice("mtu", defaultReadMTU, "Simulated network MTU (Maximum Transmission Unit) - may be a comma-separated list")
  80. maxConcurrentCalls = flags.IntSlice("maxConcurrentCalls", defaultMaxConcurrentCalls, "Number of concurrent RPCs during benchmarks")
  81. readReqSizeBytes = flags.IntSlice("reqSizeBytes", nil, "Request size in bytes - may be a comma-separated list")
  82. readRespSizeBytes = flags.IntSlice("respSizeBytes", nil, "Response size in bytes - may be a comma-separated list")
  83. reqPayloadCurveFiles = flags.StringSlice("reqPayloadCurveFiles", nil, "comma-separated list of CSV files describing the shape a random distribution of request payload sizes")
  84. respPayloadCurveFiles = flags.StringSlice("respPayloadCurveFiles", nil, "comma-separated list of CSV files describing the shape a random distribution of response payload sizes")
  85. benchTime = flag.Duration("benchtime", time.Second, "Configures the amount of time to run each benchmark")
  86. memProfile = flag.String("memProfile", "", "Enables memory profiling output to the filename provided.")
  87. memProfileRate = flag.Int("memProfileRate", 512*1024, "Configures the memory profiling rate. \n"+
  88. "memProfile should be set before setting profile rate. To include every allocated block in the profile, "+
  89. "set MemProfileRate to 1. To turn off profiling entirely, set MemProfileRate to 0. 512 * 1024 by default.")
  90. cpuProfile = flag.String("cpuProfile", "", "Enables CPU profiling output to the filename provided")
  91. benchmarkResultFile = flag.String("resultFile", "", "Save the benchmark result into a binary file")
  92. useBufconn = flag.Bool("bufconn", false, "Use in-memory connection instead of system network I/O")
  93. enableKeepalive = flag.Bool("enable_keepalive", false, "Enable client keepalive. \n"+
  94. "Keepalive.Time is set to 10s, Keepalive.Timeout is set to 1s, Keepalive.PermitWithoutStream is set to true.")
  95. )
  96. const (
  97. workloadsUnary = "unary"
  98. workloadsStreaming = "streaming"
  99. workloadsUnconstrained = "unconstrained"
  100. workloadsAll = "all"
  101. // Compression modes.
  102. compModeOff = "off"
  103. compModeGzip = "gzip"
  104. compModeNop = "nop"
  105. compModeAll = "all"
  106. // Toggle modes.
  107. toggleModeOff = "off"
  108. toggleModeOn = "on"
  109. toggleModeBoth = "both"
  110. // Network modes.
  111. networkModeNone = "none"
  112. networkModeLocal = "Local"
  113. networkModeLAN = "LAN"
  114. networkModeWAN = "WAN"
  115. networkLongHaul = "Longhaul"
  116. numStatsBuckets = 10
  117. warmupCallCount = 10
  118. warmuptime = time.Second
  119. )
  120. var (
  121. allWorkloads = []string{workloadsUnary, workloadsStreaming, workloadsUnconstrained, workloadsAll}
  122. allCompModes = []string{compModeOff, compModeGzip, compModeNop, compModeAll}
  123. allToggleModes = []string{toggleModeOff, toggleModeOn, toggleModeBoth}
  124. allNetworkModes = []string{networkModeNone, networkModeLocal, networkModeLAN, networkModeWAN, networkLongHaul}
  125. defaultReadLatency = []time.Duration{0, 40 * time.Millisecond} // if non-positive, no delay.
  126. defaultReadKbps = []int{0, 10240} // if non-positive, infinite
  127. defaultReadMTU = []int{0} // if non-positive, infinite
  128. defaultMaxConcurrentCalls = []int{1, 8, 64, 512}
  129. defaultReqSizeBytes = []int{1, 1024, 1024 * 1024}
  130. defaultRespSizeBytes = []int{1, 1024, 1024 * 1024}
  131. networks = map[string]latency.Network{
  132. networkModeLocal: latency.Local,
  133. networkModeLAN: latency.LAN,
  134. networkModeWAN: latency.WAN,
  135. networkLongHaul: latency.Longhaul,
  136. }
  137. keepaliveTime = 10 * time.Second
  138. keepaliveTimeout = 1 * time.Second
  139. // This is 0.8*keepaliveTime to prevent connection issues because of server
  140. // keepalive enforcement.
  141. keepaliveMinTime = 8 * time.Second
  142. )
  143. // runModes indicates the workloads to run. This is initialized with a call to
  144. // `runModesFromWorkloads`, passing the workloads flag set by the user.
  145. type runModes struct {
  146. unary, streaming, unconstrained bool
  147. }
  148. // runModesFromWorkloads determines the runModes based on the value of
  149. // workloads flag set by the user.
  150. func runModesFromWorkloads(workload string) runModes {
  151. r := runModes{}
  152. switch workload {
  153. case workloadsUnary:
  154. r.unary = true
  155. case workloadsStreaming:
  156. r.streaming = true
  157. case workloadsUnconstrained:
  158. r.unconstrained = true
  159. case workloadsAll:
  160. r.unary = true
  161. r.streaming = true
  162. r.unconstrained = true
  163. default:
  164. log.Fatalf("Unknown workloads setting: %v (want one of: %v)",
  165. workloads, strings.Join(allWorkloads, ", "))
  166. }
  167. return r
  168. }
  169. type startFunc func(mode string, bf stats.Features)
  170. type stopFunc func(count uint64)
  171. type ucStopFunc func(req uint64, resp uint64)
  172. type rpcCallFunc func(pos int)
  173. type rpcSendFunc func(pos int)
  174. type rpcRecvFunc func(pos int)
  175. type rpcCleanupFunc func()
  176. func unaryBenchmark(start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats) {
  177. caller, cleanup := makeFuncUnary(bf)
  178. defer cleanup()
  179. runBenchmark(caller, start, stop, bf, s, workloadsUnary)
  180. }
  181. func streamBenchmark(start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats) {
  182. caller, cleanup := makeFuncStream(bf)
  183. defer cleanup()
  184. runBenchmark(caller, start, stop, bf, s, workloadsStreaming)
  185. }
  186. func unconstrainedStreamBenchmark(start startFunc, stop ucStopFunc, bf stats.Features, s *stats.Stats) {
  187. var sender rpcSendFunc
  188. var recver rpcRecvFunc
  189. var cleanup rpcCleanupFunc
  190. if bf.EnablePreloader {
  191. sender, recver, cleanup = makeFuncUnconstrainedStreamPreloaded(bf)
  192. } else {
  193. sender, recver, cleanup = makeFuncUnconstrainedStream(bf)
  194. }
  195. defer cleanup()
  196. var req, resp uint64
  197. go func() {
  198. // Resets the counters once warmed up
  199. <-time.NewTimer(warmuptime).C
  200. atomic.StoreUint64(&req, 0)
  201. atomic.StoreUint64(&resp, 0)
  202. start(workloadsUnconstrained, bf)
  203. }()
  204. bmEnd := time.Now().Add(bf.BenchTime + warmuptime)
  205. var wg sync.WaitGroup
  206. wg.Add(2 * bf.MaxConcurrentCalls)
  207. for i := 0; i < bf.MaxConcurrentCalls; i++ {
  208. go func(pos int) {
  209. defer wg.Done()
  210. for {
  211. t := time.Now()
  212. if t.After(bmEnd) {
  213. return
  214. }
  215. sender(pos)
  216. atomic.AddUint64(&req, 1)
  217. }
  218. }(i)
  219. go func(pos int) {
  220. defer wg.Done()
  221. for {
  222. t := time.Now()
  223. if t.After(bmEnd) {
  224. return
  225. }
  226. recver(pos)
  227. atomic.AddUint64(&resp, 1)
  228. }
  229. }(i)
  230. }
  231. wg.Wait()
  232. stop(req, resp)
  233. }
  234. // makeClient returns a gRPC client for the grpc.testing.BenchmarkService
  235. // service. The client is configured using the different options in the passed
  236. // 'bf'. Also returns a cleanup function to close the client and release
  237. // resources.
  238. func makeClient(bf stats.Features) (testpb.BenchmarkServiceClient, func()) {
  239. nw := &latency.Network{Kbps: bf.Kbps, Latency: bf.Latency, MTU: bf.MTU}
  240. opts := []grpc.DialOption{}
  241. sopts := []grpc.ServerOption{}
  242. if bf.ModeCompressor == compModeNop {
  243. sopts = append(sopts,
  244. grpc.RPCCompressor(nopCompressor{}),
  245. grpc.RPCDecompressor(nopDecompressor{}),
  246. )
  247. opts = append(opts,
  248. grpc.WithCompressor(nopCompressor{}),
  249. grpc.WithDecompressor(nopDecompressor{}),
  250. )
  251. }
  252. if bf.ModeCompressor == compModeGzip {
  253. sopts = append(sopts,
  254. grpc.RPCCompressor(grpc.NewGZIPCompressor()),
  255. grpc.RPCDecompressor(grpc.NewGZIPDecompressor()),
  256. )
  257. opts = append(opts,
  258. grpc.WithCompressor(grpc.NewGZIPCompressor()),
  259. grpc.WithDecompressor(grpc.NewGZIPDecompressor()),
  260. )
  261. }
  262. if bf.EnableKeepalive {
  263. sopts = append(sopts,
  264. grpc.KeepaliveParams(keepalive.ServerParameters{
  265. Time: keepaliveTime,
  266. Timeout: keepaliveTimeout,
  267. }),
  268. grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
  269. MinTime: keepaliveMinTime,
  270. PermitWithoutStream: true,
  271. }),
  272. )
  273. opts = append(opts,
  274. grpc.WithKeepaliveParams(keepalive.ClientParameters{
  275. Time: keepaliveTime,
  276. Timeout: keepaliveTimeout,
  277. PermitWithoutStream: true,
  278. }),
  279. )
  280. }
  281. sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(bf.MaxConcurrentCalls+1)))
  282. opts = append(opts, grpc.WithInsecure())
  283. var lis net.Listener
  284. if bf.UseBufConn {
  285. bcLis := bufconn.Listen(256 * 1024)
  286. lis = bcLis
  287. opts = append(opts, grpc.WithContextDialer(func(ctx context.Context, address string) (net.Conn, error) {
  288. return nw.ContextDialer(func(context.Context, string, string) (net.Conn, error) {
  289. return bcLis.Dial()
  290. })(ctx, "", "")
  291. }))
  292. } else {
  293. var err error
  294. lis, err = net.Listen("tcp", "localhost:0")
  295. if err != nil {
  296. grpclog.Fatalf("Failed to listen: %v", err)
  297. }
  298. opts = append(opts, grpc.WithContextDialer(func(ctx context.Context, address string) (net.Conn, error) {
  299. return nw.ContextDialer((&net.Dialer{}).DialContext)(ctx, "tcp", lis.Addr().String())
  300. }))
  301. }
  302. lis = nw.Listener(lis)
  303. stopper := bm.StartServer(bm.ServerInfo{Type: "protobuf", Listener: lis}, sopts...)
  304. conn := bm.NewClientConn("" /* target not used */, opts...)
  305. return testpb.NewBenchmarkServiceClient(conn), func() {
  306. conn.Close()
  307. stopper()
  308. }
  309. }
  310. func makeFuncUnary(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
  311. tc, cleanup := makeClient(bf)
  312. return func(int) {
  313. reqSizeBytes := bf.ReqSizeBytes
  314. respSizeBytes := bf.RespSizeBytes
  315. if bf.ReqPayloadCurve != nil {
  316. reqSizeBytes = bf.ReqPayloadCurve.ChooseRandom()
  317. }
  318. if bf.RespPayloadCurve != nil {
  319. respSizeBytes = bf.RespPayloadCurve.ChooseRandom()
  320. }
  321. unaryCaller(tc, reqSizeBytes, respSizeBytes)
  322. }, cleanup
  323. }
  324. func makeFuncStream(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
  325. tc, cleanup := makeClient(bf)
  326. streams := make([]testpb.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls)
  327. for i := 0; i < bf.MaxConcurrentCalls; i++ {
  328. stream, err := tc.StreamingCall(context.Background())
  329. if err != nil {
  330. grpclog.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
  331. }
  332. streams[i] = stream
  333. }
  334. return func(pos int) {
  335. reqSizeBytes := bf.ReqSizeBytes
  336. respSizeBytes := bf.RespSizeBytes
  337. if bf.ReqPayloadCurve != nil {
  338. reqSizeBytes = bf.ReqPayloadCurve.ChooseRandom()
  339. }
  340. if bf.RespPayloadCurve != nil {
  341. respSizeBytes = bf.RespPayloadCurve.ChooseRandom()
  342. }
  343. streamCaller(streams[pos], reqSizeBytes, respSizeBytes)
  344. }, cleanup
  345. }
  346. func makeFuncUnconstrainedStreamPreloaded(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) {
  347. streams, req, cleanup := setupUnconstrainedStream(bf)
  348. preparedMsg := make([]*grpc.PreparedMsg, len(streams))
  349. for i, stream := range streams {
  350. preparedMsg[i] = &grpc.PreparedMsg{}
  351. err := preparedMsg[i].Encode(stream, req)
  352. if err != nil {
  353. grpclog.Fatalf("%v.Encode(%v, %v) = %v", preparedMsg[i], req, stream, err)
  354. }
  355. }
  356. return func(pos int) {
  357. streams[pos].SendMsg(preparedMsg[pos])
  358. }, func(pos int) {
  359. streams[pos].Recv()
  360. }, cleanup
  361. }
  362. func makeFuncUnconstrainedStream(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) {
  363. streams, req, cleanup := setupUnconstrainedStream(bf)
  364. return func(pos int) {
  365. streams[pos].Send(req)
  366. }, func(pos int) {
  367. streams[pos].Recv()
  368. }, cleanup
  369. }
  370. func setupUnconstrainedStream(bf stats.Features) ([]testpb.BenchmarkService_StreamingCallClient, *testpb.SimpleRequest, rpcCleanupFunc) {
  371. tc, cleanup := makeClient(bf)
  372. streams := make([]testpb.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls)
  373. for i := 0; i < bf.MaxConcurrentCalls; i++ {
  374. stream, err := tc.UnconstrainedStreamingCall(context.Background())
  375. if err != nil {
  376. grpclog.Fatalf("%v.UnconstrainedStreamingCall(_) = _, %v", tc, err)
  377. }
  378. streams[i] = stream
  379. }
  380. pl := bm.NewPayload(testpb.PayloadType_COMPRESSABLE, bf.ReqSizeBytes)
  381. req := &testpb.SimpleRequest{
  382. ResponseType: pl.Type,
  383. ResponseSize: int32(bf.RespSizeBytes),
  384. Payload: pl,
  385. }
  386. return streams, req, cleanup
  387. }
  388. // Makes a UnaryCall gRPC request using the given BenchmarkServiceClient and
  389. // request and response sizes.
  390. func unaryCaller(client testpb.BenchmarkServiceClient, reqSize, respSize int) {
  391. if err := bm.DoUnaryCall(client, reqSize, respSize); err != nil {
  392. grpclog.Fatalf("DoUnaryCall failed: %v", err)
  393. }
  394. }
  395. func streamCaller(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) {
  396. if err := bm.DoStreamingRoundTrip(stream, reqSize, respSize); err != nil {
  397. grpclog.Fatalf("DoStreamingRoundTrip failed: %v", err)
  398. }
  399. }
  400. func runBenchmark(caller rpcCallFunc, start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats, mode string) {
  401. // Warm up connection.
  402. for i := 0; i < warmupCallCount; i++ {
  403. caller(0)
  404. }
  405. // Run benchmark.
  406. start(mode, bf)
  407. var wg sync.WaitGroup
  408. wg.Add(bf.MaxConcurrentCalls)
  409. bmEnd := time.Now().Add(bf.BenchTime)
  410. var count uint64
  411. for i := 0; i < bf.MaxConcurrentCalls; i++ {
  412. go func(pos int) {
  413. defer wg.Done()
  414. for {
  415. t := time.Now()
  416. if t.After(bmEnd) {
  417. return
  418. }
  419. start := time.Now()
  420. caller(pos)
  421. elapse := time.Since(start)
  422. atomic.AddUint64(&count, 1)
  423. s.AddDuration(elapse)
  424. }
  425. }(i)
  426. }
  427. wg.Wait()
  428. stop(count)
  429. }
  430. // benchOpts represents all configurable options available while running this
  431. // benchmark. This is built from the values passed as flags.
  432. type benchOpts struct {
  433. rModes runModes
  434. benchTime time.Duration
  435. memProfileRate int
  436. memProfile string
  437. cpuProfile string
  438. networkMode string
  439. benchmarkResultFile string
  440. useBufconn bool
  441. enableKeepalive bool
  442. features *featureOpts
  443. }
  444. // featureOpts represents options which can have multiple values. The user
  445. // usually provides a comma-separated list of options for each of these
  446. // features through command line flags. We generate all possible combinations
  447. // for the provided values and run the benchmarks for each combination.
  448. type featureOpts struct {
  449. enableTrace []bool
  450. readLatencies []time.Duration
  451. readKbps []int
  452. readMTU []int
  453. maxConcurrentCalls []int
  454. reqSizeBytes []int
  455. respSizeBytes []int
  456. reqPayloadCurves []*stats.PayloadCurve
  457. respPayloadCurves []*stats.PayloadCurve
  458. compModes []string
  459. enableChannelz []bool
  460. enablePreloader []bool
  461. }
  462. // makeFeaturesNum returns a slice of ints of size 'maxFeatureIndex' where each
  463. // element of the slice (indexed by 'featuresIndex' enum) contains the number
  464. // of features to be exercised by the benchmark code.
  465. // For example: Index 0 of the returned slice contains the number of values for
  466. // enableTrace feature, while index 1 contains the number of value of
  467. // readLatencies feature and so on.
  468. func makeFeaturesNum(b *benchOpts) []int {
  469. featuresNum := make([]int, stats.MaxFeatureIndex)
  470. for i := 0; i < len(featuresNum); i++ {
  471. switch stats.FeatureIndex(i) {
  472. case stats.EnableTraceIndex:
  473. featuresNum[i] = len(b.features.enableTrace)
  474. case stats.ReadLatenciesIndex:
  475. featuresNum[i] = len(b.features.readLatencies)
  476. case stats.ReadKbpsIndex:
  477. featuresNum[i] = len(b.features.readKbps)
  478. case stats.ReadMTUIndex:
  479. featuresNum[i] = len(b.features.readMTU)
  480. case stats.MaxConcurrentCallsIndex:
  481. featuresNum[i] = len(b.features.maxConcurrentCalls)
  482. case stats.ReqSizeBytesIndex:
  483. featuresNum[i] = len(b.features.reqSizeBytes)
  484. case stats.RespSizeBytesIndex:
  485. featuresNum[i] = len(b.features.respSizeBytes)
  486. case stats.ReqPayloadCurveIndex:
  487. featuresNum[i] = len(b.features.reqPayloadCurves)
  488. case stats.RespPayloadCurveIndex:
  489. featuresNum[i] = len(b.features.respPayloadCurves)
  490. case stats.CompModesIndex:
  491. featuresNum[i] = len(b.features.compModes)
  492. case stats.EnableChannelzIndex:
  493. featuresNum[i] = len(b.features.enableChannelz)
  494. case stats.EnablePreloaderIndex:
  495. featuresNum[i] = len(b.features.enablePreloader)
  496. default:
  497. log.Fatalf("Unknown feature index %v in generateFeatures. maxFeatureIndex is %v", i, stats.MaxFeatureIndex)
  498. }
  499. }
  500. return featuresNum
  501. }
  502. // sharedFeatures returns a bool slice which acts as a bitmask. Each item in
  503. // the slice represents a feature, indexed by 'featureIndex' enum. The bit is
  504. // set to 1 if the corresponding feature does not have multiple value, so is
  505. // shared amongst all benchmarks.
  506. func sharedFeatures(featuresNum []int) []bool {
  507. result := make([]bool, len(featuresNum))
  508. for i, num := range featuresNum {
  509. if num <= 1 {
  510. result[i] = true
  511. }
  512. }
  513. return result
  514. }
  515. // generateFeatures generates all combinations of the provided feature options.
  516. // While all the feature options are stored in the benchOpts struct, the input
  517. // parameter 'featuresNum' is a slice indexed by 'featureIndex' enum containing
  518. // the number of values for each feature.
  519. // For example, let's say the user sets -workloads=all and
  520. // -maxConcurrentCalls=1,100, this would end up with the following
  521. // combinations:
  522. // [workloads: unary, maxConcurrentCalls=1]
  523. // [workloads: unary, maxConcurrentCalls=1]
  524. // [workloads: streaming, maxConcurrentCalls=100]
  525. // [workloads: streaming, maxConcurrentCalls=100]
  526. // [workloads: unconstrained, maxConcurrentCalls=1]
  527. // [workloads: unconstrained, maxConcurrentCalls=100]
  528. func (b *benchOpts) generateFeatures(featuresNum []int) []stats.Features {
  529. // curPos and initialPos are two slices where each value acts as an index
  530. // into the appropriate feature slice maintained in benchOpts.features. This
  531. // loop generates all possible combinations of features by changing one value
  532. // at a time, and once curPos becomes equal to initialPos, we have explored
  533. // all options.
  534. var result []stats.Features
  535. var curPos []int
  536. initialPos := make([]int, stats.MaxFeatureIndex)
  537. for !reflect.DeepEqual(initialPos, curPos) {
  538. if curPos == nil {
  539. curPos = make([]int, stats.MaxFeatureIndex)
  540. }
  541. f := stats.Features{
  542. // These features stay the same for each iteration.
  543. NetworkMode: b.networkMode,
  544. UseBufConn: b.useBufconn,
  545. EnableKeepalive: b.enableKeepalive,
  546. BenchTime: b.benchTime,
  547. // These features can potentially change for each iteration.
  548. EnableTrace: b.features.enableTrace[curPos[stats.EnableTraceIndex]],
  549. Latency: b.features.readLatencies[curPos[stats.ReadLatenciesIndex]],
  550. Kbps: b.features.readKbps[curPos[stats.ReadKbpsIndex]],
  551. MTU: b.features.readMTU[curPos[stats.ReadMTUIndex]],
  552. MaxConcurrentCalls: b.features.maxConcurrentCalls[curPos[stats.MaxConcurrentCallsIndex]],
  553. ModeCompressor: b.features.compModes[curPos[stats.CompModesIndex]],
  554. EnableChannelz: b.features.enableChannelz[curPos[stats.EnableChannelzIndex]],
  555. EnablePreloader: b.features.enablePreloader[curPos[stats.EnablePreloaderIndex]],
  556. }
  557. if len(b.features.reqPayloadCurves) == 0 {
  558. f.ReqSizeBytes = b.features.reqSizeBytes[curPos[stats.ReqSizeBytesIndex]]
  559. } else {
  560. f.ReqPayloadCurve = b.features.reqPayloadCurves[curPos[stats.ReqPayloadCurveIndex]]
  561. }
  562. if len(b.features.respPayloadCurves) == 0 {
  563. f.RespSizeBytes = b.features.respSizeBytes[curPos[stats.RespSizeBytesIndex]]
  564. } else {
  565. f.RespPayloadCurve = b.features.respPayloadCurves[curPos[stats.RespPayloadCurveIndex]]
  566. }
  567. result = append(result, f)
  568. addOne(curPos, featuresNum)
  569. }
  570. return result
  571. }
  572. // addOne mutates the input slice 'features' by changing one feature, thus
  573. // arriving at the next combination of feature values. 'featuresMaxPosition'
  574. // provides the numbers of allowed values for each feature, indexed by
  575. // 'featureIndex' enum.
  576. func addOne(features []int, featuresMaxPosition []int) {
  577. for i := len(features) - 1; i >= 0; i-- {
  578. if featuresMaxPosition[i] == 0 {
  579. continue
  580. }
  581. features[i] = (features[i] + 1)
  582. if features[i]/featuresMaxPosition[i] == 0 {
  583. break
  584. }
  585. features[i] = features[i] % featuresMaxPosition[i]
  586. }
  587. }
  588. // processFlags reads the command line flags and builds benchOpts. Specifying
  589. // invalid values for certain flags will cause flag.Parse() to fail, and the
  590. // program to terminate.
  591. // This *SHOULD* be the only place where the flags are accessed. All other
  592. // parts of the benchmark code should rely on the returned benchOpts.
  593. func processFlags() *benchOpts {
  594. flag.Parse()
  595. if flag.NArg() != 0 {
  596. log.Fatal("Error: unparsed arguments: ", flag.Args())
  597. }
  598. opts := &benchOpts{
  599. rModes: runModesFromWorkloads(*workloads),
  600. benchTime: *benchTime,
  601. memProfileRate: *memProfileRate,
  602. memProfile: *memProfile,
  603. cpuProfile: *cpuProfile,
  604. networkMode: *networkMode,
  605. benchmarkResultFile: *benchmarkResultFile,
  606. useBufconn: *useBufconn,
  607. enableKeepalive: *enableKeepalive,
  608. features: &featureOpts{
  609. enableTrace: setToggleMode(*traceMode),
  610. readLatencies: append([]time.Duration(nil), *readLatency...),
  611. readKbps: append([]int(nil), *readKbps...),
  612. readMTU: append([]int(nil), *readMTU...),
  613. maxConcurrentCalls: append([]int(nil), *maxConcurrentCalls...),
  614. reqSizeBytes: append([]int(nil), *readReqSizeBytes...),
  615. respSizeBytes: append([]int(nil), *readRespSizeBytes...),
  616. compModes: setCompressorMode(*compressorMode),
  617. enableChannelz: setToggleMode(*channelzOn),
  618. enablePreloader: setToggleMode(*preloaderMode),
  619. },
  620. }
  621. if len(*reqPayloadCurveFiles) == 0 {
  622. if len(opts.features.reqSizeBytes) == 0 {
  623. opts.features.reqSizeBytes = defaultReqSizeBytes
  624. }
  625. } else {
  626. if len(opts.features.reqSizeBytes) != 0 {
  627. log.Fatalf("you may not specify -reqPayloadCurveFiles and -reqSizeBytes at the same time")
  628. }
  629. for _, file := range *reqPayloadCurveFiles {
  630. pc, err := stats.NewPayloadCurve(file)
  631. if err != nil {
  632. log.Fatalf("cannot load payload curve file %s: %v", file, err)
  633. }
  634. opts.features.reqPayloadCurves = append(opts.features.reqPayloadCurves, pc)
  635. }
  636. opts.features.reqSizeBytes = nil
  637. }
  638. if len(*respPayloadCurveFiles) == 0 {
  639. if len(opts.features.respSizeBytes) == 0 {
  640. opts.features.respSizeBytes = defaultRespSizeBytes
  641. }
  642. } else {
  643. if len(opts.features.respSizeBytes) != 0 {
  644. log.Fatalf("you may not specify -respPayloadCurveFiles and -respSizeBytes at the same time")
  645. }
  646. for _, file := range *respPayloadCurveFiles {
  647. pc, err := stats.NewPayloadCurve(file)
  648. if err != nil {
  649. log.Fatalf("cannot load payload curve file %s: %v", file, err)
  650. }
  651. opts.features.respPayloadCurves = append(opts.features.respPayloadCurves, pc)
  652. }
  653. opts.features.respSizeBytes = nil
  654. }
  655. // Re-write latency, kpbs and mtu if network mode is set.
  656. if network, ok := networks[opts.networkMode]; ok {
  657. opts.features.readLatencies = []time.Duration{network.Latency}
  658. opts.features.readKbps = []int{network.Kbps}
  659. opts.features.readMTU = []int{network.MTU}
  660. }
  661. return opts
  662. }
  663. func setToggleMode(val string) []bool {
  664. switch val {
  665. case toggleModeOn:
  666. return []bool{true}
  667. case toggleModeOff:
  668. return []bool{false}
  669. case toggleModeBoth:
  670. return []bool{false, true}
  671. default:
  672. // This should never happen because a wrong value passed to this flag would
  673. // be caught during flag.Parse().
  674. return []bool{}
  675. }
  676. }
  677. func setCompressorMode(val string) []string {
  678. switch val {
  679. case compModeNop, compModeGzip, compModeOff:
  680. return []string{val}
  681. case compModeAll:
  682. return []string{compModeNop, compModeGzip, compModeOff}
  683. default:
  684. // This should never happen because a wrong value passed to this flag would
  685. // be caught during flag.Parse().
  686. return []string{}
  687. }
  688. }
  689. func main() {
  690. opts := processFlags()
  691. before(opts)
  692. s := stats.NewStats(numStatsBuckets)
  693. featuresNum := makeFeaturesNum(opts)
  694. sf := sharedFeatures(featuresNum)
  695. var (
  696. start = func(mode string, bf stats.Features) { s.StartRun(mode, bf, sf) }
  697. stop = func(count uint64) { s.EndRun(count) }
  698. ucStop = func(req uint64, resp uint64) { s.EndUnconstrainedRun(req, resp) }
  699. )
  700. for _, bf := range opts.generateFeatures(featuresNum) {
  701. grpc.EnableTracing = bf.EnableTrace
  702. if bf.EnableChannelz {
  703. channelz.TurnOn()
  704. }
  705. if opts.rModes.unary {
  706. unaryBenchmark(start, stop, bf, s)
  707. }
  708. if opts.rModes.streaming {
  709. streamBenchmark(start, stop, bf, s)
  710. }
  711. if opts.rModes.unconstrained {
  712. unconstrainedStreamBenchmark(start, ucStop, bf, s)
  713. }
  714. }
  715. after(opts, s.GetResults())
  716. }
  717. func before(opts *benchOpts) {
  718. if opts.memProfile != "" {
  719. runtime.MemProfileRate = opts.memProfileRate
  720. }
  721. if opts.cpuProfile != "" {
  722. f, err := os.Create(opts.cpuProfile)
  723. if err != nil {
  724. fmt.Fprintf(os.Stderr, "testing: %s\n", err)
  725. return
  726. }
  727. if err := pprof.StartCPUProfile(f); err != nil {
  728. fmt.Fprintf(os.Stderr, "testing: can't start cpu profile: %s\n", err)
  729. f.Close()
  730. return
  731. }
  732. }
  733. }
  734. func after(opts *benchOpts, data []stats.BenchResults) {
  735. if opts.cpuProfile != "" {
  736. pprof.StopCPUProfile() // flushes profile to disk
  737. }
  738. if opts.memProfile != "" {
  739. f, err := os.Create(opts.memProfile)
  740. if err != nil {
  741. fmt.Fprintf(os.Stderr, "testing: %s\n", err)
  742. os.Exit(2)
  743. }
  744. runtime.GC() // materialize all statistics
  745. if err = pprof.WriteHeapProfile(f); err != nil {
  746. fmt.Fprintf(os.Stderr, "testing: can't write heap profile %s: %s\n", opts.memProfile, err)
  747. os.Exit(2)
  748. }
  749. f.Close()
  750. }
  751. if opts.benchmarkResultFile != "" {
  752. f, err := os.Create(opts.benchmarkResultFile)
  753. if err != nil {
  754. log.Fatalf("testing: can't write benchmark result %s: %s\n", opts.benchmarkResultFile, err)
  755. }
  756. dataEncoder := gob.NewEncoder(f)
  757. dataEncoder.Encode(data)
  758. f.Close()
  759. }
  760. }
  761. // nopCompressor is a compressor that just copies data.
  762. type nopCompressor struct{}
  763. func (nopCompressor) Do(w io.Writer, p []byte) error {
  764. n, err := w.Write(p)
  765. if err != nil {
  766. return err
  767. }
  768. if n != len(p) {
  769. return fmt.Errorf("nopCompressor.Write: wrote %v bytes; want %v", n, len(p))
  770. }
  771. return nil
  772. }
  773. func (nopCompressor) Type() string { return compModeNop }
  774. // nopDecompressor is a decompressor that just copies data.
  775. type nopDecompressor struct{}
  776. func (nopDecompressor) Do(r io.Reader) ([]byte, error) { return ioutil.ReadAll(r) }
  777. func (nopDecompressor) Type() string { return compModeNop }