123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838 |
- /*
- *
- * Copyright 2017 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
- /*
- Package main provides benchmark with setting flags.
- An example to run some benchmarks with profiling enabled:
- go run benchmark/benchmain/main.go -benchtime=10s -workloads=all \
- -compression=gzip -maxConcurrentCalls=1 -trace=off \
- -reqSizeBytes=1,1048576 -respSizeBytes=1,1048576 -networkMode=Local \
- -cpuProfile=cpuProf -memProfile=memProf -memProfileRate=10000 -resultFile=result
- As a suggestion, when creating a branch, you can run this benchmark and save the result
- file "-resultFile=basePerf", and later when you at the middle of the work or finish the
- work, you can get the benchmark result and compare it with the base anytime.
- Assume there are two result files names as "basePerf" and "curPerf" created by adding
- -resultFile=basePerf and -resultFile=curPerf.
- To format the curPerf, run:
- go run benchmark/benchresult/main.go curPerf
- To observe how the performance changes based on a base result, run:
- go run benchmark/benchresult/main.go basePerf curPerf
- */
- package main
- import (
- "context"
- "encoding/gob"
- "flag"
- "fmt"
- "io"
- "io/ioutil"
- "log"
- "net"
- "os"
- "reflect"
- "runtime"
- "runtime/pprof"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "google.golang.org/grpc"
- bm "google.golang.org/grpc/benchmark"
- "google.golang.org/grpc/benchmark/flags"
- testpb "google.golang.org/grpc/benchmark/grpc_testing"
- "google.golang.org/grpc/benchmark/latency"
- "google.golang.org/grpc/benchmark/stats"
- "google.golang.org/grpc/grpclog"
- "google.golang.org/grpc/internal/channelz"
- "google.golang.org/grpc/keepalive"
- "google.golang.org/grpc/test/bufconn"
- )
- var (
- workloads = flags.StringWithAllowedValues("workloads", workloadsAll,
- fmt.Sprintf("Workloads to execute - One of: %v", strings.Join(allWorkloads, ", ")), allWorkloads)
- traceMode = flags.StringWithAllowedValues("trace", toggleModeOff,
- fmt.Sprintf("Trace mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes)
- preloaderMode = flags.StringWithAllowedValues("preloader", toggleModeOff,
- fmt.Sprintf("Preloader mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes)
- channelzOn = flags.StringWithAllowedValues("channelz", toggleModeOff,
- fmt.Sprintf("Channelz mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes)
- compressorMode = flags.StringWithAllowedValues("compression", compModeOff,
- fmt.Sprintf("Compression mode - One of: %v", strings.Join(allCompModes, ", ")), allCompModes)
- networkMode = flags.StringWithAllowedValues("networkMode", networkModeNone,
- "Network mode includes LAN, WAN, Local and Longhaul", allNetworkModes)
- readLatency = flags.DurationSlice("latency", defaultReadLatency, "Simulated one-way network latency - may be a comma-separated list")
- readKbps = flags.IntSlice("kbps", defaultReadKbps, "Simulated network throughput (in kbps) - may be a comma-separated list")
- readMTU = flags.IntSlice("mtu", defaultReadMTU, "Simulated network MTU (Maximum Transmission Unit) - may be a comma-separated list")
- maxConcurrentCalls = flags.IntSlice("maxConcurrentCalls", defaultMaxConcurrentCalls, "Number of concurrent RPCs during benchmarks")
- readReqSizeBytes = flags.IntSlice("reqSizeBytes", nil, "Request size in bytes - may be a comma-separated list")
- readRespSizeBytes = flags.IntSlice("respSizeBytes", nil, "Response size in bytes - may be a comma-separated list")
- reqPayloadCurveFiles = flags.StringSlice("reqPayloadCurveFiles", nil, "comma-separated list of CSV files describing the shape a random distribution of request payload sizes")
- respPayloadCurveFiles = flags.StringSlice("respPayloadCurveFiles", nil, "comma-separated list of CSV files describing the shape a random distribution of response payload sizes")
- benchTime = flag.Duration("benchtime", time.Second, "Configures the amount of time to run each benchmark")
- memProfile = flag.String("memProfile", "", "Enables memory profiling output to the filename provided.")
- memProfileRate = flag.Int("memProfileRate", 512*1024, "Configures the memory profiling rate. \n"+
- "memProfile should be set before setting profile rate. To include every allocated block in the profile, "+
- "set MemProfileRate to 1. To turn off profiling entirely, set MemProfileRate to 0. 512 * 1024 by default.")
- cpuProfile = flag.String("cpuProfile", "", "Enables CPU profiling output to the filename provided")
- benchmarkResultFile = flag.String("resultFile", "", "Save the benchmark result into a binary file")
- useBufconn = flag.Bool("bufconn", false, "Use in-memory connection instead of system network I/O")
- enableKeepalive = flag.Bool("enable_keepalive", false, "Enable client keepalive. \n"+
- "Keepalive.Time is set to 10s, Keepalive.Timeout is set to 1s, Keepalive.PermitWithoutStream is set to true.")
- )
- const (
- workloadsUnary = "unary"
- workloadsStreaming = "streaming"
- workloadsUnconstrained = "unconstrained"
- workloadsAll = "all"
- // Compression modes.
- compModeOff = "off"
- compModeGzip = "gzip"
- compModeNop = "nop"
- compModeAll = "all"
- // Toggle modes.
- toggleModeOff = "off"
- toggleModeOn = "on"
- toggleModeBoth = "both"
- // Network modes.
- networkModeNone = "none"
- networkModeLocal = "Local"
- networkModeLAN = "LAN"
- networkModeWAN = "WAN"
- networkLongHaul = "Longhaul"
- numStatsBuckets = 10
- warmupCallCount = 10
- warmuptime = time.Second
- )
- var (
- allWorkloads = []string{workloadsUnary, workloadsStreaming, workloadsUnconstrained, workloadsAll}
- allCompModes = []string{compModeOff, compModeGzip, compModeNop, compModeAll}
- allToggleModes = []string{toggleModeOff, toggleModeOn, toggleModeBoth}
- allNetworkModes = []string{networkModeNone, networkModeLocal, networkModeLAN, networkModeWAN, networkLongHaul}
- defaultReadLatency = []time.Duration{0, 40 * time.Millisecond} // if non-positive, no delay.
- defaultReadKbps = []int{0, 10240} // if non-positive, infinite
- defaultReadMTU = []int{0} // if non-positive, infinite
- defaultMaxConcurrentCalls = []int{1, 8, 64, 512}
- defaultReqSizeBytes = []int{1, 1024, 1024 * 1024}
- defaultRespSizeBytes = []int{1, 1024, 1024 * 1024}
- networks = map[string]latency.Network{
- networkModeLocal: latency.Local,
- networkModeLAN: latency.LAN,
- networkModeWAN: latency.WAN,
- networkLongHaul: latency.Longhaul,
- }
- keepaliveTime = 10 * time.Second
- keepaliveTimeout = 1 * time.Second
- // This is 0.8*keepaliveTime to prevent connection issues because of server
- // keepalive enforcement.
- keepaliveMinTime = 8 * time.Second
- )
- // runModes indicates the workloads to run. This is initialized with a call to
- // `runModesFromWorkloads`, passing the workloads flag set by the user.
- type runModes struct {
- unary, streaming, unconstrained bool
- }
- // runModesFromWorkloads determines the runModes based on the value of
- // workloads flag set by the user.
- func runModesFromWorkloads(workload string) runModes {
- r := runModes{}
- switch workload {
- case workloadsUnary:
- r.unary = true
- case workloadsStreaming:
- r.streaming = true
- case workloadsUnconstrained:
- r.unconstrained = true
- case workloadsAll:
- r.unary = true
- r.streaming = true
- r.unconstrained = true
- default:
- log.Fatalf("Unknown workloads setting: %v (want one of: %v)",
- workloads, strings.Join(allWorkloads, ", "))
- }
- return r
- }
- type startFunc func(mode string, bf stats.Features)
- type stopFunc func(count uint64)
- type ucStopFunc func(req uint64, resp uint64)
- type rpcCallFunc func(pos int)
- type rpcSendFunc func(pos int)
- type rpcRecvFunc func(pos int)
- type rpcCleanupFunc func()
- func unaryBenchmark(start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats) {
- caller, cleanup := makeFuncUnary(bf)
- defer cleanup()
- runBenchmark(caller, start, stop, bf, s, workloadsUnary)
- }
- func streamBenchmark(start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats) {
- caller, cleanup := makeFuncStream(bf)
- defer cleanup()
- runBenchmark(caller, start, stop, bf, s, workloadsStreaming)
- }
- func unconstrainedStreamBenchmark(start startFunc, stop ucStopFunc, bf stats.Features, s *stats.Stats) {
- var sender rpcSendFunc
- var recver rpcRecvFunc
- var cleanup rpcCleanupFunc
- if bf.EnablePreloader {
- sender, recver, cleanup = makeFuncUnconstrainedStreamPreloaded(bf)
- } else {
- sender, recver, cleanup = makeFuncUnconstrainedStream(bf)
- }
- defer cleanup()
- var req, resp uint64
- go func() {
- // Resets the counters once warmed up
- <-time.NewTimer(warmuptime).C
- atomic.StoreUint64(&req, 0)
- atomic.StoreUint64(&resp, 0)
- start(workloadsUnconstrained, bf)
- }()
- bmEnd := time.Now().Add(bf.BenchTime + warmuptime)
- var wg sync.WaitGroup
- wg.Add(2 * bf.MaxConcurrentCalls)
- for i := 0; i < bf.MaxConcurrentCalls; i++ {
- go func(pos int) {
- defer wg.Done()
- for {
- t := time.Now()
- if t.After(bmEnd) {
- return
- }
- sender(pos)
- atomic.AddUint64(&req, 1)
- }
- }(i)
- go func(pos int) {
- defer wg.Done()
- for {
- t := time.Now()
- if t.After(bmEnd) {
- return
- }
- recver(pos)
- atomic.AddUint64(&resp, 1)
- }
- }(i)
- }
- wg.Wait()
- stop(req, resp)
- }
- // makeClient returns a gRPC client for the grpc.testing.BenchmarkService
- // service. The client is configured using the different options in the passed
- // 'bf'. Also returns a cleanup function to close the client and release
- // resources.
- func makeClient(bf stats.Features) (testpb.BenchmarkServiceClient, func()) {
- nw := &latency.Network{Kbps: bf.Kbps, Latency: bf.Latency, MTU: bf.MTU}
- opts := []grpc.DialOption{}
- sopts := []grpc.ServerOption{}
- if bf.ModeCompressor == compModeNop {
- sopts = append(sopts,
- grpc.RPCCompressor(nopCompressor{}),
- grpc.RPCDecompressor(nopDecompressor{}),
- )
- opts = append(opts,
- grpc.WithCompressor(nopCompressor{}),
- grpc.WithDecompressor(nopDecompressor{}),
- )
- }
- if bf.ModeCompressor == compModeGzip {
- sopts = append(sopts,
- grpc.RPCCompressor(grpc.NewGZIPCompressor()),
- grpc.RPCDecompressor(grpc.NewGZIPDecompressor()),
- )
- opts = append(opts,
- grpc.WithCompressor(grpc.NewGZIPCompressor()),
- grpc.WithDecompressor(grpc.NewGZIPDecompressor()),
- )
- }
- if bf.EnableKeepalive {
- sopts = append(sopts,
- grpc.KeepaliveParams(keepalive.ServerParameters{
- Time: keepaliveTime,
- Timeout: keepaliveTimeout,
- }),
- grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
- MinTime: keepaliveMinTime,
- PermitWithoutStream: true,
- }),
- )
- opts = append(opts,
- grpc.WithKeepaliveParams(keepalive.ClientParameters{
- Time: keepaliveTime,
- Timeout: keepaliveTimeout,
- PermitWithoutStream: true,
- }),
- )
- }
- sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(bf.MaxConcurrentCalls+1)))
- opts = append(opts, grpc.WithInsecure())
- var lis net.Listener
- if bf.UseBufConn {
- bcLis := bufconn.Listen(256 * 1024)
- lis = bcLis
- opts = append(opts, grpc.WithContextDialer(func(ctx context.Context, address string) (net.Conn, error) {
- return nw.ContextDialer(func(context.Context, string, string) (net.Conn, error) {
- return bcLis.Dial()
- })(ctx, "", "")
- }))
- } else {
- var err error
- lis, err = net.Listen("tcp", "localhost:0")
- if err != nil {
- grpclog.Fatalf("Failed to listen: %v", err)
- }
- opts = append(opts, grpc.WithContextDialer(func(ctx context.Context, address string) (net.Conn, error) {
- return nw.ContextDialer((&net.Dialer{}).DialContext)(ctx, "tcp", lis.Addr().String())
- }))
- }
- lis = nw.Listener(lis)
- stopper := bm.StartServer(bm.ServerInfo{Type: "protobuf", Listener: lis}, sopts...)
- conn := bm.NewClientConn("" /* target not used */, opts...)
- return testpb.NewBenchmarkServiceClient(conn), func() {
- conn.Close()
- stopper()
- }
- }
- func makeFuncUnary(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
- tc, cleanup := makeClient(bf)
- return func(int) {
- reqSizeBytes := bf.ReqSizeBytes
- respSizeBytes := bf.RespSizeBytes
- if bf.ReqPayloadCurve != nil {
- reqSizeBytes = bf.ReqPayloadCurve.ChooseRandom()
- }
- if bf.RespPayloadCurve != nil {
- respSizeBytes = bf.RespPayloadCurve.ChooseRandom()
- }
- unaryCaller(tc, reqSizeBytes, respSizeBytes)
- }, cleanup
- }
- func makeFuncStream(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
- tc, cleanup := makeClient(bf)
- streams := make([]testpb.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls)
- for i := 0; i < bf.MaxConcurrentCalls; i++ {
- stream, err := tc.StreamingCall(context.Background())
- if err != nil {
- grpclog.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
- }
- streams[i] = stream
- }
- return func(pos int) {
- reqSizeBytes := bf.ReqSizeBytes
- respSizeBytes := bf.RespSizeBytes
- if bf.ReqPayloadCurve != nil {
- reqSizeBytes = bf.ReqPayloadCurve.ChooseRandom()
- }
- if bf.RespPayloadCurve != nil {
- respSizeBytes = bf.RespPayloadCurve.ChooseRandom()
- }
- streamCaller(streams[pos], reqSizeBytes, respSizeBytes)
- }, cleanup
- }
- func makeFuncUnconstrainedStreamPreloaded(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) {
- streams, req, cleanup := setupUnconstrainedStream(bf)
- preparedMsg := make([]*grpc.PreparedMsg, len(streams))
- for i, stream := range streams {
- preparedMsg[i] = &grpc.PreparedMsg{}
- err := preparedMsg[i].Encode(stream, req)
- if err != nil {
- grpclog.Fatalf("%v.Encode(%v, %v) = %v", preparedMsg[i], req, stream, err)
- }
- }
- return func(pos int) {
- streams[pos].SendMsg(preparedMsg[pos])
- }, func(pos int) {
- streams[pos].Recv()
- }, cleanup
- }
- func makeFuncUnconstrainedStream(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) {
- streams, req, cleanup := setupUnconstrainedStream(bf)
- return func(pos int) {
- streams[pos].Send(req)
- }, func(pos int) {
- streams[pos].Recv()
- }, cleanup
- }
- func setupUnconstrainedStream(bf stats.Features) ([]testpb.BenchmarkService_StreamingCallClient, *testpb.SimpleRequest, rpcCleanupFunc) {
- tc, cleanup := makeClient(bf)
- streams := make([]testpb.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls)
- for i := 0; i < bf.MaxConcurrentCalls; i++ {
- stream, err := tc.UnconstrainedStreamingCall(context.Background())
- if err != nil {
- grpclog.Fatalf("%v.UnconstrainedStreamingCall(_) = _, %v", tc, err)
- }
- streams[i] = stream
- }
- pl := bm.NewPayload(testpb.PayloadType_COMPRESSABLE, bf.ReqSizeBytes)
- req := &testpb.SimpleRequest{
- ResponseType: pl.Type,
- ResponseSize: int32(bf.RespSizeBytes),
- Payload: pl,
- }
- return streams, req, cleanup
- }
- // Makes a UnaryCall gRPC request using the given BenchmarkServiceClient and
- // request and response sizes.
- func unaryCaller(client testpb.BenchmarkServiceClient, reqSize, respSize int) {
- if err := bm.DoUnaryCall(client, reqSize, respSize); err != nil {
- grpclog.Fatalf("DoUnaryCall failed: %v", err)
- }
- }
- func streamCaller(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) {
- if err := bm.DoStreamingRoundTrip(stream, reqSize, respSize); err != nil {
- grpclog.Fatalf("DoStreamingRoundTrip failed: %v", err)
- }
- }
- func runBenchmark(caller rpcCallFunc, start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats, mode string) {
- // Warm up connection.
- for i := 0; i < warmupCallCount; i++ {
- caller(0)
- }
- // Run benchmark.
- start(mode, bf)
- var wg sync.WaitGroup
- wg.Add(bf.MaxConcurrentCalls)
- bmEnd := time.Now().Add(bf.BenchTime)
- var count uint64
- for i := 0; i < bf.MaxConcurrentCalls; i++ {
- go func(pos int) {
- defer wg.Done()
- for {
- t := time.Now()
- if t.After(bmEnd) {
- return
- }
- start := time.Now()
- caller(pos)
- elapse := time.Since(start)
- atomic.AddUint64(&count, 1)
- s.AddDuration(elapse)
- }
- }(i)
- }
- wg.Wait()
- stop(count)
- }
- // benchOpts represents all configurable options available while running this
- // benchmark. This is built from the values passed as flags.
- type benchOpts struct {
- rModes runModes
- benchTime time.Duration
- memProfileRate int
- memProfile string
- cpuProfile string
- networkMode string
- benchmarkResultFile string
- useBufconn bool
- enableKeepalive bool
- features *featureOpts
- }
- // featureOpts represents options which can have multiple values. The user
- // usually provides a comma-separated list of options for each of these
- // features through command line flags. We generate all possible combinations
- // for the provided values and run the benchmarks for each combination.
- type featureOpts struct {
- enableTrace []bool
- readLatencies []time.Duration
- readKbps []int
- readMTU []int
- maxConcurrentCalls []int
- reqSizeBytes []int
- respSizeBytes []int
- reqPayloadCurves []*stats.PayloadCurve
- respPayloadCurves []*stats.PayloadCurve
- compModes []string
- enableChannelz []bool
- enablePreloader []bool
- }
- // makeFeaturesNum returns a slice of ints of size 'maxFeatureIndex' where each
- // element of the slice (indexed by 'featuresIndex' enum) contains the number
- // of features to be exercised by the benchmark code.
- // For example: Index 0 of the returned slice contains the number of values for
- // enableTrace feature, while index 1 contains the number of value of
- // readLatencies feature and so on.
- func makeFeaturesNum(b *benchOpts) []int {
- featuresNum := make([]int, stats.MaxFeatureIndex)
- for i := 0; i < len(featuresNum); i++ {
- switch stats.FeatureIndex(i) {
- case stats.EnableTraceIndex:
- featuresNum[i] = len(b.features.enableTrace)
- case stats.ReadLatenciesIndex:
- featuresNum[i] = len(b.features.readLatencies)
- case stats.ReadKbpsIndex:
- featuresNum[i] = len(b.features.readKbps)
- case stats.ReadMTUIndex:
- featuresNum[i] = len(b.features.readMTU)
- case stats.MaxConcurrentCallsIndex:
- featuresNum[i] = len(b.features.maxConcurrentCalls)
- case stats.ReqSizeBytesIndex:
- featuresNum[i] = len(b.features.reqSizeBytes)
- case stats.RespSizeBytesIndex:
- featuresNum[i] = len(b.features.respSizeBytes)
- case stats.ReqPayloadCurveIndex:
- featuresNum[i] = len(b.features.reqPayloadCurves)
- case stats.RespPayloadCurveIndex:
- featuresNum[i] = len(b.features.respPayloadCurves)
- case stats.CompModesIndex:
- featuresNum[i] = len(b.features.compModes)
- case stats.EnableChannelzIndex:
- featuresNum[i] = len(b.features.enableChannelz)
- case stats.EnablePreloaderIndex:
- featuresNum[i] = len(b.features.enablePreloader)
- default:
- log.Fatalf("Unknown feature index %v in generateFeatures. maxFeatureIndex is %v", i, stats.MaxFeatureIndex)
- }
- }
- return featuresNum
- }
- // sharedFeatures returns a bool slice which acts as a bitmask. Each item in
- // the slice represents a feature, indexed by 'featureIndex' enum. The bit is
- // set to 1 if the corresponding feature does not have multiple value, so is
- // shared amongst all benchmarks.
- func sharedFeatures(featuresNum []int) []bool {
- result := make([]bool, len(featuresNum))
- for i, num := range featuresNum {
- if num <= 1 {
- result[i] = true
- }
- }
- return result
- }
- // generateFeatures generates all combinations of the provided feature options.
- // While all the feature options are stored in the benchOpts struct, the input
- // parameter 'featuresNum' is a slice indexed by 'featureIndex' enum containing
- // the number of values for each feature.
- // For example, let's say the user sets -workloads=all and
- // -maxConcurrentCalls=1,100, this would end up with the following
- // combinations:
- // [workloads: unary, maxConcurrentCalls=1]
- // [workloads: unary, maxConcurrentCalls=1]
- // [workloads: streaming, maxConcurrentCalls=100]
- // [workloads: streaming, maxConcurrentCalls=100]
- // [workloads: unconstrained, maxConcurrentCalls=1]
- // [workloads: unconstrained, maxConcurrentCalls=100]
- func (b *benchOpts) generateFeatures(featuresNum []int) []stats.Features {
- // curPos and initialPos are two slices where each value acts as an index
- // into the appropriate feature slice maintained in benchOpts.features. This
- // loop generates all possible combinations of features by changing one value
- // at a time, and once curPos becomes equal to initialPos, we have explored
- // all options.
- var result []stats.Features
- var curPos []int
- initialPos := make([]int, stats.MaxFeatureIndex)
- for !reflect.DeepEqual(initialPos, curPos) {
- if curPos == nil {
- curPos = make([]int, stats.MaxFeatureIndex)
- }
- f := stats.Features{
- // These features stay the same for each iteration.
- NetworkMode: b.networkMode,
- UseBufConn: b.useBufconn,
- EnableKeepalive: b.enableKeepalive,
- BenchTime: b.benchTime,
- // These features can potentially change for each iteration.
- EnableTrace: b.features.enableTrace[curPos[stats.EnableTraceIndex]],
- Latency: b.features.readLatencies[curPos[stats.ReadLatenciesIndex]],
- Kbps: b.features.readKbps[curPos[stats.ReadKbpsIndex]],
- MTU: b.features.readMTU[curPos[stats.ReadMTUIndex]],
- MaxConcurrentCalls: b.features.maxConcurrentCalls[curPos[stats.MaxConcurrentCallsIndex]],
- ModeCompressor: b.features.compModes[curPos[stats.CompModesIndex]],
- EnableChannelz: b.features.enableChannelz[curPos[stats.EnableChannelzIndex]],
- EnablePreloader: b.features.enablePreloader[curPos[stats.EnablePreloaderIndex]],
- }
- if len(b.features.reqPayloadCurves) == 0 {
- f.ReqSizeBytes = b.features.reqSizeBytes[curPos[stats.ReqSizeBytesIndex]]
- } else {
- f.ReqPayloadCurve = b.features.reqPayloadCurves[curPos[stats.ReqPayloadCurveIndex]]
- }
- if len(b.features.respPayloadCurves) == 0 {
- f.RespSizeBytes = b.features.respSizeBytes[curPos[stats.RespSizeBytesIndex]]
- } else {
- f.RespPayloadCurve = b.features.respPayloadCurves[curPos[stats.RespPayloadCurveIndex]]
- }
- result = append(result, f)
- addOne(curPos, featuresNum)
- }
- return result
- }
- // addOne mutates the input slice 'features' by changing one feature, thus
- // arriving at the next combination of feature values. 'featuresMaxPosition'
- // provides the numbers of allowed values for each feature, indexed by
- // 'featureIndex' enum.
- func addOne(features []int, featuresMaxPosition []int) {
- for i := len(features) - 1; i >= 0; i-- {
- if featuresMaxPosition[i] == 0 {
- continue
- }
- features[i] = (features[i] + 1)
- if features[i]/featuresMaxPosition[i] == 0 {
- break
- }
- features[i] = features[i] % featuresMaxPosition[i]
- }
- }
- // processFlags reads the command line flags and builds benchOpts. Specifying
- // invalid values for certain flags will cause flag.Parse() to fail, and the
- // program to terminate.
- // This *SHOULD* be the only place where the flags are accessed. All other
- // parts of the benchmark code should rely on the returned benchOpts.
- func processFlags() *benchOpts {
- flag.Parse()
- if flag.NArg() != 0 {
- log.Fatal("Error: unparsed arguments: ", flag.Args())
- }
- opts := &benchOpts{
- rModes: runModesFromWorkloads(*workloads),
- benchTime: *benchTime,
- memProfileRate: *memProfileRate,
- memProfile: *memProfile,
- cpuProfile: *cpuProfile,
- networkMode: *networkMode,
- benchmarkResultFile: *benchmarkResultFile,
- useBufconn: *useBufconn,
- enableKeepalive: *enableKeepalive,
- features: &featureOpts{
- enableTrace: setToggleMode(*traceMode),
- readLatencies: append([]time.Duration(nil), *readLatency...),
- readKbps: append([]int(nil), *readKbps...),
- readMTU: append([]int(nil), *readMTU...),
- maxConcurrentCalls: append([]int(nil), *maxConcurrentCalls...),
- reqSizeBytes: append([]int(nil), *readReqSizeBytes...),
- respSizeBytes: append([]int(nil), *readRespSizeBytes...),
- compModes: setCompressorMode(*compressorMode),
- enableChannelz: setToggleMode(*channelzOn),
- enablePreloader: setToggleMode(*preloaderMode),
- },
- }
- if len(*reqPayloadCurveFiles) == 0 {
- if len(opts.features.reqSizeBytes) == 0 {
- opts.features.reqSizeBytes = defaultReqSizeBytes
- }
- } else {
- if len(opts.features.reqSizeBytes) != 0 {
- log.Fatalf("you may not specify -reqPayloadCurveFiles and -reqSizeBytes at the same time")
- }
- for _, file := range *reqPayloadCurveFiles {
- pc, err := stats.NewPayloadCurve(file)
- if err != nil {
- log.Fatalf("cannot load payload curve file %s: %v", file, err)
- }
- opts.features.reqPayloadCurves = append(opts.features.reqPayloadCurves, pc)
- }
- opts.features.reqSizeBytes = nil
- }
- if len(*respPayloadCurveFiles) == 0 {
- if len(opts.features.respSizeBytes) == 0 {
- opts.features.respSizeBytes = defaultRespSizeBytes
- }
- } else {
- if len(opts.features.respSizeBytes) != 0 {
- log.Fatalf("you may not specify -respPayloadCurveFiles and -respSizeBytes at the same time")
- }
- for _, file := range *respPayloadCurveFiles {
- pc, err := stats.NewPayloadCurve(file)
- if err != nil {
- log.Fatalf("cannot load payload curve file %s: %v", file, err)
- }
- opts.features.respPayloadCurves = append(opts.features.respPayloadCurves, pc)
- }
- opts.features.respSizeBytes = nil
- }
- // Re-write latency, kpbs and mtu if network mode is set.
- if network, ok := networks[opts.networkMode]; ok {
- opts.features.readLatencies = []time.Duration{network.Latency}
- opts.features.readKbps = []int{network.Kbps}
- opts.features.readMTU = []int{network.MTU}
- }
- return opts
- }
- func setToggleMode(val string) []bool {
- switch val {
- case toggleModeOn:
- return []bool{true}
- case toggleModeOff:
- return []bool{false}
- case toggleModeBoth:
- return []bool{false, true}
- default:
- // This should never happen because a wrong value passed to this flag would
- // be caught during flag.Parse().
- return []bool{}
- }
- }
- func setCompressorMode(val string) []string {
- switch val {
- case compModeNop, compModeGzip, compModeOff:
- return []string{val}
- case compModeAll:
- return []string{compModeNop, compModeGzip, compModeOff}
- default:
- // This should never happen because a wrong value passed to this flag would
- // be caught during flag.Parse().
- return []string{}
- }
- }
- func main() {
- opts := processFlags()
- before(opts)
- s := stats.NewStats(numStatsBuckets)
- featuresNum := makeFeaturesNum(opts)
- sf := sharedFeatures(featuresNum)
- var (
- start = func(mode string, bf stats.Features) { s.StartRun(mode, bf, sf) }
- stop = func(count uint64) { s.EndRun(count) }
- ucStop = func(req uint64, resp uint64) { s.EndUnconstrainedRun(req, resp) }
- )
- for _, bf := range opts.generateFeatures(featuresNum) {
- grpc.EnableTracing = bf.EnableTrace
- if bf.EnableChannelz {
- channelz.TurnOn()
- }
- if opts.rModes.unary {
- unaryBenchmark(start, stop, bf, s)
- }
- if opts.rModes.streaming {
- streamBenchmark(start, stop, bf, s)
- }
- if opts.rModes.unconstrained {
- unconstrainedStreamBenchmark(start, ucStop, bf, s)
- }
- }
- after(opts, s.GetResults())
- }
- func before(opts *benchOpts) {
- if opts.memProfile != "" {
- runtime.MemProfileRate = opts.memProfileRate
- }
- if opts.cpuProfile != "" {
- f, err := os.Create(opts.cpuProfile)
- if err != nil {
- fmt.Fprintf(os.Stderr, "testing: %s\n", err)
- return
- }
- if err := pprof.StartCPUProfile(f); err != nil {
- fmt.Fprintf(os.Stderr, "testing: can't start cpu profile: %s\n", err)
- f.Close()
- return
- }
- }
- }
- func after(opts *benchOpts, data []stats.BenchResults) {
- if opts.cpuProfile != "" {
- pprof.StopCPUProfile() // flushes profile to disk
- }
- if opts.memProfile != "" {
- f, err := os.Create(opts.memProfile)
- if err != nil {
- fmt.Fprintf(os.Stderr, "testing: %s\n", err)
- os.Exit(2)
- }
- runtime.GC() // materialize all statistics
- if err = pprof.WriteHeapProfile(f); err != nil {
- fmt.Fprintf(os.Stderr, "testing: can't write heap profile %s: %s\n", opts.memProfile, err)
- os.Exit(2)
- }
- f.Close()
- }
- if opts.benchmarkResultFile != "" {
- f, err := os.Create(opts.benchmarkResultFile)
- if err != nil {
- log.Fatalf("testing: can't write benchmark result %s: %s\n", opts.benchmarkResultFile, err)
- }
- dataEncoder := gob.NewEncoder(f)
- dataEncoder.Encode(data)
- f.Close()
- }
- }
- // nopCompressor is a compressor that just copies data.
- type nopCompressor struct{}
- func (nopCompressor) Do(w io.Writer, p []byte) error {
- n, err := w.Write(p)
- if err != nil {
- return err
- }
- if n != len(p) {
- return fmt.Errorf("nopCompressor.Write: wrote %v bytes; want %v", n, len(p))
- }
- return nil
- }
- func (nopCompressor) Type() string { return compModeNop }
- // nopDecompressor is a decompressor that just copies data.
- type nopDecompressor struct{}
- func (nopDecompressor) Do(r io.Reader) ([]byte, error) { return ioutil.ReadAll(r) }
- func (nopDecompressor) Type() string { return compModeNop }
|