rpc_util_test.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "bytes"
  21. "compress/gzip"
  22. "io"
  23. "math"
  24. "reflect"
  25. "testing"
  26. "github.com/golang/protobuf/proto"
  27. "google.golang.org/grpc/codes"
  28. "google.golang.org/grpc/encoding"
  29. protoenc "google.golang.org/grpc/encoding/proto"
  30. "google.golang.org/grpc/internal/testutils"
  31. "google.golang.org/grpc/internal/transport"
  32. "google.golang.org/grpc/status"
  33. perfpb "google.golang.org/grpc/test/codec_perf"
  34. )
  35. type fullReader struct {
  36. reader io.Reader
  37. }
  38. func (f fullReader) Read(p []byte) (int, error) {
  39. return io.ReadFull(f.reader, p)
  40. }
  41. var _ CallOption = EmptyCallOption{} // ensure EmptyCallOption implements the interface
  42. func (s) TestSimpleParsing(t *testing.T) {
  43. bigMsg := bytes.Repeat([]byte{'x'}, 1<<24)
  44. for _, test := range []struct {
  45. // input
  46. p []byte
  47. // outputs
  48. err error
  49. b []byte
  50. pt payloadFormat
  51. }{
  52. {nil, io.EOF, nil, compressionNone},
  53. {[]byte{0, 0, 0, 0, 0}, nil, nil, compressionNone},
  54. {[]byte{0, 0, 0, 0, 1, 'a'}, nil, []byte{'a'}, compressionNone},
  55. {[]byte{1, 0}, io.ErrUnexpectedEOF, nil, compressionNone},
  56. {[]byte{0, 0, 0, 0, 10, 'a'}, io.ErrUnexpectedEOF, nil, compressionNone},
  57. // Check that messages with length >= 2^24 are parsed.
  58. {append([]byte{0, 1, 0, 0, 0}, bigMsg...), nil, bigMsg, compressionNone},
  59. } {
  60. buf := fullReader{bytes.NewReader(test.p)}
  61. parser := &parser{r: buf}
  62. pt, b, err := parser.recvMsg(math.MaxInt32)
  63. if err != test.err || !bytes.Equal(b, test.b) || pt != test.pt {
  64. t.Fatalf("parser{%v}.recvMsg(_) = %v, %v, %v\nwant %v, %v, %v", test.p, pt, b, err, test.pt, test.b, test.err)
  65. }
  66. }
  67. }
  68. func (s) TestMultipleParsing(t *testing.T) {
  69. // Set a byte stream consists of 3 messages with their headers.
  70. p := []byte{0, 0, 0, 0, 1, 'a', 0, 0, 0, 0, 2, 'b', 'c', 0, 0, 0, 0, 1, 'd'}
  71. b := fullReader{bytes.NewReader(p)}
  72. parser := &parser{r: b}
  73. wantRecvs := []struct {
  74. pt payloadFormat
  75. data []byte
  76. }{
  77. {compressionNone, []byte("a")},
  78. {compressionNone, []byte("bc")},
  79. {compressionNone, []byte("d")},
  80. }
  81. for i, want := range wantRecvs {
  82. pt, data, err := parser.recvMsg(math.MaxInt32)
  83. if err != nil || pt != want.pt || !reflect.DeepEqual(data, want.data) {
  84. t.Fatalf("after %d calls, parser{%v}.recvMsg(_) = %v, %v, %v\nwant %v, %v, <nil>",
  85. i, p, pt, data, err, want.pt, want.data)
  86. }
  87. }
  88. pt, data, err := parser.recvMsg(math.MaxInt32)
  89. if err != io.EOF {
  90. t.Fatalf("after %d recvMsgs calls, parser{%v}.recvMsg(_) = %v, %v, %v\nwant _, _, %v",
  91. len(wantRecvs), p, pt, data, err, io.EOF)
  92. }
  93. }
  94. func (s) TestEncode(t *testing.T) {
  95. for _, test := range []struct {
  96. // input
  97. msg proto.Message
  98. // outputs
  99. hdr []byte
  100. data []byte
  101. err error
  102. }{
  103. {nil, []byte{0, 0, 0, 0, 0}, []byte{}, nil},
  104. } {
  105. data, err := encode(encoding.GetCodec(protoenc.Name), test.msg)
  106. if err != test.err || !bytes.Equal(data, test.data) {
  107. t.Errorf("encode(_, %v) = %v, %v; want %v, %v", test.msg, data, err, test.data, test.err)
  108. continue
  109. }
  110. if hdr, _ := msgHeader(data, nil); !bytes.Equal(hdr, test.hdr) {
  111. t.Errorf("msgHeader(%v, false) = %v; want %v", data, hdr, test.hdr)
  112. }
  113. }
  114. }
  115. func (s) TestCompress(t *testing.T) {
  116. bestCompressor, err := NewGZIPCompressorWithLevel(gzip.BestCompression)
  117. if err != nil {
  118. t.Fatalf("Could not initialize gzip compressor with best compression.")
  119. }
  120. bestSpeedCompressor, err := NewGZIPCompressorWithLevel(gzip.BestSpeed)
  121. if err != nil {
  122. t.Fatalf("Could not initialize gzip compressor with best speed compression.")
  123. }
  124. defaultCompressor, err := NewGZIPCompressorWithLevel(gzip.BestSpeed)
  125. if err != nil {
  126. t.Fatalf("Could not initialize gzip compressor with default compression.")
  127. }
  128. level5, err := NewGZIPCompressorWithLevel(5)
  129. if err != nil {
  130. t.Fatalf("Could not initialize gzip compressor with level 5 compression.")
  131. }
  132. for _, test := range []struct {
  133. // input
  134. data []byte
  135. cp Compressor
  136. dc Decompressor
  137. // outputs
  138. err error
  139. }{
  140. {make([]byte, 1024), NewGZIPCompressor(), NewGZIPDecompressor(), nil},
  141. {make([]byte, 1024), bestCompressor, NewGZIPDecompressor(), nil},
  142. {make([]byte, 1024), bestSpeedCompressor, NewGZIPDecompressor(), nil},
  143. {make([]byte, 1024), defaultCompressor, NewGZIPDecompressor(), nil},
  144. {make([]byte, 1024), level5, NewGZIPDecompressor(), nil},
  145. } {
  146. b := new(bytes.Buffer)
  147. if err := test.cp.Do(b, test.data); err != test.err {
  148. t.Fatalf("Compressor.Do(_, %v) = %v, want %v", test.data, err, test.err)
  149. }
  150. if b.Len() >= len(test.data) {
  151. t.Fatalf("The compressor fails to compress data.")
  152. }
  153. if p, err := test.dc.Do(b); err != nil || !bytes.Equal(test.data, p) {
  154. t.Fatalf("Decompressor.Do(%v) = %v, %v, want %v, <nil>", b, p, err, test.data)
  155. }
  156. }
  157. }
  158. func (s) TestToRPCErr(t *testing.T) {
  159. for _, test := range []struct {
  160. // input
  161. errIn error
  162. // outputs
  163. errOut error
  164. }{
  165. {transport.ErrConnClosing, status.Error(codes.Unavailable, transport.ErrConnClosing.Desc)},
  166. {io.ErrUnexpectedEOF, status.Error(codes.Internal, io.ErrUnexpectedEOF.Error())},
  167. } {
  168. err := toRPCErr(test.errIn)
  169. if _, ok := status.FromError(err); !ok {
  170. t.Errorf("toRPCErr{%v} returned type %T, want %T", test.errIn, err, status.Error)
  171. }
  172. if !testutils.StatusErrEqual(err, test.errOut) {
  173. t.Errorf("toRPCErr{%v} = %v \nwant %v", test.errIn, err, test.errOut)
  174. }
  175. }
  176. }
  177. func (s) TestParseDialTarget(t *testing.T) {
  178. for _, test := range []struct {
  179. target, wantNet, wantAddr string
  180. }{
  181. {"unix:etcd:0", "unix", "etcd:0"},
  182. {"unix:///tmp/unix-3", "unix", "/tmp/unix-3"},
  183. {"unix://domain", "unix", "domain"},
  184. {"unix://etcd:0", "unix", "etcd:0"},
  185. {"unix:///etcd:0", "unix", "/etcd:0"},
  186. {"passthrough://unix://domain", "tcp", "passthrough://unix://domain"},
  187. {"https://google.com:443", "tcp", "https://google.com:443"},
  188. {"dns:///google.com", "tcp", "dns:///google.com"},
  189. {"/unix/socket/address", "tcp", "/unix/socket/address"},
  190. } {
  191. gotNet, gotAddr := parseDialTarget(test.target)
  192. if gotNet != test.wantNet || gotAddr != test.wantAddr {
  193. t.Errorf("parseDialTarget(%q) = %s, %s want %s, %s", test.target, gotNet, gotAddr, test.wantNet, test.wantAddr)
  194. }
  195. }
  196. }
  197. // bmEncode benchmarks encoding a Protocol Buffer message containing mSize
  198. // bytes.
  199. func bmEncode(b *testing.B, mSize int) {
  200. cdc := encoding.GetCodec(protoenc.Name)
  201. msg := &perfpb.Buffer{Body: make([]byte, mSize)}
  202. encodeData, _ := encode(cdc, msg)
  203. encodedSz := int64(len(encodeData))
  204. b.ReportAllocs()
  205. b.ResetTimer()
  206. for i := 0; i < b.N; i++ {
  207. encode(cdc, msg)
  208. }
  209. b.SetBytes(encodedSz)
  210. }
  211. func BenchmarkEncode1B(b *testing.B) {
  212. bmEncode(b, 1)
  213. }
  214. func BenchmarkEncode1KiB(b *testing.B) {
  215. bmEncode(b, 1024)
  216. }
  217. func BenchmarkEncode8KiB(b *testing.B) {
  218. bmEncode(b, 8*1024)
  219. }
  220. func BenchmarkEncode64KiB(b *testing.B) {
  221. bmEncode(b, 64*1024)
  222. }
  223. func BenchmarkEncode512KiB(b *testing.B) {
  224. bmEncode(b, 512*1024)
  225. }
  226. func BenchmarkEncode1MiB(b *testing.B) {
  227. bmEncode(b, 1024*1024)
  228. }
  229. // bmCompressor benchmarks a compressor of a Protocol Buffer message containing
  230. // mSize bytes.
  231. func bmCompressor(b *testing.B, mSize int, cp Compressor) {
  232. payload := make([]byte, mSize)
  233. cBuf := bytes.NewBuffer(make([]byte, mSize))
  234. b.ReportAllocs()
  235. b.ResetTimer()
  236. for i := 0; i < b.N; i++ {
  237. cp.Do(cBuf, payload)
  238. cBuf.Reset()
  239. }
  240. }
  241. func BenchmarkGZIPCompressor1B(b *testing.B) {
  242. bmCompressor(b, 1, NewGZIPCompressor())
  243. }
  244. func BenchmarkGZIPCompressor1KiB(b *testing.B) {
  245. bmCompressor(b, 1024, NewGZIPCompressor())
  246. }
  247. func BenchmarkGZIPCompressor8KiB(b *testing.B) {
  248. bmCompressor(b, 8*1024, NewGZIPCompressor())
  249. }
  250. func BenchmarkGZIPCompressor64KiB(b *testing.B) {
  251. bmCompressor(b, 64*1024, NewGZIPCompressor())
  252. }
  253. func BenchmarkGZIPCompressor512KiB(b *testing.B) {
  254. bmCompressor(b, 512*1024, NewGZIPCompressor())
  255. }
  256. func BenchmarkGZIPCompressor1MiB(b *testing.B) {
  257. bmCompressor(b, 1024*1024, NewGZIPCompressor())
  258. }