Skip to content

Commit

Permalink
pickfirst: add tests for resolver error scenarios (#6484)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars authored Jul 28, 2023
1 parent b8d36ca commit 20c51a9
Showing 1 changed file with 218 additions and 0 deletions.
218 changes: 218 additions & 0 deletions test/pickfirst_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ package test

import (
"context"
"errors"
"fmt"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -788,3 +790,219 @@ func (s) TestPickFirst_AddressUpdateWithBalancerAttributes(t *testing.T) {
t.Fatal(err)
}
}

// Tests the case where the pick_first LB policy receives an error from the name
// resolver without previously receiving a good update. Verifies that the
// channel moves to TRANSIENT_FAILURE and that error received from the name
// resolver is propagated to the caller of an RPC.
func (s) TestPickFirst_ResolverError_NoPreviousUpdate(t *testing.T) {
cc, r, _ := setupPickFirst(t, 0)

nrErr := errors.New("error from name resolver")
r.ReportError(nrErr)

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
awaitState(ctx, t, cc, connectivity.TransientFailure)

client := testgrpc.NewTestServiceClient(cc)
_, err := client.EmptyCall(ctx, &testpb.Empty{})
if err == nil {
t.Fatalf("EmptyCall() succeeded when expected to fail with error: %v", nrErr)
}
if !strings.Contains(err.Error(), nrErr.Error()) {
t.Fatalf("EmptyCall() failed with error: %v, want error: %v", err, nrErr)
}
}

// Tests the case where the pick_first LB policy receives an error from the name
// resolver after receiving a good update (and the channel is currently READY).
// The test verifies that the channel continues to use the previously received
// good update.
func (s) TestPickFirst_ResolverError_WithPreviousUpdate_Ready(t *testing.T) {
cc, r, backends := setupPickFirst(t, 1)

addrs := stubBackendsToResolverAddrs(backends)
r.UpdateState(resolver.State{Addresses: addrs})

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
t.Fatal(err)
}

nrErr := errors.New("error from name resolver")
r.ReportError(nrErr)

// Ensure that RPCs continue to succeed for the next second.
client := testgrpc.NewTestServiceClient(cc)
for end := time.Now().Add(time.Second); time.Now().Before(end); <-time.After(defaultTestShortTimeout) {
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}
}
}

// Tests the case where the pick_first LB policy receives an error from the name
// resolver after receiving a good update (and the channel is currently in
// CONNECTING state). The test verifies that the channel continues to use the
// previously received good update, and that RPCs don't fail with the error
// received from the name resolver.
func (s) TestPickFirst_ResolverError_WithPreviousUpdate_Connecting(t *testing.T) {
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("net.Listen() failed: %v", err)
}

// Listen on a local port and act like a server that blocks until the
// channel reaches CONNECTING and closes the connection without sending a
// server preface.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForConnecting := make(chan struct{})
go func() {
conn, err := lis.Accept()
if err != nil {
t.Errorf("Unexpected error when accepting a connection: %v", err)
}
defer conn.Close()

select {
case <-waitForConnecting:
case <-ctx.Done():
t.Error("Timeout when waiting for channel to move to CONNECTING state")
}
}()

r := manual.NewBuilderWithScheme("whatever")
dopts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithResolvers(r),
grpc.WithDefaultServiceConfig(pickFirstServiceConfig),
}
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })

addrs := []resolver.Address{{Addr: lis.Addr().String()}}
r.UpdateState(resolver.State{Addresses: addrs})
awaitState(ctx, t, cc, connectivity.Connecting)

nrErr := errors.New("error from name resolver")
r.ReportError(nrErr)

// RPCs should fail with deadline exceed error as long as they are in
// CONNECTING and not the error returned by the name resolver.
client := testgrpc.NewTestServiceClient(cc)
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); !strings.Contains(err.Error(), context.DeadlineExceeded.Error()) {
t.Fatalf("EmptyCall() failed with error: %v, want error: %v", err, context.DeadlineExceeded)
}

// Closing this channel leads to closing of the connection by our listener.
// gRPC should see this as a connection error.
close(waitForConnecting)
awaitState(ctx, t, cc, connectivity.TransientFailure)
checkForConnectionError(ctx, t, cc)
}

// Tests the case where the pick_first LB policy receives an error from the name
// resolver after receiving a good update. The previous good update though has
// seen the channel move to TRANSIENT_FAILURE. The test verifies that the
// channel fails RPCs with the new error from the resolver.
func (s) TestPickFirst_ResolverError_WithPreviousUpdate_TransientFailure(t *testing.T) {
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("net.Listen() failed: %v", err)
}

// Listen on a local port and act like a server that closes the connection
// without sending a server preface.
go func() {
conn, err := lis.Accept()
if err != nil {
t.Errorf("Unexpected error when accepting a connection: %v", err)
}
conn.Close()
}()

r := manual.NewBuilderWithScheme("whatever")
dopts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithResolvers(r),
grpc.WithDefaultServiceConfig(pickFirstServiceConfig),
}
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })

addrs := []resolver.Address{{Addr: lis.Addr().String()}}
r.UpdateState(resolver.State{Addresses: addrs})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
awaitState(ctx, t, cc, connectivity.TransientFailure)
checkForConnectionError(ctx, t, cc)

// An error from the name resolver should result in RPCs failing with that
// error instead of the old error that caused the channel to move to
// TRANSIENT_FAILURE in the first place.
nrErr := errors.New("error from name resolver")
r.ReportError(nrErr)
client := testgrpc.NewTestServiceClient(cc)
for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); strings.Contains(err.Error(), nrErr.Error()) {
break
}
}
if ctx.Err() != nil {
t.Fatal("Timeout when waiting for RPCs to fail with error returned by the name resolver")
}
}

func checkForConnectionError(ctx context.Context, t *testing.T, cc *grpc.ClientConn) {
t.Helper()

// RPCs may fail on the client side in two ways, once the fake server closes
// the accepted connection:
// - writing the client preface succeeds, but not reading the server preface
// - writing the client preface fails
// In either case, we should see it fail with UNAVAILABLE.
client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable {
t.Fatalf("EmptyCall() failed with error: %v, want code %v", err, codes.Unavailable)
}
}

// Tests the case where the pick_first LB policy receives an update from the
// name resolver with no addresses after receiving a good update. The test
// verifies that the channel fails RPCs with an error indicating the fact that
// the name resolver returned no addresses.
func (s) TestPickFirst_ResolverError_ZeroAddresses_WithPreviousUpdate(t *testing.T) {
cc, r, backends := setupPickFirst(t, 1)

addrs := stubBackendsToResolverAddrs(backends)
r.UpdateState(resolver.State{Addresses: addrs})

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
t.Fatal(err)
}

r.UpdateState(resolver.State{})
wantErr := "produced zero addresses"
client := testgrpc.NewTestServiceClient(cc)
for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); strings.Contains(err.Error(), wantErr) {
break
}
}
if ctx.Err() != nil {
t.Fatal("Timeout when waiting for RPCs to fail with error returned by the name resolver")
}
}

0 comments on commit 20c51a9

Please sign in to comment.