123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535 |
- /*
- *
- * Copyright 2017 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
- package grpc
- import (
- "context"
- "fmt"
- "math"
- "testing"
- "time"
- "google.golang.org/grpc/balancer"
- "google.golang.org/grpc/balancer/roundrobin"
- "google.golang.org/grpc/internal"
- "google.golang.org/grpc/resolver"
- "google.golang.org/grpc/resolver/manual"
- "google.golang.org/grpc/serviceconfig"
- )
- var _ balancer.Builder = &magicalLB{}
- var _ balancer.Balancer = &magicalLB{}
- // magicalLB is a ringer for grpclb. It is used to avoid circular dependencies on the grpclb package
- type magicalLB struct{}
- func (b *magicalLB) Name() string {
- return "grpclb"
- }
- func (b *magicalLB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
- return b
- }
- func (b *magicalLB) ResolverError(error) {}
- func (b *magicalLB) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) {}
- func (b *magicalLB) UpdateClientConnState(balancer.ClientConnState) error {
- return nil
- }
- func (b *magicalLB) Close() {}
- func init() {
- balancer.Register(&magicalLB{})
- }
- func startServers(t *testing.T, numServers int, maxStreams uint32) ([]*server, func()) {
- var servers []*server
- for i := 0; i < numServers; i++ {
- s := newTestServer()
- servers = append(servers, s)
- go s.start(t, 0, maxStreams)
- s.wait(t, 2*time.Second)
- }
- return servers, func() {
- for i := 0; i < numServers; i++ {
- servers[i].stop()
- }
- }
- }
- func checkPickFirst(cc *ClientConn, servers []*server) error {
- var (
- req = "port"
- reply string
- err error
- )
- connected := false
- for i := 0; i < 5000; i++ {
- if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); errorDesc(err) == servers[0].port {
- if connected {
- // connected is set to false if peer is not server[0]. So if
- // connected is true here, this is the second time we saw
- // server[0] in a row. Break because pickfirst is in effect.
- break
- }
- connected = true
- } else {
- connected = false
- }
- time.Sleep(time.Millisecond)
- }
- if !connected {
- return fmt.Errorf("pickfirst is not in effect after 5 second, EmptyCall() = _, %v, want _, %v", err, servers[0].port)
- }
- // The following RPCs should all succeed with the first server.
- for i := 0; i < 3; i++ {
- err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply)
- if errorDesc(err) != servers[0].port {
- return fmt.Errorf("index %d: want peer %v, got peer %v", i, servers[0].port, err)
- }
- }
- return nil
- }
- func checkRoundRobin(cc *ClientConn, servers []*server) error {
- var (
- req = "port"
- reply string
- err error
- )
- // Make sure connections to all servers are up.
- for i := 0; i < 2; i++ {
- // Do this check twice, otherwise the first RPC's transport may still be
- // picked by the closing pickfirst balancer, and the test becomes flaky.
- for _, s := range servers {
- var up bool
- for i := 0; i < 5000; i++ {
- if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); errorDesc(err) == s.port {
- up = true
- break
- }
- time.Sleep(time.Millisecond)
- }
- if !up {
- return fmt.Errorf("server %v is not up within 5 second", s.port)
- }
- }
- }
- serverCount := len(servers)
- for i := 0; i < 3*serverCount; i++ {
- err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply)
- if errorDesc(err) != servers[i%serverCount].port {
- return fmt.Errorf("index %d: want peer %v, got peer %v", i, servers[i%serverCount].port, err)
- }
- }
- return nil
- }
- func (s) TestSwitchBalancer(t *testing.T) {
- r := manual.NewBuilderWithScheme("whatever")
- const numServers = 2
- servers, scleanup := startServers(t, numServers, math.MaxInt32)
- defer scleanup()
- cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
- if err != nil {
- t.Fatalf("failed to dial: %v", err)
- }
- defer cc.Close()
- addrs := []resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}}
- r.UpdateState(resolver.State{Addresses: addrs})
- // The default balancer is pickfirst.
- if err := checkPickFirst(cc, servers); err != nil {
- t.Fatalf("check pickfirst returned non-nil error: %v", err)
- }
- // Switch to roundrobin.
- cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`), Addresses: addrs}, nil)
- if err := checkRoundRobin(cc, servers); err != nil {
- t.Fatalf("check roundrobin returned non-nil error: %v", err)
- }
- // Switch to pickfirst.
- cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "pick_first"}`), Addresses: addrs}, nil)
- if err := checkPickFirst(cc, servers); err != nil {
- t.Fatalf("check pickfirst returned non-nil error: %v", err)
- }
- }
- // Test that balancer specified by dial option will not be overridden.
- func (s) TestBalancerDialOption(t *testing.T) {
- r := manual.NewBuilderWithScheme("whatever")
- const numServers = 2
- servers, scleanup := startServers(t, numServers, math.MaxInt32)
- defer scleanup()
- cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}), WithBalancerName(roundrobin.Name))
- if err != nil {
- t.Fatalf("failed to dial: %v", err)
- }
- defer cc.Close()
- addrs := []resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}}
- r.UpdateState(resolver.State{Addresses: addrs})
- // The init balancer is roundrobin.
- if err := checkRoundRobin(cc, servers); err != nil {
- t.Fatalf("check roundrobin returned non-nil error: %v", err)
- }
- // Switch to pickfirst.
- cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "pick_first"}`), Addresses: addrs}, nil)
- // Balancer is still roundrobin.
- if err := checkRoundRobin(cc, servers); err != nil {
- t.Fatalf("check roundrobin returned non-nil error: %v", err)
- }
- }
- // First addr update contains grpclb.
- func (s) TestSwitchBalancerGRPCLBFirst(t *testing.T) {
- r := manual.NewBuilderWithScheme("whatever")
- cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
- if err != nil {
- t.Fatalf("failed to dial: %v", err)
- }
- defer cc.Close()
- // ClientConn will switch balancer to grpclb when receives an address of
- // type GRPCLB.
- r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}, {Addr: "grpclb", Type: resolver.GRPCLB}}})
- var isGRPCLB bool
- for i := 0; i < 5000; i++ {
- cc.mu.Lock()
- isGRPCLB = cc.curBalancerName == "grpclb"
- cc.mu.Unlock()
- if isGRPCLB {
- break
- }
- time.Sleep(time.Millisecond)
- }
- if !isGRPCLB {
- t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
- }
- // New update containing new backend and new grpclb. Should not switch
- // balancer.
- r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend2"}, {Addr: "grpclb2", Type: resolver.GRPCLB}}})
- for i := 0; i < 200; i++ {
- cc.mu.Lock()
- isGRPCLB = cc.curBalancerName == "grpclb"
- cc.mu.Unlock()
- if !isGRPCLB {
- break
- }
- time.Sleep(time.Millisecond)
- }
- if !isGRPCLB {
- t.Fatalf("within 200 ms, cc.balancer switched to !grpclb, want grpclb")
- }
- var isPickFirst bool
- // Switch balancer to pickfirst.
- r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}})
- for i := 0; i < 5000; i++ {
- cc.mu.Lock()
- isPickFirst = cc.curBalancerName == PickFirstBalancerName
- cc.mu.Unlock()
- if isPickFirst {
- break
- }
- time.Sleep(time.Millisecond)
- }
- if !isPickFirst {
- t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName)
- }
- }
- // First addr update does not contain grpclb.
- func (s) TestSwitchBalancerGRPCLBSecond(t *testing.T) {
- r := manual.NewBuilderWithScheme("whatever")
- cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
- if err != nil {
- t.Fatalf("failed to dial: %v", err)
- }
- defer cc.Close()
- r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}})
- var isPickFirst bool
- for i := 0; i < 5000; i++ {
- cc.mu.Lock()
- isPickFirst = cc.curBalancerName == PickFirstBalancerName
- cc.mu.Unlock()
- if isPickFirst {
- break
- }
- time.Sleep(time.Millisecond)
- }
- if !isPickFirst {
- t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName)
- }
- // ClientConn will switch balancer to grpclb when receives an address of
- // type GRPCLB.
- r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}, {Addr: "grpclb", Type: resolver.GRPCLB}}})
- var isGRPCLB bool
- for i := 0; i < 5000; i++ {
- cc.mu.Lock()
- isGRPCLB = cc.curBalancerName == "grpclb"
- cc.mu.Unlock()
- if isGRPCLB {
- break
- }
- time.Sleep(time.Millisecond)
- }
- if !isGRPCLB {
- t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
- }
- // New update containing new backend and new grpclb. Should not switch
- // balancer.
- r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend2"}, {Addr: "grpclb2", Type: resolver.GRPCLB}}})
- for i := 0; i < 200; i++ {
- cc.mu.Lock()
- isGRPCLB = cc.curBalancerName == "grpclb"
- cc.mu.Unlock()
- if !isGRPCLB {
- break
- }
- time.Sleep(time.Millisecond)
- }
- if !isGRPCLB {
- t.Fatalf("within 200 ms, cc.balancer switched to !grpclb, want grpclb")
- }
- // Switch balancer back.
- r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}})
- for i := 0; i < 5000; i++ {
- cc.mu.Lock()
- isPickFirst = cc.curBalancerName == PickFirstBalancerName
- cc.mu.Unlock()
- if isPickFirst {
- break
- }
- time.Sleep(time.Millisecond)
- }
- if !isPickFirst {
- t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName)
- }
- }
- // Test that if the current balancer is roundrobin, after switching to grpclb,
- // when the resolved address doesn't contain grpclb addresses, balancer will be
- // switched back to roundrobin.
- func (s) TestSwitchBalancerGRPCLBRoundRobin(t *testing.T) {
- r := manual.NewBuilderWithScheme("whatever")
- cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
- if err != nil {
- t.Fatalf("failed to dial: %v", err)
- }
- defer cc.Close()
- sc := parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)
- r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}, ServiceConfig: sc})
- var isRoundRobin bool
- for i := 0; i < 5000; i++ {
- cc.mu.Lock()
- isRoundRobin = cc.curBalancerName == "round_robin"
- cc.mu.Unlock()
- if isRoundRobin {
- break
- }
- time.Sleep(time.Millisecond)
- }
- if !isRoundRobin {
- t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName)
- }
- // ClientConn will switch balancer to grpclb when receives an address of
- // type GRPCLB.
- r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "grpclb", Type: resolver.GRPCLB}}, ServiceConfig: sc})
- var isGRPCLB bool
- for i := 0; i < 5000; i++ {
- cc.mu.Lock()
- isGRPCLB = cc.curBalancerName == "grpclb"
- cc.mu.Unlock()
- if isGRPCLB {
- break
- }
- time.Sleep(time.Millisecond)
- }
- if !isGRPCLB {
- t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
- }
- // Switch balancer back.
- r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}, ServiceConfig: sc})
- for i := 0; i < 5000; i++ {
- cc.mu.Lock()
- isRoundRobin = cc.curBalancerName == "round_robin"
- cc.mu.Unlock()
- if isRoundRobin {
- break
- }
- time.Sleep(time.Millisecond)
- }
- if !isRoundRobin {
- t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName)
- }
- }
- // Test that if resolved address list contains grpclb, the balancer option in
- // service config won't take effect. But when there's no grpclb address in a new
- // resolved address list, balancer will be switched to the new one.
- func (s) TestSwitchBalancerGRPCLBServiceConfig(t *testing.T) {
- r := manual.NewBuilderWithScheme("whatever")
- cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
- if err != nil {
- t.Fatalf("failed to dial: %v", err)
- }
- defer cc.Close()
- r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}})
- var isPickFirst bool
- for i := 0; i < 5000; i++ {
- cc.mu.Lock()
- isPickFirst = cc.curBalancerName == PickFirstBalancerName
- cc.mu.Unlock()
- if isPickFirst {
- break
- }
- time.Sleep(time.Millisecond)
- }
- if !isPickFirst {
- t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName)
- }
- // ClientConn will switch balancer to grpclb when receives an address of
- // type GRPCLB.
- addrs := []resolver.Address{{Addr: "grpclb", Type: resolver.GRPCLB}}
- r.UpdateState(resolver.State{Addresses: addrs})
- var isGRPCLB bool
- for i := 0; i < 5000; i++ {
- cc.mu.Lock()
- isGRPCLB = cc.curBalancerName == "grpclb"
- cc.mu.Unlock()
- if isGRPCLB {
- break
- }
- time.Sleep(time.Millisecond)
- }
- if !isGRPCLB {
- t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
- }
- sc := parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)
- r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: sc})
- var isRoundRobin bool
- for i := 0; i < 200; i++ {
- cc.mu.Lock()
- isRoundRobin = cc.curBalancerName == "round_robin"
- cc.mu.Unlock()
- if isRoundRobin {
- break
- }
- time.Sleep(time.Millisecond)
- }
- // Balancer should NOT switch to round_robin because resolved list contains
- // grpclb.
- if isRoundRobin {
- t.Fatalf("within 200 ms, cc.balancer switched to round_robin, want grpclb")
- }
- // Switch balancer back.
- r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}, ServiceConfig: sc})
- for i := 0; i < 5000; i++ {
- cc.mu.Lock()
- isRoundRobin = cc.curBalancerName == "round_robin"
- cc.mu.Unlock()
- if isRoundRobin {
- break
- }
- time.Sleep(time.Millisecond)
- }
- if !isRoundRobin {
- t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName)
- }
- }
- // Test that when switching to grpclb fails because grpclb is not registered,
- // the fallback balancer will only get backend addresses, not the grpclb server
- // address.
- //
- // The tests sends 3 server addresses (all backends) as resolved addresses, but
- // claim the first one is grpclb server. The all RPCs should all be send to the
- // other addresses, not the first one.
- func (s) TestSwitchBalancerGRPCLBWithGRPCLBNotRegistered(t *testing.T) {
- internal.BalancerUnregister("grpclb")
- defer balancer.Register(&magicalLB{})
- r := manual.NewBuilderWithScheme("whatever")
- const numServers = 3
- servers, scleanup := startServers(t, numServers, math.MaxInt32)
- defer scleanup()
- cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
- if err != nil {
- t.Fatalf("failed to dial: %v", err)
- }
- defer cc.Close()
- r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[1].addr}, {Addr: servers[2].addr}}})
- // The default balancer is pickfirst.
- if err := checkPickFirst(cc, servers[1:]); err != nil {
- t.Fatalf("check pickfirst returned non-nil error: %v", err)
- }
- // Try switching to grpclb by sending servers[0] as grpclb address. It's
- // expected that servers[0] will be filtered out, so it will not be used by
- // the balancer.
- //
- // If the filtering failed, servers[0] will be used for RPCs and the RPCs
- // will succeed. The following checks will catch this and fail.
- addrs := []resolver.Address{
- {Addr: servers[0].addr, Type: resolver.GRPCLB},
- {Addr: servers[1].addr}, {Addr: servers[2].addr}}
- r.UpdateState(resolver.State{Addresses: addrs})
- // Still check for pickfirst, but only with server[1] and server[2].
- if err := checkPickFirst(cc, servers[1:]); err != nil {
- t.Fatalf("check pickfirst returned non-nil error: %v", err)
- }
- // Switch to roundrobin, and check against server[1] and server[2].
- cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`), Addresses: addrs}, nil)
- if err := checkRoundRobin(cc, servers[1:]); err != nil {
- t.Fatalf("check roundrobin returned non-nil error: %v", err)
- }
- }
- func parseCfg(r *manual.Resolver, s string) *serviceconfig.ParseResult {
- scpr := r.CC.ParseServiceConfig(s)
- if scpr.Err != nil {
- panic(fmt.Sprintf("Error parsing config %q: %v", s, scpr.Err))
- }
- return scpr
- }
|