diff --git a/balancer/rls/balancer_test.go b/balancer/rls/balancer_test.go index 45d95b2ddeca..1c2f9fca32a5 100644 --- a/balancer/rls/balancer_test.go +++ b/balancer/rls/balancer_test.go @@ -36,6 +36,7 @@ import ( rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1" internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" "google.golang.org/grpc/internal/testutils" + rlstest "google.golang.org/grpc/internal/testutils/rls" "google.golang.org/grpc/metadata" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" @@ -48,10 +49,10 @@ import ( // and the old one is closed. func (s) TestConfigUpdate_ControlChannel(t *testing.T) { // Start two RLS servers. - lis1 := newListenerWrapper(t, nil) - rlsServer1, rlsReqCh1 := setupFakeRLSServer(t, lis1) - lis2 := newListenerWrapper(t, nil) - rlsServer2, rlsReqCh2 := setupFakeRLSServer(t, lis2) + lis1 := testutils.NewListenerWrapper(t, nil) + rlsServer1, rlsReqCh1 := rlstest.SetupFakeRLSServer(t, lis1) + lis2 := testutils.NewListenerWrapper(t, nil) + rlsServer2, rlsReqCh2 := rlstest.SetupFakeRLSServer(t, lis2) // Build RLS service config with the RLS server pointing to the first one. // Set a very low value for maxAge to ensure that the entry expires soon. @@ -61,12 +62,12 @@ func (s) TestConfigUpdate_ControlChannel(t *testing.T) { // Start a couple of test backends, and set up the fake RLS servers to return // these as a target in the RLS response. backendCh1, backendAddress1 := startBackend(t) - rlsServer1.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse { - return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress1}}} + rlsServer1.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse { + return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress1}}} }) backendCh2, backendAddress2 := startBackend(t) - rlsServer2.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse { - return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress2}}} + rlsServer2.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse { + return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress2}}} }) // Register a manual resolver and push the RLS service config through it. @@ -84,11 +85,11 @@ func (s) TestConfigUpdate_ControlChannel(t *testing.T) { makeTestRPCAndExpectItToReachBackend(ctx, t, cc, backendCh1) // Ensure a connection is established to the first RLS server. - val, err := lis1.newConnCh.Receive(ctx) + val, err := lis1.NewConnCh.Receive(ctx) if err != nil { t.Fatal("Timeout expired when waiting for LB policy to create control channel") } - conn1 := val.(*connWrapper) + conn1 := val.(*testutils.ConnWrapper) // Make sure an RLS request is sent out. verifyRLSRequest(t, rlsReqCh1, true) @@ -105,12 +106,12 @@ func (s) TestConfigUpdate_ControlChannel(t *testing.T) { r.UpdateState(resolver.State{ServiceConfig: sc}) // Ensure a connection is established to the second RLS server. - if _, err := lis2.newConnCh.Receive(ctx); err != nil { + if _, err := lis2.NewConnCh.Receive(ctx); err != nil { t.Fatal("Timeout expired when waiting for LB policy to create control channel") } // Ensure the connection to the old one is closed. - if _, err := conn1.closeCh.Receive(ctx); err != nil { + if _, err := conn1.CloseCh.Receive(ctx); err != nil { t.Fatal("Timeout expired when waiting for LB policy to close control channel") } @@ -136,8 +137,8 @@ func (s) TestConfigUpdate_ControlChannelWithCreds(t *testing.T) { } // Start an RLS server with the wrapped listener and credentials. - lis := newListenerWrapper(t, nil) - rlsServer, rlsReqCh := setupFakeRLSServer(t, lis, grpc.Creds(serverCreds)) + lis := testutils.NewListenerWrapper(t, nil) + rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, lis, grpc.Creds(serverCreds)) overrideAdaptiveThrottler(t, neverThrottlingThrottler()) // Build RLS service config. @@ -147,8 +148,8 @@ func (s) TestConfigUpdate_ControlChannelWithCreds(t *testing.T) { // and set up the fake RLS server to return this as the target in the RLS // response. backendCh, backendAddress := startBackend(t, grpc.Creds(serverCreds)) - rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse { - return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}} + rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse { + return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}} }) // Register a manual resolver and push the RLS service config through it. @@ -173,7 +174,7 @@ func (s) TestConfigUpdate_ControlChannelWithCreds(t *testing.T) { verifyRLSRequest(t, rlsReqCh, true) // Ensure a connection is established to the first RLS server. - if _, err := lis.newConnCh.Receive(ctx); err != nil { + if _, err := lis.NewConnCh.Receive(ctx); err != nil { t.Fatal("Timeout expired when waiting for LB policy to create control channel") } } @@ -184,7 +185,7 @@ func (s) TestConfigUpdate_ControlChannelWithCreds(t *testing.T) { // provided service config is applied for the control channel. func (s) TestConfigUpdate_ControlChannelServiceConfig(t *testing.T) { // Start an RLS server and set the throttler to never throttle requests. - rlsServer, rlsReqCh := setupFakeRLSServer(t, nil) + rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil) overrideAdaptiveThrottler(t, neverThrottlingThrottler()) // Register a balancer to be used for the control channel, and set up a @@ -211,8 +212,8 @@ func (s) TestConfigUpdate_ControlChannelServiceConfig(t *testing.T) { // Start a test backend, and set up the fake RLS server to return this as a // target in the RLS response. backendCh, backendAddress := startBackend(t) - rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse { - return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}} + rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse { + return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}} }) // Register a manual resolver and push the RLS service config through it. @@ -244,7 +245,7 @@ func (s) TestConfigUpdate_ControlChannelServiceConfig(t *testing.T) { // target after the config has been applied. func (s) TestConfigUpdate_DefaultTarget(t *testing.T) { // Start an RLS server and set the throttler to always throttle requests. - rlsServer, _ := setupFakeRLSServer(t, nil) + rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil) overrideAdaptiveThrottler(t, alwaysThrottlingThrottler()) // Build RLS service config with a default target. @@ -284,7 +285,7 @@ func (s) TestConfigUpdate_DefaultTarget(t *testing.T) { // child policy configuration are propagated correctly. func (s) TestConfigUpdate_ChildPolicyConfigs(t *testing.T) { // Start an RLS server and set the throttler to never throttle requests. - rlsServer, rlsReqCh := setupFakeRLSServer(t, nil) + rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil) overrideAdaptiveThrottler(t, neverThrottlingThrottler()) // Start a default backend and a test backend. @@ -292,8 +293,8 @@ func (s) TestConfigUpdate_ChildPolicyConfigs(t *testing.T) { testBackendCh, testBackendAddress := startBackend(t) // Set up the RLS server to respond with the test backend. - rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse { - return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}} + rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse { + return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}} }) // Set up a test balancer callback to push configs received by child policies. @@ -411,7 +412,7 @@ func (s) TestConfigUpdate_ChildPolicyConfigs(t *testing.T) { // handled by closing the old balancer and creating a new one. func (s) TestConfigUpdate_ChildPolicyChange(t *testing.T) { // Start an RLS server and set the throttler to never throttle requests. - rlsServer, _ := setupFakeRLSServer(t, nil) + rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil) overrideAdaptiveThrottler(t, neverThrottlingThrottler()) // Set up balancer callbacks. @@ -507,14 +508,14 @@ func (s) TestConfigUpdate_ChildPolicyChange(t *testing.T) { // the caller of the RPC. func (s) TestConfigUpdate_BadChildPolicyConfigs(t *testing.T) { // Start an RLS server and set the throttler to never throttle requests. - rlsServer, rlsReqCh := setupFakeRLSServer(t, nil) + rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil) overrideAdaptiveThrottler(t, neverThrottlingThrottler()) // Set up the RLS server to respond with a bad target field which is expected // to cause the child policy's ParseTarget to fail and should result in the LB // policy creating a lame child policy wrapper. - rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse { - return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{e2e.RLSChildPolicyBadTarget}}} + rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse { + return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{e2e.RLSChildPolicyBadTarget}}} }) // Build RLS service config with a default target. This default backend is @@ -567,7 +568,7 @@ func (s) TestConfigUpdate_DataCacheSizeDecrease(t *testing.T) { defer func() { minEvictDuration = origMinEvictDuration }() // Start an RLS server and set the throttler to never throttle requests. - rlsServer, rlsReqCh := setupFakeRLSServer(t, nil) + rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil) overrideAdaptiveThrottler(t, neverThrottlingThrottler()) // Register an LB policy to act as the child policy for RLS LB policy. @@ -582,14 +583,14 @@ func (s) TestConfigUpdate_DataCacheSizeDecrease(t *testing.T) { // these as targets in the RLS response, based on request keys. backendCh1, backendAddress1 := startBackend(t) backendCh2, backendAddress2 := startBackend(t) - rlsServer.SetResponseCallback(func(ctx context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse { + rlsServer.SetResponseCallback(func(ctx context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse { if req.KeyMap["k1"] == "v1" { - return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress1}}} + return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress1}}} } if req.KeyMap["k2"] == "v2" { - return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress2}}} + return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress2}}} } - return &e2e.RouteLookupResponse{Err: errors.New("no keys in request metadata")} + return &rlstest.RouteLookupResponse{Err: errors.New("no keys in request metadata")} }) // Register a manual resolver and push the RLS service config through it. @@ -661,7 +662,7 @@ func (s) TestDataCachePurging(t *testing.T) { defer func() { dataCachePurgeHook = origDataCachePurgeHook }() // Start an RLS server and set the throttler to never throttle requests. - rlsServer, rlsReqCh := setupFakeRLSServer(t, nil) + rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil) overrideAdaptiveThrottler(t, neverThrottlingThrottler()) // Register an LB policy to act as the child policy for RLS LB policy. @@ -678,8 +679,8 @@ func (s) TestDataCachePurging(t *testing.T) { // Start a test backend, and set up the fake RLS server to return this as a // target in the RLS response. backendCh, backendAddress := startBackend(t) - rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse { - return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}} + rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse { + return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}} }) // Register a manual resolver and push the RLS service config through it. @@ -740,7 +741,7 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) { // Start an RLS server with the restartable listener and set the throttler to // never throttle requests. - rlsServer, rlsReqCh := setupFakeRLSServer(t, lis) + rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, lis) overrideAdaptiveThrottler(t, neverThrottlingThrottler()) // Override the reset backoff hook to get notified. @@ -769,8 +770,8 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) { // Start a test backend, and set up the fake RLS server to return this as a // target in the RLS response. backendCh, backendAddress := startBackend(t) - rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse { - return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}} + rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse { + return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}} }) // Register a manual resolver and push the RLS service config through it. @@ -818,7 +819,11 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) { ctxOutgoing := metadata.AppendToOutgoingContext(ctx, "n1", "v1") makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh) - <-resetBackoffDone + select { + case <-ctx.Done(): + t.Fatalf("Timed out waiting for resetBackoffDone") + case <-resetBackoffDone: + } // The fact that the above RPC succeeded indicates that the control channel // has moved back to READY. The connectivity state monitoring code should have diff --git a/balancer/rls/control_channel_test.go b/balancer/rls/control_channel_test.go index 7ecdc6bec5fd..ccba8e36f976 100644 --- a/balancer/rls/control_channel_test.go +++ b/balancer/rls/control_channel_test.go @@ -32,11 +32,11 @@ import ( "github.com/google/go-cmp/cmp" "google.golang.org/grpc" "google.golang.org/grpc/balancer" - "google.golang.org/grpc/balancer/rls/internal/test/e2e" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/internal" rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1" + rlstest "google.golang.org/grpc/internal/testutils/rls" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "google.golang.org/grpc/testdata" @@ -47,7 +47,7 @@ import ( // indicates that the control channel needs to be throttled. func (s) TestControlChannelThrottled(t *testing.T) { // Start an RLS server and set the throttler to always throttle requests. - rlsServer, rlsReqCh := setupFakeRLSServer(t, nil) + rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil) overrideAdaptiveThrottler(t, alwaysThrottlingThrottler()) // Create a control channel to the fake RLS server. @@ -70,12 +70,12 @@ func (s) TestControlChannelThrottled(t *testing.T) { // TestLookupFailure tests the case where the RLS server responds with an error. func (s) TestLookupFailure(t *testing.T) { // Start an RLS server and set the throttler to never throttle requests. - rlsServer, _ := setupFakeRLSServer(t, nil) + rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil) overrideAdaptiveThrottler(t, neverThrottlingThrottler()) // Setup the RLS server to respond with errors. - rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse { - return &e2e.RouteLookupResponse{Err: errors.New("rls failure")} + rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse { + return &rlstest.RouteLookupResponse{Err: errors.New("rls failure")} }) // Create a control channel to the fake RLS server. @@ -114,7 +114,7 @@ func (s) TestLookupDeadlineExceeded(t *testing.T) { } // Start an RLS server and set the throttler to never throttle. - rlsServer, _ := setupFakeRLSServer(t, nil, grpc.UnaryInterceptor(interceptor)) + rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil, grpc.UnaryInterceptor(interceptor)) overrideAdaptiveThrottler(t, neverThrottlingThrottler()) // Create a control channel with a small deadline. @@ -246,7 +246,7 @@ var ( Reason: rlspb.RouteLookupRequest_REASON_MISS, StaleHeaderData: staleHeaderData, } - lookupResponse = &e2e.RouteLookupResponse{ + lookupResponse = &rlstest.RouteLookupResponse{ Resp: &rlspb.RouteLookupResponse{ Targets: wantTargets, HeaderData: wantHeaderData, @@ -256,11 +256,11 @@ var ( func testControlChannelCredsSuccess(t *testing.T, sopts []grpc.ServerOption, bopts balancer.BuildOptions) { // Start an RLS server and set the throttler to never throttle requests. - rlsServer, _ := setupFakeRLSServer(t, nil, sopts...) + rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil, sopts...) overrideAdaptiveThrottler(t, neverThrottlingThrottler()) // Setup the RLS server to respond with a valid response. - rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse { + rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse { return lookupResponse }) @@ -356,7 +356,7 @@ func testControlChannelCredsFailure(t *testing.T, sopts []grpc.ServerOption, bop // Start an RLS server and set the throttler to never throttle requests. The // creds failures happen before the RPC handler on the server is invoked. // So, there is need to setup the request and responses on the fake server. - rlsServer, _ := setupFakeRLSServer(t, nil, sopts...) + rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil, sopts...) overrideAdaptiveThrottler(t, neverThrottlingThrottler()) // Create the control channel to the fake server. @@ -454,7 +454,7 @@ func (*unsupportedCredsBundle) NewWithMode(mode string) (credentials.Bundle, err // TestNewControlChannelUnsupportedCredsBundle tests the case where the control // channel is configured with a bundle which does not support the mode we use. func (s) TestNewControlChannelUnsupportedCredsBundle(t *testing.T) { - rlsServer, _ := setupFakeRLSServer(t, nil) + rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil) // Create the control channel to the fake server. ctrlCh, err := newControlChannel(rlsServer.Address, "", defaultTestTimeout, balancer.BuildOptions{CredsBundle: &unsupportedCredsBundle{}}, nil) diff --git a/balancer/rls/helpers_test.go b/balancer/rls/helpers_test.go index 26123f8ce855..5fca54a63ace 100644 --- a/balancer/rls/helpers_test.go +++ b/balancer/rls/helpers_test.go @@ -20,7 +20,6 @@ package rls import ( "context" - "net" "strings" "sync" "testing" @@ -35,7 +34,6 @@ import ( rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1" internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" "google.golang.org/grpc/internal/stubserver" - "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" "google.golang.org/grpc/serviceconfig" @@ -62,52 +60,6 @@ func Test(t *testing.T) { grpctest.RunSubTests(t, s{}) } -// connWrapper wraps a net.Conn and pushes on a channel when closed. -type connWrapper struct { - net.Conn - closeCh *testutils.Channel -} - -func (cw *connWrapper) Close() error { - err := cw.Conn.Close() - cw.closeCh.Replace(nil) - return err -} - -// listenerWrapper wraps a net.Listener and the returned net.Conn. -// -// It pushes on a channel whenever it accepts a new connection. -type listenerWrapper struct { - net.Listener - newConnCh *testutils.Channel -} - -func (l *listenerWrapper) Accept() (net.Conn, error) { - c, err := l.Listener.Accept() - if err != nil { - return nil, err - } - closeCh := testutils.NewChannel() - conn := &connWrapper{Conn: c, closeCh: closeCh} - l.newConnCh.Send(conn) - return conn, nil -} - -func newListenerWrapper(t *testing.T, lis net.Listener) *listenerWrapper { - if lis == nil { - var err error - lis, err = testutils.LocalTCPListener() - if err != nil { - t.Fatal(err) - } - } - - return &listenerWrapper{ - Listener: lis, - newConnCh: testutils.NewChannel(), - } -} - // fakeBackoffStrategy is a fake implementation of the backoff.Strategy // interface, for tests to inject the backoff duration. type fakeBackoffStrategy struct { @@ -173,29 +125,6 @@ func overrideAdaptiveThrottler(t *testing.T, f *fakeThrottler) { t.Cleanup(func() { newAdaptiveThrottler = origAdaptiveThrottler }) } -// setupFakeRLSServer starts and returns a fake RouteLookupService server -// listening on the given listener or on a random local port. Also returns a -// channel for tests to get notified whenever the RouteLookup RPC is invoked on -// the fake server. -// -// This function sets up the fake server to respond with an empty response for -// the RouteLookup RPCs. Tests can override this by calling the -// SetResponseCallback() method on the returned fake server. -func setupFakeRLSServer(t *testing.T, lis net.Listener, opts ...grpc.ServerOption) (*e2e.FakeRouteLookupServer, chan struct{}) { - s, cancel := e2e.StartFakeRouteLookupServer(t, lis, opts...) - t.Logf("Started fake RLS server at %q", s.Address) - - ch := make(chan struct{}, 1) - s.SetRequestCallback(func(request *rlspb.RouteLookupRequest) { - select { - case ch <- struct{}{}: - default: - } - }) - t.Cleanup(cancel) - return s, ch -} - // buildBasicRLSConfig constructs a basic service config for the RLS LB policy // with header matching rules. This expects the passed child policy name to // have been registered by the caller. diff --git a/balancer/rls/picker_test.go b/balancer/rls/picker_test.go index cfe45477f0bc..a52aaa5e563d 100644 --- a/balancer/rls/picker_test.go +++ b/balancer/rls/picker_test.go @@ -25,10 +25,10 @@ import ( "time" "google.golang.org/grpc" - "google.golang.org/grpc/balancer/rls/internal/test/e2e" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1" + rlstest "google.golang.org/grpc/internal/testutils/rls" "google.golang.org/protobuf/types/known/durationpb" ) @@ -36,7 +36,7 @@ import ( // and no pending request either, and the ensuing RLS request is throttled. func (s) TestPick_DataCacheMiss_NoPendingEntry_ThrottledWithDefaultTarget(t *testing.T) { // Start an RLS server and set the throttler to always throttle requests. - rlsServer, rlsReqCh := setupFakeRLSServer(t, nil) + rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil) overrideAdaptiveThrottler(t, alwaysThrottlingThrottler()) // Build RLS service config with a default target. @@ -68,7 +68,7 @@ func (s) TestPick_DataCacheMiss_NoPendingEntry_ThrottledWithDefaultTarget(t *tes // expected to fail with an RLS throttled error. func (s) TestPick_DataCacheMiss_NoPendingEntry_ThrottledWithoutDefaultTarget(t *testing.T) { // Start an RLS server and set the throttler to always throttle requests. - rlsServer, rlsReqCh := setupFakeRLSServer(t, nil) + rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil) overrideAdaptiveThrottler(t, alwaysThrottlingThrottler()) // Build an RLS config without a default target. @@ -99,7 +99,7 @@ func (s) TestPick_DataCacheMiss_NoPendingEntry_ThrottledWithoutDefaultTarget(t * // deadline exceeded error. func (s) TestPick_DataCacheMiss_NoPendingEntry_NotThrottled(t *testing.T) { // Start an RLS server and set the throttler to never throttle requests. - rlsServer, rlsReqCh := setupFakeRLSServer(t, nil) + rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil) overrideAdaptiveThrottler(t, neverThrottlingThrottler()) // Build an RLS config without a default target. @@ -158,7 +158,7 @@ func (s) TestPick_DataCacheMiss_PendingEntryExists(t *testing.T) { } // Start an RLS server and set the throttler to never throttle. - rlsServer, _ := setupFakeRLSServer(t, nil, grpc.UnaryInterceptor(interceptor)) + rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil, grpc.UnaryInterceptor(interceptor)) overrideAdaptiveThrottler(t, neverThrottlingThrottler()) // Build RLS service config with an optional default target. @@ -203,7 +203,7 @@ func (s) TestPick_DataCacheMiss_PendingEntryExists(t *testing.T) { // delegated to the child policy. func (s) TestPick_DataCacheHit_NoPendingEntry_ValidEntry(t *testing.T) { // Start an RLS server and set the throttler to never throttle requests. - rlsServer, rlsReqCh := setupFakeRLSServer(t, nil) + rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil) overrideAdaptiveThrottler(t, neverThrottlingThrottler()) // Build the RLS config without a default target. @@ -212,8 +212,8 @@ func (s) TestPick_DataCacheHit_NoPendingEntry_ValidEntry(t *testing.T) { // Start a test backend, and setup the fake RLS server to return this as a // target in the RLS response. testBackendCh, testBackendAddress := startBackend(t) - rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse { - return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}} + rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse { + return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}} }) // Register a manual resolver and push the RLS service config through it. @@ -264,7 +264,7 @@ func (s) TestPick_DataCacheHit_NoPendingEntry_StaleEntry(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { // Start an RLS server and setup the throttler appropriately. - rlsServer, rlsReqCh := setupFakeRLSServer(t, nil) + rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil) var throttler *fakeThrottler if test.throttled { throttler = oneTimeAllowingThrottler() @@ -283,8 +283,8 @@ func (s) TestPick_DataCacheHit_NoPendingEntry_StaleEntry(t *testing.T) { // Start a test backend, and setup the fake RLS server to return // this as a target in the RLS response. testBackendCh, testBackendAddress := startBackend(t) - rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse { - return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}} + rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse { + return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}} }) // Register a manual resolver and push the RLS service config @@ -364,7 +364,7 @@ func (s) TestPick_DataCacheHit_NoPendingEntry_ExpiredEntry(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { // Start an RLS server and setup the throttler appropriately. - rlsServer, rlsReqCh := setupFakeRLSServer(t, nil) + rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil) var throttler *fakeThrottler if test.throttled { throttler = oneTimeAllowingThrottler() @@ -390,8 +390,8 @@ func (s) TestPick_DataCacheHit_NoPendingEntry_ExpiredEntry(t *testing.T) { // Start a test backend, and setup the fake RLS server to return // this as a target in the RLS response. testBackendCh, testBackendAddress := startBackend(t) - rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse { - return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}} + rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse { + return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}} }) // Register a manual resolver and push the RLS service config @@ -462,7 +462,7 @@ func (s) TestPick_DataCacheHit_NoPendingEntry_ExpiredEntryInBackoff(t *testing.T for _, test := range tests { t.Run(test.name, func(t *testing.T) { // Start an RLS server and set the throttler to never throttle requests. - rlsServer, rlsReqCh := setupFakeRLSServer(t, nil) + rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil) overrideAdaptiveThrottler(t, neverThrottlingThrottler()) // Override the backoff strategy to return a large backoff which @@ -488,8 +488,8 @@ func (s) TestPick_DataCacheHit_NoPendingEntry_ExpiredEntryInBackoff(t *testing.T // Start a test backend, and set up the fake RLS server to return this as // a target in the RLS response. testBackendCh, testBackendAddress := startBackend(t) - rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse { - return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}} + rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse { + return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}} }) // Register a manual resolver and push the RLS service config through it. @@ -513,8 +513,8 @@ func (s) TestPick_DataCacheHit_NoPendingEntry_ExpiredEntryInBackoff(t *testing.T // Set up the fake RLS server to return errors. This will push the cache // entry into backoff. var rlsLastErr = errors.New("last RLS request failed") - rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse { - return &e2e.RouteLookupResponse{Err: rlsLastErr} + rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse { + return &rlstest.RouteLookupResponse{Err: rlsLastErr} }) // Since the RLS server is now configured to return errors, this will push @@ -567,7 +567,7 @@ func (s) TestPick_DataCacheHit_PendingEntryExists_StaleEntry(t *testing.T) { } // Start an RLS server and set the throttler to never throttle. - rlsServer, _ := setupFakeRLSServer(t, nil, grpc.UnaryInterceptor(interceptor)) + rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil, grpc.UnaryInterceptor(interceptor)) overrideAdaptiveThrottler(t, neverThrottlingThrottler()) // Build RLS service config with an optional default target. @@ -584,8 +584,8 @@ func (s) TestPick_DataCacheHit_PendingEntryExists_StaleEntry(t *testing.T) { // Start a test backend, and setup the fake RLS server to return // this as a target in the RLS response. testBackendCh, testBackendAddress := startBackend(t) - rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse { - return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}} + rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse { + return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}} }) // Register a manual resolver and push the RLS service config @@ -662,7 +662,7 @@ func (s) TestPick_DataCacheHit_PendingEntryExists_ExpiredEntry(t *testing.T) { } // Start an RLS server and set the throttler to never throttle. - rlsServer, _ := setupFakeRLSServer(t, nil, grpc.UnaryInterceptor(interceptor)) + rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil, grpc.UnaryInterceptor(interceptor)) overrideAdaptiveThrottler(t, neverThrottlingThrottler()) // Build RLS service config with an optional default target. @@ -677,8 +677,8 @@ func (s) TestPick_DataCacheHit_PendingEntryExists_ExpiredEntry(t *testing.T) { // Start a test backend, and setup the fake RLS server to return // this as a target in the RLS response. testBackendCh, testBackendAddress := startBackend(t) - rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse { - return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}} + rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse { + return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}} }) // Register a manual resolver and push the RLS service config diff --git a/balancer/rls/internal/test/e2e/rls_fakeserver.go b/internal/testutils/rls/fake_rls_server.go similarity index 77% rename from balancer/rls/internal/test/e2e/rls_fakeserver.go rename to internal/testutils/rls/fake_rls_server.go index 521985412822..e64c9de3ae7f 100644 --- a/balancer/rls/internal/test/e2e/rls_fakeserver.go +++ b/internal/testutils/rls/fake_rls_server.go @@ -1,6 +1,6 @@ /* * - * Copyright 2020 gRPC authors. + * Copyright 2022 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,7 +16,8 @@ * */ -package e2e +// Package rls contains utilities for RouteLookupService e2e tests. +package rls import ( "context" @@ -39,6 +40,29 @@ type RouteLookupResponse struct { Err error } +// SetupFakeRLSServer starts and returns a fake RouteLookupService server +// listening on the given listener or on a random local port. Also returns a +// channel for tests to get notified whenever the RouteLookup RPC is invoked on +// the fake server. +// +// This function sets up the fake server to respond with an empty response for +// the RouteLookup RPCs. Tests can override this by calling the +// SetResponseCallback() method on the returned fake server. +func SetupFakeRLSServer(t *testing.T, lis net.Listener, opts ...grpc.ServerOption) (*FakeRouteLookupServer, chan struct{}) { + s, cancel := StartFakeRouteLookupServer(t, lis, opts...) + t.Logf("Started fake RLS server at %q", s.Address) + + ch := make(chan struct{}, 1) + s.SetRequestCallback(func(request *rlspb.RouteLookupRequest) { + select { + case ch <- struct{}{}: + default: + } + }) + t.Cleanup(cancel) + return s, ch +} + // FakeRouteLookupServer is a fake implementation of the RouteLookupService. // // It is safe for concurrent use. diff --git a/internal/testutils/wrappers.go b/internal/testutils/wrappers.go new file mode 100644 index 000000000000..c9b596d8851c --- /dev/null +++ b/internal/testutils/wrappers.go @@ -0,0 +1,74 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package testutils + +import ( + "net" + "testing" +) + +// ConnWrapper wraps a net.Conn and pushes on a channel when closed. +type ConnWrapper struct { + net.Conn + CloseCh *Channel +} + +// Close closes the connection and sends a value on the close channel. +func (cw *ConnWrapper) Close() error { + err := cw.Conn.Close() + cw.CloseCh.Replace(nil) + return err +} + +// ListenerWrapper wraps a net.Listener and the returned net.Conn. +// +// It pushes on a channel whenever it accepts a new connection. +type ListenerWrapper struct { + net.Listener + NewConnCh *Channel +} + +// Accept wraps the Listener Accept and sends the accepted connection on a +// channel. +func (l *ListenerWrapper) Accept() (net.Conn, error) { + c, err := l.Listener.Accept() + if err != nil { + return nil, err + } + closeCh := NewChannel() + conn := &ConnWrapper{Conn: c, CloseCh: closeCh} + l.NewConnCh.Send(conn) + return conn, nil +} + +// NewListenerWrapper returns a ListenerWrapper. +func NewListenerWrapper(t *testing.T, lis net.Listener) *ListenerWrapper { + if lis == nil { + var err error + lis, err = LocalTCPListener() + if err != nil { + t.Fatal(err) + } + } + + return &ListenerWrapper{ + Listener: lis, + NewConnCh: NewChannel(), + } +} diff --git a/xds/internal/clusterspecifier/cluster_specifier.go b/xds/internal/clusterspecifier/cluster_specifier.go index 9bb30f16589f..b95a101116ed 100644 --- a/xds/internal/clusterspecifier/cluster_specifier.go +++ b/xds/internal/clusterspecifier/cluster_specifier.go @@ -65,3 +65,8 @@ func Register(cs ClusterSpecifier) { func Get(typeURL string) ClusterSpecifier { return m[typeURL] } + +// UnregisterForTesting unregisters the ClusterSpecifier for testing purposes. +func UnregisterForTesting(typeURL string) { + delete(m, typeURL) +} diff --git a/xds/internal/clusterspecifier/rls/rls.go b/xds/internal/clusterspecifier/rls/rls.go index 037795834c9b..69fb7f4a9098 100644 --- a/xds/internal/clusterspecifier/rls/rls.go +++ b/xds/internal/clusterspecifier/rls/rls.go @@ -40,6 +40,28 @@ func init() { } } +// RegisterForTesting registers the RLS Cluster Specifier Plugin for testing +// purposes, regardless of the XDSRLS environment variable. This is needed +// because there is no way to set the XDSRLS environment variable to true in a +// test before init() in this package is run. +// +// TODO: Remove this function once the RLS env var is removed. +func RegisterForTesting() { + clusterspecifier.Register(rls{}) +} + +// UnregisterForTesting unregisters the RLS Cluster Specifier Plugin for testing +// purposes. This is needed because there is no way to unregister the RLS +// Cluster Specifier Plugin after registering it solely for testing purposes +// using rls.RegisterForTesting(). +// +// TODO: Remove this function once the RLS env var is removed. +func UnregisterForTesting() { + for _, typeURL := range rls.TypeURLs(rls{}) { + clusterspecifier.UnregisterForTesting(typeURL) + } +} + type rls struct{} func (rls) TypeURLs() []string { diff --git a/xds/internal/httpfilter/rbac/rbac.go b/xds/internal/httpfilter/rbac/rbac.go index bb85dc80d460..3dc4b56826e6 100644 --- a/xds/internal/httpfilter/rbac/rbac.go +++ b/xds/internal/httpfilter/rbac/rbac.go @@ -43,16 +43,21 @@ func init() { } } -// RegisterForTesting registers the RBAC HTTP Filter for testing purposes, regardless -// of the RBAC environment variable. This is needed because there is no way to set the RBAC -// environment variable to true in a test before init() in this package is run. +// RegisterForTesting registers the RBAC HTTP Filter for testing purposes, +// regardless of the RBAC environment variable. This is needed because there is +// no way to set the RBAC environment variable to true in a test before init() +// in this package is run. +// +// TODO: Remove this function once the RBAC env var is removed. func RegisterForTesting() { httpfilter.Register(builder{}) } -// UnregisterForTesting unregisters the RBAC HTTP Filter for testing purposes. This is needed because -// there is no way to unregister the HTTP Filter after registering it solely for testing purposes using -// rbac.RegisterForTesting() +// UnregisterForTesting unregisters the RBAC HTTP Filter for testing purposes. +// This is needed because there is no way to unregister the HTTP Filter after +// registering it solely for testing purposes using rbac.RegisterForTesting(). +// +// TODO: Remove this function once the RBAC env var is removed. func UnregisterForTesting() { for _, typeURL := range builder.TypeURLs(builder{}) { httpfilter.UnregisterForTesting(typeURL) diff --git a/xds/internal/test/xds_client_integration_test.go b/xds/internal/test/xds_client_integration_test.go index 6a9a8c9688f0..e9e3fd584bf3 100644 --- a/xds/internal/test/xds_client_integration_test.go +++ b/xds/internal/test/xds_client_integration_test.go @@ -30,11 +30,23 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/envconfig" + rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1" "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" + rlstest "google.golang.org/grpc/internal/testutils/rls" "google.golang.org/grpc/status" + rlscsp "google.golang.org/grpc/xds/internal/clusterspecifier/rls" "google.golang.org/grpc/xds/internal/testutils/e2e" + "google.golang.org/protobuf/types/known/durationpb" + _ "google.golang.org/grpc/balancer/rls" // Register the RLS Load Balancing policy. + _ "google.golang.org/grpc/xds/internal/clusterspecifier/rls" // Register the RLS Cluster Specifier Plugin. + + v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" wrapperspb "github.com/golang/protobuf/ptypes/wrappers" testpb "google.golang.org/grpc/test/grpc_testing" @@ -246,3 +258,127 @@ func (s) TestClientSideRetry(t *testing.T) { }) } } + +// defaultClientResourcesWithRLSCSP returns a set of resources (LDS, RDS, CDS, EDS) for a +// client to connect to a server with a RLS Load Balancer as a child of Cluster Manager. +func defaultClientResourcesWithRLSCSP(params e2e.ResourceParams, rlsProto *rlspb.RouteLookupConfig) e2e.UpdateOptions { + routeConfigName := "route-" + params.DialTarget + clusterName := "cluster-" + params.DialTarget + endpointsName := "endpoints-" + params.DialTarget + return e2e.UpdateOptions{ + NodeID: params.NodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(params.DialTarget, routeConfigName)}, + Routes: []*v3routepb.RouteConfiguration{defaultRouteConfigWithRLSCSP(routeConfigName, params.DialTarget, rlsProto)}, + Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, endpointsName, params.SecLevel)}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(endpointsName, params.Host, []uint32{params.Port})}, + } +} + +// defaultRouteConfigWithRLSCSP returns a basic xds RouteConfig resource with an +// RLS Cluster Specifier Plugin configured as the route. +func defaultRouteConfigWithRLSCSP(routeName, ldsTarget string, rlsProto *rlspb.RouteLookupConfig) *v3routepb.RouteConfiguration { + return &v3routepb.RouteConfiguration{ + Name: routeName, + ClusterSpecifierPlugins: []*v3routepb.ClusterSpecifierPlugin{ + { + Extension: &v3corepb.TypedExtensionConfig{ + Name: "rls-csp", + TypedConfig: testutils.MarshalAny(&rlspb.RouteLookupClusterSpecifier{ + RouteLookupConfig: rlsProto, + }), + }, + }, + }, + VirtualHosts: []*v3routepb.VirtualHost{{ + Domains: []string{ldsTarget}, + Routes: []*v3routepb.Route{{ + Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}}, + Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_ClusterSpecifierPlugin{ClusterSpecifierPlugin: "rls-csp"}, + }}, + }}, + }}, + } +} + +// TestRLSinxDS tests an xDS configured system with a RLS Balancer present. +// This test sets up the RLS Balancer using the RLS Cluster Specifier Plugin, +// spins up a test service and has a fake RLS Server correctly respond with a target +// corresponding to this test service. This test asserts an RPC proceeds as normal +// with the RLS Balancer as part of system. +func (s) TestRLSinxDS(t *testing.T) { + oldRLS := envconfig.XDSRLS + envconfig.XDSRLS = true + rlscsp.RegisterForTesting() + defer func() { + envconfig.XDSRLS = oldRLS + rlscsp.UnregisterForTesting() + }() + + // Set up all components and configuration necessary - management server, + // xDS resolver, fake RLS Server, and xDS configuration which specifies a + // RLS Balancer that communicates to this set up fake RLS Server. + managementServer, nodeID, _, resolver, cleanup1 := setupManagementServer(t) + defer cleanup1() + port, cleanup2 := clientSetup(t, &testService{}) + defer cleanup2() + + lis := testutils.NewListenerWrapper(t, nil) + rlsServer, rlsRequestCh := rlstest.SetupFakeRLSServer(t, lis) + rlsProto := &rlspb.RouteLookupConfig{ + GrpcKeybuilders: []*rlspb.GrpcKeyBuilder{{Names: []*rlspb.GrpcKeyBuilder_Name{{Service: "grpc.testing.TestService"}}}}, + LookupService: rlsServer.Address, + LookupServiceTimeout: durationpb.New(defaultTestTimeout), + CacheSizeBytes: 1024, + } + + const serviceName = "my-service-client-side-xds" + resources := defaultClientResourcesWithRLSCSP(e2e.ResourceParams{ + DialTarget: serviceName, + NodeID: nodeID, + Host: "localhost", + Port: port, + SecLevel: e2e.SecurityLevelNone, + }, rlsProto) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Configure the fake RLS Server to set the RLS Balancers child CDS + // Cluster's name as the target for the RPC to use. + rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse { + return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{"cluster-" + serviceName}}} + }) + + // Create a ClientConn and make a successful RPC. + cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver)) + if err != nil { + t.Fatalf("failed to dial local test server: %v", err) + } + defer cc.Close() + + client := testpb.NewTestServiceClient(cc) + // Successfully sending the RPC will require the RLS Load Balancer to + // communicate with the fake RLS Server for information about the target. + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + t.Fatalf("rpc EmptyCall() failed: %v", err) + } + + // These RLS Verifications makes sure the RLS Load Balancer is actually part + // of the xDS Configured system that correctly sends out RPC. + + // Verify connection is established to RLS Server. + if _, err = lis.NewConnCh.Receive(ctx); err != nil { + t.Fatal("Timeout when waiting for RLS LB policy to create control channel") + } + + // Verify an rls request is sent out to fake RLS Server. + select { + case <-ctx.Done(): + t.Fatalf("Timeout when waiting for an RLS request to be sent out") + case <-rlsRequestCh: + } +}