stats_test.go 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323
  1. /*
  2. *
  3. * Copyright 2016 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package stats_test
  19. import (
  20. "context"
  21. "fmt"
  22. "io"
  23. "net"
  24. "reflect"
  25. "sync"
  26. "testing"
  27. "time"
  28. "github.com/golang/protobuf/proto"
  29. "google.golang.org/grpc"
  30. "google.golang.org/grpc/internal/grpctest"
  31. "google.golang.org/grpc/metadata"
  32. "google.golang.org/grpc/stats"
  33. testpb "google.golang.org/grpc/stats/grpc_testing"
  34. "google.golang.org/grpc/status"
  35. )
  36. type s struct {
  37. grpctest.Tester
  38. }
  39. func Test(t *testing.T) {
  40. grpctest.RunSubTests(t, s{})
  41. }
  42. func init() {
  43. grpc.EnableTracing = false
  44. }
  45. type connCtxKey struct{}
  46. type rpcCtxKey struct{}
  47. var (
  48. // For headers sent to server:
  49. testMetadata = metadata.MD{
  50. "key1": []string{"value1"},
  51. "key2": []string{"value2"},
  52. "user-agent": []string{fmt.Sprintf("test/0.0.1 grpc-go/%s", grpc.Version)},
  53. }
  54. // For headers sent from server:
  55. testHeaderMetadata = metadata.MD{
  56. "hkey1": []string{"headerValue1"},
  57. "hkey2": []string{"headerValue2"},
  58. }
  59. // For trailers sent from server:
  60. testTrailerMetadata = metadata.MD{
  61. "tkey1": []string{"trailerValue1"},
  62. "tkey2": []string{"trailerValue2"},
  63. }
  64. // The id for which the service handler should return error.
  65. errorID int32 = 32202
  66. )
  67. type testServer struct {
  68. testpb.UnimplementedTestServiceServer
  69. }
  70. func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
  71. if err := grpc.SendHeader(ctx, testHeaderMetadata); err != nil {
  72. return nil, status.Errorf(status.Code(err), "grpc.SendHeader(_, %v) = %v, want <nil>", testHeaderMetadata, err)
  73. }
  74. if err := grpc.SetTrailer(ctx, testTrailerMetadata); err != nil {
  75. return nil, status.Errorf(status.Code(err), "grpc.SetTrailer(_, %v) = %v, want <nil>", testTrailerMetadata, err)
  76. }
  77. if in.Id == errorID {
  78. return nil, fmt.Errorf("got error id: %v", in.Id)
  79. }
  80. return &testpb.SimpleResponse{Id: in.Id}, nil
  81. }
  82. func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
  83. if err := stream.SendHeader(testHeaderMetadata); err != nil {
  84. return status.Errorf(status.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, testHeaderMetadata, err, nil)
  85. }
  86. stream.SetTrailer(testTrailerMetadata)
  87. for {
  88. in, err := stream.Recv()
  89. if err == io.EOF {
  90. // read done.
  91. return nil
  92. }
  93. if err != nil {
  94. return err
  95. }
  96. if in.Id == errorID {
  97. return fmt.Errorf("got error id: %v", in.Id)
  98. }
  99. if err := stream.Send(&testpb.SimpleResponse{Id: in.Id}); err != nil {
  100. return err
  101. }
  102. }
  103. }
  104. func (s *testServer) ClientStreamCall(stream testpb.TestService_ClientStreamCallServer) error {
  105. if err := stream.SendHeader(testHeaderMetadata); err != nil {
  106. return status.Errorf(status.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, testHeaderMetadata, err, nil)
  107. }
  108. stream.SetTrailer(testTrailerMetadata)
  109. for {
  110. in, err := stream.Recv()
  111. if err == io.EOF {
  112. // read done.
  113. return stream.SendAndClose(&testpb.SimpleResponse{Id: int32(0)})
  114. }
  115. if err != nil {
  116. return err
  117. }
  118. if in.Id == errorID {
  119. return fmt.Errorf("got error id: %v", in.Id)
  120. }
  121. }
  122. }
  123. func (s *testServer) ServerStreamCall(in *testpb.SimpleRequest, stream testpb.TestService_ServerStreamCallServer) error {
  124. if err := stream.SendHeader(testHeaderMetadata); err != nil {
  125. return status.Errorf(status.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, testHeaderMetadata, err, nil)
  126. }
  127. stream.SetTrailer(testTrailerMetadata)
  128. if in.Id == errorID {
  129. return fmt.Errorf("got error id: %v", in.Id)
  130. }
  131. for i := 0; i < 5; i++ {
  132. if err := stream.Send(&testpb.SimpleResponse{Id: in.Id}); err != nil {
  133. return err
  134. }
  135. }
  136. return nil
  137. }
  138. // test is an end-to-end test. It should be created with the newTest
  139. // func, modified as needed, and then started with its startServer method.
  140. // It should be cleaned up with the tearDown method.
  141. type test struct {
  142. t *testing.T
  143. compress string
  144. clientStatsHandler stats.Handler
  145. serverStatsHandler stats.Handler
  146. testServer testpb.TestServiceServer // nil means none
  147. // srv and srvAddr are set once startServer is called.
  148. srv *grpc.Server
  149. srvAddr string
  150. cc *grpc.ClientConn // nil until requested via clientConn
  151. }
  152. func (te *test) tearDown() {
  153. if te.cc != nil {
  154. te.cc.Close()
  155. te.cc = nil
  156. }
  157. te.srv.Stop()
  158. }
  159. type testConfig struct {
  160. compress string
  161. }
  162. // newTest returns a new test using the provided testing.T and
  163. // environment. It is returned with default values. Tests should
  164. // modify it before calling its startServer and clientConn methods.
  165. func newTest(t *testing.T, tc *testConfig, ch stats.Handler, sh stats.Handler) *test {
  166. te := &test{
  167. t: t,
  168. compress: tc.compress,
  169. clientStatsHandler: ch,
  170. serverStatsHandler: sh,
  171. }
  172. return te
  173. }
  174. // startServer starts a gRPC server listening. Callers should defer a
  175. // call to te.tearDown to clean up.
  176. func (te *test) startServer(ts testpb.TestServiceServer) {
  177. te.testServer = ts
  178. lis, err := net.Listen("tcp", "localhost:0")
  179. if err != nil {
  180. te.t.Fatalf("Failed to listen: %v", err)
  181. }
  182. var opts []grpc.ServerOption
  183. if te.compress == "gzip" {
  184. opts = append(opts,
  185. grpc.RPCCompressor(grpc.NewGZIPCompressor()),
  186. grpc.RPCDecompressor(grpc.NewGZIPDecompressor()),
  187. )
  188. }
  189. if te.serverStatsHandler != nil {
  190. opts = append(opts, grpc.StatsHandler(te.serverStatsHandler))
  191. }
  192. s := grpc.NewServer(opts...)
  193. te.srv = s
  194. if te.testServer != nil {
  195. testpb.RegisterTestServiceServer(s, te.testServer)
  196. }
  197. go s.Serve(lis)
  198. te.srvAddr = lis.Addr().String()
  199. }
  200. func (te *test) clientConn() *grpc.ClientConn {
  201. if te.cc != nil {
  202. return te.cc
  203. }
  204. opts := []grpc.DialOption{
  205. grpc.WithInsecure(),
  206. grpc.WithBlock(),
  207. grpc.WithUserAgent("test/0.0.1"),
  208. }
  209. if te.compress == "gzip" {
  210. opts = append(opts,
  211. grpc.WithCompressor(grpc.NewGZIPCompressor()),
  212. grpc.WithDecompressor(grpc.NewGZIPDecompressor()),
  213. )
  214. }
  215. if te.clientStatsHandler != nil {
  216. opts = append(opts, grpc.WithStatsHandler(te.clientStatsHandler))
  217. }
  218. var err error
  219. te.cc, err = grpc.Dial(te.srvAddr, opts...)
  220. if err != nil {
  221. te.t.Fatalf("Dial(%q) = %v", te.srvAddr, err)
  222. }
  223. return te.cc
  224. }
  225. type rpcType int
  226. const (
  227. unaryRPC rpcType = iota
  228. clientStreamRPC
  229. serverStreamRPC
  230. fullDuplexStreamRPC
  231. )
  232. type rpcConfig struct {
  233. count int // Number of requests and responses for streaming RPCs.
  234. success bool // Whether the RPC should succeed or return error.
  235. failfast bool
  236. callType rpcType // Type of RPC.
  237. }
  238. func (te *test) doUnaryCall(c *rpcConfig) (*testpb.SimpleRequest, *testpb.SimpleResponse, error) {
  239. var (
  240. resp *testpb.SimpleResponse
  241. req *testpb.SimpleRequest
  242. err error
  243. )
  244. tc := testpb.NewTestServiceClient(te.clientConn())
  245. if c.success {
  246. req = &testpb.SimpleRequest{Id: errorID + 1}
  247. } else {
  248. req = &testpb.SimpleRequest{Id: errorID}
  249. }
  250. ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
  251. resp, err = tc.UnaryCall(ctx, req, grpc.WaitForReady(!c.failfast))
  252. return req, resp, err
  253. }
  254. func (te *test) doFullDuplexCallRoundtrip(c *rpcConfig) ([]*testpb.SimpleRequest, []*testpb.SimpleResponse, error) {
  255. var (
  256. reqs []*testpb.SimpleRequest
  257. resps []*testpb.SimpleResponse
  258. err error
  259. )
  260. tc := testpb.NewTestServiceClient(te.clientConn())
  261. stream, err := tc.FullDuplexCall(metadata.NewOutgoingContext(context.Background(), testMetadata), grpc.WaitForReady(!c.failfast))
  262. if err != nil {
  263. return reqs, resps, err
  264. }
  265. var startID int32
  266. if !c.success {
  267. startID = errorID
  268. }
  269. for i := 0; i < c.count; i++ {
  270. req := &testpb.SimpleRequest{
  271. Id: int32(i) + startID,
  272. }
  273. reqs = append(reqs, req)
  274. if err = stream.Send(req); err != nil {
  275. return reqs, resps, err
  276. }
  277. var resp *testpb.SimpleResponse
  278. if resp, err = stream.Recv(); err != nil {
  279. return reqs, resps, err
  280. }
  281. resps = append(resps, resp)
  282. }
  283. if err = stream.CloseSend(); err != nil && err != io.EOF {
  284. return reqs, resps, err
  285. }
  286. if _, err = stream.Recv(); err != io.EOF {
  287. return reqs, resps, err
  288. }
  289. return reqs, resps, nil
  290. }
  291. func (te *test) doClientStreamCall(c *rpcConfig) ([]*testpb.SimpleRequest, *testpb.SimpleResponse, error) {
  292. var (
  293. reqs []*testpb.SimpleRequest
  294. resp *testpb.SimpleResponse
  295. err error
  296. )
  297. tc := testpb.NewTestServiceClient(te.clientConn())
  298. stream, err := tc.ClientStreamCall(metadata.NewOutgoingContext(context.Background(), testMetadata), grpc.WaitForReady(!c.failfast))
  299. if err != nil {
  300. return reqs, resp, err
  301. }
  302. var startID int32
  303. if !c.success {
  304. startID = errorID
  305. }
  306. for i := 0; i < c.count; i++ {
  307. req := &testpb.SimpleRequest{
  308. Id: int32(i) + startID,
  309. }
  310. reqs = append(reqs, req)
  311. if err = stream.Send(req); err != nil {
  312. return reqs, resp, err
  313. }
  314. }
  315. resp, err = stream.CloseAndRecv()
  316. return reqs, resp, err
  317. }
  318. func (te *test) doServerStreamCall(c *rpcConfig) (*testpb.SimpleRequest, []*testpb.SimpleResponse, error) {
  319. var (
  320. req *testpb.SimpleRequest
  321. resps []*testpb.SimpleResponse
  322. err error
  323. )
  324. tc := testpb.NewTestServiceClient(te.clientConn())
  325. var startID int32
  326. if !c.success {
  327. startID = errorID
  328. }
  329. req = &testpb.SimpleRequest{Id: startID}
  330. stream, err := tc.ServerStreamCall(metadata.NewOutgoingContext(context.Background(), testMetadata), req, grpc.WaitForReady(!c.failfast))
  331. if err != nil {
  332. return req, resps, err
  333. }
  334. for {
  335. var resp *testpb.SimpleResponse
  336. resp, err := stream.Recv()
  337. if err == io.EOF {
  338. return req, resps, nil
  339. } else if err != nil {
  340. return req, resps, err
  341. }
  342. resps = append(resps, resp)
  343. }
  344. }
  345. type expectedData struct {
  346. method string
  347. serverAddr string
  348. compression string
  349. reqIdx int
  350. requests []*testpb.SimpleRequest
  351. respIdx int
  352. responses []*testpb.SimpleResponse
  353. err error
  354. failfast bool
  355. }
  356. type gotData struct {
  357. ctx context.Context
  358. client bool
  359. s interface{} // This could be RPCStats or ConnStats.
  360. }
  361. const (
  362. begin int = iota
  363. end
  364. inPayload
  365. inHeader
  366. inTrailer
  367. outPayload
  368. outHeader
  369. // TODO: test outTrailer ?
  370. connBegin
  371. connEnd
  372. )
  373. func checkBegin(t *testing.T, d *gotData, e *expectedData) {
  374. var (
  375. ok bool
  376. st *stats.Begin
  377. )
  378. if st, ok = d.s.(*stats.Begin); !ok {
  379. t.Fatalf("got %T, want Begin", d.s)
  380. }
  381. if d.ctx == nil {
  382. t.Fatalf("d.ctx = nil, want <non-nil>")
  383. }
  384. if st.BeginTime.IsZero() {
  385. t.Fatalf("st.BeginTime = %v, want <non-zero>", st.BeginTime)
  386. }
  387. if d.client {
  388. if st.FailFast != e.failfast {
  389. t.Fatalf("st.FailFast = %v, want %v", st.FailFast, e.failfast)
  390. }
  391. }
  392. }
  393. func checkInHeader(t *testing.T, d *gotData, e *expectedData) {
  394. var (
  395. ok bool
  396. st *stats.InHeader
  397. )
  398. if st, ok = d.s.(*stats.InHeader); !ok {
  399. t.Fatalf("got %T, want InHeader", d.s)
  400. }
  401. if d.ctx == nil {
  402. t.Fatalf("d.ctx = nil, want <non-nil>")
  403. }
  404. if st.Compression != e.compression {
  405. t.Fatalf("st.Compression = %v, want %v", st.Compression, e.compression)
  406. }
  407. if d.client {
  408. // additional headers might be injected so instead of testing equality, test that all the
  409. // expected headers keys have the expected header values.
  410. for key := range testHeaderMetadata {
  411. if !reflect.DeepEqual(st.Header.Get(key), testHeaderMetadata.Get(key)) {
  412. t.Fatalf("st.Header[%s] = %v, want %v", key, st.Header.Get(key), testHeaderMetadata.Get(key))
  413. }
  414. }
  415. } else {
  416. if st.FullMethod != e.method {
  417. t.Fatalf("st.FullMethod = %s, want %v", st.FullMethod, e.method)
  418. }
  419. if st.LocalAddr.String() != e.serverAddr {
  420. t.Fatalf("st.LocalAddr = %v, want %v", st.LocalAddr, e.serverAddr)
  421. }
  422. // additional headers might be injected so instead of testing equality, test that all the
  423. // expected headers keys have the expected header values.
  424. for key := range testMetadata {
  425. if !reflect.DeepEqual(st.Header.Get(key), testMetadata.Get(key)) {
  426. t.Fatalf("st.Header[%s] = %v, want %v", key, st.Header.Get(key), testMetadata.Get(key))
  427. }
  428. }
  429. if connInfo, ok := d.ctx.Value(connCtxKey{}).(*stats.ConnTagInfo); ok {
  430. if connInfo.RemoteAddr != st.RemoteAddr {
  431. t.Fatalf("connInfo.RemoteAddr = %v, want %v", connInfo.RemoteAddr, st.RemoteAddr)
  432. }
  433. if connInfo.LocalAddr != st.LocalAddr {
  434. t.Fatalf("connInfo.LocalAddr = %v, want %v", connInfo.LocalAddr, st.LocalAddr)
  435. }
  436. } else {
  437. t.Fatalf("got context %v, want one with connCtxKey", d.ctx)
  438. }
  439. if rpcInfo, ok := d.ctx.Value(rpcCtxKey{}).(*stats.RPCTagInfo); ok {
  440. if rpcInfo.FullMethodName != st.FullMethod {
  441. t.Fatalf("rpcInfo.FullMethod = %s, want %v", rpcInfo.FullMethodName, st.FullMethod)
  442. }
  443. } else {
  444. t.Fatalf("got context %v, want one with rpcCtxKey", d.ctx)
  445. }
  446. }
  447. }
  448. func checkInPayload(t *testing.T, d *gotData, e *expectedData) {
  449. var (
  450. ok bool
  451. st *stats.InPayload
  452. )
  453. if st, ok = d.s.(*stats.InPayload); !ok {
  454. t.Fatalf("got %T, want InPayload", d.s)
  455. }
  456. if d.ctx == nil {
  457. t.Fatalf("d.ctx = nil, want <non-nil>")
  458. }
  459. if d.client {
  460. b, err := proto.Marshal(e.responses[e.respIdx])
  461. if err != nil {
  462. t.Fatalf("failed to marshal message: %v", err)
  463. }
  464. if reflect.TypeOf(st.Payload) != reflect.TypeOf(e.responses[e.respIdx]) {
  465. t.Fatalf("st.Payload = %T, want %T", st.Payload, e.responses[e.respIdx])
  466. }
  467. e.respIdx++
  468. if string(st.Data) != string(b) {
  469. t.Fatalf("st.Data = %v, want %v", st.Data, b)
  470. }
  471. if st.Length != len(b) {
  472. t.Fatalf("st.Lenght = %v, want %v", st.Length, len(b))
  473. }
  474. } else {
  475. b, err := proto.Marshal(e.requests[e.reqIdx])
  476. if err != nil {
  477. t.Fatalf("failed to marshal message: %v", err)
  478. }
  479. if reflect.TypeOf(st.Payload) != reflect.TypeOf(e.requests[e.reqIdx]) {
  480. t.Fatalf("st.Payload = %T, want %T", st.Payload, e.requests[e.reqIdx])
  481. }
  482. e.reqIdx++
  483. if string(st.Data) != string(b) {
  484. t.Fatalf("st.Data = %v, want %v", st.Data, b)
  485. }
  486. if st.Length != len(b) {
  487. t.Fatalf("st.Lenght = %v, want %v", st.Length, len(b))
  488. }
  489. }
  490. // Below are sanity checks that WireLength and RecvTime are populated.
  491. // TODO: check values of WireLength and RecvTime.
  492. if len(st.Data) > 0 && st.WireLength == 0 {
  493. t.Fatalf("st.WireLength = %v with non-empty data, want <non-zero>",
  494. st.WireLength)
  495. }
  496. if st.RecvTime.IsZero() {
  497. t.Fatalf("st.ReceivedTime = %v, want <non-zero>", st.RecvTime)
  498. }
  499. }
  500. func checkInTrailer(t *testing.T, d *gotData, e *expectedData) {
  501. var (
  502. ok bool
  503. st *stats.InTrailer
  504. )
  505. if st, ok = d.s.(*stats.InTrailer); !ok {
  506. t.Fatalf("got %T, want InTrailer", d.s)
  507. }
  508. if d.ctx == nil {
  509. t.Fatalf("d.ctx = nil, want <non-nil>")
  510. }
  511. if !st.Client {
  512. t.Fatalf("st IsClient = false, want true")
  513. }
  514. if !reflect.DeepEqual(st.Trailer, testTrailerMetadata) {
  515. t.Fatalf("st.Trailer = %v, want %v", st.Trailer, testTrailerMetadata)
  516. }
  517. }
  518. func checkOutHeader(t *testing.T, d *gotData, e *expectedData) {
  519. var (
  520. ok bool
  521. st *stats.OutHeader
  522. )
  523. if st, ok = d.s.(*stats.OutHeader); !ok {
  524. t.Fatalf("got %T, want OutHeader", d.s)
  525. }
  526. if d.ctx == nil {
  527. t.Fatalf("d.ctx = nil, want <non-nil>")
  528. }
  529. if st.Compression != e.compression {
  530. t.Fatalf("st.Compression = %v, want %v", st.Compression, e.compression)
  531. }
  532. if d.client {
  533. if st.FullMethod != e.method {
  534. t.Fatalf("st.FullMethod = %s, want %v", st.FullMethod, e.method)
  535. }
  536. if st.RemoteAddr.String() != e.serverAddr {
  537. t.Fatalf("st.RemoteAddr = %v, want %v", st.RemoteAddr, e.serverAddr)
  538. }
  539. // additional headers might be injected so instead of testing equality, test that all the
  540. // expected headers keys have the expected header values.
  541. for key := range testMetadata {
  542. if !reflect.DeepEqual(st.Header.Get(key), testMetadata.Get(key)) {
  543. t.Fatalf("st.Header[%s] = %v, want %v", key, st.Header.Get(key), testMetadata.Get(key))
  544. }
  545. }
  546. if rpcInfo, ok := d.ctx.Value(rpcCtxKey{}).(*stats.RPCTagInfo); ok {
  547. if rpcInfo.FullMethodName != st.FullMethod {
  548. t.Fatalf("rpcInfo.FullMethod = %s, want %v", rpcInfo.FullMethodName, st.FullMethod)
  549. }
  550. } else {
  551. t.Fatalf("got context %v, want one with rpcCtxKey", d.ctx)
  552. }
  553. } else {
  554. // additional headers might be injected so instead of testing equality, test that all the
  555. // expected headers keys have the expected header values.
  556. for key := range testHeaderMetadata {
  557. if !reflect.DeepEqual(st.Header.Get(key), testHeaderMetadata.Get(key)) {
  558. t.Fatalf("st.Header[%s] = %v, want %v", key, st.Header.Get(key), testHeaderMetadata.Get(key))
  559. }
  560. }
  561. }
  562. }
  563. func checkOutPayload(t *testing.T, d *gotData, e *expectedData) {
  564. var (
  565. ok bool
  566. st *stats.OutPayload
  567. )
  568. if st, ok = d.s.(*stats.OutPayload); !ok {
  569. t.Fatalf("got %T, want OutPayload", d.s)
  570. }
  571. if d.ctx == nil {
  572. t.Fatalf("d.ctx = nil, want <non-nil>")
  573. }
  574. if d.client {
  575. b, err := proto.Marshal(e.requests[e.reqIdx])
  576. if err != nil {
  577. t.Fatalf("failed to marshal message: %v", err)
  578. }
  579. if reflect.TypeOf(st.Payload) != reflect.TypeOf(e.requests[e.reqIdx]) {
  580. t.Fatalf("st.Payload = %T, want %T", st.Payload, e.requests[e.reqIdx])
  581. }
  582. e.reqIdx++
  583. if string(st.Data) != string(b) {
  584. t.Fatalf("st.Data = %v, want %v", st.Data, b)
  585. }
  586. if st.Length != len(b) {
  587. t.Fatalf("st.Lenght = %v, want %v", st.Length, len(b))
  588. }
  589. } else {
  590. b, err := proto.Marshal(e.responses[e.respIdx])
  591. if err != nil {
  592. t.Fatalf("failed to marshal message: %v", err)
  593. }
  594. if reflect.TypeOf(st.Payload) != reflect.TypeOf(e.responses[e.respIdx]) {
  595. t.Fatalf("st.Payload = %T, want %T", st.Payload, e.responses[e.respIdx])
  596. }
  597. e.respIdx++
  598. if string(st.Data) != string(b) {
  599. t.Fatalf("st.Data = %v, want %v", st.Data, b)
  600. }
  601. if st.Length != len(b) {
  602. t.Fatalf("st.Lenght = %v, want %v", st.Length, len(b))
  603. }
  604. }
  605. // Below are sanity checks that WireLength and SentTime are populated.
  606. // TODO: check values of WireLength and SentTime.
  607. if len(st.Data) > 0 && st.WireLength == 0 {
  608. t.Fatalf("st.WireLength = %v with non-empty data, want <non-zero>",
  609. st.WireLength)
  610. }
  611. if st.SentTime.IsZero() {
  612. t.Fatalf("st.SentTime = %v, want <non-zero>", st.SentTime)
  613. }
  614. }
  615. func checkOutTrailer(t *testing.T, d *gotData, e *expectedData) {
  616. var (
  617. ok bool
  618. st *stats.OutTrailer
  619. )
  620. if st, ok = d.s.(*stats.OutTrailer); !ok {
  621. t.Fatalf("got %T, want OutTrailer", d.s)
  622. }
  623. if d.ctx == nil {
  624. t.Fatalf("d.ctx = nil, want <non-nil>")
  625. }
  626. if st.Client {
  627. t.Fatalf("st IsClient = true, want false")
  628. }
  629. if !reflect.DeepEqual(st.Trailer, testTrailerMetadata) {
  630. t.Fatalf("st.Trailer = %v, want %v", st.Trailer, testTrailerMetadata)
  631. }
  632. }
  633. func checkEnd(t *testing.T, d *gotData, e *expectedData) {
  634. var (
  635. ok bool
  636. st *stats.End
  637. )
  638. if st, ok = d.s.(*stats.End); !ok {
  639. t.Fatalf("got %T, want End", d.s)
  640. }
  641. if d.ctx == nil {
  642. t.Fatalf("d.ctx = nil, want <non-nil>")
  643. }
  644. if st.BeginTime.IsZero() {
  645. t.Fatalf("st.BeginTime = %v, want <non-zero>", st.BeginTime)
  646. }
  647. if st.EndTime.IsZero() {
  648. t.Fatalf("st.EndTime = %v, want <non-zero>", st.EndTime)
  649. }
  650. actual, ok := status.FromError(st.Error)
  651. if !ok {
  652. t.Fatalf("expected st.Error to be a statusError, got %v (type %T)", st.Error, st.Error)
  653. }
  654. expectedStatus, _ := status.FromError(e.err)
  655. if actual.Code() != expectedStatus.Code() || actual.Message() != expectedStatus.Message() {
  656. t.Fatalf("st.Error = %v, want %v", st.Error, e.err)
  657. }
  658. if st.Client {
  659. if !reflect.DeepEqual(st.Trailer, testTrailerMetadata) {
  660. t.Fatalf("st.Trailer = %v, want %v", st.Trailer, testTrailerMetadata)
  661. }
  662. } else {
  663. if st.Trailer != nil {
  664. t.Fatalf("st.Trailer = %v, want nil", st.Trailer)
  665. }
  666. }
  667. }
  668. func checkConnBegin(t *testing.T, d *gotData, e *expectedData) {
  669. var (
  670. ok bool
  671. st *stats.ConnBegin
  672. )
  673. if st, ok = d.s.(*stats.ConnBegin); !ok {
  674. t.Fatalf("got %T, want ConnBegin", d.s)
  675. }
  676. if d.ctx == nil {
  677. t.Fatalf("d.ctx = nil, want <non-nil>")
  678. }
  679. st.IsClient() // TODO remove this.
  680. }
  681. func checkConnEnd(t *testing.T, d *gotData, e *expectedData) {
  682. var (
  683. ok bool
  684. st *stats.ConnEnd
  685. )
  686. if st, ok = d.s.(*stats.ConnEnd); !ok {
  687. t.Fatalf("got %T, want ConnEnd", d.s)
  688. }
  689. if d.ctx == nil {
  690. t.Fatalf("d.ctx = nil, want <non-nil>")
  691. }
  692. st.IsClient() // TODO remove this.
  693. }
  694. type statshandler struct {
  695. mu sync.Mutex
  696. gotRPC []*gotData
  697. gotConn []*gotData
  698. }
  699. func (h *statshandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
  700. return context.WithValue(ctx, connCtxKey{}, info)
  701. }
  702. func (h *statshandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
  703. return context.WithValue(ctx, rpcCtxKey{}, info)
  704. }
  705. func (h *statshandler) HandleConn(ctx context.Context, s stats.ConnStats) {
  706. h.mu.Lock()
  707. defer h.mu.Unlock()
  708. h.gotConn = append(h.gotConn, &gotData{ctx, s.IsClient(), s})
  709. }
  710. func (h *statshandler) HandleRPC(ctx context.Context, s stats.RPCStats) {
  711. h.mu.Lock()
  712. defer h.mu.Unlock()
  713. h.gotRPC = append(h.gotRPC, &gotData{ctx, s.IsClient(), s})
  714. }
  715. func checkConnStats(t *testing.T, got []*gotData) {
  716. if len(got) <= 0 || len(got)%2 != 0 {
  717. for i, g := range got {
  718. t.Errorf(" - %v, %T = %+v, ctx: %v", i, g.s, g.s, g.ctx)
  719. }
  720. t.Fatalf("got %v stats, want even positive number", len(got))
  721. }
  722. // The first conn stats must be a ConnBegin.
  723. checkConnBegin(t, got[0], nil)
  724. // The last conn stats must be a ConnEnd.
  725. checkConnEnd(t, got[len(got)-1], nil)
  726. }
  727. func checkServerStats(t *testing.T, got []*gotData, expect *expectedData, checkFuncs []func(t *testing.T, d *gotData, e *expectedData)) {
  728. if len(got) != len(checkFuncs) {
  729. for i, g := range got {
  730. t.Errorf(" - %v, %T", i, g.s)
  731. }
  732. t.Fatalf("got %v stats, want %v stats", len(got), len(checkFuncs))
  733. }
  734. var rpcctx context.Context
  735. for i := 0; i < len(got); i++ {
  736. if _, ok := got[i].s.(stats.RPCStats); ok {
  737. if rpcctx != nil && got[i].ctx != rpcctx {
  738. t.Fatalf("got different contexts with stats %T", got[i].s)
  739. }
  740. rpcctx = got[i].ctx
  741. }
  742. }
  743. for i, f := range checkFuncs {
  744. f(t, got[i], expect)
  745. }
  746. }
  747. func testServerStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs []func(t *testing.T, d *gotData, e *expectedData)) {
  748. h := &statshandler{}
  749. te := newTest(t, tc, nil, h)
  750. te.startServer(&testServer{})
  751. defer te.tearDown()
  752. var (
  753. reqs []*testpb.SimpleRequest
  754. resps []*testpb.SimpleResponse
  755. err error
  756. method string
  757. req *testpb.SimpleRequest
  758. resp *testpb.SimpleResponse
  759. e error
  760. )
  761. switch cc.callType {
  762. case unaryRPC:
  763. method = "/grpc.testing.TestService/UnaryCall"
  764. req, resp, e = te.doUnaryCall(cc)
  765. reqs = []*testpb.SimpleRequest{req}
  766. resps = []*testpb.SimpleResponse{resp}
  767. err = e
  768. case clientStreamRPC:
  769. method = "/grpc.testing.TestService/ClientStreamCall"
  770. reqs, resp, e = te.doClientStreamCall(cc)
  771. resps = []*testpb.SimpleResponse{resp}
  772. err = e
  773. case serverStreamRPC:
  774. method = "/grpc.testing.TestService/ServerStreamCall"
  775. req, resps, e = te.doServerStreamCall(cc)
  776. reqs = []*testpb.SimpleRequest{req}
  777. err = e
  778. case fullDuplexStreamRPC:
  779. method = "/grpc.testing.TestService/FullDuplexCall"
  780. reqs, resps, err = te.doFullDuplexCallRoundtrip(cc)
  781. }
  782. if cc.success != (err == nil) {
  783. t.Fatalf("cc.success: %v, got error: %v", cc.success, err)
  784. }
  785. te.cc.Close()
  786. te.srv.GracefulStop() // Wait for the server to stop.
  787. for {
  788. h.mu.Lock()
  789. if len(h.gotRPC) >= len(checkFuncs) {
  790. h.mu.Unlock()
  791. break
  792. }
  793. h.mu.Unlock()
  794. time.Sleep(10 * time.Millisecond)
  795. }
  796. for {
  797. h.mu.Lock()
  798. if _, ok := h.gotConn[len(h.gotConn)-1].s.(*stats.ConnEnd); ok {
  799. h.mu.Unlock()
  800. break
  801. }
  802. h.mu.Unlock()
  803. time.Sleep(10 * time.Millisecond)
  804. }
  805. expect := &expectedData{
  806. serverAddr: te.srvAddr,
  807. compression: tc.compress,
  808. method: method,
  809. requests: reqs,
  810. responses: resps,
  811. err: err,
  812. }
  813. h.mu.Lock()
  814. checkConnStats(t, h.gotConn)
  815. h.mu.Unlock()
  816. checkServerStats(t, h.gotRPC, expect, checkFuncs)
  817. }
  818. func (s) TestServerStatsUnaryRPC(t *testing.T) {
  819. testServerStats(t, &testConfig{compress: ""}, &rpcConfig{success: true, callType: unaryRPC}, []func(t *testing.T, d *gotData, e *expectedData){
  820. checkInHeader,
  821. checkBegin,
  822. checkInPayload,
  823. checkOutHeader,
  824. checkOutPayload,
  825. checkOutTrailer,
  826. checkEnd,
  827. })
  828. }
  829. func (s) TestServerStatsUnaryRPCError(t *testing.T) {
  830. testServerStats(t, &testConfig{compress: ""}, &rpcConfig{success: false, callType: unaryRPC}, []func(t *testing.T, d *gotData, e *expectedData){
  831. checkInHeader,
  832. checkBegin,
  833. checkInPayload,
  834. checkOutHeader,
  835. checkOutTrailer,
  836. checkEnd,
  837. })
  838. }
  839. func (s) TestServerStatsClientStreamRPC(t *testing.T) {
  840. count := 5
  841. checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
  842. checkInHeader,
  843. checkBegin,
  844. checkOutHeader,
  845. }
  846. ioPayFuncs := []func(t *testing.T, d *gotData, e *expectedData){
  847. checkInPayload,
  848. }
  849. for i := 0; i < count; i++ {
  850. checkFuncs = append(checkFuncs, ioPayFuncs...)
  851. }
  852. checkFuncs = append(checkFuncs,
  853. checkOutPayload,
  854. checkOutTrailer,
  855. checkEnd,
  856. )
  857. testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, callType: clientStreamRPC}, checkFuncs)
  858. }
  859. func (s) TestServerStatsClientStreamRPCError(t *testing.T) {
  860. count := 1
  861. testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, callType: clientStreamRPC}, []func(t *testing.T, d *gotData, e *expectedData){
  862. checkInHeader,
  863. checkBegin,
  864. checkOutHeader,
  865. checkInPayload,
  866. checkOutTrailer,
  867. checkEnd,
  868. })
  869. }
  870. func (s) TestServerStatsServerStreamRPC(t *testing.T) {
  871. count := 5
  872. checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
  873. checkInHeader,
  874. checkBegin,
  875. checkInPayload,
  876. checkOutHeader,
  877. }
  878. ioPayFuncs := []func(t *testing.T, d *gotData, e *expectedData){
  879. checkOutPayload,
  880. }
  881. for i := 0; i < count; i++ {
  882. checkFuncs = append(checkFuncs, ioPayFuncs...)
  883. }
  884. checkFuncs = append(checkFuncs,
  885. checkOutTrailer,
  886. checkEnd,
  887. )
  888. testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, callType: serverStreamRPC}, checkFuncs)
  889. }
  890. func (s) TestServerStatsServerStreamRPCError(t *testing.T) {
  891. count := 5
  892. testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, callType: serverStreamRPC}, []func(t *testing.T, d *gotData, e *expectedData){
  893. checkInHeader,
  894. checkBegin,
  895. checkInPayload,
  896. checkOutHeader,
  897. checkOutTrailer,
  898. checkEnd,
  899. })
  900. }
  901. func (s) TestServerStatsFullDuplexRPC(t *testing.T) {
  902. count := 5
  903. checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
  904. checkInHeader,
  905. checkBegin,
  906. checkOutHeader,
  907. }
  908. ioPayFuncs := []func(t *testing.T, d *gotData, e *expectedData){
  909. checkInPayload,
  910. checkOutPayload,
  911. }
  912. for i := 0; i < count; i++ {
  913. checkFuncs = append(checkFuncs, ioPayFuncs...)
  914. }
  915. checkFuncs = append(checkFuncs,
  916. checkOutTrailer,
  917. checkEnd,
  918. )
  919. testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, callType: fullDuplexStreamRPC}, checkFuncs)
  920. }
  921. func (s) TestServerStatsFullDuplexRPCError(t *testing.T) {
  922. count := 5
  923. testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, callType: fullDuplexStreamRPC}, []func(t *testing.T, d *gotData, e *expectedData){
  924. checkInHeader,
  925. checkBegin,
  926. checkOutHeader,
  927. checkInPayload,
  928. checkOutTrailer,
  929. checkEnd,
  930. })
  931. }
  932. type checkFuncWithCount struct {
  933. f func(t *testing.T, d *gotData, e *expectedData)
  934. c int // expected count
  935. }
  936. func checkClientStats(t *testing.T, got []*gotData, expect *expectedData, checkFuncs map[int]*checkFuncWithCount) {
  937. var expectLen int
  938. for _, v := range checkFuncs {
  939. expectLen += v.c
  940. }
  941. if len(got) != expectLen {
  942. for i, g := range got {
  943. t.Errorf(" - %v, %T", i, g.s)
  944. }
  945. t.Fatalf("got %v stats, want %v stats", len(got), expectLen)
  946. }
  947. var tagInfoInCtx *stats.RPCTagInfo
  948. for i := 0; i < len(got); i++ {
  949. if _, ok := got[i].s.(stats.RPCStats); ok {
  950. tagInfoInCtxNew, _ := got[i].ctx.Value(rpcCtxKey{}).(*stats.RPCTagInfo)
  951. if tagInfoInCtx != nil && tagInfoInCtx != tagInfoInCtxNew {
  952. t.Fatalf("got context containing different tagInfo with stats %T", got[i].s)
  953. }
  954. tagInfoInCtx = tagInfoInCtxNew
  955. }
  956. }
  957. for _, s := range got {
  958. switch s.s.(type) {
  959. case *stats.Begin:
  960. if checkFuncs[begin].c <= 0 {
  961. t.Fatalf("unexpected stats: %T", s.s)
  962. }
  963. checkFuncs[begin].f(t, s, expect)
  964. checkFuncs[begin].c--
  965. case *stats.OutHeader:
  966. if checkFuncs[outHeader].c <= 0 {
  967. t.Fatalf("unexpected stats: %T", s.s)
  968. }
  969. checkFuncs[outHeader].f(t, s, expect)
  970. checkFuncs[outHeader].c--
  971. case *stats.OutPayload:
  972. if checkFuncs[outPayload].c <= 0 {
  973. t.Fatalf("unexpected stats: %T", s.s)
  974. }
  975. checkFuncs[outPayload].f(t, s, expect)
  976. checkFuncs[outPayload].c--
  977. case *stats.InHeader:
  978. if checkFuncs[inHeader].c <= 0 {
  979. t.Fatalf("unexpected stats: %T", s.s)
  980. }
  981. checkFuncs[inHeader].f(t, s, expect)
  982. checkFuncs[inHeader].c--
  983. case *stats.InPayload:
  984. if checkFuncs[inPayload].c <= 0 {
  985. t.Fatalf("unexpected stats: %T", s.s)
  986. }
  987. checkFuncs[inPayload].f(t, s, expect)
  988. checkFuncs[inPayload].c--
  989. case *stats.InTrailer:
  990. if checkFuncs[inTrailer].c <= 0 {
  991. t.Fatalf("unexpected stats: %T", s.s)
  992. }
  993. checkFuncs[inTrailer].f(t, s, expect)
  994. checkFuncs[inTrailer].c--
  995. case *stats.End:
  996. if checkFuncs[end].c <= 0 {
  997. t.Fatalf("unexpected stats: %T", s.s)
  998. }
  999. checkFuncs[end].f(t, s, expect)
  1000. checkFuncs[end].c--
  1001. case *stats.ConnBegin:
  1002. if checkFuncs[connBegin].c <= 0 {
  1003. t.Fatalf("unexpected stats: %T", s.s)
  1004. }
  1005. checkFuncs[connBegin].f(t, s, expect)
  1006. checkFuncs[connBegin].c--
  1007. case *stats.ConnEnd:
  1008. if checkFuncs[connEnd].c <= 0 {
  1009. t.Fatalf("unexpected stats: %T", s.s)
  1010. }
  1011. checkFuncs[connEnd].f(t, s, expect)
  1012. checkFuncs[connEnd].c--
  1013. default:
  1014. t.Fatalf("unexpected stats: %T", s.s)
  1015. }
  1016. }
  1017. }
  1018. func testClientStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs map[int]*checkFuncWithCount) {
  1019. h := &statshandler{}
  1020. te := newTest(t, tc, h, nil)
  1021. te.startServer(&testServer{})
  1022. defer te.tearDown()
  1023. var (
  1024. reqs []*testpb.SimpleRequest
  1025. resps []*testpb.SimpleResponse
  1026. method string
  1027. err error
  1028. req *testpb.SimpleRequest
  1029. resp *testpb.SimpleResponse
  1030. e error
  1031. )
  1032. switch cc.callType {
  1033. case unaryRPC:
  1034. method = "/grpc.testing.TestService/UnaryCall"
  1035. req, resp, e = te.doUnaryCall(cc)
  1036. reqs = []*testpb.SimpleRequest{req}
  1037. resps = []*testpb.SimpleResponse{resp}
  1038. err = e
  1039. case clientStreamRPC:
  1040. method = "/grpc.testing.TestService/ClientStreamCall"
  1041. reqs, resp, e = te.doClientStreamCall(cc)
  1042. resps = []*testpb.SimpleResponse{resp}
  1043. err = e
  1044. case serverStreamRPC:
  1045. method = "/grpc.testing.TestService/ServerStreamCall"
  1046. req, resps, e = te.doServerStreamCall(cc)
  1047. reqs = []*testpb.SimpleRequest{req}
  1048. err = e
  1049. case fullDuplexStreamRPC:
  1050. method = "/grpc.testing.TestService/FullDuplexCall"
  1051. reqs, resps, err = te.doFullDuplexCallRoundtrip(cc)
  1052. }
  1053. if cc.success != (err == nil) {
  1054. t.Fatalf("cc.success: %v, got error: %v", cc.success, err)
  1055. }
  1056. te.cc.Close()
  1057. te.srv.GracefulStop() // Wait for the server to stop.
  1058. lenRPCStats := 0
  1059. for _, v := range checkFuncs {
  1060. lenRPCStats += v.c
  1061. }
  1062. for {
  1063. h.mu.Lock()
  1064. if len(h.gotRPC) >= lenRPCStats {
  1065. h.mu.Unlock()
  1066. break
  1067. }
  1068. h.mu.Unlock()
  1069. time.Sleep(10 * time.Millisecond)
  1070. }
  1071. for {
  1072. h.mu.Lock()
  1073. if _, ok := h.gotConn[len(h.gotConn)-1].s.(*stats.ConnEnd); ok {
  1074. h.mu.Unlock()
  1075. break
  1076. }
  1077. h.mu.Unlock()
  1078. time.Sleep(10 * time.Millisecond)
  1079. }
  1080. expect := &expectedData{
  1081. serverAddr: te.srvAddr,
  1082. compression: tc.compress,
  1083. method: method,
  1084. requests: reqs,
  1085. responses: resps,
  1086. failfast: cc.failfast,
  1087. err: err,
  1088. }
  1089. h.mu.Lock()
  1090. checkConnStats(t, h.gotConn)
  1091. h.mu.Unlock()
  1092. checkClientStats(t, h.gotRPC, expect, checkFuncs)
  1093. }
  1094. func (s) TestClientStatsUnaryRPC(t *testing.T) {
  1095. testClientStats(t, &testConfig{compress: ""}, &rpcConfig{success: true, failfast: false, callType: unaryRPC}, map[int]*checkFuncWithCount{
  1096. begin: {checkBegin, 1},
  1097. outHeader: {checkOutHeader, 1},
  1098. outPayload: {checkOutPayload, 1},
  1099. inHeader: {checkInHeader, 1},
  1100. inPayload: {checkInPayload, 1},
  1101. inTrailer: {checkInTrailer, 1},
  1102. end: {checkEnd, 1},
  1103. })
  1104. }
  1105. func (s) TestClientStatsUnaryRPCError(t *testing.T) {
  1106. testClientStats(t, &testConfig{compress: ""}, &rpcConfig{success: false, failfast: false, callType: unaryRPC}, map[int]*checkFuncWithCount{
  1107. begin: {checkBegin, 1},
  1108. outHeader: {checkOutHeader, 1},
  1109. outPayload: {checkOutPayload, 1},
  1110. inHeader: {checkInHeader, 1},
  1111. inTrailer: {checkInTrailer, 1},
  1112. end: {checkEnd, 1},
  1113. })
  1114. }
  1115. func (s) TestClientStatsClientStreamRPC(t *testing.T) {
  1116. count := 5
  1117. testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: clientStreamRPC}, map[int]*checkFuncWithCount{
  1118. begin: {checkBegin, 1},
  1119. outHeader: {checkOutHeader, 1},
  1120. inHeader: {checkInHeader, 1},
  1121. outPayload: {checkOutPayload, count},
  1122. inTrailer: {checkInTrailer, 1},
  1123. inPayload: {checkInPayload, 1},
  1124. end: {checkEnd, 1},
  1125. })
  1126. }
  1127. func (s) TestClientStatsClientStreamRPCError(t *testing.T) {
  1128. count := 1
  1129. testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, callType: clientStreamRPC}, map[int]*checkFuncWithCount{
  1130. begin: {checkBegin, 1},
  1131. outHeader: {checkOutHeader, 1},
  1132. inHeader: {checkInHeader, 1},
  1133. outPayload: {checkOutPayload, 1},
  1134. inTrailer: {checkInTrailer, 1},
  1135. end: {checkEnd, 1},
  1136. })
  1137. }
  1138. func (s) TestClientStatsServerStreamRPC(t *testing.T) {
  1139. count := 5
  1140. testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: serverStreamRPC}, map[int]*checkFuncWithCount{
  1141. begin: {checkBegin, 1},
  1142. outHeader: {checkOutHeader, 1},
  1143. outPayload: {checkOutPayload, 1},
  1144. inHeader: {checkInHeader, 1},
  1145. inPayload: {checkInPayload, count},
  1146. inTrailer: {checkInTrailer, 1},
  1147. end: {checkEnd, 1},
  1148. })
  1149. }
  1150. func (s) TestClientStatsServerStreamRPCError(t *testing.T) {
  1151. count := 5
  1152. testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, callType: serverStreamRPC}, map[int]*checkFuncWithCount{
  1153. begin: {checkBegin, 1},
  1154. outHeader: {checkOutHeader, 1},
  1155. outPayload: {checkOutPayload, 1},
  1156. inHeader: {checkInHeader, 1},
  1157. inTrailer: {checkInTrailer, 1},
  1158. end: {checkEnd, 1},
  1159. })
  1160. }
  1161. func (s) TestClientStatsFullDuplexRPC(t *testing.T) {
  1162. count := 5
  1163. testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: fullDuplexStreamRPC}, map[int]*checkFuncWithCount{
  1164. begin: {checkBegin, 1},
  1165. outHeader: {checkOutHeader, 1},
  1166. outPayload: {checkOutPayload, count},
  1167. inHeader: {checkInHeader, 1},
  1168. inPayload: {checkInPayload, count},
  1169. inTrailer: {checkInTrailer, 1},
  1170. end: {checkEnd, 1},
  1171. })
  1172. }
  1173. func (s) TestClientStatsFullDuplexRPCError(t *testing.T) {
  1174. count := 5
  1175. testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, callType: fullDuplexStreamRPC}, map[int]*checkFuncWithCount{
  1176. begin: {checkBegin, 1},
  1177. outHeader: {checkOutHeader, 1},
  1178. outPayload: {checkOutPayload, 1},
  1179. inHeader: {checkInHeader, 1},
  1180. inTrailer: {checkInTrailer, 1},
  1181. end: {checkEnd, 1},
  1182. })
  1183. }
  1184. func (s) TestTags(t *testing.T) {
  1185. b := []byte{5, 2, 4, 3, 1}
  1186. ctx := stats.SetTags(context.Background(), b)
  1187. if tg := stats.OutgoingTags(ctx); !reflect.DeepEqual(tg, b) {
  1188. t.Errorf("OutgoingTags(%v) = %v; want %v", ctx, tg, b)
  1189. }
  1190. if tg := stats.Tags(ctx); tg != nil {
  1191. t.Errorf("Tags(%v) = %v; want nil", ctx, tg)
  1192. }
  1193. ctx = stats.SetIncomingTags(context.Background(), b)
  1194. if tg := stats.Tags(ctx); !reflect.DeepEqual(tg, b) {
  1195. t.Errorf("Tags(%v) = %v; want %v", ctx, tg, b)
  1196. }
  1197. if tg := stats.OutgoingTags(ctx); tg != nil {
  1198. t.Errorf("OutgoingTags(%v) = %v; want nil", ctx, tg)
  1199. }
  1200. }
  1201. func (s) TestTrace(t *testing.T) {
  1202. b := []byte{5, 2, 4, 3, 1}
  1203. ctx := stats.SetTrace(context.Background(), b)
  1204. if tr := stats.OutgoingTrace(ctx); !reflect.DeepEqual(tr, b) {
  1205. t.Errorf("OutgoingTrace(%v) = %v; want %v", ctx, tr, b)
  1206. }
  1207. if tr := stats.Trace(ctx); tr != nil {
  1208. t.Errorf("Trace(%v) = %v; want nil", ctx, tr)
  1209. }
  1210. ctx = stats.SetIncomingTrace(context.Background(), b)
  1211. if tr := stats.Trace(ctx); !reflect.DeepEqual(tr, b) {
  1212. t.Errorf("Trace(%v) = %v; want %v", ctx, tr, b)
  1213. }
  1214. if tr := stats.OutgoingTrace(ctx); tr != nil {
  1215. t.Errorf("OutgoingTrace(%v) = %v; want nil", ctx, tr)
  1216. }
  1217. }