Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpclb: filter out grpclb addresses if balancer in use is not grpclb #2509

Merged
merged 4 commits into from
Dec 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
)
Expand All @@ -47,6 +48,18 @@ func Register(b Builder) {
m[strings.ToLower(b.Name())] = b
}

// unregisterForTesting deletes the balancer with the given name from the
// balancer map.
//
// This function is not thread-safe.
func unregisterForTesting(name string) {
delete(m, name)
}

func init() {
internal.BalancerUnregister = unregisterForTesting
}

// Get returns the resolver builder registered with the given name.
// Note that the compare is done in a case-insenstive fashion.
// If no builder is register with the name, nil will be returned.
Expand Down
1 change: 1 addition & 0 deletions balancer/grpclb/grpclb.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ func (lb *lbBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
var remoteBalancerAddrs, backendAddrs []resolver.Address
for _, a := range addrs {
if a.Type == resolver.GRPCLB {
a.Type = resolver.Backend
remoteBalancerAddrs = append(remoteBalancerAddrs, a)
} else {
backendAddrs = append(backendAddrs, a)
Expand Down
22 changes: 3 additions & 19 deletions balancer/grpclb/grpclb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,25 +628,10 @@ func TestBalancerDisconnects(t *testing.T) {
t.Fatalf("No RPC sent to second backend after 1 second")
}

type customGRPCLBBuilder struct {
balancer.Builder
name string
}

func (b *customGRPCLBBuilder) Name() string {
return b.name
}

const grpclbCustomFallbackName = "grpclb_with_custom_fallback_timeout"

func init() {
balancer.Register(&customGRPCLBBuilder{
Builder: newLBBuilderWithFallbackTimeout(100 * time.Millisecond),
name: grpclbCustomFallbackName,
})
}

func TestFallback(t *testing.T) {
balancer.Register(newLBBuilderWithFallbackTimeout(100 * time.Millisecond))
defer balancer.Register(newLBBuilder())

defer leakcheck.Check(t)

r, cleanup := manual.GenerateAndRegisterManualResolver()
Expand Down Expand Up @@ -684,7 +669,6 @@ func TestFallback(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
grpc.WithBalancerName(grpclbCustomFallbackName),
grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
Expand Down
22 changes: 22 additions & 0 deletions balancer_conn_wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,28 @@ func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s co
}

func (ccb *ccBalancerWrapper) handleResolvedAddrs(addrs []resolver.Address, err error) {
if ccb.cc.curBalancerName != grpclbName {
var containsGRPCLB bool
for _, a := range addrs {
if a.Type == resolver.GRPCLB {
containsGRPCLB = true
break
}
}
if containsGRPCLB {
// The current balancer is not grpclb, but addresses contain grpclb
// address. This means we failed to switch to grpclb, most likely
// because grpclb is not registered. Filter out all grpclb addresses
// from addrs before sending to balancer.
tempAddrs := make([]resolver.Address, 0, len(addrs))
for _, a := range addrs {
if a.Type != resolver.GRPCLB {
tempAddrs = append(tempAddrs, a)
}
}
addrs = tempAddrs
}
}
select {
case <-ccb.resolverUpdateCh:
default:
Expand Down
54 changes: 52 additions & 2 deletions balancer_switching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/connectivity"
_ "google.golang.org/grpc/grpclog/glogger"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/leakcheck"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
Expand Down Expand Up @@ -133,7 +134,7 @@ func TestSwitchBalancer(t *testing.T) {
r, rcleanup := manual.GenerateAndRegisterManualResolver()
defer rcleanup()

numServers := 2
const numServers = 2
servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()

Expand Down Expand Up @@ -165,7 +166,7 @@ func TestBalancerDialOption(t *testing.T) {
r, rcleanup := manual.GenerateAndRegisterManualResolver()
defer rcleanup()

numServers := 2
const numServers = 2
servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()

Expand Down Expand Up @@ -467,3 +468,52 @@ func TestSwitchBalancerGRPCLBServiceConfig(t *testing.T) {
t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName)
}
}

// Test that when switching to grpclb fails because grpclb is not registered,
// the fallback balancer will only get backend addresses, not the grpclb server
// address.
//
// The tests sends 3 server addresses (all backends) as resolved addresses, but
// claim the first one is grpclb server. The all RPCs should all be send to the
// other addresses, not the first one.
func TestSwitchBalancerGRPCLBWithGRPCLBNotRegistered(t *testing.T) {
internal.BalancerUnregister("grpclb")
defer balancer.Register(&magicalLB{})

defer leakcheck.Check(t)
r, rcleanup := manual.GenerateAndRegisterManualResolver()
defer rcleanup()

const numServers = 3
servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()

cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
r.NewAddress([]resolver.Address{{Addr: servers[1].addr}, {Addr: servers[2].addr}})
// The default balancer is pickfirst.
if err := checkPickFirst(cc, servers[1:]); err != nil {
t.Fatalf("check pickfirst returned non-nil error: %v", err)
}
// Try switching to grpclb by sending servers[0] as grpclb address. It's
// expected that servers[0] will be filtered out, so it will not be used by
// the balancer.
//
// If the filtering failed, servers[0] will be used for RPCs and the RPCs
// will succeed. The following checks will catch this and fail.
r.NewAddress([]resolver.Address{
{Addr: servers[0].addr, Type: resolver.GRPCLB},
{Addr: servers[1].addr}, {Addr: servers[2].addr}})
// Still check for pickfirst, but only with server[1] and server[2].
if err := checkPickFirst(cc, servers[1:]); err != nil {
t.Fatalf("check pickfirst returned non-nil error: %v", err)
}
// Switch to roundrobin, anc check against server[1] and server[2].
cc.handleServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)
if err := checkRoundRobin(cc, servers[1:]); err != nil {
t.Fatalf("check roundrobin returned non-nil error: %v", err)
}
}
2 changes: 2 additions & 0 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ var (
WithResolverBuilder interface{} // func (resolver.Builder) grpc.DialOption
// HealthCheckFunc is used to provide client-side LB channel health checking
HealthCheckFunc func(ctx context.Context, newStream func() (interface{}, error), reportHealth func(bool), serviceName string) error
// BalancerUnregister is exported by package balancer to unregister a balancer.
BalancerUnregister func(name string)
)

const (
Expand Down