picker_wrapper_test.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "context"
  21. "fmt"
  22. "sync/atomic"
  23. "testing"
  24. "time"
  25. "google.golang.org/grpc/balancer"
  26. "google.golang.org/grpc/connectivity"
  27. _ "google.golang.org/grpc/grpclog/glogger"
  28. "google.golang.org/grpc/internal/transport"
  29. )
  30. const goroutineCount = 5
  31. var (
  32. testT = &testTransport{}
  33. testSC = &acBalancerWrapper{ac: &addrConn{
  34. state: connectivity.Ready,
  35. transport: testT,
  36. }}
  37. testSCNotReady = &acBalancerWrapper{ac: &addrConn{
  38. state: connectivity.TransientFailure,
  39. }}
  40. )
  41. type testTransport struct {
  42. transport.ClientTransport
  43. }
  44. type testingPicker struct {
  45. err error
  46. sc balancer.SubConn
  47. maxCalled int64
  48. }
  49. func (p *testingPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
  50. if atomic.AddInt64(&p.maxCalled, -1) < 0 {
  51. return nil, nil, fmt.Errorf("pick called to many times (> goroutineCount)")
  52. }
  53. if p.err != nil {
  54. return nil, nil, p.err
  55. }
  56. return p.sc, nil, nil
  57. }
  58. func (s) TestBlockingPickTimeout(t *testing.T) {
  59. bp := newPickerWrapper()
  60. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
  61. defer cancel()
  62. if _, _, err := bp.pick(ctx, true, balancer.PickOptions{}); err != context.DeadlineExceeded {
  63. t.Errorf("bp.pick returned error %v, want DeadlineExceeded", err)
  64. }
  65. }
  66. func (s) TestBlockingPick(t *testing.T) {
  67. bp := newPickerWrapper()
  68. // All goroutines should block because picker is nil in bp.
  69. var finishedCount uint64
  70. for i := goroutineCount; i > 0; i-- {
  71. go func() {
  72. if tr, _, err := bp.pick(context.Background(), true, balancer.PickOptions{}); err != nil || tr != testT {
  73. t.Errorf("bp.pick returned non-nil error: %v", err)
  74. }
  75. atomic.AddUint64(&finishedCount, 1)
  76. }()
  77. }
  78. time.Sleep(50 * time.Millisecond)
  79. if c := atomic.LoadUint64(&finishedCount); c != 0 {
  80. t.Errorf("finished goroutines count: %v, want 0", c)
  81. }
  82. bp.updatePicker(&testingPicker{sc: testSC, maxCalled: goroutineCount})
  83. }
  84. func (s) TestBlockingPickNoSubAvailable(t *testing.T) {
  85. bp := newPickerWrapper()
  86. var finishedCount uint64
  87. bp.updatePicker(&testingPicker{err: balancer.ErrNoSubConnAvailable, maxCalled: goroutineCount})
  88. // All goroutines should block because picker returns no sc available.
  89. for i := goroutineCount; i > 0; i-- {
  90. go func() {
  91. if tr, _, err := bp.pick(context.Background(), true, balancer.PickOptions{}); err != nil || tr != testT {
  92. t.Errorf("bp.pick returned non-nil error: %v", err)
  93. }
  94. atomic.AddUint64(&finishedCount, 1)
  95. }()
  96. }
  97. time.Sleep(50 * time.Millisecond)
  98. if c := atomic.LoadUint64(&finishedCount); c != 0 {
  99. t.Errorf("finished goroutines count: %v, want 0", c)
  100. }
  101. bp.updatePicker(&testingPicker{sc: testSC, maxCalled: goroutineCount})
  102. }
  103. func (s) TestBlockingPickTransientWaitforready(t *testing.T) {
  104. bp := newPickerWrapper()
  105. bp.updatePicker(&testingPicker{err: balancer.ErrTransientFailure, maxCalled: goroutineCount})
  106. var finishedCount uint64
  107. // All goroutines should block because picker returns transientFailure and
  108. // picks are not failfast.
  109. for i := goroutineCount; i > 0; i-- {
  110. go func() {
  111. if tr, _, err := bp.pick(context.Background(), false, balancer.PickOptions{}); err != nil || tr != testT {
  112. t.Errorf("bp.pick returned non-nil error: %v", err)
  113. }
  114. atomic.AddUint64(&finishedCount, 1)
  115. }()
  116. }
  117. time.Sleep(time.Millisecond)
  118. if c := atomic.LoadUint64(&finishedCount); c != 0 {
  119. t.Errorf("finished goroutines count: %v, want 0", c)
  120. }
  121. bp.updatePicker(&testingPicker{sc: testSC, maxCalled: goroutineCount})
  122. }
  123. func (s) TestBlockingPickSCNotReady(t *testing.T) {
  124. bp := newPickerWrapper()
  125. bp.updatePicker(&testingPicker{sc: testSCNotReady, maxCalled: goroutineCount})
  126. var finishedCount uint64
  127. // All goroutines should block because sc is not ready.
  128. for i := goroutineCount; i > 0; i-- {
  129. go func() {
  130. if tr, _, err := bp.pick(context.Background(), true, balancer.PickOptions{}); err != nil || tr != testT {
  131. t.Errorf("bp.pick returned non-nil error: %v", err)
  132. }
  133. atomic.AddUint64(&finishedCount, 1)
  134. }()
  135. }
  136. time.Sleep(time.Millisecond)
  137. if c := atomic.LoadUint64(&finishedCount); c != 0 {
  138. t.Errorf("finished goroutines count: %v, want 0", c)
  139. }
  140. bp.updatePicker(&testingPicker{sc: testSC, maxCalled: goroutineCount})
  141. }