channel.go 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. /*
  2. *
  3. * Copyright 2020 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. package testutils
  18. import (
  19. "errors"
  20. "time"
  21. )
  22. // ErrRecvTimeout is an error to indicate that a receive operation on the
  23. // channel timed out.
  24. var ErrRecvTimeout = errors.New("timed out when waiting for value on channel")
  25. const (
  26. // DefaultChanRecvTimeout is the default timeout for receive operations on the
  27. // underlying channel.
  28. DefaultChanRecvTimeout = 1 * time.Second
  29. // DefaultChanBufferSize is the default buffer size of the underlying channel.
  30. DefaultChanBufferSize = 1
  31. )
  32. // Channel wraps a generic channel and provides a timed receive operation.
  33. type Channel struct {
  34. ch chan interface{}
  35. }
  36. // Send sends value on the underlying channel.
  37. func (cwt *Channel) Send(value interface{}) {
  38. cwt.ch <- value
  39. }
  40. // Receive returns the value received on the underlying channel, or
  41. // ErrRecvTimeout if DefaultChanRecvTimeout amount of time elapses.
  42. func (cwt *Channel) Receive() (interface{}, error) {
  43. timer := time.NewTimer(DefaultChanRecvTimeout)
  44. select {
  45. case <-timer.C:
  46. return nil, ErrRecvTimeout
  47. case got := <-cwt.ch:
  48. timer.Stop()
  49. return got, nil
  50. }
  51. }
  52. // NewChannel returns a new Channel.
  53. func NewChannel() *Channel {
  54. return NewChannelWithSize(DefaultChanBufferSize)
  55. }
  56. // NewChannelWithSize returns a new Channel with a buffer of bufSize.
  57. func NewChannelWithSize(bufSize int) *Channel {
  58. return &Channel{ch: make(chan interface{}, bufSize)}
  59. }