latency_test.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363
  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package latency
  19. import (
  20. "bytes"
  21. "fmt"
  22. "net"
  23. "reflect"
  24. "sync"
  25. "testing"
  26. "time"
  27. "google.golang.org/grpc/internal/grpctest"
  28. )
  29. type s struct {
  30. grpctest.Tester
  31. }
  32. func Test(t *testing.T) {
  33. grpctest.RunSubTests(t, s{})
  34. }
  35. // bufConn is a net.Conn implemented by a bytes.Buffer (which is a ReadWriter).
  36. type bufConn struct {
  37. *bytes.Buffer
  38. }
  39. func (bufConn) Close() error { panic("unimplemented") }
  40. func (bufConn) LocalAddr() net.Addr { panic("unimplemented") }
  41. func (bufConn) RemoteAddr() net.Addr { panic("unimplemented") }
  42. func (bufConn) SetDeadline(t time.Time) error { panic("unimplemneted") }
  43. func (bufConn) SetReadDeadline(t time.Time) error { panic("unimplemneted") }
  44. func (bufConn) SetWriteDeadline(t time.Time) error { panic("unimplemneted") }
  45. func restoreHooks() func() {
  46. s := sleep
  47. n := now
  48. return func() {
  49. sleep = s
  50. now = n
  51. }
  52. }
  53. func (s) TestConn(t *testing.T) {
  54. defer restoreHooks()()
  55. // Constant time.
  56. now = func() time.Time { return time.Unix(123, 456) }
  57. // Capture sleep times for checking later.
  58. var sleepTimes []time.Duration
  59. sleep = func(t time.Duration) { sleepTimes = append(sleepTimes, t) }
  60. wantSleeps := func(want ...time.Duration) {
  61. if !reflect.DeepEqual(want, sleepTimes) {
  62. t.Fatalf("sleepTimes = %v; want %v", sleepTimes, want)
  63. }
  64. sleepTimes = nil
  65. }
  66. // Use a fairly high latency to cause a large BDP and avoid sleeps while
  67. // writing due to simulation of full buffers.
  68. latency := 1 * time.Second
  69. c, err := (&Network{Kbps: 1, Latency: latency, MTU: 5}).Conn(bufConn{&bytes.Buffer{}})
  70. if err != nil {
  71. t.Fatalf("Unexpected error creating connection: %v", err)
  72. }
  73. wantSleeps(latency) // Connection creation delay.
  74. // 1 kbps = 128 Bps. Divides evenly by 1 second using nanos.
  75. byteLatency := time.Duration(time.Second / 128)
  76. write := func(b []byte) {
  77. n, err := c.Write(b)
  78. if n != len(b) || err != nil {
  79. t.Fatalf("c.Write(%v) = %v, %v; want %v, nil", b, n, err, len(b))
  80. }
  81. }
  82. write([]byte{1, 2, 3, 4, 5}) // One full packet
  83. pkt1Time := latency + byteLatency*5
  84. write([]byte{6}) // One partial packet
  85. pkt2Time := pkt1Time + byteLatency
  86. write([]byte{7, 8, 9, 10, 11, 12, 13}) // Two packets
  87. pkt3Time := pkt2Time + byteLatency*5
  88. pkt4Time := pkt3Time + byteLatency*2
  89. // No reads, so no sleeps yet.
  90. wantSleeps()
  91. read := func(n int, want []byte) {
  92. b := make([]byte, n)
  93. if rd, err := c.Read(b); err != nil || rd != len(want) {
  94. t.Fatalf("c.Read(<%v bytes>) = %v, %v; want %v, nil", n, rd, err, len(want))
  95. }
  96. if !reflect.DeepEqual(b[:len(want)], want) {
  97. t.Fatalf("read %v; want %v", b, want)
  98. }
  99. }
  100. read(1, []byte{1})
  101. wantSleeps(pkt1Time)
  102. read(1, []byte{2})
  103. wantSleeps()
  104. read(3, []byte{3, 4, 5})
  105. wantSleeps()
  106. read(2, []byte{6})
  107. wantSleeps(pkt2Time)
  108. read(2, []byte{7, 8})
  109. wantSleeps(pkt3Time)
  110. read(10, []byte{9, 10, 11})
  111. wantSleeps()
  112. read(10, []byte{12, 13})
  113. wantSleeps(pkt4Time)
  114. }
  115. func (s) TestSync(t *testing.T) {
  116. defer restoreHooks()()
  117. // Infinitely fast CPU: time doesn't pass unless sleep is called.
  118. tn := time.Unix(123, 0)
  119. now = func() time.Time { return tn }
  120. sleep = func(d time.Duration) { tn = tn.Add(d) }
  121. // Simulate a 20ms latency network, then run sync across that and expect to
  122. // measure 20ms latency, or 10ms additional delay for a 30ms network.
  123. slowConn, err := (&Network{Kbps: 0, Latency: 20 * time.Millisecond, MTU: 5}).Conn(bufConn{&bytes.Buffer{}})
  124. if err != nil {
  125. t.Fatalf("Unexpected error creating connection: %v", err)
  126. }
  127. c, err := (&Network{Latency: 30 * time.Millisecond}).Conn(slowConn)
  128. if err != nil {
  129. t.Fatalf("Unexpected error creating connection: %v", err)
  130. }
  131. if c.(*conn).delay != 10*time.Millisecond {
  132. t.Fatalf("c.delay = %v; want 10ms", c.(*conn).delay)
  133. }
  134. }
  135. func (s) TestSyncTooSlow(t *testing.T) {
  136. defer restoreHooks()()
  137. // Infinitely fast CPU: time doesn't pass unless sleep is called.
  138. tn := time.Unix(123, 0)
  139. now = func() time.Time { return tn }
  140. sleep = func(d time.Duration) { tn = tn.Add(d) }
  141. // Simulate a 10ms latency network, then attempt to simulate a 5ms latency
  142. // network and expect an error.
  143. slowConn, err := (&Network{Kbps: 0, Latency: 10 * time.Millisecond, MTU: 5}).Conn(bufConn{&bytes.Buffer{}})
  144. if err != nil {
  145. t.Fatalf("Unexpected error creating connection: %v", err)
  146. }
  147. errWant := "measured network latency (10ms) higher than desired latency (5ms)"
  148. if _, err := (&Network{Latency: 5 * time.Millisecond}).Conn(slowConn); err == nil || err.Error() != errWant {
  149. t.Fatalf("Conn() = _, %q; want _, %q", err, errWant)
  150. }
  151. }
  152. func (s) TestListenerAndDialer(t *testing.T) {
  153. defer restoreHooks()()
  154. tn := time.Unix(123, 0)
  155. startTime := tn
  156. mu := &sync.Mutex{}
  157. now = func() time.Time {
  158. mu.Lock()
  159. defer mu.Unlock()
  160. return tn
  161. }
  162. // Use a fairly high latency to cause a large BDP and avoid sleeps while
  163. // writing due to simulation of full buffers.
  164. n := &Network{Kbps: 2, Latency: 1 * time.Second, MTU: 10}
  165. // 2 kbps = .25 kBps = 256 Bps
  166. byteLatency := func(n int) time.Duration {
  167. return time.Duration(n) * time.Second / 256
  168. }
  169. // Create a real listener and wrap it.
  170. l, err := net.Listen("tcp", "localhost:0")
  171. if err != nil {
  172. t.Fatalf("Unexpected error creating listener: %v", err)
  173. }
  174. defer l.Close()
  175. l = n.Listener(l)
  176. var serverConn net.Conn
  177. var scErr error
  178. scDone := make(chan struct{})
  179. go func() {
  180. serverConn, scErr = l.Accept()
  181. close(scDone)
  182. }()
  183. // Create a dialer and use it.
  184. clientConn, err := n.TimeoutDialer(net.DialTimeout)("tcp", l.Addr().String(), 2*time.Second)
  185. if err != nil {
  186. t.Fatalf("Unexpected error dialing: %v", err)
  187. }
  188. defer clientConn.Close()
  189. // Block until server's Conn is available.
  190. <-scDone
  191. if scErr != nil {
  192. t.Fatalf("Unexpected error listening: %v", scErr)
  193. }
  194. defer serverConn.Close()
  195. // sleep (only) advances tn. Done after connections established so sync detects zero delay.
  196. sleep = func(d time.Duration) {
  197. mu.Lock()
  198. defer mu.Unlock()
  199. if d > 0 {
  200. tn = tn.Add(d)
  201. }
  202. }
  203. seq := func(a, b int) []byte {
  204. buf := make([]byte, b-a)
  205. for i := 0; i < b-a; i++ {
  206. buf[i] = byte(i + a)
  207. }
  208. return buf
  209. }
  210. pkt1 := seq(0, 10)
  211. pkt2 := seq(10, 30)
  212. pkt3 := seq(30, 35)
  213. write := func(c net.Conn, b []byte) {
  214. n, err := c.Write(b)
  215. if n != len(b) || err != nil {
  216. t.Fatalf("c.Write(%v) = %v, %v; want %v, nil", b, n, err, len(b))
  217. }
  218. }
  219. write(serverConn, pkt1)
  220. write(serverConn, pkt2)
  221. write(serverConn, pkt3)
  222. write(clientConn, pkt3)
  223. write(clientConn, pkt1)
  224. write(clientConn, pkt2)
  225. if tn != startTime {
  226. t.Fatalf("unexpected sleep in write; tn = %v; want %v", tn, startTime)
  227. }
  228. read := func(c net.Conn, n int, want []byte, timeWant time.Time) {
  229. b := make([]byte, n)
  230. if rd, err := c.Read(b); err != nil || rd != len(want) {
  231. t.Fatalf("c.Read(<%v bytes>) = %v, %v; want %v, nil (read: %v)", n, rd, err, len(want), b[:rd])
  232. }
  233. if !reflect.DeepEqual(b[:len(want)], want) {
  234. t.Fatalf("read %v; want %v", b, want)
  235. }
  236. if !tn.Equal(timeWant) {
  237. t.Errorf("tn after read(%v) = %v; want %v", want, tn, timeWant)
  238. }
  239. }
  240. read(clientConn, len(pkt1)+1, pkt1, startTime.Add(n.Latency+byteLatency(len(pkt1))))
  241. read(serverConn, len(pkt3)+1, pkt3, tn) // tn was advanced by the above read; pkt3 is shorter than pkt1
  242. read(clientConn, len(pkt2), pkt2[:10], startTime.Add(n.Latency+byteLatency(len(pkt1)+10)))
  243. read(clientConn, len(pkt2), pkt2[10:], startTime.Add(n.Latency+byteLatency(len(pkt1)+len(pkt2))))
  244. read(clientConn, len(pkt3), pkt3, startTime.Add(n.Latency+byteLatency(len(pkt1)+len(pkt2)+len(pkt3))))
  245. read(serverConn, len(pkt1), pkt1, tn) // tn already past the arrival time due to prior reads
  246. read(serverConn, len(pkt2), pkt2[:10], tn)
  247. read(serverConn, len(pkt2), pkt2[10:], tn)
  248. // Sleep awhile and make sure the read happens disregarding previous writes
  249. // (lastSendEnd handling).
  250. sleep(10 * time.Second)
  251. write(clientConn, pkt1)
  252. read(serverConn, len(pkt1), pkt1, tn.Add(n.Latency+byteLatency(len(pkt1))))
  253. // Send, sleep longer than the network delay, then make sure the read happens
  254. // instantly.
  255. write(serverConn, pkt1)
  256. sleep(10 * time.Second)
  257. read(clientConn, len(pkt1), pkt1, tn)
  258. }
  259. func (s) TestBufferBloat(t *testing.T) {
  260. defer restoreHooks()()
  261. // Infinitely fast CPU: time doesn't pass unless sleep is called.
  262. tn := time.Unix(123, 0)
  263. now = func() time.Time { return tn }
  264. // Capture sleep times for checking later.
  265. var sleepTimes []time.Duration
  266. sleep = func(d time.Duration) {
  267. sleepTimes = append(sleepTimes, d)
  268. tn = tn.Add(d)
  269. }
  270. wantSleeps := func(want ...time.Duration) error {
  271. if !reflect.DeepEqual(want, sleepTimes) {
  272. return fmt.Errorf("sleepTimes = %v; want %v", sleepTimes, want)
  273. }
  274. sleepTimes = nil
  275. return nil
  276. }
  277. n := &Network{Kbps: 8 /* 1KBps */, Latency: time.Second, MTU: 8}
  278. bdpBytes := (n.Kbps * 1024 / 8) * int(n.Latency/time.Second) // 1024
  279. c, err := n.Conn(bufConn{&bytes.Buffer{}})
  280. if err != nil {
  281. t.Fatalf("Unexpected error creating connection: %v", err)
  282. }
  283. wantSleeps(n.Latency) // Connection creation delay.
  284. write := func(n int, sleeps ...time.Duration) {
  285. if wt, err := c.Write(make([]byte, n)); err != nil || wt != n {
  286. t.Fatalf("c.Write(<%v bytes>) = %v, %v; want %v, nil", n, wt, err, n)
  287. }
  288. if err := wantSleeps(sleeps...); err != nil {
  289. t.Fatalf("After writing %v bytes: %v", n, err)
  290. }
  291. }
  292. read := func(n int, sleeps ...time.Duration) {
  293. if rd, err := c.Read(make([]byte, n)); err != nil || rd != n {
  294. t.Fatalf("c.Read(_) = %v, %v; want %v, nil", rd, err, n)
  295. }
  296. if err := wantSleeps(sleeps...); err != nil {
  297. t.Fatalf("After reading %v bytes: %v", n, err)
  298. }
  299. }
  300. write(8) // No reads and buffer not full, so no sleeps yet.
  301. read(8, time.Second+n.pktTime(8))
  302. write(bdpBytes) // Fill the buffer.
  303. write(1) // We can send one extra packet even when the buffer is full.
  304. write(n.MTU, n.pktTime(1)) // Make sure we sleep to clear the previous write.
  305. write(1, n.pktTime(n.MTU))
  306. write(n.MTU+1, n.pktTime(1), n.pktTime(n.MTU))
  307. tn = tn.Add(10 * time.Second) // Wait long enough for the buffer to clear.
  308. write(bdpBytes) // No sleeps required.
  309. }