From 229c8b4adc1920f01debe3cd10a82ba2f9ea1041 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Thu, 3 Jun 2021 10:54:48 -0700 Subject: [PATCH 01/16] [xds_client_in_attributes] xds: cleanup xds balancer tests to pass xds_client in resolver state RELEASE NOTES: N/A --- xds/internal/xdsclient/attributes.go | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/xds/internal/xdsclient/attributes.go b/xds/internal/xdsclient/attributes.go index 99060177e1e3..1ccd95c73a80 100644 --- a/xds/internal/xdsclient/attributes.go +++ b/xds/internal/xdsclient/attributes.go @@ -17,20 +17,34 @@ package xdsclient -import "google.golang.org/grpc/resolver" +import ( + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/xds/internal/client/bootstrap" + "google.golang.org/grpc/xds/internal/client/load" +) type clientKeyType string const clientKey = clientKeyType("grpc.xds.internal.client.Client") +// Interface contains a subset of the *Client methods that are needed by the +// balancers. +type Interface interface { + WatchCluster(string, func(ClusterUpdate, error)) func() + WatchEndpoints(clusterName string, edsCb func(EndpointsUpdate, error)) (cancel func()) + BootstrapConfig() *bootstrap.Config + ReportLoad(server string) (*load.Store, func()) + Close() +} + // FromResolverState returns the Client from state, or nil if not present. -func FromResolverState(state resolver.State) *Client { - cs, _ := state.Attributes.Value(clientKey).(*Client) +func FromResolverState(state resolver.State) Interface { + cs, _ := state.Attributes.Value(clientKey).(Interface) return cs } // SetClient sets c in state and returns the new state. -func SetClient(state resolver.State, c *Client) resolver.State { +func SetClient(state resolver.State, c Interface) resolver.State { state.Attributes = state.Attributes.WithValues(clientKey, c) return state } From a869c3a6666d4da59b841d6e5419340df65a157d Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Thu, 3 Jun 2021 11:10:23 -0700 Subject: [PATCH 02/16] [xds_client_in_attributes] lrs --- xds/internal/balancer/lrs/balancer.go | 27 ++-------------------- xds/internal/balancer/lrs/balancer_test.go | 9 +++----- 2 files changed, 5 insertions(+), 31 deletions(-) diff --git a/xds/internal/balancer/lrs/balancer.go b/xds/internal/balancer/lrs/balancer.go index 75a8cbb0dd7b..f19fc374b013 100644 --- a/xds/internal/balancer/lrs/balancer.go +++ b/xds/internal/balancer/lrs/balancer.go @@ -36,8 +36,6 @@ func init() { balancer.Register(bb{}) } -var newXDSClient func() (xdsClient, error) - // Name is the name of the LRS balancer. const Name = "lrs_experimental" @@ -50,17 +48,6 @@ func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Bal } b.logger = prefixLogger(b) b.logger.Infof("Created") - - if newXDSClient != nil { - // For tests - client, err := newXDSClient() - if err != nil { - b.logger.Errorf("failed to create xds-client: %v", err) - return nil - } - b.xdsClient = newXDSClientWrapper(client) - } - return b } @@ -169,15 +156,8 @@ func (ccw *ccWrapper) UpdateState(s balancer.State) { ccw.ClientConn.UpdateState(s) } -// xdsClient contains only the xds_client methods needed by LRS -// balancer. It's defined so we can override xdsclient in tests. -type xdsClient interface { - ReportLoad(server string) (*load.Store, func()) - Close() -} - type xdsClientWrapper struct { - c xdsClient + c xdsclient.Interface cancelLoadReport func() clusterName string edsServiceName string @@ -187,7 +167,7 @@ type xdsClientWrapper struct { loadWrapper *loadstore.Wrapper } -func newXDSClientWrapper(c xdsClient) *xdsClientWrapper { +func newXDSClientWrapper(c xdsclient.Interface) *xdsClientWrapper { return &xdsClientWrapper{ c: c, loadWrapper: loadstore.NewWrapper(), @@ -256,7 +236,4 @@ func (w *xdsClientWrapper) close() { w.cancelLoadReport() w.cancelLoadReport = nil } - if newXDSClient != nil { - w.c.Close() - } } diff --git a/xds/internal/balancer/lrs/balancer_test.go b/xds/internal/balancer/lrs/balancer_test.go index 9ffa2894dad8..5308d96cc734 100644 --- a/xds/internal/balancer/lrs/balancer_test.go +++ b/xds/internal/balancer/lrs/balancer_test.go @@ -33,6 +33,7 @@ import ( internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" "google.golang.org/grpc/resolver" xdsinternal "google.golang.org/grpc/xds/internal" + xdsclient "google.golang.org/grpc/xds/internal/client" "google.golang.org/grpc/xds/internal/testutils" "google.golang.org/grpc/xds/internal/testutils/fakeclient" ) @@ -55,9 +56,7 @@ var ( // server (empty string). func TestLoadReporting(t *testing.T) { xdsC := fakeclient.NewClient() - oldNewXDSClient := newXDSClient - newXDSClient = func() (xdsClient, error) { return xdsC, nil } - defer func() { newXDSClient = oldNewXDSClient }() + defer xdsC.Close() builder := balancer.Get(Name) cc := testutils.NewTestClientConn(t) @@ -65,9 +64,7 @@ func TestLoadReporting(t *testing.T) { defer lrsB.Close() if err := lrsB.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{ - Addresses: testBackendAddrs, - }, + ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), BalancerConfig: &LBConfig{ ClusterName: testClusterName, EDSServiceName: testServiceName, From d718b2447cd802e4b42537a0dbb7eb902389b7af Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Thu, 3 Jun 2021 11:14:08 -0700 Subject: [PATCH 03/16] [xds_client_in_attributes] cluster impl --- .../balancer/clusterimpl/balancer_test.go | 48 +++++-------------- .../balancer/clusterimpl/clusterimpl.go | 29 ++--------- 2 files changed, 15 insertions(+), 62 deletions(-) diff --git a/xds/internal/balancer/clusterimpl/balancer_test.go b/xds/internal/balancer/clusterimpl/balancer_test.go index 404dfb22d005..eaaa73dabf1b 100644 --- a/xds/internal/balancer/clusterimpl/balancer_test.go +++ b/xds/internal/balancer/clusterimpl/balancer_test.go @@ -74,9 +74,7 @@ func init() { func TestDropByCategory(t *testing.T) { defer xdsclient.ClearCounterForTesting(testClusterName) xdsC := fakeclient.NewClient() - oldNewXDSClient := newXDSClient - newXDSClient = func() (xdsClient, error) { return xdsC, nil } - defer func() { newXDSClient = oldNewXDSClient }() + defer xdsC.Close() builder := balancer.Get(Name) cc := testutils.NewTestClientConn(t) @@ -89,9 +87,7 @@ func TestDropByCategory(t *testing.T) { dropDenominator = 2 ) if err := b.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{ - Addresses: testBackendAddrs, - }, + ResolverState: client.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), BalancerConfig: &LBConfig{ Cluster: testClusterName, EDSServiceName: testServiceName, @@ -176,9 +172,7 @@ func TestDropByCategory(t *testing.T) { dropDenominator2 = 4 ) if err := b.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{ - Addresses: testBackendAddrs, - }, + ResolverState: client.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), BalancerConfig: &LBConfig{ Cluster: testClusterName, EDSServiceName: testServiceName, @@ -232,9 +226,7 @@ func TestDropByCategory(t *testing.T) { func TestDropCircuitBreaking(t *testing.T) { defer xdsclient.ClearCounterForTesting(testClusterName) xdsC := fakeclient.NewClient() - oldNewXDSClient := newXDSClient - newXDSClient = func() (xdsClient, error) { return xdsC, nil } - defer func() { newXDSClient = oldNewXDSClient }() + defer xdsC.Close() builder := balancer.Get(Name) cc := testutils.NewTestClientConn(t) @@ -243,9 +235,7 @@ func TestDropCircuitBreaking(t *testing.T) { var maxRequest uint32 = 50 if err := b.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{ - Addresses: testBackendAddrs, - }, + ResolverState: client.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), BalancerConfig: &LBConfig{ Cluster: testClusterName, EDSServiceName: testServiceName, @@ -344,9 +334,7 @@ func TestDropCircuitBreaking(t *testing.T) { func TestPickerUpdateAfterClose(t *testing.T) { defer xdsclient.ClearCounterForTesting(testClusterName) xdsC := fakeclient.NewClient() - oldNewXDSClient := newXDSClient - newXDSClient = func() (xdsClient, error) { return xdsC, nil } - defer func() { newXDSClient = oldNewXDSClient }() + defer xdsC.Close() builder := balancer.Get(Name) cc := testutils.NewTestClientConn(t) @@ -354,9 +342,7 @@ func TestPickerUpdateAfterClose(t *testing.T) { var maxRequest uint32 = 50 if err := b.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{ - Addresses: testBackendAddrs, - }, + ResolverState: client.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), BalancerConfig: &LBConfig{ Cluster: testClusterName, EDSServiceName: testServiceName, @@ -389,9 +375,7 @@ func TestPickerUpdateAfterClose(t *testing.T) { func TestClusterNameInAddressAttributes(t *testing.T) { defer xdsclient.ClearCounterForTesting(testClusterName) xdsC := fakeclient.NewClient() - oldNewXDSClient := newXDSClient - newXDSClient = func() (xdsClient, error) { return xdsC, nil } - defer func() { newXDSClient = oldNewXDSClient }() + defer xdsC.Close() builder := balancer.Get(Name) cc := testutils.NewTestClientConn(t) @@ -399,9 +383,7 @@ func TestClusterNameInAddressAttributes(t *testing.T) { defer b.Close() if err := b.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{ - Addresses: testBackendAddrs, - }, + ResolverState: client.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), BalancerConfig: &LBConfig{ Cluster: testClusterName, EDSServiceName: testServiceName, @@ -450,9 +432,7 @@ func TestClusterNameInAddressAttributes(t *testing.T) { const testClusterName2 = "test-cluster-2" var addr2 = resolver.Address{Addr: "2.2.2.2"} if err := b.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{ - Addresses: []resolver.Address{addr2}, - }, + ResolverState: client.SetClient(resolver.State{Addresses: []resolver.Address{addr2}}, xdsC), BalancerConfig: &LBConfig{ Cluster: testClusterName2, EDSServiceName: testServiceName, @@ -480,9 +460,7 @@ func TestClusterNameInAddressAttributes(t *testing.T) { func TestReResolution(t *testing.T) { defer xdsclient.ClearCounterForTesting(testClusterName) xdsC := fakeclient.NewClient() - oldNewXDSClient := newXDSClient - newXDSClient = func() (xdsClient, error) { return xdsC, nil } - defer func() { newXDSClient = oldNewXDSClient }() + defer xdsC.Close() builder := balancer.Get(Name) cc := testutils.NewTestClientConn(t) @@ -490,9 +468,7 @@ func TestReResolution(t *testing.T) { defer b.Close() if err := b.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{ - Addresses: testBackendAddrs, - }, + ResolverState: client.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), BalancerConfig: &LBConfig{ Cluster: testClusterName, EDSServiceName: testServiceName, diff --git a/xds/internal/balancer/clusterimpl/clusterimpl.go b/xds/internal/balancer/clusterimpl/clusterimpl.go index 9f3acafbc92b..b338eed8121c 100644 --- a/xds/internal/balancer/clusterimpl/clusterimpl.go +++ b/xds/internal/balancer/clusterimpl/clusterimpl.go @@ -52,8 +52,6 @@ func init() { balancer.Register(bb{}) } -var newXDSClient func() (xdsClient, error) - type bb struct{} func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer { @@ -67,18 +65,7 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba requestCountMax: defaultRequestCountMax, } b.logger = prefixLogger(b) - - if newXDSClient != nil { - // For tests - client, err := newXDSClient() - if err != nil { - b.logger.Errorf("failed to create xds-client: %v", err) - return nil - } - b.xdsClient = client - } go b.run() - b.logger.Infof("Created") return b } @@ -91,13 +78,6 @@ func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, err return parseConfig(c) } -// xdsClient contains only the xds_client methods needed by LRS -// balancer. It's defined so we can override xdsclient in tests. -type xdsClient interface { - ReportLoad(server string) (*load.Store, func()) - Close() -} - type clusterImplBalancer struct { balancer.ClientConn @@ -113,9 +93,9 @@ type clusterImplBalancer struct { closed *grpcsync.Event done *grpcsync.Event - bOpts balancer.BuildOptions - logger *grpclog.PrefixLogger - xdsClient xdsClient + bOpts balancer.BuildOptions + logger *grpclog.PrefixLogger + xdsC xdsclient.Interface config *LBConfig childLB balancer.Balancer @@ -328,9 +308,6 @@ func (b *clusterImplBalancer) Close() { b.childLB.Close() b.childLB = nil } - if newXDSClient != nil { - b.xdsClient.Close() - } <-b.done.Done() b.logger.Infof("Shutdown") } From f65828f512b22c6b91dcbe4f3e06c994b1609b14 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Thu, 3 Jun 2021 11:19:48 -0700 Subject: [PATCH 04/16] [xds_client_in_attributes] eds --- xds/internal/balancer/edsbalancer/eds.go | 25 +------------------ xds/internal/balancer/edsbalancer/eds_test.go | 19 +++++++++++--- .../balancer/edsbalancer/xds_lrs_test.go | 7 +++--- 3 files changed, 20 insertions(+), 31 deletions(-) diff --git a/xds/internal/balancer/edsbalancer/eds.go b/xds/internal/balancer/edsbalancer/eds.go index ffc46cea469b..4ecba5683e9a 100644 --- a/xds/internal/balancer/edsbalancer/eds.go +++ b/xds/internal/balancer/edsbalancer/eds.go @@ -41,19 +41,10 @@ import ( const edsName = "eds_experimental" -// xdsClient contains only the xds_client methods needed by EDS -// balancer. It's defined so we can override xdsclient.New function in tests. -type xdsClient interface { - WatchEndpoints(clusterName string, edsCb func(xdsclient.EndpointsUpdate, error)) (cancel func()) - ReportLoad(server string) (loadStore *load.Store, cancel func()) - Close() -} - var ( newEDSBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions, enqueueState func(priorityType, balancer.State), lw load.PerClusterReporter, logger *grpclog.PrefixLogger) edsBalancerImplInterface { return newEDSBalancerImpl(cc, opts, enqueueState, lw, logger) } - newXDSClient func() (xdsClient, error) ) func init() { @@ -74,17 +65,6 @@ func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Bal config: &EDSConfig{}, } x.logger = prefixLogger(x) - - if newXDSClient != nil { - // For tests - client, err := newXDSClient() - if err != nil { - x.logger.Errorf("xds: failed to create xds-client: %v", err) - return nil - } - x.xdsClient = client - } - x.edsImpl = newEDSBalancer(x.cc, opts, x.enqueueChildBalancerState, x.loadWrapper, x.logger) x.logger.Infof("Created") go x.run() @@ -144,7 +124,7 @@ type edsBalancer struct { xdsClientUpdate chan *edsUpdate childPolicyUpdate *buffer.Unbounded - xdsClient xdsClient + xdsClient xdsclient.Interface loadWrapper *loadstore.Wrapper config *EDSConfig // may change when passed a different service config edsImpl edsBalancerImplInterface @@ -174,9 +154,6 @@ func (b *edsBalancer) run() { b.edsImpl.updateState(u.priority, u.s) case <-b.closed.Done(): b.cancelWatch() - if newXDSClient != nil { - b.xdsClient.Close() - } b.edsImpl.close() b.logger.Infof("Shutdown") b.done.Fire() diff --git a/xds/internal/balancer/edsbalancer/eds_test.go b/xds/internal/balancer/edsbalancer/eds_test.go index 7e16076751ab..c20e8206b9ec 100644 --- a/xds/internal/balancer/edsbalancer/eds_test.go +++ b/xds/internal/balancer/edsbalancer/eds_test.go @@ -255,8 +255,6 @@ func waitForNewEDSLB(ctx context.Context, ch *testutils.Channel) (*fakeEDSBalanc // cleanup. func setup(edsLBCh *testutils.Channel) (*fakeclient.Client, func()) { xdsC := fakeclient.NewClientWithName(testBalancerNameFooBar) - oldNewXDSClient := newXDSClient - newXDSClient = func() (xdsClient, error) { return xdsC, nil } origNewEDSBalancer := newEDSBalancer newEDSBalancer = func(cc balancer.ClientConn, _ balancer.BuildOptions, _ func(priorityType, balancer.State), _ load.PerClusterReporter, _ *grpclog.PrefixLogger) edsBalancerImplInterface { @@ -266,7 +264,7 @@ func setup(edsLBCh *testutils.Channel) (*fakeclient.Client, func()) { } return xdsC, func() { newEDSBalancer = origNewEDSBalancer - newXDSClient = oldNewXDSClient + xdsC.Close() } } @@ -348,6 +346,7 @@ func (s) TestConfigChildPolicyUpdate(t *testing.T) { Config: json.RawMessage("{}"), } if err := edsB.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), BalancerConfig: &EDSConfig{ ChildPolicy: lbCfgA, ClusterName: testEDSClusterName, @@ -377,6 +376,7 @@ func (s) TestConfigChildPolicyUpdate(t *testing.T) { Config: json.RawMessage("{}"), } if err := edsB.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), BalancerConfig: &EDSConfig{ ChildPolicy: lbCfgB, ClusterName: testEDSClusterName, @@ -421,6 +421,7 @@ func (s) TestSubConnStateChange(t *testing.T) { } if err := edsB.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), BalancerConfig: &EDSConfig{EDSServiceName: testServiceName}, }); err != nil { t.Fatalf("edsB.UpdateClientConnState() failed: %v", err) @@ -467,6 +468,7 @@ func (s) TestErrorFromXDSClientUpdate(t *testing.T) { } if err := edsB.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), BalancerConfig: &EDSConfig{EDSServiceName: testServiceName}, }); err != nil { t.Fatal(err) @@ -511,6 +513,7 @@ func (s) TestErrorFromXDSClientUpdate(t *testing.T) { // An update with the same service name should not trigger a new watch. if err := edsB.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), BalancerConfig: &EDSConfig{EDSServiceName: testServiceName}, }); err != nil { t.Fatal(err) @@ -549,6 +552,7 @@ func (s) TestErrorFromResolver(t *testing.T) { } if err := edsB.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), BalancerConfig: &EDSConfig{EDSServiceName: testServiceName}, }); err != nil { t.Fatal(err) @@ -589,6 +593,7 @@ func (s) TestErrorFromResolver(t *testing.T) { // An update with the same service name should trigger a new watch, because // the previous watch was canceled. if err := edsB.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), BalancerConfig: &EDSConfig{EDSServiceName: testServiceName}, }); err != nil { t.Fatal(err) @@ -640,6 +645,7 @@ func (s) TestClientWatchEDS(t *testing.T) { defer cancel() // If eds service name is not set, should watch for cluster name. if err := edsB.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), BalancerConfig: &EDSConfig{ClusterName: "cluster-1"}, }); err != nil { t.Fatal(err) @@ -651,6 +657,7 @@ func (s) TestClientWatchEDS(t *testing.T) { // Update with an non-empty edsServiceName should trigger an EDS watch for // the same. if err := edsB.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), BalancerConfig: &EDSConfig{EDSServiceName: "foobar-1"}, }); err != nil { t.Fatal(err) @@ -664,6 +671,7 @@ func (s) TestClientWatchEDS(t *testing.T) { // registered watch will be cancelled, which will result in an EDS request // with no resource names being sent to the server. if err := edsB.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), BalancerConfig: &EDSConfig{EDSServiceName: "foobar-2"}, }); err != nil { t.Fatal(err) @@ -677,7 +685,7 @@ func (s) TestClientWatchEDS(t *testing.T) { // service name from an update's config. func (s) TestCounterUpdate(t *testing.T) { edsLBCh := testutils.NewChannel() - _, cleanup := setup(edsLBCh) + xdsC, cleanup := setup(edsLBCh) defer cleanup() builder := balancer.Get(edsName) @@ -690,6 +698,7 @@ func (s) TestCounterUpdate(t *testing.T) { var testCountMax uint32 = 100 // Update should trigger counter update with provided service name. if err := edsB.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), BalancerConfig: &EDSConfig{ ClusterName: "foobar-1", MaxConcurrentRequests: &testCountMax, @@ -724,6 +733,7 @@ func (s) TestClusterNameUpdateInAddressAttributes(t *testing.T) { // Update should trigger counter update with provided service name. if err := edsB.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), BalancerConfig: &EDSConfig{ ClusterName: "foobar-1", }, @@ -743,6 +753,7 @@ func (s) TestClusterNameUpdateInAddressAttributes(t *testing.T) { // Update should trigger counter update with provided service name. if err := edsB.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), BalancerConfig: &EDSConfig{ ClusterName: "foobar-2", }, diff --git a/xds/internal/balancer/edsbalancer/xds_lrs_test.go b/xds/internal/balancer/edsbalancer/xds_lrs_test.go index d5b40dd98d32..86e36fceaedf 100644 --- a/xds/internal/balancer/edsbalancer/xds_lrs_test.go +++ b/xds/internal/balancer/edsbalancer/xds_lrs_test.go @@ -25,6 +25,8 @@ import ( "testing" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/resolver" + xdsclient "google.golang.org/grpc/xds/internal/client" "google.golang.org/grpc/xds/internal/testutils/fakeclient" ) @@ -33,9 +35,7 @@ import ( // server (empty string). func (s) TestXDSLoadReporting(t *testing.T) { xdsC := fakeclient.NewClient() - oldNewXDSClient := newXDSClient - newXDSClient = func() (xdsClient, error) { return xdsC, nil } - defer func() { newXDSClient = oldNewXDSClient }() + defer xdsC.Close() builder := balancer.Get(edsName) edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{}) @@ -45,6 +45,7 @@ func (s) TestXDSLoadReporting(t *testing.T) { defer edsB.Close() if err := edsB.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), BalancerConfig: &EDSConfig{ EDSServiceName: testEDSClusterName, LrsLoadReportingServerName: new(string), From f0d9bf4655ea9c554920d5be9b4def98cc5c5b92 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Thu, 3 Jun 2021 11:43:17 -0700 Subject: [PATCH 05/16] [xds_client_in_attributes] cds --- .../balancer/cdsbalancer/cdsbalancer.go | 26 +------------------ .../cdsbalancer/cdsbalancer_security_test.go | 8 ++---- .../balancer/cdsbalancer/cdsbalancer_test.go | 25 +++++++++--------- .../balancer/cdsbalancer/cluster_handler.go | 4 +-- 4 files changed, 17 insertions(+), 46 deletions(-) diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer.go b/xds/internal/balancer/cdsbalancer/cdsbalancer.go index 7278c624361d..2b1318a1fdcb 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer.go @@ -36,7 +36,6 @@ import ( "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/xds/internal/balancer/edsbalancer" "google.golang.org/grpc/xds/internal/xdsclient" - "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" ) const ( @@ -59,7 +58,6 @@ var ( // not deal with subConns. return builder.Build(cc, opts), nil } - newXDSClient func() (xdsClient, error) buildProvider = buildProviderFunc ) @@ -84,17 +82,6 @@ func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Bal } b.logger = prefixLogger((b)) b.logger.Infof("Created") - - if newXDSClient != nil { - // For tests - client, err := newXDSClient() - if err != nil { - b.logger.Errorf("failed to create xds-client: %v", err) - return nil - } - b.xdsClient = client - } - var creds credentials.TransportCredentials switch { case opts.DialCreds != nil: @@ -137,14 +124,6 @@ func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, err return &cfg, nil } -// xdsClient contains methods from xdsClient.Client which are used by -// the cdsBalancer. This will be faked out in unittests. -type xdsClient interface { - WatchCluster(string, func(xdsclient.ClusterUpdate, error)) func() - BootstrapConfig() *bootstrap.Config - Close() -} - // ccUpdate wraps a clientConn update received from gRPC (pushed from the // xdsResolver). A valid clusterName causes the cdsBalancer to register a CDS // watcher with the xdsClient, while a non-nil error causes it to cancel the @@ -184,7 +163,7 @@ type cdsBalancer struct { ccw *ccWrapper // ClientConn interface passed to child LB. bOpts balancer.BuildOptions // BuildOptions passed to child LB. updateCh *buffer.Unbounded // Channel for gRPC and xdsClient updates. - xdsClient xdsClient // xDS client to watch Cluster resource. + xdsClient xdsclient.Interface // xDS client to watch Cluster resource. cancelWatch func() // Cluster watch cancel func. edsLB balancer.Balancer // EDS child policy. clusterToWatch string @@ -407,9 +386,6 @@ func (b *cdsBalancer) run() { b.edsLB.Close() b.edsLB = nil } - if newXDSClient != nil { - b.xdsClient.Close() - } if b.cachedRoot != nil { b.cachedRoot.Close() } diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go index 9964b9de925c..067bc2b05369 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go @@ -133,11 +133,7 @@ func (p *fakeProvider) Close() { // xDSCredentials. func setupWithXDSCreds(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, *xdstestutils.TestClientConn, func()) { t.Helper() - xdsC := fakeclient.NewClient() - oldNewXDSClient := newXDSClient - newXDSClient = func() (xdsClient, error) { return xdsC, nil } - builder := balancer.Get(cdsName) if builder == nil { t.Fatalf("balancer.Get(%q) returned nil", cdsName) @@ -164,7 +160,7 @@ func setupWithXDSCreds(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDS } // Push a ClientConnState update to the CDS balancer with a cluster name. - if err := cdsB.UpdateClientConnState(cdsCCS(clusterName)); err != nil { + if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, xdsC)); err != nil { t.Fatalf("cdsBalancer.UpdateClientConnState failed with error: %v", err) } @@ -181,8 +177,8 @@ func setupWithXDSCreds(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDS } return xdsC, cdsB.(*cdsBalancer), edsB, tcc, func() { - newXDSClient = oldNewXDSClient newEDSBalancer = oldEDSBalancerBuilder + xdsC.Close() } } diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index 5c5161807be3..5a798e414fe9 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -173,7 +173,7 @@ func (tb *testEDSBalancer) waitForClose(ctx context.Context) error { // cdsCCS is a helper function to construct a good update passed from the // xdsResolver to the cdsBalancer. -func cdsCCS(cluster string) balancer.ClientConnState { +func cdsCCS(cluster string, xdsC xdsclient.Interface) balancer.ClientConnState { const cdsLBConfig = `{ "loadBalancingConfig":[ { @@ -185,9 +185,9 @@ func cdsCCS(cluster string) balancer.ClientConnState { }` jsonSC := fmt.Sprintf(cdsLBConfig, cluster) return balancer.ClientConnState{ - ResolverState: resolver.State{ + ResolverState: xdsclient.SetClient(resolver.State{ ServiceConfig: internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(jsonSC), - }, + }, xdsC), BalancerConfig: &lbConfig{ClusterName: clusterName}, } } @@ -211,11 +211,7 @@ func edsCCS(service string, countMax *uint32, enableLRS bool) balancer.ClientCon // newEDSBalancer function to return it), and also returns a cleanup function. func setup(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, *xdstestutils.TestClientConn, func()) { t.Helper() - xdsC := fakeclient.NewClient() - oldNewXDSClient := newXDSClient - newXDSClient = func() (xdsClient, error) { return xdsC, nil } - builder := balancer.Get(cdsName) if builder == nil { t.Fatalf("balancer.Get(%q) returned nil", cdsName) @@ -232,7 +228,7 @@ func setup(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, *x return xdsC, cdsB.(*cdsBalancer), edsB, tcc, func() { newEDSBalancer = oldEDSBalancerBuilder - newXDSClient = oldNewXDSClient + xdsC.Close() } } @@ -242,7 +238,7 @@ func setupWithWatch(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBal t.Helper() xdsC, cdsB, edsB, tcc, cancel := setup(t) - if err := cdsB.UpdateClientConnState(cdsCCS(clusterName)); err != nil { + if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, xdsC)); err != nil { t.Fatalf("cdsBalancer.UpdateClientConnState failed with error: %v", err) } @@ -262,6 +258,9 @@ func setupWithWatch(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBal // cdsBalancer with different inputs and verifies that the CDS watch API on the // provided xdsClient is invoked appropriately. func (s) TestUpdateClientConnState(t *testing.T) { + xdsC := fakeclient.NewClient() + defer xdsC.Close() + tests := []struct { name string ccs balancer.ClientConnState @@ -280,14 +279,14 @@ func (s) TestUpdateClientConnState(t *testing.T) { }, { name: "happy-good-case", - ccs: cdsCCS(clusterName), + ccs: cdsCCS(clusterName, xdsC), wantCluster: clusterName, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - xdsC, cdsB, _, _, cancel := setup(t) + _, cdsB, _, _, cancel := setup(t) defer func() { cancel() cdsB.Close() @@ -324,7 +323,7 @@ func (s) TestUpdateClientConnStateWithSameState(t *testing.T) { }() // This is the same clientConn update sent in setupWithWatch(). - if err := cdsB.UpdateClientConnState(cdsCCS(clusterName)); err != nil { + if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, xdsC)); err != nil { t.Fatalf("cdsBalancer.UpdateClientConnState failed with error: %v", err) } // The above update should not result in a new watch being registered. @@ -660,7 +659,7 @@ func (s) TestClose(t *testing.T) { // Make sure that the UpdateClientConnState() method on the CDS balancer // returns error. - if err := cdsB.UpdateClientConnState(cdsCCS(clusterName)); err != errBalancerClosed { + if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, xdsC)); err != errBalancerClosed { t.Fatalf("UpdateClientConnState() after close returned %v, want %v", err, errBalancerClosed) } diff --git a/xds/internal/balancer/cdsbalancer/cluster_handler.go b/xds/internal/balancer/cdsbalancer/cluster_handler.go index c38d1a6c31a6..0ce91fb99eb1 100644 --- a/xds/internal/balancer/cdsbalancer/cluster_handler.go +++ b/xds/internal/balancer/cdsbalancer/cluster_handler.go @@ -40,7 +40,7 @@ type clusterHandler struct { // CDS Balancer cares about is the most recent update. updateChannel chan clusterHandlerUpdate - xdsClient xdsClient + xdsClient xdsclient.Interface } func (ch *clusterHandler) updateRootCluster(rootClusterName string) { @@ -112,7 +112,7 @@ type clusterNode struct { // CreateClusterNode creates a cluster node from a given clusterName. This will // also start the watch for that cluster. -func createClusterNode(clusterName string, xdsClient xdsClient, topLevelHandler *clusterHandler) *clusterNode { +func createClusterNode(clusterName string, xdsClient xdsclient.Interface, topLevelHandler *clusterHandler) *clusterNode { c := &clusterNode{ clusterHandler: topLevelHandler, } From c120b7f6f2bf98b8693d85f4d38983ae4bc7fea1 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Fri, 4 Jun 2021 14:34:09 -0700 Subject: [PATCH 06/16] [xds_client_in_attributes] fix --- xds/internal/balancer/clusterimpl/balancer_test.go | 14 +++++++------- xds/internal/balancer/clusterimpl/clusterimpl.go | 6 +++--- xds/internal/balancer/edsbalancer/xds_lrs_test.go | 2 +- xds/internal/balancer/lrs/balancer_test.go | 2 +- xds/internal/xdsclient/attributes.go | 4 ++-- 5 files changed, 14 insertions(+), 14 deletions(-) diff --git a/xds/internal/balancer/clusterimpl/balancer_test.go b/xds/internal/balancer/clusterimpl/balancer_test.go index eaaa73dabf1b..ab3613bec31d 100644 --- a/xds/internal/balancer/clusterimpl/balancer_test.go +++ b/xds/internal/balancer/clusterimpl/balancer_test.go @@ -87,7 +87,7 @@ func TestDropByCategory(t *testing.T) { dropDenominator = 2 ) if err := b.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: client.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), + ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), BalancerConfig: &LBConfig{ Cluster: testClusterName, EDSServiceName: testServiceName, @@ -172,7 +172,7 @@ func TestDropByCategory(t *testing.T) { dropDenominator2 = 4 ) if err := b.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: client.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), + ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), BalancerConfig: &LBConfig{ Cluster: testClusterName, EDSServiceName: testServiceName, @@ -235,7 +235,7 @@ func TestDropCircuitBreaking(t *testing.T) { var maxRequest uint32 = 50 if err := b.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: client.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), + ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), BalancerConfig: &LBConfig{ Cluster: testClusterName, EDSServiceName: testServiceName, @@ -342,7 +342,7 @@ func TestPickerUpdateAfterClose(t *testing.T) { var maxRequest uint32 = 50 if err := b.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: client.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), + ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), BalancerConfig: &LBConfig{ Cluster: testClusterName, EDSServiceName: testServiceName, @@ -383,7 +383,7 @@ func TestClusterNameInAddressAttributes(t *testing.T) { defer b.Close() if err := b.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: client.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), + ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), BalancerConfig: &LBConfig{ Cluster: testClusterName, EDSServiceName: testServiceName, @@ -432,7 +432,7 @@ func TestClusterNameInAddressAttributes(t *testing.T) { const testClusterName2 = "test-cluster-2" var addr2 = resolver.Address{Addr: "2.2.2.2"} if err := b.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: client.SetClient(resolver.State{Addresses: []resolver.Address{addr2}}, xdsC), + ResolverState: xdsclient.SetClient(resolver.State{Addresses: []resolver.Address{addr2}}, xdsC), BalancerConfig: &LBConfig{ Cluster: testClusterName2, EDSServiceName: testServiceName, @@ -468,7 +468,7 @@ func TestReResolution(t *testing.T) { defer b.Close() if err := b.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: client.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), + ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), BalancerConfig: &LBConfig{ Cluster: testClusterName, EDSServiceName: testServiceName, diff --git a/xds/internal/balancer/clusterimpl/clusterimpl.go b/xds/internal/balancer/clusterimpl/clusterimpl.go index b338eed8121c..29b35999ba97 100644 --- a/xds/internal/balancer/clusterimpl/clusterimpl.go +++ b/xds/internal/balancer/clusterimpl/clusterimpl.go @@ -93,9 +93,9 @@ type clusterImplBalancer struct { closed *grpcsync.Event done *grpcsync.Event - bOpts balancer.BuildOptions - logger *grpclog.PrefixLogger - xdsC xdsclient.Interface + bOpts balancer.BuildOptions + logger *grpclog.PrefixLogger + xdsClient xdsclient.Interface config *LBConfig childLB balancer.Balancer diff --git a/xds/internal/balancer/edsbalancer/xds_lrs_test.go b/xds/internal/balancer/edsbalancer/xds_lrs_test.go index 86e36fceaedf..3dcbf5e259c7 100644 --- a/xds/internal/balancer/edsbalancer/xds_lrs_test.go +++ b/xds/internal/balancer/edsbalancer/xds_lrs_test.go @@ -26,8 +26,8 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/resolver" - xdsclient "google.golang.org/grpc/xds/internal/client" "google.golang.org/grpc/xds/internal/testutils/fakeclient" + "google.golang.org/grpc/xds/internal/xdsclient" ) // TestXDSLoadReporting verifies that the edsBalancer starts the loadReport diff --git a/xds/internal/balancer/lrs/balancer_test.go b/xds/internal/balancer/lrs/balancer_test.go index 5308d96cc734..c0ec9cc41dd3 100644 --- a/xds/internal/balancer/lrs/balancer_test.go +++ b/xds/internal/balancer/lrs/balancer_test.go @@ -33,9 +33,9 @@ import ( internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" "google.golang.org/grpc/resolver" xdsinternal "google.golang.org/grpc/xds/internal" - xdsclient "google.golang.org/grpc/xds/internal/client" "google.golang.org/grpc/xds/internal/testutils" "google.golang.org/grpc/xds/internal/testutils/fakeclient" + "google.golang.org/grpc/xds/internal/xdsclient" ) const defaultTestTimeout = 1 * time.Second diff --git a/xds/internal/xdsclient/attributes.go b/xds/internal/xdsclient/attributes.go index 1ccd95c73a80..5025aee6c761 100644 --- a/xds/internal/xdsclient/attributes.go +++ b/xds/internal/xdsclient/attributes.go @@ -19,8 +19,8 @@ package xdsclient import ( "google.golang.org/grpc/resolver" - "google.golang.org/grpc/xds/internal/client/bootstrap" - "google.golang.org/grpc/xds/internal/client/load" + "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" + "google.golang.org/grpc/xds/internal/xdsclient/load" ) type clientKeyType string From d85bfc96c52434fc324c5bb72b355d109e3227fb Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Fri, 4 Jun 2021 14:36:59 -0700 Subject: [PATCH 07/16] [xds_client_in_attributes] server --- xds/internal/xdsclient/attributes.go | 1 + xds/server.go | 15 +++------------ xds/server_test.go | 8 ++++---- 3 files changed, 8 insertions(+), 16 deletions(-) diff --git a/xds/internal/xdsclient/attributes.go b/xds/internal/xdsclient/attributes.go index 5025aee6c761..858e66a16c59 100644 --- a/xds/internal/xdsclient/attributes.go +++ b/xds/internal/xdsclient/attributes.go @@ -30,6 +30,7 @@ const clientKey = clientKeyType("grpc.xds.internal.client.Client") // Interface contains a subset of the *Client methods that are needed by the // balancers. type Interface interface { + WatchListener(string, func(ListenerUpdate, error)) func() WatchCluster(string, func(ClusterUpdate, error)) func() WatchEndpoints(clusterName string, edsCb func(EndpointsUpdate, error)) (cancel func()) BootstrapConfig() *bootstrap.Config diff --git a/xds/server.go b/xds/server.go index 989859bc65c8..f013037ca020 100644 --- a/xds/server.go +++ b/xds/server.go @@ -35,14 +35,13 @@ import ( "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/xds/internal/server" "google.golang.org/grpc/xds/internal/xdsclient" - "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" ) const serverPrefix = "[xds-server %p] " var ( // These new functions will be overridden in unit tests. - newXDSClient = func() (xdsClient, error) { + newXDSClient = func() (xdsclient.Interface, error) { return xdsclient.New() } newGRPCServer = func(opts ...grpc.ServerOption) grpcServer { @@ -58,14 +57,6 @@ func prefixLogger(p *GRPCServer) *internalgrpclog.PrefixLogger { return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(serverPrefix, p)) } -// xdsClient contains methods from xdsClient.Client which are used by -// the server. This is useful for overriding in unit tests. -type xdsClient interface { - WatchListener(string, func(xdsclient.ListenerUpdate, error)) func() - BootstrapConfig() *bootstrap.Config - Close() -} - // grpcServer contains methods from grpc.Server which are used by the // GRPCServer type here. This is useful for overriding in unit tests. type grpcServer interface { @@ -90,7 +81,7 @@ type GRPCServer struct { // beginning of Serve(), where we have to decide if we have to create a // client or use an existing one. clientMu sync.Mutex - xdsC xdsClient + xdsC xdsclient.Interface } // NewGRPCServer creates an xDS-enabled gRPC server using the passed in opts. @@ -156,7 +147,7 @@ func (s *GRPCServer) initXDSClient() error { newXDSClient := newXDSClient if s.opts.bootstrapContents != nil { - newXDSClient = func() (xdsClient, error) { + newXDSClient = func() (xdsclient.Interface, error) { return xdsclient.NewClientWithBootstrapContents(s.opts.bootstrapContents) } } diff --git a/xds/server_test.go b/xds/server_test.go index 27a33da091d0..e82fd182e51d 100644 --- a/xds/server_test.go +++ b/xds/server_test.go @@ -247,7 +247,7 @@ func (p *fakeProvider) Close() { func setupOverrides() (*fakeGRPCServer, *testutils.Channel, func()) { clientCh := testutils.NewChannel() origNewXDSClient := newXDSClient - newXDSClient = func() (xdsClient, error) { + newXDSClient = func() (xdsclient.Interface, error) { c := fakeclient.NewClient() c.SetBootstrapConfig(&bootstrap.Config{ BalancerName: "dummyBalancer", @@ -277,7 +277,7 @@ func setupOverrides() (*fakeGRPCServer, *testutils.Channel, func()) { func setupOverridesForXDSCreds(includeCertProviderCfg bool) (*testutils.Channel, func()) { clientCh := testutils.NewChannel() origNewXDSClient := newXDSClient - newXDSClient = func() (xdsClient, error) { + newXDSClient = func() (xdsclient.Interface, error) { c := fakeclient.NewClient() bc := &bootstrap.Config{ BalancerName: "dummyBalancer", @@ -544,7 +544,7 @@ func (s) TestServeBootstrapConfigInvalid(t *testing.T) { // xdsClient with the specified bootstrap configuration. clientCh := testutils.NewChannel() origNewXDSClient := newXDSClient - newXDSClient = func() (xdsClient, error) { + newXDSClient = func() (xdsclient.Interface, error) { c := fakeclient.NewClient() c.SetBootstrapConfig(test.bootstrapConfig) clientCh.Send(c) @@ -587,7 +587,7 @@ func (s) TestServeBootstrapConfigInvalid(t *testing.T) { // verifies that Server() exits with a non-nil error. func (s) TestServeNewClientFailure(t *testing.T) { origNewXDSClient := newXDSClient - newXDSClient = func() (xdsClient, error) { + newXDSClient = func() (xdsclient.Interface, error) { return nil, errors.New("xdsClient creation failed") } defer func() { newXDSClient = origNewXDSClient }() From addf9ec45fc4f5b9d556092763853fdbe69652b1 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Fri, 4 Jun 2021 14:38:54 -0700 Subject: [PATCH 08/16] [xds_client_in_attributes] googlec2p --- xds/googledirectpath/googlec2p.go | 8 ++------ xds/googledirectpath/googlec2p_test.go | 4 +++- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/xds/googledirectpath/googlec2p.go b/xds/googledirectpath/googlec2p.go index af487ec4a736..a19c28006527 100644 --- a/xds/googledirectpath/googlec2p.go +++ b/xds/googledirectpath/googlec2p.go @@ -62,15 +62,11 @@ const ( dnsName, xdsName = "dns", "xds" ) -type xdsClient interface { - Close() -} - // For overriding in unittests. var ( onGCE = googlecloud.OnGCE - newClientWithConfig = func(config *bootstrap.Config) (xdsClient, error) { + newClientWithConfig = func(config *bootstrap.Config) (xdsclient.Interface, error) { return xdsclient.NewWithConfig(config) } @@ -139,7 +135,7 @@ func (c2pResolverBuilder) Scheme() string { type c2pResolver struct { resolver.Resolver - client xdsClient + client xdsclient.Interface } func (r *c2pResolver) Close() { diff --git a/xds/googledirectpath/googlec2p_test.go b/xds/googledirectpath/googlec2p_test.go index fb68fa23a1d0..2028a65f9e59 100644 --- a/xds/googledirectpath/googlec2p_test.go +++ b/xds/googledirectpath/googlec2p_test.go @@ -32,6 +32,7 @@ import ( "google.golang.org/grpc/internal/xds/env" "google.golang.org/grpc/resolver" "google.golang.org/grpc/xds/internal/version" + "google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" "google.golang.org/protobuf/testing/protocmp" "google.golang.org/protobuf/types/known/structpb" @@ -130,6 +131,7 @@ func TestBuildNotOnGCE(t *testing.T) { } type testXDSClient struct { + xdsclient.Interface closed chan struct{} } @@ -177,7 +179,7 @@ func TestBuildXDS(t *testing.T) { configCh := make(chan *bootstrap.Config, 1) oldNewClient := newClientWithConfig - newClientWithConfig = func(config *bootstrap.Config) (xdsClient, error) { + newClientWithConfig = func(config *bootstrap.Config) (xdsclient.Interface, error) { configCh <- config return tXDSClient, nil } From d470faf134c1742e87928a1e35c1a36682aa69be Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Fri, 4 Jun 2021 14:40:50 -0700 Subject: [PATCH 09/16] [xds_client_in_attributes] csds --- xds/csds/csds.go | 17 +++-------------- xds/csds/csds_test.go | 13 +++---------- xds/internal/xdsclient/attributes.go | 9 ++++++++- 3 files changed, 14 insertions(+), 25 deletions(-) diff --git a/xds/csds/csds.go b/xds/csds/csds.go index d32bebac81bc..44803f82e3e3 100644 --- a/xds/csds/csds.go +++ b/xds/csds/csds.go @@ -38,27 +38,16 @@ import ( "google.golang.org/grpc/grpclog" "google.golang.org/grpc/status" "google.golang.org/grpc/xds/internal/xdsclient" - "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" "google.golang.org/protobuf/types/known/timestamppb" _ "google.golang.org/grpc/xds/internal/xdsclient/v2" // Register v2 xds_client. _ "google.golang.org/grpc/xds/internal/xdsclient/v3" // Register v3 xds_client. ) -// xdsClient contains methods from xdsClient.Client which are used by -// the server. This is useful for overriding in unit tests. -type xdsClient interface { - DumpLDS() (string, map[string]xdsclient.UpdateWithMD) - DumpRDS() (string, map[string]xdsclient.UpdateWithMD) - DumpCDS() (string, map[string]xdsclient.UpdateWithMD) - DumpEDS() (string, map[string]xdsclient.UpdateWithMD) - BootstrapConfig() *bootstrap.Config - Close() -} - var ( logger = grpclog.Component("xds") - newXDSClient = func() xdsClient { + newXDSClient = func() xdsclient.Interface { + // FIXME: this is no longer necessary. c, err := xdsclient.New() if err != nil { // If err is not nil, c is a typed nil (of type *xdsclient.Client). @@ -76,7 +65,7 @@ var ( type ClientStatusDiscoveryServer struct { // xdsClient will always be the same in practice. But we keep a copy in each // server instance for testing. - xdsClient xdsClient + xdsClient xdsclient.Interface } // NewClientStatusDiscoveryServer returns an implementation of the CSDS server that can be diff --git a/xds/csds/csds_test.go b/xds/csds/csds_test.go index 7f0e90bebc1e..fdbe1f23173d 100644 --- a/xds/csds/csds_test.go +++ b/xds/csds/csds_test.go @@ -59,13 +59,6 @@ const ( defaultTestTimeout = 10 * time.Second ) -type xdsClientWithWatch interface { - WatchListener(string, func(xdsclient.ListenerUpdate, error)) func() - WatchRouteConfig(string, func(xdsclient.RouteConfigUpdate, error)) func() - WatchCluster(string, func(xdsclient.ClusterUpdate, error)) func() - WatchEndpoints(string, func(xdsclient.EndpointsUpdate, error)) func() -} - var cmpOpts = cmp.Options{ cmpopts.EquateEmpty(), cmp.Comparer(func(a, b *timestamppb.Timestamp) bool { return true }), @@ -250,7 +243,7 @@ func TestCSDS(t *testing.T) { } } -func commonSetup(t *testing.T) (xdsClientWithWatch, *e2e.ManagementServer, string, v3statuspbgrpc.ClientStatusDiscoveryService_StreamClientStatusClient, func()) { +func commonSetup(t *testing.T) (xdsclient.Interface, *e2e.ManagementServer, string, v3statuspbgrpc.ClientStatusDiscoveryService_StreamClientStatusClient, func()) { t.Helper() // Spin up a xDS management server on a local port. @@ -275,7 +268,7 @@ func commonSetup(t *testing.T) (xdsClientWithWatch, *e2e.ManagementServer, strin t.Fatalf("failed to create xds client: %v", err) } oldNewXDSClient := newXDSClient - newXDSClient = func() xdsClient { return xdsC } + newXDSClient = func() xdsclient.Interface { return xdsC } // Initialize an gRPC server and register CSDS on it. server := grpc.NewServer() @@ -635,7 +628,7 @@ func protoToJSON(p proto.Message) string { func TestCSDSNoXDSClient(t *testing.T) { oldNewXDSClient := newXDSClient - newXDSClient = func() xdsClient { return nil } + newXDSClient = func() xdsclient.Interface { return nil } defer func() { newXDSClient = oldNewXDSClient }() // Initialize an gRPC server and register CSDS on it. diff --git a/xds/internal/xdsclient/attributes.go b/xds/internal/xdsclient/attributes.go index 858e66a16c59..4d1ea72aee50 100644 --- a/xds/internal/xdsclient/attributes.go +++ b/xds/internal/xdsclient/attributes.go @@ -31,10 +31,17 @@ const clientKey = clientKeyType("grpc.xds.internal.client.Client") // balancers. type Interface interface { WatchListener(string, func(ListenerUpdate, error)) func() + WatchRouteConfig(string, func(RouteConfigUpdate, error)) func() WatchCluster(string, func(ClusterUpdate, error)) func() WatchEndpoints(clusterName string, edsCb func(EndpointsUpdate, error)) (cancel func()) - BootstrapConfig() *bootstrap.Config ReportLoad(server string) (*load.Store, func()) + + DumpLDS() (string, map[string]UpdateWithMD) + DumpRDS() (string, map[string]UpdateWithMD) + DumpCDS() (string, map[string]UpdateWithMD) + DumpEDS() (string, map[string]UpdateWithMD) + + BootstrapConfig() *bootstrap.Config Close() } From c0462b6065ecca00fd21dc23539bcb4a990a9fba Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Fri, 4 Jun 2021 14:44:17 -0700 Subject: [PATCH 10/16] [xds_client_in_attributes] resolver --- xds/internal/resolver/watch_service.go | 4 +-- xds/internal/resolver/xds_resolver.go | 21 +++---------- xds/internal/resolver/xds_resolver_test.go | 34 ++++++++++----------- xds/internal/testutils/fakeclient/client.go | 2 ++ 4 files changed, 26 insertions(+), 35 deletions(-) diff --git a/xds/internal/resolver/watch_service.go b/xds/internal/resolver/watch_service.go index 591cc3833937..f1ac4222a5b0 100644 --- a/xds/internal/resolver/watch_service.go +++ b/xds/internal/resolver/watch_service.go @@ -54,7 +54,7 @@ type ldsConfig struct { // Note that during race (e.g. an xDS response is received while the user is // calling cancel()), there's a small window where the callback can be called // after the watcher is canceled. The caller needs to handle this case. -func watchService(c xdsClient, serviceName string, cb func(serviceUpdate, error), logger *grpclog.PrefixLogger) (cancel func()) { +func watchService(c xdsclient.Interface, serviceName string, cb func(serviceUpdate, error), logger *grpclog.PrefixLogger) (cancel func()) { w := &serviceUpdateWatcher{ logger: logger, c: c, @@ -70,7 +70,7 @@ func watchService(c xdsClient, serviceName string, cb func(serviceUpdate, error) // callback at the right time. type serviceUpdateWatcher struct { logger *grpclog.PrefixLogger - c xdsClient + c xdsclient.Interface serviceName string ldsCancel func() serviceCb func(serviceUpdate, error) diff --git a/xds/internal/resolver/xds_resolver.go b/xds/internal/resolver/xds_resolver.go index a6a013698ac4..0bd95d36b07c 100644 --- a/xds/internal/resolver/xds_resolver.go +++ b/xds/internal/resolver/xds_resolver.go @@ -27,10 +27,8 @@ import ( "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/pretty" - "google.golang.org/grpc/resolver" - "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" - iresolver "google.golang.org/grpc/internal/resolver" + "google.golang.org/grpc/resolver" "google.golang.org/grpc/xds/internal/xdsclient" ) @@ -41,21 +39,21 @@ const xdsScheme = "xds" // the same time. func NewBuilder(config []byte) (resolver.Builder, error) { return &xdsResolverBuilder{ - newXDSClient: func() (xdsClient, error) { + newXDSClient: func() (xdsclient.Interface, error) { return xdsclient.NewClientWithBootstrapContents(config) }, }, nil } // For overriding in unittests. -var newXDSClient = func() (xdsClient, error) { return xdsclient.New() } +var newXDSClient = func() (xdsclient.Interface, error) { return xdsclient.New() } func init() { resolver.Register(&xdsResolverBuilder{}) } type xdsResolverBuilder struct { - newXDSClient func() (xdsClient, error) + newXDSClient func() (xdsclient.Interface, error) } // Build helps implement the resolver.Builder interface. @@ -119,15 +117,6 @@ func (*xdsResolverBuilder) Scheme() string { return xdsScheme } -// xdsClient contains methods from xdsClient.Client which are used by -// the resolver. This will be faked out in unittests. -type xdsClient interface { - WatchListener(serviceName string, cb func(xdsclient.ListenerUpdate, error)) func() - WatchRouteConfig(routeName string, cb func(xdsclient.RouteConfigUpdate, error)) func() - BootstrapConfig() *bootstrap.Config - Close() -} - // suWithError wraps the ServiceUpdate and error received through a watch API // callback, so that it can pushed onto the update channel as a single entity. type suWithError struct { @@ -149,7 +138,7 @@ type xdsResolver struct { logger *grpclog.PrefixLogger // The underlying xdsClient which performs all xDS requests and responses. - client xdsClient + client xdsclient.Interface // A channel for the watch API callback to write service updates on to. The // updates are read by the run goroutine and passed on to the ClientConn. updateCh chan suWithError diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index d588ff157cd6..b8f42648e802 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -114,19 +114,19 @@ func newTestClientConn() *testClientConn { func (s) TestResolverBuilder(t *testing.T) { tests := []struct { name string - xdsClientFunc func() (xdsClient, error) + xdsClientFunc func() (xdsclient.Interface, error) wantErr bool }{ { name: "simple-good", - xdsClientFunc: func() (xdsClient, error) { + xdsClientFunc: func() (xdsclient.Interface, error) { return fakeclient.NewClient(), nil }, wantErr: false, }, { name: "newXDSClient-throws-error", - xdsClientFunc: func() (xdsClient, error) { + xdsClientFunc: func() (xdsclient.Interface, error) { return nil, errors.New("newXDSClient-throws-error") }, wantErr: true, @@ -167,7 +167,7 @@ func (s) TestResolverBuilder_xdsCredsBootstrapMismatch(t *testing.T) { // Fake out the xdsClient creation process by providing a fake, which does // not have any certificate provider configuration. oldClientMaker := newXDSClient - newXDSClient = func() (xdsClient, error) { + newXDSClient = func() (xdsclient.Interface, error) { fc := fakeclient.NewClient() fc.SetBootstrapConfig(&bootstrap.Config{}) return fc, nil @@ -194,7 +194,7 @@ func (s) TestResolverBuilder_xdsCredsBootstrapMismatch(t *testing.T) { } type setupOpts struct { - xdsClientFunc func() (xdsClient, error) + xdsClientFunc func() (xdsclient.Interface, error) } func testSetup(t *testing.T, opts setupOpts) (*xdsResolver, *testClientConn, func()) { @@ -254,7 +254,7 @@ func waitForWatchRouteConfig(ctx context.Context, t *testing.T, xdsC *fakeclient func (s) TestXDSResolverWatchCallbackAfterClose(t *testing.T) { xdsC := fakeclient.NewClient() xdsR, tcc, cancel := testSetup(t, setupOpts{ - xdsClientFunc: func() (xdsClient, error) { return xdsC, nil }, + xdsClientFunc: func() (xdsclient.Interface, error) { return xdsC, nil }, }) defer cancel() @@ -300,7 +300,7 @@ func (s) TestXDSResolverCloseClosesXDSClient(t *testing.T) { func (s) TestXDSResolverBadServiceUpdate(t *testing.T) { xdsC := fakeclient.NewClient() xdsR, tcc, cancel := testSetup(t, setupOpts{ - xdsClientFunc: func() (xdsClient, error) { return xdsC, nil }, + xdsClientFunc: func() (xdsclient.Interface, error) { return xdsC, nil }, }) defer xdsR.Close() defer cancel() @@ -326,7 +326,7 @@ func (s) TestXDSResolverBadServiceUpdate(t *testing.T) { func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) { xdsC := fakeclient.NewClient() xdsR, tcc, cancel := testSetup(t, setupOpts{ - xdsClientFunc: func() (xdsClient, error) { return xdsC, nil }, + xdsClientFunc: func() (xdsclient.Interface, error) { return xdsC, nil }, }) defer xdsR.Close() defer cancel() @@ -460,7 +460,7 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) { func (s) TestXDSResolverRemovedWithRPCs(t *testing.T) { xdsC := fakeclient.NewClient() xdsR, tcc, cancel := testSetup(t, setupOpts{ - xdsClientFunc: func() (xdsClient, error) { return xdsC, nil }, + xdsClientFunc: func() (xdsclient.Interface, error) { return xdsC, nil }, }) defer cancel() defer xdsR.Close() @@ -520,7 +520,7 @@ func (s) TestXDSResolverRemovedWithRPCs(t *testing.T) { func (s) TestXDSResolverRemovedResource(t *testing.T) { xdsC := fakeclient.NewClient() xdsR, tcc, cancel := testSetup(t, setupOpts{ - xdsClientFunc: func() (xdsClient, error) { return xdsC, nil }, + xdsClientFunc: func() (xdsclient.Interface, error) { return xdsC, nil }, }) defer cancel() defer xdsR.Close() @@ -628,7 +628,7 @@ func (s) TestXDSResolverRemovedResource(t *testing.T) { func (s) TestXDSResolverWRR(t *testing.T) { xdsC := fakeclient.NewClient() xdsR, tcc, cancel := testSetup(t, setupOpts{ - xdsClientFunc: func() (xdsClient, error) { return xdsC, nil }, + xdsClientFunc: func() (xdsclient.Interface, error) { return xdsC, nil }, }) defer xdsR.Close() defer cancel() @@ -689,7 +689,7 @@ func (s) TestXDSResolverMaxStreamDuration(t *testing.T) { defer func(old bool) { env.TimeoutSupport = old }(env.TimeoutSupport) xdsC := fakeclient.NewClient() xdsR, tcc, cancel := testSetup(t, setupOpts{ - xdsClientFunc: func() (xdsClient, error) { return xdsC, nil }, + xdsClientFunc: func() (xdsclient.Interface, error) { return xdsC, nil }, }) defer xdsR.Close() defer cancel() @@ -792,7 +792,7 @@ func (s) TestXDSResolverMaxStreamDuration(t *testing.T) { func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) { xdsC := fakeclient.NewClient() xdsR, tcc, cancel := testSetup(t, setupOpts{ - xdsClientFunc: func() (xdsClient, error) { return xdsC, nil }, + xdsClientFunc: func() (xdsclient.Interface, error) { return xdsC, nil }, }) defer xdsR.Close() defer cancel() @@ -941,7 +941,7 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) { func (s) TestXDSResolverGoodUpdateAfterError(t *testing.T) { xdsC := fakeclient.NewClient() xdsR, tcc, cancel := testSetup(t, setupOpts{ - xdsClientFunc: func() (xdsClient, error) { return xdsC, nil }, + xdsClientFunc: func() (xdsclient.Interface, error) { return xdsC, nil }, }) defer xdsR.Close() defer cancel() @@ -995,7 +995,7 @@ func (s) TestXDSResolverGoodUpdateAfterError(t *testing.T) { func (s) TestXDSResolverResourceNotFoundError(t *testing.T) { xdsC := fakeclient.NewClient() xdsR, tcc, cancel := testSetup(t, setupOpts{ - xdsClientFunc: func() (xdsClient, error) { return xdsC, nil }, + xdsClientFunc: func() (xdsclient.Interface, error) { return xdsC, nil }, }) defer xdsR.Close() defer cancel() @@ -1041,7 +1041,7 @@ func (s) TestXDSResolverResourceNotFoundError(t *testing.T) { func (s) TestXDSResolverMultipleLDSUpdates(t *testing.T) { xdsC := fakeclient.NewClient() xdsR, tcc, cancel := testSetup(t, setupOpts{ - xdsClientFunc: func() (xdsClient, error) { return xdsC, nil }, + xdsClientFunc: func() (xdsclient.Interface, error) { return xdsC, nil }, }) defer xdsR.Close() defer cancel() @@ -1216,7 +1216,7 @@ func (s) TestXDSResolverHTTPFilters(t *testing.T) { t.Run(tc.name, func(t *testing.T) { xdsC := fakeclient.NewClient() xdsR, tcc, cancel := testSetup(t, setupOpts{ - xdsClientFunc: func() (xdsClient, error) { return xdsC, nil }, + xdsClientFunc: func() (xdsclient.Interface, error) { return xdsC, nil }, }) defer xdsR.Close() defer cancel() diff --git a/xds/internal/testutils/fakeclient/client.go b/xds/internal/testutils/fakeclient/client.go index 37e84f998b99..0a120219d81d 100644 --- a/xds/internal/testutils/fakeclient/client.go +++ b/xds/internal/testutils/fakeclient/client.go @@ -32,6 +32,8 @@ import ( // Client is a fake implementation of an xds client. It exposes a bunch of // channels to signal the occurrence of various events. type Client struct { + xdsclient.Interface + name string ldsWatchCh *testutils.Channel rdsWatchCh *testutils.Channel From 2bd77d886c4df6840080bfc2eb6e632d9f1ce3cf Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Fri, 4 Jun 2021 14:48:24 -0700 Subject: [PATCH 11/16] [xds_client_in_attributes] rename types [xds_client_in_attributes] rename 1 [xds_client_in_attributes] rename 2 [xds_client_in_attributes] rename 3 [xds_client_in_attributes] fix tests --- xds/csds/csds.go | 4 +-- xds/csds/csds_test.go | 6 ++-- xds/googledirectpath/googlec2p.go | 4 +-- xds/googledirectpath/googlec2p_test.go | 4 +-- .../balancer/cdsbalancer/cdsbalancer.go | 15 ++++---- .../balancer/cdsbalancer/cdsbalancer_test.go | 8 +++-- .../balancer/cdsbalancer/cluster_handler.go | 4 +-- .../balancer/clusterimpl/clusterimpl.go | 2 +- xds/internal/balancer/edsbalancer/eds.go | 2 +- xds/internal/balancer/lrs/balancer.go | 4 +-- xds/internal/resolver/watch_service.go | 4 +-- xds/internal/resolver/xds_resolver.go | 17 +++------- xds/internal/resolver/xds_resolver_test.go | 34 +++++++++---------- xds/internal/testutils/fakeclient/client.go | 2 +- xds/internal/xdsclient/attributes.go | 13 +++---- xds/internal/xdsclient/client.go | 2 +- xds/internal/xdsclient/client_test.go | 10 +++--- xds/internal/xdsclient/singleton.go | 26 ++++++-------- xds/internal/xdsclient/tests/dump_test.go | 20 ++++++----- xds/server.go | 6 ++-- xds/server_test.go | 8 ++--- 21 files changed, 94 insertions(+), 101 deletions(-) diff --git a/xds/csds/csds.go b/xds/csds/csds.go index 44803f82e3e3..556075fa50ec 100644 --- a/xds/csds/csds.go +++ b/xds/csds/csds.go @@ -46,7 +46,7 @@ import ( var ( logger = grpclog.Component("xds") - newXDSClient = func() xdsclient.Interface { + newXDSClient = func() xdsclient.XDSClient { // FIXME: this is no longer necessary. c, err := xdsclient.New() if err != nil { @@ -65,7 +65,7 @@ var ( type ClientStatusDiscoveryServer struct { // xdsClient will always be the same in practice. But we keep a copy in each // server instance for testing. - xdsClient xdsclient.Interface + xdsClient xdsclient.XDSClient } // NewClientStatusDiscoveryServer returns an implementation of the CSDS server that can be diff --git a/xds/csds/csds_test.go b/xds/csds/csds_test.go index fdbe1f23173d..98dc93e86713 100644 --- a/xds/csds/csds_test.go +++ b/xds/csds/csds_test.go @@ -243,7 +243,7 @@ func TestCSDS(t *testing.T) { } } -func commonSetup(t *testing.T) (xdsclient.Interface, *e2e.ManagementServer, string, v3statuspbgrpc.ClientStatusDiscoveryService_StreamClientStatusClient, func()) { +func commonSetup(t *testing.T) (xdsclient.XDSClient, *e2e.ManagementServer, string, v3statuspbgrpc.ClientStatusDiscoveryService_StreamClientStatusClient, func()) { t.Helper() // Spin up a xDS management server on a local port. @@ -268,7 +268,7 @@ func commonSetup(t *testing.T) (xdsclient.Interface, *e2e.ManagementServer, stri t.Fatalf("failed to create xds client: %v", err) } oldNewXDSClient := newXDSClient - newXDSClient = func() xdsclient.Interface { return xdsC } + newXDSClient = func() xdsclient.XDSClient { return xdsC } // Initialize an gRPC server and register CSDS on it. server := grpc.NewServer() @@ -628,7 +628,7 @@ func protoToJSON(p proto.Message) string { func TestCSDSNoXDSClient(t *testing.T) { oldNewXDSClient := newXDSClient - newXDSClient = func() xdsclient.Interface { return nil } + newXDSClient = func() xdsclient.XDSClient { return nil } defer func() { newXDSClient = oldNewXDSClient }() // Initialize an gRPC server and register CSDS on it. diff --git a/xds/googledirectpath/googlec2p.go b/xds/googledirectpath/googlec2p.go index a19c28006527..0c2f984fbcb1 100644 --- a/xds/googledirectpath/googlec2p.go +++ b/xds/googledirectpath/googlec2p.go @@ -66,7 +66,7 @@ const ( var ( onGCE = googlecloud.OnGCE - newClientWithConfig = func(config *bootstrap.Config) (xdsclient.Interface, error) { + newClientWithConfig = func(config *bootstrap.Config) (xdsclient.XDSClient, error) { return xdsclient.NewWithConfig(config) } @@ -135,7 +135,7 @@ func (c2pResolverBuilder) Scheme() string { type c2pResolver struct { resolver.Resolver - client xdsclient.Interface + client xdsclient.XDSClient } func (r *c2pResolver) Close() { diff --git a/xds/googledirectpath/googlec2p_test.go b/xds/googledirectpath/googlec2p_test.go index 2028a65f9e59..8f98d3159d3a 100644 --- a/xds/googledirectpath/googlec2p_test.go +++ b/xds/googledirectpath/googlec2p_test.go @@ -131,7 +131,7 @@ func TestBuildNotOnGCE(t *testing.T) { } type testXDSClient struct { - xdsclient.Interface + xdsclient.XDSClient closed chan struct{} } @@ -179,7 +179,7 @@ func TestBuildXDS(t *testing.T) { configCh := make(chan *bootstrap.Config, 1) oldNewClient := newClientWithConfig - newClientWithConfig = func(config *bootstrap.Config) (xdsclient.Interface, error) { + newClientWithConfig = func(config *bootstrap.Config) (xdsclient.XDSClient, error) { configCh <- config return tXDSClient, nil } diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer.go b/xds/internal/balancer/cdsbalancer/cdsbalancer.go index 2b1318a1fdcb..73c43c945d1f 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" + "google.golang.org/grpc/attributes" "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/base" "google.golang.org/grpc/connectivity" @@ -163,7 +164,6 @@ type cdsBalancer struct { ccw *ccWrapper // ClientConn interface passed to child LB. bOpts balancer.BuildOptions // BuildOptions passed to child LB. updateCh *buffer.Unbounded // Channel for gRPC and xdsClient updates. - xdsClient xdsclient.Interface // xDS client to watch Cluster resource. cancelWatch func() // Cluster watch cancel func. edsLB balancer.Balancer // EDS child policy. clusterToWatch string @@ -171,6 +171,9 @@ type cdsBalancer struct { closed *grpcsync.Event done *grpcsync.Event + xdsClient xdsclient.XDSClient // xDS client to watch Cluster resource. + attrsWithXDSClient *attributes.Attributes // Attributes with XDSClient attached, to pass on to the children. + // The certificate providers are cached here to that they can be closed when // a new provider is to be created. cachedRoot certprovider.Provider @@ -340,15 +343,8 @@ func (b *cdsBalancer) handleWatchUpdate(update *watchUpdate) { lbCfg.LrsLoadReportingServerName = new(string) } - resolverState := resolver.State{} - // Include the xds client for the child LB policies to use. For unit - // tests, b.xdsClient may not be a full *xdsclient.Client, but it will - // always be in production. - if c, ok := b.xdsClient.(*xdsclient.Client); ok { - resolverState = xdsclient.SetClient(resolverState, c) - } ccState := balancer.ClientConnState{ - ResolverState: resolverState, + ResolverState: resolver.State{Attributes: b.attrsWithXDSClient}, BalancerConfig: lbCfg, } if err := b.edsLB.UpdateClientConnState(ccState); err != nil { @@ -462,6 +458,7 @@ func (b *cdsBalancer) UpdateClientConnState(state balancer.ClientConnState) erro return balancer.ErrBadResolverState } b.xdsClient = c + b.attrsWithXDSClient = state.ResolverState.Attributes } b.logger.Infof("Received update from resolver, balancer config: %+v", pretty.ToJSON(state.BalancerConfig)) diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index 5a798e414fe9..aebe048d48a9 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -28,7 +28,6 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" - "google.golang.org/grpc/attributes" "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal" @@ -129,7 +128,10 @@ func (tb *testEDSBalancer) waitForClientConnUpdate(ctx context.Context, wantCCS return err } gotCCS := ccs.(balancer.ClientConnState) - if !cmp.Equal(gotCCS, wantCCS, cmpopts.IgnoreUnexported(attributes.Attributes{})) { + if xdsclient.FromResolverState(gotCCS.ResolverState) == nil { + return fmt.Errorf("want resolver state with XDSClient attached, got nil") + } + if !cmp.Equal(gotCCS, wantCCS, cmpopts.IgnoreFields(resolver.State{}, "Attributes")) { return fmt.Errorf("received ClientConnState: %+v, want %+v", gotCCS, wantCCS) } return nil @@ -173,7 +175,7 @@ func (tb *testEDSBalancer) waitForClose(ctx context.Context) error { // cdsCCS is a helper function to construct a good update passed from the // xdsResolver to the cdsBalancer. -func cdsCCS(cluster string, xdsC xdsclient.Interface) balancer.ClientConnState { +func cdsCCS(cluster string, xdsC xdsclient.XDSClient) balancer.ClientConnState { const cdsLBConfig = `{ "loadBalancingConfig":[ { diff --git a/xds/internal/balancer/cdsbalancer/cluster_handler.go b/xds/internal/balancer/cdsbalancer/cluster_handler.go index 0ce91fb99eb1..09d945cd0c37 100644 --- a/xds/internal/balancer/cdsbalancer/cluster_handler.go +++ b/xds/internal/balancer/cdsbalancer/cluster_handler.go @@ -40,7 +40,7 @@ type clusterHandler struct { // CDS Balancer cares about is the most recent update. updateChannel chan clusterHandlerUpdate - xdsClient xdsclient.Interface + xdsClient xdsclient.XDSClient } func (ch *clusterHandler) updateRootCluster(rootClusterName string) { @@ -112,7 +112,7 @@ type clusterNode struct { // CreateClusterNode creates a cluster node from a given clusterName. This will // also start the watch for that cluster. -func createClusterNode(clusterName string, xdsClient xdsclient.Interface, topLevelHandler *clusterHandler) *clusterNode { +func createClusterNode(clusterName string, xdsClient xdsclient.XDSClient, topLevelHandler *clusterHandler) *clusterNode { c := &clusterNode{ clusterHandler: topLevelHandler, } diff --git a/xds/internal/balancer/clusterimpl/clusterimpl.go b/xds/internal/balancer/clusterimpl/clusterimpl.go index 29b35999ba97..f5fa7c12589b 100644 --- a/xds/internal/balancer/clusterimpl/clusterimpl.go +++ b/xds/internal/balancer/clusterimpl/clusterimpl.go @@ -95,7 +95,7 @@ type clusterImplBalancer struct { bOpts balancer.BuildOptions logger *grpclog.PrefixLogger - xdsClient xdsclient.Interface + xdsClient xdsclient.XDSClient config *LBConfig childLB balancer.Balancer diff --git a/xds/internal/balancer/edsbalancer/eds.go b/xds/internal/balancer/edsbalancer/eds.go index 4ecba5683e9a..ea11b2f8a257 100644 --- a/xds/internal/balancer/edsbalancer/eds.go +++ b/xds/internal/balancer/edsbalancer/eds.go @@ -124,7 +124,7 @@ type edsBalancer struct { xdsClientUpdate chan *edsUpdate childPolicyUpdate *buffer.Unbounded - xdsClient xdsclient.Interface + xdsClient xdsclient.XDSClient loadWrapper *loadstore.Wrapper config *EDSConfig // may change when passed a different service config edsImpl edsBalancerImplInterface diff --git a/xds/internal/balancer/lrs/balancer.go b/xds/internal/balancer/lrs/balancer.go index f19fc374b013..ed7fb38c8545 100644 --- a/xds/internal/balancer/lrs/balancer.go +++ b/xds/internal/balancer/lrs/balancer.go @@ -157,7 +157,7 @@ func (ccw *ccWrapper) UpdateState(s balancer.State) { } type xdsClientWrapper struct { - c xdsclient.Interface + c xdsclient.XDSClient cancelLoadReport func() clusterName string edsServiceName string @@ -167,7 +167,7 @@ type xdsClientWrapper struct { loadWrapper *loadstore.Wrapper } -func newXDSClientWrapper(c xdsclient.Interface) *xdsClientWrapper { +func newXDSClientWrapper(c xdsclient.XDSClient) *xdsClientWrapper { return &xdsClientWrapper{ c: c, loadWrapper: loadstore.NewWrapper(), diff --git a/xds/internal/resolver/watch_service.go b/xds/internal/resolver/watch_service.go index f1ac4222a5b0..bea5bbcda140 100644 --- a/xds/internal/resolver/watch_service.go +++ b/xds/internal/resolver/watch_service.go @@ -54,7 +54,7 @@ type ldsConfig struct { // Note that during race (e.g. an xDS response is received while the user is // calling cancel()), there's a small window where the callback can be called // after the watcher is canceled. The caller needs to handle this case. -func watchService(c xdsclient.Interface, serviceName string, cb func(serviceUpdate, error), logger *grpclog.PrefixLogger) (cancel func()) { +func watchService(c xdsclient.XDSClient, serviceName string, cb func(serviceUpdate, error), logger *grpclog.PrefixLogger) (cancel func()) { w := &serviceUpdateWatcher{ logger: logger, c: c, @@ -70,7 +70,7 @@ func watchService(c xdsclient.Interface, serviceName string, cb func(serviceUpda // callback at the right time. type serviceUpdateWatcher struct { logger *grpclog.PrefixLogger - c xdsclient.Interface + c xdsclient.XDSClient serviceName string ldsCancel func() serviceCb func(serviceUpdate, error) diff --git a/xds/internal/resolver/xds_resolver.go b/xds/internal/resolver/xds_resolver.go index 0bd95d36b07c..19ee01773e8c 100644 --- a/xds/internal/resolver/xds_resolver.go +++ b/xds/internal/resolver/xds_resolver.go @@ -39,21 +39,21 @@ const xdsScheme = "xds" // the same time. func NewBuilder(config []byte) (resolver.Builder, error) { return &xdsResolverBuilder{ - newXDSClient: func() (xdsclient.Interface, error) { + newXDSClient: func() (xdsclient.XDSClient, error) { return xdsclient.NewClientWithBootstrapContents(config) }, }, nil } // For overriding in unittests. -var newXDSClient = func() (xdsclient.Interface, error) { return xdsclient.New() } +var newXDSClient = func() (xdsclient.XDSClient, error) { return xdsclient.New() } func init() { resolver.Register(&xdsResolverBuilder{}) } type xdsResolverBuilder struct { - newXDSClient func() (xdsclient.Interface, error) + newXDSClient func() (xdsclient.XDSClient, error) } // Build helps implement the resolver.Builder interface. @@ -138,7 +138,7 @@ type xdsResolver struct { logger *grpclog.PrefixLogger // The underlying xdsClient which performs all xDS requests and responses. - client xdsclient.Interface + client xdsclient.XDSClient // A channel for the watch API callback to write service updates on to. The // updates are read by the run goroutine and passed on to the ClientConn. updateCh chan suWithError @@ -185,14 +185,7 @@ func (r *xdsResolver) sendNewServiceConfig(cs *configSelector) bool { state := iresolver.SetConfigSelector(resolver.State{ ServiceConfig: r.cc.ParseServiceConfig(string(sc)), }, cs) - - // Include the xds client for the LB policies to use. For unit tests, - // r.client may not be a full *xdsclient.Client, but it will always be in - // production. - if c, ok := r.client.(*xdsclient.Client); ok { - state = xdsclient.SetClient(state, c) - } - r.cc.UpdateState(state) + r.cc.UpdateState(xdsclient.SetClient(state, r.client)) return true } diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index b8f42648e802..4edf34f41acf 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -114,19 +114,19 @@ func newTestClientConn() *testClientConn { func (s) TestResolverBuilder(t *testing.T) { tests := []struct { name string - xdsClientFunc func() (xdsclient.Interface, error) + xdsClientFunc func() (xdsclient.XDSClient, error) wantErr bool }{ { name: "simple-good", - xdsClientFunc: func() (xdsclient.Interface, error) { + xdsClientFunc: func() (xdsclient.XDSClient, error) { return fakeclient.NewClient(), nil }, wantErr: false, }, { name: "newXDSClient-throws-error", - xdsClientFunc: func() (xdsclient.Interface, error) { + xdsClientFunc: func() (xdsclient.XDSClient, error) { return nil, errors.New("newXDSClient-throws-error") }, wantErr: true, @@ -167,7 +167,7 @@ func (s) TestResolverBuilder_xdsCredsBootstrapMismatch(t *testing.T) { // Fake out the xdsClient creation process by providing a fake, which does // not have any certificate provider configuration. oldClientMaker := newXDSClient - newXDSClient = func() (xdsclient.Interface, error) { + newXDSClient = func() (xdsclient.XDSClient, error) { fc := fakeclient.NewClient() fc.SetBootstrapConfig(&bootstrap.Config{}) return fc, nil @@ -194,7 +194,7 @@ func (s) TestResolverBuilder_xdsCredsBootstrapMismatch(t *testing.T) { } type setupOpts struct { - xdsClientFunc func() (xdsclient.Interface, error) + xdsClientFunc func() (xdsclient.XDSClient, error) } func testSetup(t *testing.T, opts setupOpts) (*xdsResolver, *testClientConn, func()) { @@ -254,7 +254,7 @@ func waitForWatchRouteConfig(ctx context.Context, t *testing.T, xdsC *fakeclient func (s) TestXDSResolverWatchCallbackAfterClose(t *testing.T) { xdsC := fakeclient.NewClient() xdsR, tcc, cancel := testSetup(t, setupOpts{ - xdsClientFunc: func() (xdsclient.Interface, error) { return xdsC, nil }, + xdsClientFunc: func() (xdsclient.XDSClient, error) { return xdsC, nil }, }) defer cancel() @@ -300,7 +300,7 @@ func (s) TestXDSResolverCloseClosesXDSClient(t *testing.T) { func (s) TestXDSResolverBadServiceUpdate(t *testing.T) { xdsC := fakeclient.NewClient() xdsR, tcc, cancel := testSetup(t, setupOpts{ - xdsClientFunc: func() (xdsclient.Interface, error) { return xdsC, nil }, + xdsClientFunc: func() (xdsclient.XDSClient, error) { return xdsC, nil }, }) defer xdsR.Close() defer cancel() @@ -326,7 +326,7 @@ func (s) TestXDSResolverBadServiceUpdate(t *testing.T) { func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) { xdsC := fakeclient.NewClient() xdsR, tcc, cancel := testSetup(t, setupOpts{ - xdsClientFunc: func() (xdsclient.Interface, error) { return xdsC, nil }, + xdsClientFunc: func() (xdsclient.XDSClient, error) { return xdsC, nil }, }) defer xdsR.Close() defer cancel() @@ -460,7 +460,7 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) { func (s) TestXDSResolverRemovedWithRPCs(t *testing.T) { xdsC := fakeclient.NewClient() xdsR, tcc, cancel := testSetup(t, setupOpts{ - xdsClientFunc: func() (xdsclient.Interface, error) { return xdsC, nil }, + xdsClientFunc: func() (xdsclient.XDSClient, error) { return xdsC, nil }, }) defer cancel() defer xdsR.Close() @@ -520,7 +520,7 @@ func (s) TestXDSResolverRemovedWithRPCs(t *testing.T) { func (s) TestXDSResolverRemovedResource(t *testing.T) { xdsC := fakeclient.NewClient() xdsR, tcc, cancel := testSetup(t, setupOpts{ - xdsClientFunc: func() (xdsclient.Interface, error) { return xdsC, nil }, + xdsClientFunc: func() (xdsclient.XDSClient, error) { return xdsC, nil }, }) defer cancel() defer xdsR.Close() @@ -628,7 +628,7 @@ func (s) TestXDSResolverRemovedResource(t *testing.T) { func (s) TestXDSResolverWRR(t *testing.T) { xdsC := fakeclient.NewClient() xdsR, tcc, cancel := testSetup(t, setupOpts{ - xdsClientFunc: func() (xdsclient.Interface, error) { return xdsC, nil }, + xdsClientFunc: func() (xdsclient.XDSClient, error) { return xdsC, nil }, }) defer xdsR.Close() defer cancel() @@ -689,7 +689,7 @@ func (s) TestXDSResolverMaxStreamDuration(t *testing.T) { defer func(old bool) { env.TimeoutSupport = old }(env.TimeoutSupport) xdsC := fakeclient.NewClient() xdsR, tcc, cancel := testSetup(t, setupOpts{ - xdsClientFunc: func() (xdsclient.Interface, error) { return xdsC, nil }, + xdsClientFunc: func() (xdsclient.XDSClient, error) { return xdsC, nil }, }) defer xdsR.Close() defer cancel() @@ -792,7 +792,7 @@ func (s) TestXDSResolverMaxStreamDuration(t *testing.T) { func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) { xdsC := fakeclient.NewClient() xdsR, tcc, cancel := testSetup(t, setupOpts{ - xdsClientFunc: func() (xdsclient.Interface, error) { return xdsC, nil }, + xdsClientFunc: func() (xdsclient.XDSClient, error) { return xdsC, nil }, }) defer xdsR.Close() defer cancel() @@ -941,7 +941,7 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) { func (s) TestXDSResolverGoodUpdateAfterError(t *testing.T) { xdsC := fakeclient.NewClient() xdsR, tcc, cancel := testSetup(t, setupOpts{ - xdsClientFunc: func() (xdsclient.Interface, error) { return xdsC, nil }, + xdsClientFunc: func() (xdsclient.XDSClient, error) { return xdsC, nil }, }) defer xdsR.Close() defer cancel() @@ -995,7 +995,7 @@ func (s) TestXDSResolverGoodUpdateAfterError(t *testing.T) { func (s) TestXDSResolverResourceNotFoundError(t *testing.T) { xdsC := fakeclient.NewClient() xdsR, tcc, cancel := testSetup(t, setupOpts{ - xdsClientFunc: func() (xdsclient.Interface, error) { return xdsC, nil }, + xdsClientFunc: func() (xdsclient.XDSClient, error) { return xdsC, nil }, }) defer xdsR.Close() defer cancel() @@ -1041,7 +1041,7 @@ func (s) TestXDSResolverResourceNotFoundError(t *testing.T) { func (s) TestXDSResolverMultipleLDSUpdates(t *testing.T) { xdsC := fakeclient.NewClient() xdsR, tcc, cancel := testSetup(t, setupOpts{ - xdsClientFunc: func() (xdsclient.Interface, error) { return xdsC, nil }, + xdsClientFunc: func() (xdsclient.XDSClient, error) { return xdsC, nil }, }) defer xdsR.Close() defer cancel() @@ -1216,7 +1216,7 @@ func (s) TestXDSResolverHTTPFilters(t *testing.T) { t.Run(tc.name, func(t *testing.T) { xdsC := fakeclient.NewClient() xdsR, tcc, cancel := testSetup(t, setupOpts{ - xdsClientFunc: func() (xdsclient.Interface, error) { return xdsC, nil }, + xdsClientFunc: func() (xdsclient.XDSClient, error) { return xdsC, nil }, }) defer xdsR.Close() defer cancel() diff --git a/xds/internal/testutils/fakeclient/client.go b/xds/internal/testutils/fakeclient/client.go index 0a120219d81d..638dc7e80abb 100644 --- a/xds/internal/testutils/fakeclient/client.go +++ b/xds/internal/testutils/fakeclient/client.go @@ -32,7 +32,7 @@ import ( // Client is a fake implementation of an xds client. It exposes a bunch of // channels to signal the occurrence of various events. type Client struct { - xdsclient.Interface + xdsclient.XDSClient name string ldsWatchCh *testutils.Channel diff --git a/xds/internal/xdsclient/attributes.go b/xds/internal/xdsclient/attributes.go index 4d1ea72aee50..d2357df0727c 100644 --- a/xds/internal/xdsclient/attributes.go +++ b/xds/internal/xdsclient/attributes.go @@ -27,9 +27,10 @@ type clientKeyType string const clientKey = clientKeyType("grpc.xds.internal.client.Client") -// Interface contains a subset of the *Client methods that are needed by the -// balancers. -type Interface interface { +// XDSClient is a full fledged gRPC client which queries a set of discovery APIs +// (collectively termed as xDS) on a remote management server, to discover +// various dynamic resources. +type XDSClient interface { WatchListener(string, func(ListenerUpdate, error)) func() WatchRouteConfig(string, func(RouteConfigUpdate, error)) func() WatchCluster(string, func(ClusterUpdate, error)) func() @@ -46,13 +47,13 @@ type Interface interface { } // FromResolverState returns the Client from state, or nil if not present. -func FromResolverState(state resolver.State) Interface { - cs, _ := state.Attributes.Value(clientKey).(Interface) +func FromResolverState(state resolver.State) XDSClient { + cs, _ := state.Attributes.Value(clientKey).(XDSClient) return cs } // SetClient sets c in state and returns the new state. -func SetClient(state resolver.State, c Interface) resolver.State { +func SetClient(state resolver.State, c XDSClient) resolver.State { state.Attributes = state.Attributes.WithValues(clientKey, c) return state } diff --git a/xds/internal/xdsclient/client.go b/xds/internal/xdsclient/client.go index ac832f205d59..13ef973807ce 100644 --- a/xds/internal/xdsclient/client.go +++ b/xds/internal/xdsclient/client.go @@ -579,7 +579,7 @@ func newWithConfig(config *bootstrap.Config, watchExpiryTimeout time.Duration) ( // BootstrapConfig returns the configuration read from the bootstrap file. // Callers must treat the return value as read-only. -func (c *Client) BootstrapConfig() *bootstrap.Config { +func (c *clientRefCounted) BootstrapConfig() *bootstrap.Config { return c.config } diff --git a/xds/internal/xdsclient/client_test.go b/xds/internal/xdsclient/client_test.go index c1d4b38e576a..b039e5808b00 100644 --- a/xds/internal/xdsclient/client_test.go +++ b/xds/internal/xdsclient/client_test.go @@ -267,7 +267,7 @@ func (s) TestClientNewSingleton(t *testing.T) { if err != nil { t.Fatalf("failed to create client: %v", err) } - clientImpl := client.clientImpl + clientImpl := client.(*clientRefCounted).clientImpl ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() c, err := apiClientCh.Receive(ctx) @@ -285,10 +285,10 @@ func (s) TestClientNewSingleton(t *testing.T) { client.Close() t.Fatalf("%d-th call to New() failed with error: %v", i, terr) } - if tc.clientImpl != clientImpl { + if tc.(*clientRefCounted).clientImpl != clientImpl { client.Close() tc.Close() - t.Fatalf("%d-th call to New() got a different client %p, want %p", i, tc.clientImpl, clientImpl) + t.Fatalf("%d-th call to New() got a different client %p, want %p", i, tc.(*clientRefCounted).clientImpl, clientImpl) } sctx, scancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) @@ -339,8 +339,8 @@ func (s) TestClientNewSingleton(t *testing.T) { if client2 != client { t.Fatalf("New() after Close() should return the same client wrapper, got different %p, %p", client2, client) } - if client2.clientImpl == clientImpl { - t.Fatalf("New() after Close() should return different client implementation, got the same %p", client2.clientImpl) + if client2.(*clientRefCounted).clientImpl == clientImpl { + t.Fatalf("New() after Close() should return different client implementation, got the same %p", client2.(*clientRefCounted).clientImpl) } if apiClient2 == apiClient { t.Fatalf("New() after Close() should return different API client, got the same %p", apiClient2) diff --git a/xds/internal/xdsclient/singleton.go b/xds/internal/xdsclient/singleton.go index 8d0e10f2c31a..927aa9a70887 100644 --- a/xds/internal/xdsclient/singleton.go +++ b/xds/internal/xdsclient/singleton.go @@ -32,18 +32,14 @@ const defaultWatchExpiryTimeout = 15 * time.Second // This is the Client returned by New(). It contains one client implementation, // and maintains the refcount. -var singletonClient = &Client{} +var singletonClient = &clientRefCounted{} // To override in tests. var bootstrapNewConfig = bootstrap.NewConfig -// Client is a full fledged gRPC client which queries a set of discovery APIs -// (collectively termed as xDS) on a remote management server, to discover -// various dynamic resources. -// -// The xds client is a singleton. It will be shared by the xds resolver and +// clientRefCounted is ref-counted, and to be shared by the xds resolver and // balancer implementations, across multiple ClientConns and Servers. -type Client struct { +type clientRefCounted struct { *clientImpl // This mu protects all the fields, including the embedded clientImpl above. @@ -60,7 +56,7 @@ type Client struct { // Note that the first invocation of New() or NewWithConfig() sets the client // singleton. The following calls will return the singleton xds client without // checking or using the config. -func New() (*Client, error) { +func New() (XDSClient, error) { singletonClient.mu.Lock() defer singletonClient.mu.Unlock() // If the client implementation was created, increment ref count and return @@ -96,7 +92,7 @@ func New() (*Client, error) { // // This function is internal only, for c2p resolver and testing to use. DO NOT // use this elsewhere. Use New() instead. -func NewWithConfig(config *bootstrap.Config) (*Client, error) { +func NewWithConfig(config *bootstrap.Config) (XDSClient, error) { singletonClient.mu.Lock() defer singletonClient.mu.Unlock() // If the client implementation was created, increment ref count and return @@ -120,7 +116,7 @@ func NewWithConfig(config *bootstrap.Config) (*Client, error) { // Close closes the client. It does ref count of the xds client implementation, // and closes the gRPC connection to the management server when ref count // reaches 0. -func (c *Client) Close() { +func (c *clientRefCounted) Close() { c.mu.Lock() defer c.mu.Unlock() c.refCount-- @@ -136,18 +132,18 @@ func (c *Client) Close() { // // Note that this function doesn't set the singleton, so that the testing states // don't leak. -func NewWithConfigForTesting(config *bootstrap.Config, watchExpiryTimeout time.Duration) (*Client, error) { +func NewWithConfigForTesting(config *bootstrap.Config, watchExpiryTimeout time.Duration) (XDSClient, error) { cl, err := newWithConfig(config, watchExpiryTimeout) if err != nil { return nil, err } - return &Client{clientImpl: cl, refCount: 1}, nil + return &clientRefCounted{clientImpl: cl, refCount: 1}, nil } // NewClientWithBootstrapContents returns an xds client for this config, // separate from the global singleton. This should be used for testing // purposes only. -func NewClientWithBootstrapContents(contents []byte) (*Client, error) { +func NewClientWithBootstrapContents(contents []byte) (XDSClient, error) { // Normalize the contents buf := bytes.Buffer{} err := json.Indent(&buf, contents, "", "") @@ -180,12 +176,12 @@ func NewClientWithBootstrapContents(contents []byte) (*Client, error) { return nil, err } - c := &Client{clientImpl: cImpl, refCount: 1} + c := &clientRefCounted{clientImpl: cImpl, refCount: 1} clients[string(contents)] = c return c, nil } var ( - clients = map[string]*Client{} + clients = map[string]*clientRefCounted{} clientsMu sync.Mutex ) diff --git a/xds/internal/xdsclient/tests/dump_test.go b/xds/internal/xdsclient/tests/dump_test.go index 541f5901c121..64c78f672858 100644 --- a/xds/internal/xdsclient/tests/dump_test.go +++ b/xds/internal/xdsclient/tests/dump_test.go @@ -85,6 +85,7 @@ func (s) TestLDSConfigDump(t *testing.T) { t.Fatalf("failed to create client: %v", err) } defer client.Close() + updateHandler := client.(xdsclient.UpdateHandler) // Expected unknown. if err := compareDump(client.DumpLDS, "", map[string]xdsclient.UpdateWithMD{}); err != nil { @@ -111,7 +112,7 @@ func (s) TestLDSConfigDump(t *testing.T) { Raw: r, } } - client.NewListeners(update0, xdsclient.UpdateMetadata{Version: testVersion}) + updateHandler.NewListeners(update0, xdsclient.UpdateMetadata{Version: testVersion}) // Expect ACK. if err := compareDump(client.DumpLDS, testVersion, want0); err != nil { @@ -120,7 +121,7 @@ func (s) TestLDSConfigDump(t *testing.T) { const nackVersion = "lds-version-nack" var nackErr = fmt.Errorf("lds nack error") - client.NewListeners( + updateHandler.NewListeners( map[string]xdsclient.ListenerUpdate{ ldsTargets[0]: {}, }, @@ -195,6 +196,7 @@ func (s) TestRDSConfigDump(t *testing.T) { t.Fatalf("failed to create client: %v", err) } defer client.Close() + updateHandler := client.(xdsclient.UpdateHandler) // Expected unknown. if err := compareDump(client.DumpRDS, "", map[string]xdsclient.UpdateWithMD{}); err != nil { @@ -221,7 +223,7 @@ func (s) TestRDSConfigDump(t *testing.T) { Raw: r, } } - client.NewRouteConfigs(update0, xdsclient.UpdateMetadata{Version: testVersion}) + updateHandler.NewRouteConfigs(update0, xdsclient.UpdateMetadata{Version: testVersion}) // Expect ACK. if err := compareDump(client.DumpRDS, testVersion, want0); err != nil { @@ -230,7 +232,7 @@ func (s) TestRDSConfigDump(t *testing.T) { const nackVersion = "rds-version-nack" var nackErr = fmt.Errorf("rds nack error") - client.NewRouteConfigs( + updateHandler.NewRouteConfigs( map[string]xdsclient.RouteConfigUpdate{ rdsTargets[0]: {}, }, @@ -305,6 +307,7 @@ func (s) TestCDSConfigDump(t *testing.T) { t.Fatalf("failed to create client: %v", err) } defer client.Close() + updateHandler := client.(xdsclient.UpdateHandler) // Expected unknown. if err := compareDump(client.DumpCDS, "", map[string]xdsclient.UpdateWithMD{}); err != nil { @@ -331,7 +334,7 @@ func (s) TestCDSConfigDump(t *testing.T) { Raw: r, } } - client.NewClusters(update0, xdsclient.UpdateMetadata{Version: testVersion}) + updateHandler.NewClusters(update0, xdsclient.UpdateMetadata{Version: testVersion}) // Expect ACK. if err := compareDump(client.DumpCDS, testVersion, want0); err != nil { @@ -340,7 +343,7 @@ func (s) TestCDSConfigDump(t *testing.T) { const nackVersion = "cds-version-nack" var nackErr = fmt.Errorf("cds nack error") - client.NewClusters( + updateHandler.NewClusters( map[string]xdsclient.ClusterUpdate{ cdsTargets[0]: {}, }, @@ -401,6 +404,7 @@ func (s) TestEDSConfigDump(t *testing.T) { t.Fatalf("failed to create client: %v", err) } defer client.Close() + updateHandler := client.(xdsclient.UpdateHandler) // Expected unknown. if err := compareDump(client.DumpEDS, "", map[string]xdsclient.UpdateWithMD{}); err != nil { @@ -427,7 +431,7 @@ func (s) TestEDSConfigDump(t *testing.T) { Raw: r, } } - client.NewEndpoints(update0, xdsclient.UpdateMetadata{Version: testVersion}) + updateHandler.NewEndpoints(update0, xdsclient.UpdateMetadata{Version: testVersion}) // Expect ACK. if err := compareDump(client.DumpEDS, testVersion, want0); err != nil { @@ -436,7 +440,7 @@ func (s) TestEDSConfigDump(t *testing.T) { const nackVersion = "eds-version-nack" var nackErr = fmt.Errorf("eds nack error") - client.NewEndpoints( + updateHandler.NewEndpoints( map[string]xdsclient.EndpointsUpdate{ edsTargets[0]: {}, }, diff --git a/xds/server.go b/xds/server.go index f013037ca020..cfbea1a1bca2 100644 --- a/xds/server.go +++ b/xds/server.go @@ -41,7 +41,7 @@ const serverPrefix = "[xds-server %p] " var ( // These new functions will be overridden in unit tests. - newXDSClient = func() (xdsclient.Interface, error) { + newXDSClient = func() (xdsclient.XDSClient, error) { return xdsclient.New() } newGRPCServer = func(opts ...grpc.ServerOption) grpcServer { @@ -81,7 +81,7 @@ type GRPCServer struct { // beginning of Serve(), where we have to decide if we have to create a // client or use an existing one. clientMu sync.Mutex - xdsC xdsclient.Interface + xdsC xdsclient.XDSClient } // NewGRPCServer creates an xDS-enabled gRPC server using the passed in opts. @@ -147,7 +147,7 @@ func (s *GRPCServer) initXDSClient() error { newXDSClient := newXDSClient if s.opts.bootstrapContents != nil { - newXDSClient = func() (xdsclient.Interface, error) { + newXDSClient = func() (xdsclient.XDSClient, error) { return xdsclient.NewClientWithBootstrapContents(s.opts.bootstrapContents) } } diff --git a/xds/server_test.go b/xds/server_test.go index e82fd182e51d..45df8b76fca9 100644 --- a/xds/server_test.go +++ b/xds/server_test.go @@ -247,7 +247,7 @@ func (p *fakeProvider) Close() { func setupOverrides() (*fakeGRPCServer, *testutils.Channel, func()) { clientCh := testutils.NewChannel() origNewXDSClient := newXDSClient - newXDSClient = func() (xdsclient.Interface, error) { + newXDSClient = func() (xdsclient.XDSClient, error) { c := fakeclient.NewClient() c.SetBootstrapConfig(&bootstrap.Config{ BalancerName: "dummyBalancer", @@ -277,7 +277,7 @@ func setupOverrides() (*fakeGRPCServer, *testutils.Channel, func()) { func setupOverridesForXDSCreds(includeCertProviderCfg bool) (*testutils.Channel, func()) { clientCh := testutils.NewChannel() origNewXDSClient := newXDSClient - newXDSClient = func() (xdsclient.Interface, error) { + newXDSClient = func() (xdsclient.XDSClient, error) { c := fakeclient.NewClient() bc := &bootstrap.Config{ BalancerName: "dummyBalancer", @@ -544,7 +544,7 @@ func (s) TestServeBootstrapConfigInvalid(t *testing.T) { // xdsClient with the specified bootstrap configuration. clientCh := testutils.NewChannel() origNewXDSClient := newXDSClient - newXDSClient = func() (xdsclient.Interface, error) { + newXDSClient = func() (xdsclient.XDSClient, error) { c := fakeclient.NewClient() c.SetBootstrapConfig(test.bootstrapConfig) clientCh.Send(c) @@ -587,7 +587,7 @@ func (s) TestServeBootstrapConfigInvalid(t *testing.T) { // verifies that Server() exits with a non-nil error. func (s) TestServeNewClientFailure(t *testing.T) { origNewXDSClient := newXDSClient - newXDSClient = func() (xdsclient.Interface, error) { + newXDSClient = func() (xdsclient.XDSClient, error) { return nil, errors.New("xdsClient creation failed") } defer func() { newXDSClient = origNewXDSClient }() From b0cb7025907c097ee82c92d54b69817ca7ffa302 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Fri, 4 Jun 2021 15:11:26 -0700 Subject: [PATCH 12/16] [xds_client_in_attributes] csds doc --- xds/csds/csds.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/xds/csds/csds.go b/xds/csds/csds.go index 556075fa50ec..1b54a3a4c6e3 100644 --- a/xds/csds/csds.go +++ b/xds/csds/csds.go @@ -47,13 +47,8 @@ import ( var ( logger = grpclog.Component("xds") newXDSClient = func() xdsclient.XDSClient { - // FIXME: this is no longer necessary. c, err := xdsclient.New() if err != nil { - // If err is not nil, c is a typed nil (of type *xdsclient.Client). - // If c is returned and assigned to the xdsClient field in the CSDS - // server, the nil checks in the handlers will not handle it - // properly. logger.Warningf("failed to create xds client: %v", err) return nil } From 590d05b51ec6b449c5a14587c26a711a3098406c Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Fri, 4 Jun 2021 15:13:47 -0700 Subject: [PATCH 13/16] [xds_client_in_attributes] n --- xds/internal/balancer/cdsbalancer/cdsbalancer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index aebe048d48a9..f36117620e68 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -129,7 +129,7 @@ func (tb *testEDSBalancer) waitForClientConnUpdate(ctx context.Context, wantCCS } gotCCS := ccs.(balancer.ClientConnState) if xdsclient.FromResolverState(gotCCS.ResolverState) == nil { - return fmt.Errorf("want resolver state with XDSClient attached, got nil") + return fmt.Errorf("want resolver state with XDSClient attached, got one without") } if !cmp.Equal(gotCCS, wantCCS, cmpopts.IgnoreFields(resolver.State{}, "Attributes")) { return fmt.Errorf("received ClientConnState: %+v, want %+v", gotCCS, wantCCS) From c88ed76a3ef7b661addeca0a26b6c1deda986ee9 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Fri, 4 Jun 2021 15:20:17 -0700 Subject: [PATCH 14/16] [xds_client_in_attributes] cds unnecessary change reverted --- xds/internal/balancer/cdsbalancer/cdsbalancer.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer.go b/xds/internal/balancer/cdsbalancer/cdsbalancer.go index 73c43c945d1f..a710e4983161 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer.go @@ -22,7 +22,6 @@ import ( "errors" "fmt" - "google.golang.org/grpc/attributes" "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/base" "google.golang.org/grpc/connectivity" @@ -164,6 +163,7 @@ type cdsBalancer struct { ccw *ccWrapper // ClientConn interface passed to child LB. bOpts balancer.BuildOptions // BuildOptions passed to child LB. updateCh *buffer.Unbounded // Channel for gRPC and xdsClient updates. + xdsClient xdsclient.XDSClient // xDS client to watch Cluster resource. cancelWatch func() // Cluster watch cancel func. edsLB balancer.Balancer // EDS child policy. clusterToWatch string @@ -171,9 +171,6 @@ type cdsBalancer struct { closed *grpcsync.Event done *grpcsync.Event - xdsClient xdsclient.XDSClient // xDS client to watch Cluster resource. - attrsWithXDSClient *attributes.Attributes // Attributes with XDSClient attached, to pass on to the children. - // The certificate providers are cached here to that they can be closed when // a new provider is to be created. cachedRoot certprovider.Provider @@ -344,7 +341,7 @@ func (b *cdsBalancer) handleWatchUpdate(update *watchUpdate) { } ccState := balancer.ClientConnState{ - ResolverState: resolver.State{Attributes: b.attrsWithXDSClient}, + ResolverState: xdsclient.SetClient(resolver.State{}, b.xdsClient), BalancerConfig: lbCfg, } if err := b.edsLB.UpdateClientConnState(ccState); err != nil { @@ -458,7 +455,6 @@ func (b *cdsBalancer) UpdateClientConnState(state balancer.ClientConnState) erro return balancer.ErrBadResolverState } b.xdsClient = c - b.attrsWithXDSClient = state.ResolverState.Attributes } b.logger.Infof("Received update from resolver, balancer config: %+v", pretty.ToJSON(state.BalancerConfig)) From 64141d6e9c71a11ac80ec5358b5127039d2c6ec4 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 8 Jun 2021 12:51:55 -0700 Subject: [PATCH 15/16] [xds_client_in_attributes] c1 --- xds/internal/testutils/fakeclient/client.go | 3 +++ xds/internal/xdsclient/client_test.go | 16 ++++++++-------- xds/internal/xdsclient/singleton.go | 11 +++++++++++ 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/xds/internal/testutils/fakeclient/client.go b/xds/internal/testutils/fakeclient/client.go index 638dc7e80abb..2538b59255cf 100644 --- a/xds/internal/testutils/fakeclient/client.go +++ b/xds/internal/testutils/fakeclient/client.go @@ -32,6 +32,9 @@ import ( // Client is a fake implementation of an xds client. It exposes a bunch of // channels to signal the occurrence of various events. type Client struct { + // Embed XDSClient so this fake client implements the interface, but it's + // never set (it's always nil). This may cause nil panic since not all the + // methods are implemented. xdsclient.XDSClient name string diff --git a/xds/internal/xdsclient/client_test.go b/xds/internal/xdsclient/client_test.go index b039e5808b00..12590408e6ca 100644 --- a/xds/internal/xdsclient/client_test.go +++ b/xds/internal/xdsclient/client_test.go @@ -263,11 +263,11 @@ func (s) TestClientNewSingleton(t *testing.T) { defer cleanup() // The first New(). Should create a Client and a new APIClient. - client, err := New() + client, err := newRefCounted() if err != nil { t.Fatalf("failed to create client: %v", err) } - clientImpl := client.(*clientRefCounted).clientImpl + clientImpl := client.clientImpl ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() c, err := apiClientCh.Receive(ctx) @@ -280,15 +280,15 @@ func (s) TestClientNewSingleton(t *testing.T) { // and should not create new API client. const count = 9 for i := 0; i < count; i++ { - tc, terr := New() + tc, terr := newRefCounted() if terr != nil { client.Close() t.Fatalf("%d-th call to New() failed with error: %v", i, terr) } - if tc.(*clientRefCounted).clientImpl != clientImpl { + if tc.clientImpl != clientImpl { client.Close() tc.Close() - t.Fatalf("%d-th call to New() got a different client %p, want %p", i, tc.(*clientRefCounted).clientImpl, clientImpl) + t.Fatalf("%d-th call to New() got a different client %p, want %p", i, tc.clientImpl, clientImpl) } sctx, scancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) @@ -324,7 +324,7 @@ func (s) TestClientNewSingleton(t *testing.T) { // Call New() again after the previous Client is actually closed. Should // create a Client and a new APIClient. - client2, err2 := New() + client2, err2 := newRefCounted() if err2 != nil { t.Fatalf("failed to create client: %v", err) } @@ -339,8 +339,8 @@ func (s) TestClientNewSingleton(t *testing.T) { if client2 != client { t.Fatalf("New() after Close() should return the same client wrapper, got different %p, %p", client2, client) } - if client2.(*clientRefCounted).clientImpl == clientImpl { - t.Fatalf("New() after Close() should return different client implementation, got the same %p", client2.(*clientRefCounted).clientImpl) + if client2.clientImpl == clientImpl { + t.Fatalf("New() after Close() should return different client implementation, got the same %p", client2.clientImpl) } if apiClient2 == apiClient { t.Fatalf("New() after Close() should return different API client, got the same %p", apiClient2) diff --git a/xds/internal/xdsclient/singleton.go b/xds/internal/xdsclient/singleton.go index 927aa9a70887..f045790e2a40 100644 --- a/xds/internal/xdsclient/singleton.go +++ b/xds/internal/xdsclient/singleton.go @@ -57,6 +57,17 @@ type clientRefCounted struct { // singleton. The following calls will return the singleton xds client without // checking or using the config. func New() (XDSClient, error) { + // This cannot just return newRefCounted(), because in error cases, the + // returned nil is a typed nil (*clientRefCounted), which may cause nil + // checks fail. + c, err := newRefCounted() + if err != nil { + return nil, err + } + return c, nil +} + +func newRefCounted() (*clientRefCounted, error) { singletonClient.mu.Lock() defer singletonClient.mu.Unlock() // If the client implementation was created, increment ref count and return From 5afc17c1807333fe9e97ab20b785799cdcdaefb8 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 8 Jun 2021 15:19:34 -0700 Subject: [PATCH 16/16] [xds_client_in_attributes] vet after rebasing on master --- xds/internal/resolver/xds_resolver_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index 4edf34f41acf..a41920998272 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -286,7 +286,7 @@ func (s) TestXDSResolverWatchCallbackAfterClose(t *testing.T) { func (s) TestXDSResolverCloseClosesXDSClient(t *testing.T) { xdsC := fakeclient.NewClient() xdsR, _, cancel := testSetup(t, setupOpts{ - xdsClientFunc: func() (xdsClient, error) { return xdsC, nil }, + xdsClientFunc: func() (xdsclient.XDSClient, error) { return xdsC, nil }, }) defer cancel() xdsR.Close()