diff --git a/test/pickfirst_test.go b/test/pickfirst_test.go index 0e92a543ba4d..a7496e854125 100644 --- a/test/pickfirst_test.go +++ b/test/pickfirst_test.go @@ -20,7 +20,9 @@ package test import ( "context" + "errors" "fmt" + "strings" "testing" "time" @@ -788,3 +790,219 @@ func (s) TestPickFirst_AddressUpdateWithBalancerAttributes(t *testing.T) { t.Fatal(err) } } + +// Tests the case where the pick_first LB policy receives an error from the name +// resolver without previously receiving a good update. Verifies that the +// channel moves to TRANSIENT_FAILURE and that error received from the name +// resolver is propagated to the caller of an RPC. +func (s) TestPickFirst_ResolverError_NoPreviousUpdate(t *testing.T) { + cc, r, _ := setupPickFirst(t, 0) + + nrErr := errors.New("error from name resolver") + r.ReportError(nrErr) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + awaitState(ctx, t, cc, connectivity.TransientFailure) + + client := testgrpc.NewTestServiceClient(cc) + _, err := client.EmptyCall(ctx, &testpb.Empty{}) + if err == nil { + t.Fatalf("EmptyCall() succeeded when expected to fail with error: %v", nrErr) + } + if !strings.Contains(err.Error(), nrErr.Error()) { + t.Fatalf("EmptyCall() failed with error: %v, want error: %v", err, nrErr) + } +} + +// Tests the case where the pick_first LB policy receives an error from the name +// resolver after receiving a good update (and the channel is currently READY). +// The test verifies that the channel continues to use the previously received +// good update. +func (s) TestPickFirst_ResolverError_WithPreviousUpdate_Ready(t *testing.T) { + cc, r, backends := setupPickFirst(t, 1) + + addrs := stubBackendsToResolverAddrs(backends) + r.UpdateState(resolver.State{Addresses: addrs}) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil { + t.Fatal(err) + } + + nrErr := errors.New("error from name resolver") + r.ReportError(nrErr) + + // Ensure that RPCs continue to succeed for the next second. + client := testgrpc.NewTestServiceClient(cc) + for end := time.Now().Add(time.Second); time.Now().Before(end); <-time.After(defaultTestShortTimeout) { + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("EmptyCall() failed: %v", err) + } + } +} + +// Tests the case where the pick_first LB policy receives an error from the name +// resolver after receiving a good update (and the channel is currently in +// CONNECTING state). The test verifies that the channel continues to use the +// previously received good update, and that RPCs don't fail with the error +// received from the name resolver. +func (s) TestPickFirst_ResolverError_WithPreviousUpdate_Connecting(t *testing.T) { + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("net.Listen() failed: %v", err) + } + + // Listen on a local port and act like a server that blocks until the + // channel reaches CONNECTING and closes the connection without sending a + // server preface. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + waitForConnecting := make(chan struct{}) + go func() { + conn, err := lis.Accept() + if err != nil { + t.Errorf("Unexpected error when accepting a connection: %v", err) + } + defer conn.Close() + + select { + case <-waitForConnecting: + case <-ctx.Done(): + t.Error("Timeout when waiting for channel to move to CONNECTING state") + } + }() + + r := manual.NewBuilderWithScheme("whatever") + dopts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithResolvers(r), + grpc.WithDefaultServiceConfig(pickFirstServiceConfig), + } + cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + t.Cleanup(func() { cc.Close() }) + + addrs := []resolver.Address{{Addr: lis.Addr().String()}} + r.UpdateState(resolver.State{Addresses: addrs}) + awaitState(ctx, t, cc, connectivity.Connecting) + + nrErr := errors.New("error from name resolver") + r.ReportError(nrErr) + + // RPCs should fail with deadline exceed error as long as they are in + // CONNECTING and not the error returned by the name resolver. + client := testgrpc.NewTestServiceClient(cc) + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); !strings.Contains(err.Error(), context.DeadlineExceeded.Error()) { + t.Fatalf("EmptyCall() failed with error: %v, want error: %v", err, context.DeadlineExceeded) + } + + // Closing this channel leads to closing of the connection by our listener. + // gRPC should see this as a connection error. + close(waitForConnecting) + awaitState(ctx, t, cc, connectivity.TransientFailure) + checkForConnectionError(ctx, t, cc) +} + +// Tests the case where the pick_first LB policy receives an error from the name +// resolver after receiving a good update. The previous good update though has +// seen the channel move to TRANSIENT_FAILURE. The test verifies that the +// channel fails RPCs with the new error from the resolver. +func (s) TestPickFirst_ResolverError_WithPreviousUpdate_TransientFailure(t *testing.T) { + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("net.Listen() failed: %v", err) + } + + // Listen on a local port and act like a server that closes the connection + // without sending a server preface. + go func() { + conn, err := lis.Accept() + if err != nil { + t.Errorf("Unexpected error when accepting a connection: %v", err) + } + conn.Close() + }() + + r := manual.NewBuilderWithScheme("whatever") + dopts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithResolvers(r), + grpc.WithDefaultServiceConfig(pickFirstServiceConfig), + } + cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + t.Cleanup(func() { cc.Close() }) + + addrs := []resolver.Address{{Addr: lis.Addr().String()}} + r.UpdateState(resolver.State{Addresses: addrs}) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + awaitState(ctx, t, cc, connectivity.TransientFailure) + checkForConnectionError(ctx, t, cc) + + // An error from the name resolver should result in RPCs failing with that + // error instead of the old error that caused the channel to move to + // TRANSIENT_FAILURE in the first place. + nrErr := errors.New("error from name resolver") + r.ReportError(nrErr) + client := testgrpc.NewTestServiceClient(cc) + for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) { + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); strings.Contains(err.Error(), nrErr.Error()) { + break + } + } + if ctx.Err() != nil { + t.Fatal("Timeout when waiting for RPCs to fail with error returned by the name resolver") + } +} + +func checkForConnectionError(ctx context.Context, t *testing.T, cc *grpc.ClientConn) { + t.Helper() + + // RPCs may fail on the client side in two ways, once the fake server closes + // the accepted connection: + // - writing the client preface succeeds, but not reading the server preface + // - writing the client preface fails + // In either case, we should see it fail with UNAVAILABLE. + client := testgrpc.NewTestServiceClient(cc) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable { + t.Fatalf("EmptyCall() failed with error: %v, want code %v", err, codes.Unavailable) + } +} + +// Tests the case where the pick_first LB policy receives an update from the +// name resolver with no addresses after receiving a good update. The test +// verifies that the channel fails RPCs with an error indicating the fact that +// the name resolver returned no addresses. +func (s) TestPickFirst_ResolverError_ZeroAddresses_WithPreviousUpdate(t *testing.T) { + cc, r, backends := setupPickFirst(t, 1) + + addrs := stubBackendsToResolverAddrs(backends) + r.UpdateState(resolver.State{Addresses: addrs}) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil { + t.Fatal(err) + } + + r.UpdateState(resolver.State{}) + wantErr := "produced zero addresses" + client := testgrpc.NewTestServiceClient(cc) + for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) { + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); strings.Contains(err.Error(), wantErr) { + break + } + } + if ctx.Err() != nil { + t.Fatal("Timeout when waiting for RPCs to fail with error returned by the name resolver") + } +}