grpclb_test.go 33 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153
  1. /*
  2. *
  3. * Copyright 2016 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpclb
  19. import (
  20. "context"
  21. "errors"
  22. "fmt"
  23. "io"
  24. "net"
  25. "strconv"
  26. "strings"
  27. "sync"
  28. "sync/atomic"
  29. "testing"
  30. "time"
  31. durationpb "github.com/golang/protobuf/ptypes/duration"
  32. "google.golang.org/grpc"
  33. "google.golang.org/grpc/balancer"
  34. lbgrpc "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
  35. lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
  36. "google.golang.org/grpc/codes"
  37. "google.golang.org/grpc/credentials"
  38. _ "google.golang.org/grpc/grpclog/glogger"
  39. "google.golang.org/grpc/internal/leakcheck"
  40. "google.golang.org/grpc/metadata"
  41. "google.golang.org/grpc/peer"
  42. "google.golang.org/grpc/resolver"
  43. "google.golang.org/grpc/resolver/manual"
  44. "google.golang.org/grpc/status"
  45. testpb "google.golang.org/grpc/test/grpc_testing"
  46. )
  47. var (
  48. lbServerName = "bar.com"
  49. beServerName = "foo.com"
  50. lbToken = "iamatoken"
  51. // Resolver replaces localhost with fakeName in Next().
  52. // Dialer replaces fakeName with localhost when dialing.
  53. // This will test that custom dialer is passed from Dial to grpclb.
  54. fakeName = "fake.Name"
  55. )
  56. type serverNameCheckCreds struct {
  57. mu sync.Mutex
  58. sn string
  59. expected string
  60. }
  61. func (c *serverNameCheckCreds) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
  62. if _, err := io.WriteString(rawConn, c.sn); err != nil {
  63. fmt.Printf("Failed to write the server name %s to the client %v", c.sn, err)
  64. return nil, nil, err
  65. }
  66. return rawConn, nil, nil
  67. }
  68. func (c *serverNameCheckCreds) ClientHandshake(ctx context.Context, addr string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
  69. c.mu.Lock()
  70. defer c.mu.Unlock()
  71. b := make([]byte, len(c.expected))
  72. errCh := make(chan error, 1)
  73. go func() {
  74. _, err := rawConn.Read(b)
  75. errCh <- err
  76. }()
  77. select {
  78. case err := <-errCh:
  79. if err != nil {
  80. fmt.Printf("Failed to read the server name from the server %v", err)
  81. return nil, nil, err
  82. }
  83. case <-ctx.Done():
  84. return nil, nil, ctx.Err()
  85. }
  86. if c.expected != string(b) {
  87. fmt.Printf("Read the server name %s want %s", string(b), c.expected)
  88. return nil, nil, errors.New("received unexpected server name")
  89. }
  90. return rawConn, nil, nil
  91. }
  92. func (c *serverNameCheckCreds) Info() credentials.ProtocolInfo {
  93. c.mu.Lock()
  94. defer c.mu.Unlock()
  95. return credentials.ProtocolInfo{}
  96. }
  97. func (c *serverNameCheckCreds) Clone() credentials.TransportCredentials {
  98. c.mu.Lock()
  99. defer c.mu.Unlock()
  100. return &serverNameCheckCreds{
  101. expected: c.expected,
  102. }
  103. }
  104. func (c *serverNameCheckCreds) OverrideServerName(s string) error {
  105. c.mu.Lock()
  106. defer c.mu.Unlock()
  107. c.expected = s
  108. return nil
  109. }
  110. // fakeNameDialer replaces fakeName with localhost when dialing.
  111. // This will test that custom dialer is passed from Dial to grpclb.
  112. func fakeNameDialer(ctx context.Context, addr string) (net.Conn, error) {
  113. addr = strings.Replace(addr, fakeName, "localhost", 1)
  114. return (&net.Dialer{}).DialContext(ctx, "tcp", addr)
  115. }
  116. // merge merges the new client stats into current stats.
  117. //
  118. // It's a test-only method. rpcStats is defined in grpclb_picker.
  119. func (s *rpcStats) merge(cs *lbpb.ClientStats) {
  120. atomic.AddInt64(&s.numCallsStarted, cs.NumCallsStarted)
  121. atomic.AddInt64(&s.numCallsFinished, cs.NumCallsFinished)
  122. atomic.AddInt64(&s.numCallsFinishedWithClientFailedToSend, cs.NumCallsFinishedWithClientFailedToSend)
  123. atomic.AddInt64(&s.numCallsFinishedKnownReceived, cs.NumCallsFinishedKnownReceived)
  124. s.mu.Lock()
  125. for _, perToken := range cs.CallsFinishedWithDrop {
  126. s.numCallsDropped[perToken.LoadBalanceToken] += perToken.NumCalls
  127. }
  128. s.mu.Unlock()
  129. }
  130. func mapsEqual(a, b map[string]int64) bool {
  131. if len(a) != len(b) {
  132. return false
  133. }
  134. for k, v1 := range a {
  135. if v2, ok := b[k]; !ok || v1 != v2 {
  136. return false
  137. }
  138. }
  139. return true
  140. }
  141. func atomicEqual(a, b *int64) bool {
  142. return atomic.LoadInt64(a) == atomic.LoadInt64(b)
  143. }
  144. // equal compares two rpcStats.
  145. //
  146. // It's a test-only method. rpcStats is defined in grpclb_picker.
  147. func (s *rpcStats) equal(o *rpcStats) bool {
  148. if !atomicEqual(&s.numCallsStarted, &o.numCallsStarted) {
  149. return false
  150. }
  151. if !atomicEqual(&s.numCallsFinished, &o.numCallsFinished) {
  152. return false
  153. }
  154. if !atomicEqual(&s.numCallsFinishedWithClientFailedToSend, &o.numCallsFinishedWithClientFailedToSend) {
  155. return false
  156. }
  157. if !atomicEqual(&s.numCallsFinishedKnownReceived, &o.numCallsFinishedKnownReceived) {
  158. return false
  159. }
  160. s.mu.Lock()
  161. defer s.mu.Unlock()
  162. o.mu.Lock()
  163. defer o.mu.Unlock()
  164. return mapsEqual(s.numCallsDropped, o.numCallsDropped)
  165. }
  166. type remoteBalancer struct {
  167. sls chan *lbpb.ServerList
  168. statsDura time.Duration
  169. done chan struct{}
  170. stats *rpcStats
  171. }
  172. func newRemoteBalancer(intervals []time.Duration) *remoteBalancer {
  173. return &remoteBalancer{
  174. sls: make(chan *lbpb.ServerList, 1),
  175. done: make(chan struct{}),
  176. stats: newRPCStats(),
  177. }
  178. }
  179. func (b *remoteBalancer) stop() {
  180. close(b.sls)
  181. close(b.done)
  182. }
  183. func (b *remoteBalancer) BalanceLoad(stream lbgrpc.LoadBalancer_BalanceLoadServer) error {
  184. req, err := stream.Recv()
  185. if err != nil {
  186. return err
  187. }
  188. initReq := req.GetInitialRequest()
  189. if initReq.Name != beServerName {
  190. return status.Errorf(codes.InvalidArgument, "invalid service name: %v", initReq.Name)
  191. }
  192. resp := &lbpb.LoadBalanceResponse{
  193. LoadBalanceResponseType: &lbpb.LoadBalanceResponse_InitialResponse{
  194. InitialResponse: &lbpb.InitialLoadBalanceResponse{
  195. ClientStatsReportInterval: &durationpb.Duration{
  196. Seconds: int64(b.statsDura.Seconds()),
  197. Nanos: int32(b.statsDura.Nanoseconds() - int64(b.statsDura.Seconds())*1e9),
  198. },
  199. },
  200. },
  201. }
  202. if err := stream.Send(resp); err != nil {
  203. return err
  204. }
  205. go func() {
  206. for {
  207. var (
  208. req *lbpb.LoadBalanceRequest
  209. err error
  210. )
  211. if req, err = stream.Recv(); err != nil {
  212. return
  213. }
  214. b.stats.merge(req.GetClientStats())
  215. }
  216. }()
  217. for {
  218. select {
  219. case v := <-b.sls:
  220. resp = &lbpb.LoadBalanceResponse{
  221. LoadBalanceResponseType: &lbpb.LoadBalanceResponse_ServerList{
  222. ServerList: v,
  223. },
  224. }
  225. case <-stream.Context().Done():
  226. return stream.Context().Err()
  227. }
  228. if err := stream.Send(resp); err != nil {
  229. return err
  230. }
  231. }
  232. }
  233. type testServer struct {
  234. testpb.TestServiceServer
  235. addr string
  236. fallback bool
  237. }
  238. const testmdkey = "testmd"
  239. func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
  240. md, ok := metadata.FromIncomingContext(ctx)
  241. if !ok {
  242. return nil, status.Error(codes.Internal, "failed to receive metadata")
  243. }
  244. if !s.fallback && (md == nil || md["lb-token"][0] != lbToken) {
  245. return nil, status.Errorf(codes.Internal, "received unexpected metadata: %v", md)
  246. }
  247. grpc.SetTrailer(ctx, metadata.Pairs(testmdkey, s.addr))
  248. return &testpb.Empty{}, nil
  249. }
  250. func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
  251. return nil
  252. }
  253. func startBackends(sn string, fallback bool, lis ...net.Listener) (servers []*grpc.Server) {
  254. for _, l := range lis {
  255. creds := &serverNameCheckCreds{
  256. sn: sn,
  257. }
  258. s := grpc.NewServer(grpc.Creds(creds))
  259. testpb.RegisterTestServiceServer(s, &testServer{addr: l.Addr().String(), fallback: fallback})
  260. servers = append(servers, s)
  261. go func(s *grpc.Server, l net.Listener) {
  262. s.Serve(l)
  263. }(s, l)
  264. }
  265. return
  266. }
  267. func stopBackends(servers []*grpc.Server) {
  268. for _, s := range servers {
  269. s.Stop()
  270. }
  271. }
  272. type testServers struct {
  273. lbAddr string
  274. ls *remoteBalancer
  275. lb *grpc.Server
  276. backends []*grpc.Server
  277. beIPs []net.IP
  278. bePorts []int
  279. lbListener net.Listener
  280. beListeners []net.Listener
  281. }
  282. func newLoadBalancer(numberOfBackends int) (tss *testServers, cleanup func(), err error) {
  283. var (
  284. beListeners []net.Listener
  285. ls *remoteBalancer
  286. lb *grpc.Server
  287. beIPs []net.IP
  288. bePorts []int
  289. )
  290. for i := 0; i < numberOfBackends; i++ {
  291. // Start a backend.
  292. beLis, e := net.Listen("tcp", "localhost:0")
  293. if e != nil {
  294. err = fmt.Errorf("failed to listen %v", err)
  295. return
  296. }
  297. beIPs = append(beIPs, beLis.Addr().(*net.TCPAddr).IP)
  298. bePorts = append(bePorts, beLis.Addr().(*net.TCPAddr).Port)
  299. beListeners = append(beListeners, newRestartableListener(beLis))
  300. }
  301. backends := startBackends(beServerName, false, beListeners...)
  302. // Start a load balancer.
  303. lbLis, err := net.Listen("tcp", "localhost:0")
  304. if err != nil {
  305. err = fmt.Errorf("failed to create the listener for the load balancer %v", err)
  306. return
  307. }
  308. lbLis = newRestartableListener(lbLis)
  309. lbCreds := &serverNameCheckCreds{
  310. sn: lbServerName,
  311. }
  312. lb = grpc.NewServer(grpc.Creds(lbCreds))
  313. ls = newRemoteBalancer(nil)
  314. lbgrpc.RegisterLoadBalancerServer(lb, ls)
  315. go func() {
  316. lb.Serve(lbLis)
  317. }()
  318. tss = &testServers{
  319. lbAddr: net.JoinHostPort(fakeName, strconv.Itoa(lbLis.Addr().(*net.TCPAddr).Port)),
  320. ls: ls,
  321. lb: lb,
  322. backends: backends,
  323. beIPs: beIPs,
  324. bePorts: bePorts,
  325. lbListener: lbLis,
  326. beListeners: beListeners,
  327. }
  328. cleanup = func() {
  329. defer stopBackends(backends)
  330. defer func() {
  331. ls.stop()
  332. lb.Stop()
  333. }()
  334. }
  335. return
  336. }
  337. func TestGRPCLB(t *testing.T) {
  338. defer leakcheck.Check(t)
  339. r, cleanup := manual.GenerateAndRegisterManualResolver()
  340. defer cleanup()
  341. tss, cleanup, err := newLoadBalancer(1)
  342. if err != nil {
  343. t.Fatalf("failed to create new load balancer: %v", err)
  344. }
  345. defer cleanup()
  346. be := &lbpb.Server{
  347. IpAddress: tss.beIPs[0],
  348. Port: int32(tss.bePorts[0]),
  349. LoadBalanceToken: lbToken,
  350. }
  351. var bes []*lbpb.Server
  352. bes = append(bes, be)
  353. sl := &lbpb.ServerList{
  354. Servers: bes,
  355. }
  356. tss.ls.sls <- sl
  357. creds := serverNameCheckCreds{
  358. expected: beServerName,
  359. }
  360. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  361. defer cancel()
  362. cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
  363. grpc.WithTransportCredentials(&creds), grpc.WithContextDialer(fakeNameDialer))
  364. if err != nil {
  365. t.Fatalf("Failed to dial to the backend %v", err)
  366. }
  367. defer cc.Close()
  368. testC := testpb.NewTestServiceClient(cc)
  369. r.UpdateState(resolver.State{Addresses: []resolver.Address{{
  370. Addr: tss.lbAddr,
  371. Type: resolver.GRPCLB,
  372. ServerName: lbServerName,
  373. }}})
  374. if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
  375. t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
  376. }
  377. }
  378. // The remote balancer sends response with duplicates to grpclb client.
  379. func TestGRPCLBWeighted(t *testing.T) {
  380. defer leakcheck.Check(t)
  381. r, cleanup := manual.GenerateAndRegisterManualResolver()
  382. defer cleanup()
  383. tss, cleanup, err := newLoadBalancer(2)
  384. if err != nil {
  385. t.Fatalf("failed to create new load balancer: %v", err)
  386. }
  387. defer cleanup()
  388. beServers := []*lbpb.Server{{
  389. IpAddress: tss.beIPs[0],
  390. Port: int32(tss.bePorts[0]),
  391. LoadBalanceToken: lbToken,
  392. }, {
  393. IpAddress: tss.beIPs[1],
  394. Port: int32(tss.bePorts[1]),
  395. LoadBalanceToken: lbToken,
  396. }}
  397. portsToIndex := make(map[int]int)
  398. for i := range beServers {
  399. portsToIndex[tss.bePorts[i]] = i
  400. }
  401. creds := serverNameCheckCreds{
  402. expected: beServerName,
  403. }
  404. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  405. defer cancel()
  406. cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
  407. grpc.WithTransportCredentials(&creds), grpc.WithContextDialer(fakeNameDialer))
  408. if err != nil {
  409. t.Fatalf("Failed to dial to the backend %v", err)
  410. }
  411. defer cc.Close()
  412. testC := testpb.NewTestServiceClient(cc)
  413. r.UpdateState(resolver.State{Addresses: []resolver.Address{{
  414. Addr: tss.lbAddr,
  415. Type: resolver.GRPCLB,
  416. ServerName: lbServerName,
  417. }}})
  418. sequences := []string{"00101", "00011"}
  419. for _, seq := range sequences {
  420. var (
  421. bes []*lbpb.Server
  422. p peer.Peer
  423. result string
  424. )
  425. for _, s := range seq {
  426. bes = append(bes, beServers[s-'0'])
  427. }
  428. tss.ls.sls <- &lbpb.ServerList{Servers: bes}
  429. for i := 0; i < 1000; i++ {
  430. if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
  431. t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
  432. }
  433. result += strconv.Itoa(portsToIndex[p.Addr.(*net.TCPAddr).Port])
  434. }
  435. // The generated result will be in format of "0010100101".
  436. if !strings.Contains(result, strings.Repeat(seq, 2)) {
  437. t.Errorf("got result sequence %q, want patten %q", result, seq)
  438. }
  439. }
  440. }
  441. func TestDropRequest(t *testing.T) {
  442. defer leakcheck.Check(t)
  443. r, cleanup := manual.GenerateAndRegisterManualResolver()
  444. defer cleanup()
  445. tss, cleanup, err := newLoadBalancer(2)
  446. if err != nil {
  447. t.Fatalf("failed to create new load balancer: %v", err)
  448. }
  449. defer cleanup()
  450. tss.ls.sls <- &lbpb.ServerList{
  451. Servers: []*lbpb.Server{{
  452. IpAddress: tss.beIPs[0],
  453. Port: int32(tss.bePorts[0]),
  454. LoadBalanceToken: lbToken,
  455. Drop: false,
  456. }, {
  457. IpAddress: tss.beIPs[1],
  458. Port: int32(tss.bePorts[1]),
  459. LoadBalanceToken: lbToken,
  460. Drop: false,
  461. }, {
  462. Drop: true,
  463. }},
  464. }
  465. creds := serverNameCheckCreds{
  466. expected: beServerName,
  467. }
  468. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  469. defer cancel()
  470. cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
  471. grpc.WithTransportCredentials(&creds), grpc.WithContextDialer(fakeNameDialer))
  472. if err != nil {
  473. t.Fatalf("Failed to dial to the backend %v", err)
  474. }
  475. defer cc.Close()
  476. testC := testpb.NewTestServiceClient(cc)
  477. r.UpdateState(resolver.State{Addresses: []resolver.Address{{
  478. Addr: tss.lbAddr,
  479. Type: resolver.GRPCLB,
  480. ServerName: lbServerName,
  481. }}})
  482. // Wait for the 1st, non-fail-fast RPC to succeed. This ensures both server
  483. // connections are made, because the first one has Drop set to true.
  484. var i int
  485. for i = 0; i < 1000; i++ {
  486. if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err == nil {
  487. break
  488. }
  489. time.Sleep(time.Millisecond)
  490. }
  491. if i >= 1000 {
  492. t.Fatalf("%v.SayHello(_, _) = _, %v, want _, <nil>", testC, err)
  493. }
  494. select {
  495. case <-ctx.Done():
  496. t.Fatal("timed out", ctx.Err())
  497. default:
  498. }
  499. for _, failfast := range []bool{true, false} {
  500. for i := 0; i < 3; i++ {
  501. // 1st RPCs pick the second item in server list. They should succeed
  502. // since they choose the non-drop-request backend according to the
  503. // round robin policy.
  504. if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(!failfast)); err != nil {
  505. t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
  506. }
  507. // 2st RPCs should fail, because they pick last item in server list,
  508. // with Drop set to true.
  509. if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(!failfast)); status.Code(err) != codes.Unavailable {
  510. t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, %s", testC, err, codes.Unavailable)
  511. }
  512. // 3rd RPCs pick the first item in server list. They should succeed
  513. // since they choose the non-drop-request backend according to the
  514. // round robin policy.
  515. if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(!failfast)); err != nil {
  516. t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
  517. }
  518. }
  519. }
  520. tss.backends[0].Stop()
  521. // This last pick was backend 0. Closing backend 0 doesn't reset drop index
  522. // (for level 1 picking), so the following picks will be (backend1, drop,
  523. // backend1), instead of (backend, backend, drop) if drop index was reset.
  524. time.Sleep(time.Second)
  525. for i := 0; i < 3; i++ {
  526. var p peer.Peer
  527. if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
  528. t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
  529. }
  530. if want := tss.bePorts[1]; p.Addr.(*net.TCPAddr).Port != want {
  531. t.Errorf("got peer: %v, want peer port: %v", p.Addr, want)
  532. }
  533. if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.Unavailable {
  534. t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, %s", testC, err, codes.Unavailable)
  535. }
  536. if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
  537. t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
  538. }
  539. if want := tss.bePorts[1]; p.Addr.(*net.TCPAddr).Port != want {
  540. t.Errorf("got peer: %v, want peer port: %v", p.Addr, want)
  541. }
  542. }
  543. }
  544. // When the balancer in use disconnects, grpclb should connect to the next address from resolved balancer address list.
  545. func TestBalancerDisconnects(t *testing.T) {
  546. defer leakcheck.Check(t)
  547. r, cleanup := manual.GenerateAndRegisterManualResolver()
  548. defer cleanup()
  549. var (
  550. tests []*testServers
  551. lbs []*grpc.Server
  552. )
  553. for i := 0; i < 2; i++ {
  554. tss, cleanup, err := newLoadBalancer(1)
  555. if err != nil {
  556. t.Fatalf("failed to create new load balancer: %v", err)
  557. }
  558. defer cleanup()
  559. be := &lbpb.Server{
  560. IpAddress: tss.beIPs[0],
  561. Port: int32(tss.bePorts[0]),
  562. LoadBalanceToken: lbToken,
  563. }
  564. var bes []*lbpb.Server
  565. bes = append(bes, be)
  566. sl := &lbpb.ServerList{
  567. Servers: bes,
  568. }
  569. tss.ls.sls <- sl
  570. tests = append(tests, tss)
  571. lbs = append(lbs, tss.lb)
  572. }
  573. creds := serverNameCheckCreds{
  574. expected: beServerName,
  575. }
  576. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  577. defer cancel()
  578. cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
  579. grpc.WithTransportCredentials(&creds), grpc.WithContextDialer(fakeNameDialer))
  580. if err != nil {
  581. t.Fatalf("Failed to dial to the backend %v", err)
  582. }
  583. defer cc.Close()
  584. testC := testpb.NewTestServiceClient(cc)
  585. r.UpdateState(resolver.State{Addresses: []resolver.Address{{
  586. Addr: tests[0].lbAddr,
  587. Type: resolver.GRPCLB,
  588. ServerName: lbServerName,
  589. }, {
  590. Addr: tests[1].lbAddr,
  591. Type: resolver.GRPCLB,
  592. ServerName: lbServerName,
  593. }}})
  594. var p peer.Peer
  595. if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
  596. t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
  597. }
  598. if p.Addr.(*net.TCPAddr).Port != tests[0].bePorts[0] {
  599. t.Fatalf("got peer: %v, want peer port: %v", p.Addr, tests[0].bePorts[0])
  600. }
  601. lbs[0].Stop()
  602. // Stop balancer[0], balancer[1] should be used by grpclb.
  603. // Check peer address to see if that happened.
  604. for i := 0; i < 1000; i++ {
  605. if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
  606. t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
  607. }
  608. if p.Addr.(*net.TCPAddr).Port == tests[1].bePorts[0] {
  609. return
  610. }
  611. time.Sleep(time.Millisecond)
  612. }
  613. t.Fatalf("No RPC sent to second backend after 1 second")
  614. }
  615. func TestFallback(t *testing.T) {
  616. balancer.Register(newLBBuilderWithFallbackTimeout(100 * time.Millisecond))
  617. defer balancer.Register(newLBBuilder())
  618. defer leakcheck.Check(t)
  619. r, cleanup := manual.GenerateAndRegisterManualResolver()
  620. defer cleanup()
  621. tss, cleanup, err := newLoadBalancer(1)
  622. if err != nil {
  623. t.Fatalf("failed to create new load balancer: %v", err)
  624. }
  625. defer cleanup()
  626. // Start a standalone backend.
  627. beLis, err := net.Listen("tcp", "localhost:0")
  628. if err != nil {
  629. t.Fatalf("Failed to listen %v", err)
  630. }
  631. defer beLis.Close()
  632. standaloneBEs := startBackends(beServerName, true, beLis)
  633. defer stopBackends(standaloneBEs)
  634. be := &lbpb.Server{
  635. IpAddress: tss.beIPs[0],
  636. Port: int32(tss.bePorts[0]),
  637. LoadBalanceToken: lbToken,
  638. }
  639. var bes []*lbpb.Server
  640. bes = append(bes, be)
  641. sl := &lbpb.ServerList{
  642. Servers: bes,
  643. }
  644. tss.ls.sls <- sl
  645. creds := serverNameCheckCreds{
  646. expected: beServerName,
  647. }
  648. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  649. defer cancel()
  650. cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
  651. grpc.WithTransportCredentials(&creds), grpc.WithContextDialer(fakeNameDialer))
  652. if err != nil {
  653. t.Fatalf("Failed to dial to the backend %v", err)
  654. }
  655. defer cc.Close()
  656. testC := testpb.NewTestServiceClient(cc)
  657. r.UpdateState(resolver.State{Addresses: []resolver.Address{{
  658. Addr: "invalid.address",
  659. Type: resolver.GRPCLB,
  660. ServerName: lbServerName,
  661. }, {
  662. Addr: beLis.Addr().String(),
  663. Type: resolver.Backend,
  664. ServerName: beServerName,
  665. }}})
  666. var p peer.Peer
  667. if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
  668. t.Fatalf("_.EmptyCall(_, _) = _, %v, want _, <nil>", err)
  669. }
  670. if p.Addr.String() != beLis.Addr().String() {
  671. t.Fatalf("got peer: %v, want peer: %v", p.Addr, beLis.Addr())
  672. }
  673. r.UpdateState(resolver.State{Addresses: []resolver.Address{{
  674. Addr: tss.lbAddr,
  675. Type: resolver.GRPCLB,
  676. ServerName: lbServerName,
  677. }, {
  678. Addr: beLis.Addr().String(),
  679. Type: resolver.Backend,
  680. ServerName: beServerName,
  681. }}})
  682. var backendUsed bool
  683. for i := 0; i < 1000; i++ {
  684. if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
  685. t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
  686. }
  687. if p.Addr.(*net.TCPAddr).Port == tss.bePorts[0] {
  688. backendUsed = true
  689. break
  690. }
  691. time.Sleep(time.Millisecond)
  692. }
  693. if !backendUsed {
  694. t.Fatalf("No RPC sent to backend behind remote balancer after 1 second")
  695. }
  696. // Close backend and remote balancer connections, should use fallback.
  697. tss.beListeners[0].(*restartableListener).stopPreviousConns()
  698. tss.lbListener.(*restartableListener).stopPreviousConns()
  699. time.Sleep(time.Second)
  700. var fallbackUsed bool
  701. for i := 0; i < 1000; i++ {
  702. if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
  703. t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
  704. }
  705. if p.Addr.String() == beLis.Addr().String() {
  706. fallbackUsed = true
  707. break
  708. }
  709. time.Sleep(time.Millisecond)
  710. }
  711. if !fallbackUsed {
  712. t.Fatalf("No RPC sent to fallback after 1 second")
  713. }
  714. // Restart backend and remote balancer, should not use backends.
  715. tss.beListeners[0].(*restartableListener).restart()
  716. tss.lbListener.(*restartableListener).restart()
  717. tss.ls.sls <- sl
  718. time.Sleep(time.Second)
  719. var backendUsed2 bool
  720. for i := 0; i < 1000; i++ {
  721. if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
  722. t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
  723. }
  724. if p.Addr.(*net.TCPAddr).Port == tss.bePorts[0] {
  725. backendUsed2 = true
  726. break
  727. }
  728. time.Sleep(time.Millisecond)
  729. }
  730. if !backendUsed2 {
  731. t.Fatalf("No RPC sent to backend behind remote balancer after 1 second")
  732. }
  733. }
  734. // The remote balancer sends response with duplicates to grpclb client.
  735. func TestGRPCLBPickFirst(t *testing.T) {
  736. balancer.Register(newLBBuilderWithPickFirst())
  737. defer balancer.Register(newLBBuilder())
  738. defer leakcheck.Check(t)
  739. r, cleanup := manual.GenerateAndRegisterManualResolver()
  740. defer cleanup()
  741. tss, cleanup, err := newLoadBalancer(3)
  742. if err != nil {
  743. t.Fatalf("failed to create new load balancer: %v", err)
  744. }
  745. defer cleanup()
  746. beServers := []*lbpb.Server{{
  747. IpAddress: tss.beIPs[0],
  748. Port: int32(tss.bePorts[0]),
  749. LoadBalanceToken: lbToken,
  750. }, {
  751. IpAddress: tss.beIPs[1],
  752. Port: int32(tss.bePorts[1]),
  753. LoadBalanceToken: lbToken,
  754. }, {
  755. IpAddress: tss.beIPs[2],
  756. Port: int32(tss.bePorts[2]),
  757. LoadBalanceToken: lbToken,
  758. }}
  759. portsToIndex := make(map[int]int)
  760. for i := range beServers {
  761. portsToIndex[tss.bePorts[i]] = i
  762. }
  763. creds := serverNameCheckCreds{
  764. expected: beServerName,
  765. }
  766. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  767. defer cancel()
  768. cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
  769. grpc.WithTransportCredentials(&creds), grpc.WithContextDialer(fakeNameDialer))
  770. if err != nil {
  771. t.Fatalf("Failed to dial to the backend %v", err)
  772. }
  773. defer cc.Close()
  774. testC := testpb.NewTestServiceClient(cc)
  775. r.UpdateState(resolver.State{Addresses: []resolver.Address{{
  776. Addr: tss.lbAddr,
  777. Type: resolver.GRPCLB,
  778. ServerName: lbServerName,
  779. }}})
  780. var p peer.Peer
  781. portPicked1 := 0
  782. tss.ls.sls <- &lbpb.ServerList{Servers: beServers[1:2]}
  783. for i := 0; i < 1000; i++ {
  784. if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
  785. t.Fatalf("_.EmptyCall(_, _) = _, %v, want _, <nil>", err)
  786. }
  787. if portPicked1 == 0 {
  788. portPicked1 = p.Addr.(*net.TCPAddr).Port
  789. continue
  790. }
  791. if portPicked1 != p.Addr.(*net.TCPAddr).Port {
  792. t.Fatalf("Different backends are picked for RPCs: %v vs %v", portPicked1, p.Addr.(*net.TCPAddr).Port)
  793. }
  794. }
  795. portPicked2 := portPicked1
  796. tss.ls.sls <- &lbpb.ServerList{Servers: beServers[:1]}
  797. for i := 0; i < 1000; i++ {
  798. if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
  799. t.Fatalf("_.EmptyCall(_, _) = _, %v, want _, <nil>", err)
  800. }
  801. if portPicked2 == portPicked1 {
  802. portPicked2 = p.Addr.(*net.TCPAddr).Port
  803. continue
  804. }
  805. if portPicked2 != p.Addr.(*net.TCPAddr).Port {
  806. t.Fatalf("Different backends are picked for RPCs: %v vs %v", portPicked2, p.Addr.(*net.TCPAddr).Port)
  807. }
  808. }
  809. portPicked := portPicked2
  810. tss.ls.sls <- &lbpb.ServerList{Servers: beServers[1:]}
  811. for i := 0; i < 1000; i++ {
  812. if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
  813. t.Fatalf("_.EmptyCall(_, _) = _, %v, want _, <nil>", err)
  814. }
  815. if portPicked == portPicked2 {
  816. portPicked = p.Addr.(*net.TCPAddr).Port
  817. continue
  818. }
  819. if portPicked != p.Addr.(*net.TCPAddr).Port {
  820. t.Fatalf("Different backends are picked for RPCs: %v vs %v", portPicked, p.Addr.(*net.TCPAddr).Port)
  821. }
  822. }
  823. }
  824. type failPreRPCCred struct{}
  825. func (failPreRPCCred) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
  826. if strings.Contains(uri[0], failtosendURI) {
  827. return nil, fmt.Errorf("rpc should fail to send")
  828. }
  829. return nil, nil
  830. }
  831. func (failPreRPCCred) RequireTransportSecurity() bool {
  832. return false
  833. }
  834. func checkStats(stats, expected *rpcStats) error {
  835. if !stats.equal(expected) {
  836. return fmt.Errorf("stats not equal: got %+v, want %+v", stats, expected)
  837. }
  838. return nil
  839. }
  840. func runAndGetStats(t *testing.T, drop bool, runRPCs func(*grpc.ClientConn)) *rpcStats {
  841. defer leakcheck.Check(t)
  842. r, cleanup := manual.GenerateAndRegisterManualResolver()
  843. defer cleanup()
  844. tss, cleanup, err := newLoadBalancer(1)
  845. if err != nil {
  846. t.Fatalf("failed to create new load balancer: %v", err)
  847. }
  848. defer cleanup()
  849. servers := []*lbpb.Server{{
  850. IpAddress: tss.beIPs[0],
  851. Port: int32(tss.bePorts[0]),
  852. LoadBalanceToken: lbToken,
  853. }}
  854. if drop {
  855. servers = append(servers, &lbpb.Server{
  856. LoadBalanceToken: lbToken,
  857. Drop: drop,
  858. })
  859. }
  860. tss.ls.sls <- &lbpb.ServerList{Servers: servers}
  861. tss.ls.statsDura = 100 * time.Millisecond
  862. creds := serverNameCheckCreds{expected: beServerName}
  863. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  864. defer cancel()
  865. cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
  866. grpc.WithTransportCredentials(&creds),
  867. grpc.WithPerRPCCredentials(failPreRPCCred{}),
  868. grpc.WithContextDialer(fakeNameDialer))
  869. if err != nil {
  870. t.Fatalf("Failed to dial to the backend %v", err)
  871. }
  872. defer cc.Close()
  873. r.UpdateState(resolver.State{Addresses: []resolver.Address{{
  874. Addr: tss.lbAddr,
  875. Type: resolver.GRPCLB,
  876. ServerName: lbServerName,
  877. }}})
  878. runRPCs(cc)
  879. time.Sleep(1 * time.Second)
  880. stats := tss.ls.stats
  881. return stats
  882. }
  883. const (
  884. countRPC = 40
  885. failtosendURI = "failtosend"
  886. )
  887. func TestGRPCLBStatsUnarySuccess(t *testing.T) {
  888. defer leakcheck.Check(t)
  889. stats := runAndGetStats(t, false, func(cc *grpc.ClientConn) {
  890. testC := testpb.NewTestServiceClient(cc)
  891. // The first non-failfast RPC succeeds, all connections are up.
  892. if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
  893. t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
  894. }
  895. for i := 0; i < countRPC-1; i++ {
  896. testC.EmptyCall(context.Background(), &testpb.Empty{})
  897. }
  898. })
  899. if err := checkStats(stats, &rpcStats{
  900. numCallsStarted: int64(countRPC),
  901. numCallsFinished: int64(countRPC),
  902. numCallsFinishedKnownReceived: int64(countRPC),
  903. }); err != nil {
  904. t.Fatal(err)
  905. }
  906. }
  907. func TestGRPCLBStatsUnaryDrop(t *testing.T) {
  908. defer leakcheck.Check(t)
  909. stats := runAndGetStats(t, true, func(cc *grpc.ClientConn) {
  910. testC := testpb.NewTestServiceClient(cc)
  911. // The first non-failfast RPC succeeds, all connections are up.
  912. if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
  913. t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
  914. }
  915. for i := 0; i < countRPC-1; i++ {
  916. testC.EmptyCall(context.Background(), &testpb.Empty{})
  917. }
  918. })
  919. if err := checkStats(stats, &rpcStats{
  920. numCallsStarted: int64(countRPC),
  921. numCallsFinished: int64(countRPC),
  922. numCallsFinishedKnownReceived: int64(countRPC) / 2,
  923. numCallsDropped: map[string]int64{lbToken: int64(countRPC) / 2},
  924. }); err != nil {
  925. t.Fatal(err)
  926. }
  927. }
  928. func TestGRPCLBStatsUnaryFailedToSend(t *testing.T) {
  929. defer leakcheck.Check(t)
  930. stats := runAndGetStats(t, false, func(cc *grpc.ClientConn) {
  931. testC := testpb.NewTestServiceClient(cc)
  932. // The first non-failfast RPC succeeds, all connections are up.
  933. if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
  934. t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
  935. }
  936. for i := 0; i < countRPC-1; i++ {
  937. cc.Invoke(context.Background(), failtosendURI, &testpb.Empty{}, nil)
  938. }
  939. })
  940. if err := checkStats(stats, &rpcStats{
  941. numCallsStarted: int64(countRPC),
  942. numCallsFinished: int64(countRPC),
  943. numCallsFinishedWithClientFailedToSend: int64(countRPC - 1),
  944. numCallsFinishedKnownReceived: 1,
  945. }); err != nil {
  946. t.Fatal(err)
  947. }
  948. }
  949. func TestGRPCLBStatsStreamingSuccess(t *testing.T) {
  950. defer leakcheck.Check(t)
  951. stats := runAndGetStats(t, false, func(cc *grpc.ClientConn) {
  952. testC := testpb.NewTestServiceClient(cc)
  953. // The first non-failfast RPC succeeds, all connections are up.
  954. stream, err := testC.FullDuplexCall(context.Background(), grpc.WaitForReady(true))
  955. if err != nil {
  956. t.Fatalf("%v.FullDuplexCall(_, _) = _, %v, want _, <nil>", testC, err)
  957. }
  958. for {
  959. if _, err = stream.Recv(); err == io.EOF {
  960. break
  961. }
  962. }
  963. for i := 0; i < countRPC-1; i++ {
  964. stream, err = testC.FullDuplexCall(context.Background())
  965. if err == nil {
  966. // Wait for stream to end if err is nil.
  967. for {
  968. if _, err = stream.Recv(); err == io.EOF {
  969. break
  970. }
  971. }
  972. }
  973. }
  974. })
  975. if err := checkStats(stats, &rpcStats{
  976. numCallsStarted: int64(countRPC),
  977. numCallsFinished: int64(countRPC),
  978. numCallsFinishedKnownReceived: int64(countRPC),
  979. }); err != nil {
  980. t.Fatal(err)
  981. }
  982. }
  983. func TestGRPCLBStatsStreamingDrop(t *testing.T) {
  984. defer leakcheck.Check(t)
  985. stats := runAndGetStats(t, true, func(cc *grpc.ClientConn) {
  986. testC := testpb.NewTestServiceClient(cc)
  987. // The first non-failfast RPC succeeds, all connections are up.
  988. stream, err := testC.FullDuplexCall(context.Background(), grpc.WaitForReady(true))
  989. if err != nil {
  990. t.Fatalf("%v.FullDuplexCall(_, _) = _, %v, want _, <nil>", testC, err)
  991. }
  992. for {
  993. if _, err = stream.Recv(); err == io.EOF {
  994. break
  995. }
  996. }
  997. for i := 0; i < countRPC-1; i++ {
  998. stream, err = testC.FullDuplexCall(context.Background())
  999. if err == nil {
  1000. // Wait for stream to end if err is nil.
  1001. for {
  1002. if _, err = stream.Recv(); err == io.EOF {
  1003. break
  1004. }
  1005. }
  1006. }
  1007. }
  1008. })
  1009. if err := checkStats(stats, &rpcStats{
  1010. numCallsStarted: int64(countRPC),
  1011. numCallsFinished: int64(countRPC),
  1012. numCallsFinishedKnownReceived: int64(countRPC) / 2,
  1013. numCallsDropped: map[string]int64{lbToken: int64(countRPC) / 2},
  1014. }); err != nil {
  1015. t.Fatal(err)
  1016. }
  1017. }
  1018. func TestGRPCLBStatsStreamingFailedToSend(t *testing.T) {
  1019. defer leakcheck.Check(t)
  1020. stats := runAndGetStats(t, false, func(cc *grpc.ClientConn) {
  1021. testC := testpb.NewTestServiceClient(cc)
  1022. // The first non-failfast RPC succeeds, all connections are up.
  1023. stream, err := testC.FullDuplexCall(context.Background(), grpc.WaitForReady(true))
  1024. if err != nil {
  1025. t.Fatalf("%v.FullDuplexCall(_, _) = _, %v, want _, <nil>", testC, err)
  1026. }
  1027. for {
  1028. if _, err = stream.Recv(); err == io.EOF {
  1029. break
  1030. }
  1031. }
  1032. for i := 0; i < countRPC-1; i++ {
  1033. cc.NewStream(context.Background(), &grpc.StreamDesc{}, failtosendURI)
  1034. }
  1035. })
  1036. if err := checkStats(stats, &rpcStats{
  1037. numCallsStarted: int64(countRPC),
  1038. numCallsFinished: int64(countRPC),
  1039. numCallsFinishedWithClientFailedToSend: int64(countRPC - 1),
  1040. numCallsFinishedKnownReceived: 1,
  1041. }); err != nil {
  1042. t.Fatal(err)
  1043. }
  1044. }