123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289 |
- /*
- *
- * Copyright 2014 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
- package grpc
- import (
- "bytes"
- "compress/gzip"
- "io"
- "math"
- "reflect"
- "testing"
- "github.com/golang/protobuf/proto"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/encoding"
- protoenc "google.golang.org/grpc/encoding/proto"
- "google.golang.org/grpc/internal/testutils"
- "google.golang.org/grpc/internal/transport"
- "google.golang.org/grpc/status"
- perfpb "google.golang.org/grpc/test/codec_perf"
- )
- type fullReader struct {
- reader io.Reader
- }
- func (f fullReader) Read(p []byte) (int, error) {
- return io.ReadFull(f.reader, p)
- }
- var _ CallOption = EmptyCallOption{} // ensure EmptyCallOption implements the interface
- func (s) TestSimpleParsing(t *testing.T) {
- bigMsg := bytes.Repeat([]byte{'x'}, 1<<24)
- for _, test := range []struct {
- // input
- p []byte
- // outputs
- err error
- b []byte
- pt payloadFormat
- }{
- {nil, io.EOF, nil, compressionNone},
- {[]byte{0, 0, 0, 0, 0}, nil, nil, compressionNone},
- {[]byte{0, 0, 0, 0, 1, 'a'}, nil, []byte{'a'}, compressionNone},
- {[]byte{1, 0}, io.ErrUnexpectedEOF, nil, compressionNone},
- {[]byte{0, 0, 0, 0, 10, 'a'}, io.ErrUnexpectedEOF, nil, compressionNone},
- // Check that messages with length >= 2^24 are parsed.
- {append([]byte{0, 1, 0, 0, 0}, bigMsg...), nil, bigMsg, compressionNone},
- } {
- buf := fullReader{bytes.NewReader(test.p)}
- parser := &parser{r: buf}
- pt, b, err := parser.recvMsg(math.MaxInt32)
- if err != test.err || !bytes.Equal(b, test.b) || pt != test.pt {
- t.Fatalf("parser{%v}.recvMsg(_) = %v, %v, %v\nwant %v, %v, %v", test.p, pt, b, err, test.pt, test.b, test.err)
- }
- }
- }
- func (s) TestMultipleParsing(t *testing.T) {
- // Set a byte stream consists of 3 messages with their headers.
- p := []byte{0, 0, 0, 0, 1, 'a', 0, 0, 0, 0, 2, 'b', 'c', 0, 0, 0, 0, 1, 'd'}
- b := fullReader{bytes.NewReader(p)}
- parser := &parser{r: b}
- wantRecvs := []struct {
- pt payloadFormat
- data []byte
- }{
- {compressionNone, []byte("a")},
- {compressionNone, []byte("bc")},
- {compressionNone, []byte("d")},
- }
- for i, want := range wantRecvs {
- pt, data, err := parser.recvMsg(math.MaxInt32)
- if err != nil || pt != want.pt || !reflect.DeepEqual(data, want.data) {
- t.Fatalf("after %d calls, parser{%v}.recvMsg(_) = %v, %v, %v\nwant %v, %v, <nil>",
- i, p, pt, data, err, want.pt, want.data)
- }
- }
- pt, data, err := parser.recvMsg(math.MaxInt32)
- if err != io.EOF {
- t.Fatalf("after %d recvMsgs calls, parser{%v}.recvMsg(_) = %v, %v, %v\nwant _, _, %v",
- len(wantRecvs), p, pt, data, err, io.EOF)
- }
- }
- func (s) TestEncode(t *testing.T) {
- for _, test := range []struct {
- // input
- msg proto.Message
- // outputs
- hdr []byte
- data []byte
- err error
- }{
- {nil, []byte{0, 0, 0, 0, 0}, []byte{}, nil},
- } {
- data, err := encode(encoding.GetCodec(protoenc.Name), test.msg)
- if err != test.err || !bytes.Equal(data, test.data) {
- t.Errorf("encode(_, %v) = %v, %v; want %v, %v", test.msg, data, err, test.data, test.err)
- continue
- }
- if hdr, _ := msgHeader(data, nil); !bytes.Equal(hdr, test.hdr) {
- t.Errorf("msgHeader(%v, false) = %v; want %v", data, hdr, test.hdr)
- }
- }
- }
- func (s) TestCompress(t *testing.T) {
- bestCompressor, err := NewGZIPCompressorWithLevel(gzip.BestCompression)
- if err != nil {
- t.Fatalf("Could not initialize gzip compressor with best compression.")
- }
- bestSpeedCompressor, err := NewGZIPCompressorWithLevel(gzip.BestSpeed)
- if err != nil {
- t.Fatalf("Could not initialize gzip compressor with best speed compression.")
- }
- defaultCompressor, err := NewGZIPCompressorWithLevel(gzip.BestSpeed)
- if err != nil {
- t.Fatalf("Could not initialize gzip compressor with default compression.")
- }
- level5, err := NewGZIPCompressorWithLevel(5)
- if err != nil {
- t.Fatalf("Could not initialize gzip compressor with level 5 compression.")
- }
- for _, test := range []struct {
- // input
- data []byte
- cp Compressor
- dc Decompressor
- // outputs
- err error
- }{
- {make([]byte, 1024), NewGZIPCompressor(), NewGZIPDecompressor(), nil},
- {make([]byte, 1024), bestCompressor, NewGZIPDecompressor(), nil},
- {make([]byte, 1024), bestSpeedCompressor, NewGZIPDecompressor(), nil},
- {make([]byte, 1024), defaultCompressor, NewGZIPDecompressor(), nil},
- {make([]byte, 1024), level5, NewGZIPDecompressor(), nil},
- } {
- b := new(bytes.Buffer)
- if err := test.cp.Do(b, test.data); err != test.err {
- t.Fatalf("Compressor.Do(_, %v) = %v, want %v", test.data, err, test.err)
- }
- if b.Len() >= len(test.data) {
- t.Fatalf("The compressor fails to compress data.")
- }
- if p, err := test.dc.Do(b); err != nil || !bytes.Equal(test.data, p) {
- t.Fatalf("Decompressor.Do(%v) = %v, %v, want %v, <nil>", b, p, err, test.data)
- }
- }
- }
- func (s) TestToRPCErr(t *testing.T) {
- for _, test := range []struct {
- // input
- errIn error
- // outputs
- errOut error
- }{
- {transport.ErrConnClosing, status.Error(codes.Unavailable, transport.ErrConnClosing.Desc)},
- {io.ErrUnexpectedEOF, status.Error(codes.Internal, io.ErrUnexpectedEOF.Error())},
- } {
- err := toRPCErr(test.errIn)
- if _, ok := status.FromError(err); !ok {
- t.Errorf("toRPCErr{%v} returned type %T, want %T", test.errIn, err, status.Error)
- }
- if !testutils.StatusErrEqual(err, test.errOut) {
- t.Errorf("toRPCErr{%v} = %v \nwant %v", test.errIn, err, test.errOut)
- }
- }
- }
- func (s) TestParseDialTarget(t *testing.T) {
- for _, test := range []struct {
- target, wantNet, wantAddr string
- }{
- {"unix:etcd:0", "unix", "etcd:0"},
- {"unix:///tmp/unix-3", "unix", "/tmp/unix-3"},
- {"unix://domain", "unix", "domain"},
- {"unix://etcd:0", "unix", "etcd:0"},
- {"unix:///etcd:0", "unix", "/etcd:0"},
- {"passthrough://unix://domain", "tcp", "passthrough://unix://domain"},
- {"https://google.com:443", "tcp", "https://google.com:443"},
- {"dns:///google.com", "tcp", "dns:///google.com"},
- {"/unix/socket/address", "tcp", "/unix/socket/address"},
- } {
- gotNet, gotAddr := parseDialTarget(test.target)
- if gotNet != test.wantNet || gotAddr != test.wantAddr {
- t.Errorf("parseDialTarget(%q) = %s, %s want %s, %s", test.target, gotNet, gotAddr, test.wantNet, test.wantAddr)
- }
- }
- }
- // bmEncode benchmarks encoding a Protocol Buffer message containing mSize
- // bytes.
- func bmEncode(b *testing.B, mSize int) {
- cdc := encoding.GetCodec(protoenc.Name)
- msg := &perfpb.Buffer{Body: make([]byte, mSize)}
- encodeData, _ := encode(cdc, msg)
- encodedSz := int64(len(encodeData))
- b.ReportAllocs()
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- encode(cdc, msg)
- }
- b.SetBytes(encodedSz)
- }
- func BenchmarkEncode1B(b *testing.B) {
- bmEncode(b, 1)
- }
- func BenchmarkEncode1KiB(b *testing.B) {
- bmEncode(b, 1024)
- }
- func BenchmarkEncode8KiB(b *testing.B) {
- bmEncode(b, 8*1024)
- }
- func BenchmarkEncode64KiB(b *testing.B) {
- bmEncode(b, 64*1024)
- }
- func BenchmarkEncode512KiB(b *testing.B) {
- bmEncode(b, 512*1024)
- }
- func BenchmarkEncode1MiB(b *testing.B) {
- bmEncode(b, 1024*1024)
- }
- // bmCompressor benchmarks a compressor of a Protocol Buffer message containing
- // mSize bytes.
- func bmCompressor(b *testing.B, mSize int, cp Compressor) {
- payload := make([]byte, mSize)
- cBuf := bytes.NewBuffer(make([]byte, mSize))
- b.ReportAllocs()
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- cp.Do(cBuf, payload)
- cBuf.Reset()
- }
- }
- func BenchmarkGZIPCompressor1B(b *testing.B) {
- bmCompressor(b, 1, NewGZIPCompressor())
- }
- func BenchmarkGZIPCompressor1KiB(b *testing.B) {
- bmCompressor(b, 1024, NewGZIPCompressor())
- }
- func BenchmarkGZIPCompressor8KiB(b *testing.B) {
- bmCompressor(b, 8*1024, NewGZIPCompressor())
- }
- func BenchmarkGZIPCompressor64KiB(b *testing.B) {
- bmCompressor(b, 64*1024, NewGZIPCompressor())
- }
- func BenchmarkGZIPCompressor512KiB(b *testing.B) {
- bmCompressor(b, 512*1024, NewGZIPCompressor())
- }
- func BenchmarkGZIPCompressor1MiB(b *testing.B) {
- bmCompressor(b, 1024*1024, NewGZIPCompressor())
- }
|