diff --git a/internal/testutils/xds/e2e/logging.go b/internal/testutils/xds/e2e/logging.go index 8767ad0f5ed7..ed3a6032f65f 100644 --- a/internal/testutils/xds/e2e/logging.go +++ b/internal/testutils/xds/e2e/logging.go @@ -27,14 +27,14 @@ type serverLogger struct { } func (l serverLogger) Debugf(format string, args ...any) { - l.logger.Logf(format, args) + l.logger.Logf(format, args...) } func (l serverLogger) Infof(format string, args ...any) { - l.logger.Logf(format, args) + l.logger.Logf(format, args...) } func (l serverLogger) Warnf(format string, args ...any) { - l.logger.Logf(format, args) + l.logger.Logf(format, args...) } func (l serverLogger) Errorf(format string, args ...any) { - l.logger.Logf(format, args) + l.logger.Logf(format, args...) } diff --git a/internal/xds/bootstrap/bootstrap.go b/internal/xds/bootstrap/bootstrap.go index 7be267a81273..f73ac864b49f 100644 --- a/internal/xds/bootstrap/bootstrap.go +++ b/internal/xds/bootstrap/bootstrap.go @@ -34,7 +34,6 @@ import ( "google.golang.org/grpc/credentials/tls/certprovider" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/envconfig" - "google.golang.org/grpc/internal/pretty" "google.golang.org/grpc/xds/bootstrap" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/structpb" @@ -213,6 +212,9 @@ func (sc *ServerConfig) Equal(other *ServerConfig) bool { // content. It doesn't cover NodeProto because NodeProto isn't used by // federation. func (sc *ServerConfig) String() string { + if len(sc.serverFeatures) == 0 { + return fmt.Sprintf("%s-%s", sc.serverURI, sc.selectedCreds.String()) + } features := strings.Join(sc.serverFeatures, "-") return strings.Join([]string{sc.serverURI, sc.selectedCreds.String(), features}, "-") } @@ -418,6 +420,12 @@ func (c *Config) Equal(other *Config) bool { return true } +// String returns a string representation of the Config. +func (c *Config) String() string { + s, _ := c.MarshalJSON() + return string(s) +} + // The following fields correspond 1:1 with the JSON schema for Config. type configJSON struct { XDSServers []*ServerConfig `json:"xds_servers,omitempty"` @@ -438,7 +446,7 @@ func (c *Config) MarshalJSON() ([]byte, error) { Authorities: c.authorities, Node: c.node, } - return json.Marshal(config) + return json.MarshalIndent(config, " ", " ") } // UnmarshalJSON takes the json data (the complete bootstrap configuration) and @@ -566,9 +574,7 @@ func newConfigFromContents(data []byte) (*Config, error) { } if logger.V(2) { - logger.Infof("Bootstrap config for creating xds-client: %v", pretty.ToJSON(config)) - } else { - logger.Infof("Bootstrap config for creating xds-client: %+v", config) + logger.Infof("Bootstrap config for creating xds-client: %s", config) } return config, nil } @@ -632,7 +638,7 @@ func NewContentsForTesting(opts ConfigOptionsForTesting) ([]byte, error) { Authorities: authorities, Node: node{ID: opts.NodeID}, } - contents, err := json.Marshal(cfgJSON) + contents, err := json.MarshalIndent(cfgJSON, " ", " ") if err != nil { return nil, fmt.Errorf("failed to marshal bootstrap configuration for provided options %+v: %v", opts, err) } diff --git a/xds/csds/csds.go b/xds/csds/csds.go index 6266f60e86d9..3d8398a72ff0 100644 --- a/xds/csds/csds.go +++ b/xds/csds/csds.go @@ -27,7 +27,6 @@ import ( "context" "fmt" "io" - "sync" "google.golang.org/grpc/codes" "google.golang.org/grpc/grpclog" @@ -55,22 +54,14 @@ func prefixLogger(s *ClientStatusDiscoveryServer) *internalgrpclog.PrefixLogger // https://github.com/grpc/proposal/blob/master/A40-csds-support.md. type ClientStatusDiscoveryServer struct { logger *internalgrpclog.PrefixLogger - - mu sync.Mutex - xdsClient xdsclient.XDSClient - xdsClientClose func() } // NewClientStatusDiscoveryServer returns an implementation of the CSDS server // that can be registered on a gRPC server. func NewClientStatusDiscoveryServer() (*ClientStatusDiscoveryServer, error) { - c, close, err := xdsclient.New() - if err != nil { - logger.Warningf("Failed to create xDS client: %v", err) - } - s := &ClientStatusDiscoveryServer{xdsClient: c, xdsClientClose: close} + s := &ClientStatusDiscoveryServer{} s.logger = prefixLogger(s) - s.logger.Infof("Created CSDS server, with xdsClient %p", c) + s.logger.Infof("Created CSDS server") return s, nil } @@ -104,24 +95,14 @@ func (s *ClientStatusDiscoveryServer) FetchClientStatus(_ context.Context, req * // // If it returns an error, the error is a status error. func (s *ClientStatusDiscoveryServer) buildClientStatusRespForReq(req *v3statuspb.ClientStatusRequest) (*v3statuspb.ClientStatusResponse, error) { - s.mu.Lock() - defer s.mu.Unlock() - - if s.xdsClient == nil { - return &v3statuspb.ClientStatusResponse{}, nil - } // Field NodeMatchers is unsupported, by design // https://github.com/grpc/proposal/blob/master/A40-csds-support.md#detail-node-matching. if len(req.NodeMatchers) != 0 { return nil, status.Errorf(codes.InvalidArgument, "node_matchers are not supported, request contains node_matchers: %v", req.NodeMatchers) } - return s.xdsClient.DumpResources() + return xdsclient.DumpResources(), nil } // Close cleans up the resources. -func (s *ClientStatusDiscoveryServer) Close() { - if s.xdsClientClose != nil { - s.xdsClientClose() - } -} +func (s *ClientStatusDiscoveryServer) Close() {} diff --git a/xds/csds/csds_e2e_test.go b/xds/csds/csds_e2e_test.go index ffa6a7354bc8..aad9b0d14c67 100644 --- a/xds/csds/csds_e2e_test.go +++ b/xds/csds/csds_e2e_test.go @@ -22,7 +22,8 @@ import ( "context" "fmt" "io" - "sort" + "runtime" + "slices" "strings" "testing" "time" @@ -32,6 +33,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/internal/pretty" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" "google.golang.org/grpc/xds/csds" @@ -43,6 +45,7 @@ import ( v3adminpb "github.com/envoyproxy/go-control-plane/envoy/admin/v3" v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" @@ -54,29 +57,6 @@ import ( const defaultTestTimeout = 5 * time.Second -var cmpOpts = cmp.Options{ - cmp.Transformer("sort", func(in []*v3statuspb.ClientConfig_GenericXdsConfig) []*v3statuspb.ClientConfig_GenericXdsConfig { - out := append([]*v3statuspb.ClientConfig_GenericXdsConfig(nil), in...) - sort.Slice(out, func(i, j int) bool { - a, b := out[i], out[j] - if a == nil { - return true - } - if b == nil { - return false - } - if strings.Compare(a.TypeUrl, b.TypeUrl) == 0 { - return strings.Compare(a.Name, b.Name) < 0 - } - return strings.Compare(a.TypeUrl, b.TypeUrl) < 0 - }) - return out - }), - protocmp.Transform(), - protocmp.IgnoreFields((*v3statuspb.ClientConfig_GenericXdsConfig)(nil), "last_updated"), - protocmp.IgnoreFields((*v3adminpb.UpdateFailureState)(nil), "last_update_attempt", "details"), -} - type s struct { grpctest.Tester } @@ -113,34 +93,23 @@ func (unimplementedEndpointsWatcher) OnUpdate(*xdsresource.EndpointsResourceData func (unimplementedEndpointsWatcher) OnError(error) {} func (unimplementedEndpointsWatcher) OnResourceDoesNotExist() {} -func (s) TestCSDS(t *testing.T) { - // Spin up a xDS management server on a local port. - mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{}) - - // Create a bootstrap file in a temporary directory. - nodeID := uuid.New().String() - bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) - testutils.CreateBootstrapFileForTesting(t, bootstrapContents) - - // Create an xDS client. This will end up using the same singleton as used - // by the CSDS service. - xdsC, close, err := xdsclient.New() - if err != nil { - t.Fatalf("Failed to create xDS client: %v", err) - } - defer close() +// Creates a gRPC server and starts serving a CSDS service implementation on it. +// Returns the address of the newly created gRPC server. +// +// Registers cleanup functions on t to stop the gRPC server and the CSDS +// implemenation. +func startCSDSServer(t *testing.T) string { + t.Helper() - // Initialize an gRPC server and register CSDS on it. server := grpc.NewServer() + t.Cleanup(server.Stop) + csdss, err := csds.NewClientStatusDiscoveryServer() if err != nil { - t.Fatal(err) + t.Fatalf("Failed to create CSDS service implementation: %v", err) } v3statuspbgrpc.RegisterClientStatusDiscoveryServiceServer(server, csdss) - defer func() { - server.Stop() - csdss.Close() - }() + t.Cleanup(csdss.Close) // Create a local listener and pass it to Serve(). lis, err := testutils.LocalTCPListener() @@ -152,23 +121,100 @@ func (s) TestCSDS(t *testing.T) { t.Errorf("Serve() failed: %v", err) } }() + return lis.Addr().String() +} - // Create a client to the CSDS server. - conn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) +func startCSDSClientStream(ctx context.Context, t *testing.T, serverAddr string) v3statuspbgrpc.ClientStatusDiscoveryService_StreamClientStatusClient { + conn, err := grpc.NewClient(serverAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { - t.Fatalf("Failed to dial CSDS server %q: %v", lis.Addr().String(), err) + t.Fatalf("Failed to dial CSDS server %q: %v", serverAddr, err) } - c := v3statuspbgrpc.NewClientStatusDiscoveryServiceClient(conn) - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - stream, err := c.StreamClientStatus(ctx, grpc.WaitForReady(true)) + + client := v3statuspbgrpc.NewClientStatusDiscoveryServiceClient(conn) + stream, err := client.StreamClientStatus(ctx, grpc.WaitForReady(true)) if err != nil { t.Fatalf("Failed to create a stream for CSDS: %v", err) } - defer conn.Close() + t.Cleanup(func() { conn.Close() }) + return stream +} + +// Tests CSDS functionality. The test performs the following: +// - Spins up a management server and creates two xDS clients talking to it. +// - Registers a set of watches on the xDS clients, and verifies that the CSDS +// response reports resources in REQUESTED state. +// - Configures resources on the management server corresponding to the ones +// being watched by the clients, and verifies that the CSDS response reports +// resources in ACKED state. +// - Modifies resources on the management server such that some of them are +// expected to be NACKed by the client. Verifies that the CSDS response +// contains some resources in ACKED state and some in NACKED state. +// +// For all of the above operations, the test also verifies that the client_scope +// field in the CSDS response is populated appropriately. +func (s) TestCSDS(t *testing.T) { + // TODO(easwars): Once https://github.com/grpc/grpc/issues/34099 is fixed + // for grpc-go, use resource watchers which are able to control how much we + // read from the management server, and thereby allowing this test to not + // starve in the presence of constant updates from the management server. + if runtime.GOARCH == "arm64" { + t.Skip("Skipping test on arm64 due to https://github.com/envoyproxy/go-control-plane/issues/962") + } + + // Spin up a xDS management server on a local port. + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{}) + + // Create a bootstrap contents pointing to the above management server. + nodeID := uuid.New().String() + bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) + + // Create two xDS clients, with different names. These should end up + // creating two different xDS clients. + const xdsClient1Name = "xds-csds-client-1" + xdsClient1, xdsClose1, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: xdsClient1Name, + Contents: bootstrapContents, + }) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer xdsClose1() + const xdsClient2Name = "xds-csds-client-2" + xdsClient2, xdsClose2, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: xdsClient2Name, + Contents: bootstrapContents, + }) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer xdsClose2() + + // Start a CSDS server and create a client stream to it. + addr := startCSDSServer(t) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + stream := startCSDSClientStream(ctx, t, addr) // Verify that the xDS client reports an empty config. - if err := checkClientStatusResponse(stream, nil); err != nil { + wantNode := &v3corepb.Node{ + Id: nodeID, + UserAgentName: "gRPC Go", + UserAgentVersionType: &v3corepb.Node_UserAgentVersion{UserAgentVersion: grpc.Version}, + ClientFeatures: []string{"envoy.lb.does_not_support_overprovisioning", "xds.config.resource-in-sotw"}, + } + wantResp := &v3statuspb.ClientStatusResponse{ + Config: []*v3statuspb.ClientConfig{ + { + Node: wantNode, + ClientScope: xdsClient1Name, + }, + { + Node: wantNode, + ClientScope: xdsClient2Name, + }, + }, + } + if err := checkClientStatusResponse(ctx, stream, wantResp); err != nil { t.Fatal(err) } @@ -204,43 +250,50 @@ func (s) TestCSDS(t *testing.T) { endpointAnys[i] = testutils.MarshalAny(t, endpoints[i]) } - // Register watches on the xDS client for two resources of each type. - for _, target := range ldsTargets { - xdsresource.WatchListener(xdsC, target, unimplementedListenerWatcher{}) - } - for _, target := range rdsTargets { - xdsresource.WatchRouteConfig(xdsC, target, unimplementedRouteConfigWatcher{}) - } - for _, target := range cdsTargets { - xdsresource.WatchCluster(xdsC, target, unimplementedClusterWatcher{}) - } - for _, target := range edsTargets { - xdsresource.WatchEndpoints(xdsC, target, unimplementedEndpointsWatcher{}) + // Register watches on the xDS clients for two resources of each type. + for _, xdsC := range []xdsclient.XDSClient{xdsClient1, xdsClient2} { + for _, target := range ldsTargets { + xdsresource.WatchListener(xdsC, target, unimplementedListenerWatcher{}) + } + for _, target := range rdsTargets { + xdsresource.WatchRouteConfig(xdsC, target, unimplementedRouteConfigWatcher{}) + } + for _, target := range cdsTargets { + xdsresource.WatchCluster(xdsC, target, unimplementedClusterWatcher{}) + } + for _, target := range edsTargets { + xdsresource.WatchEndpoints(xdsC, target, unimplementedEndpointsWatcher{}) + } } // Verify that the xDS client reports the resources as being in "Requested" - // state. - want := []*v3statuspb.ClientConfig_GenericXdsConfig{} - for i := range ldsTargets { - want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[i], "", v3adminpb.ClientResourceStatus_REQUESTED, nil)) - } - for i := range rdsTargets { - want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[i], "", v3adminpb.ClientResourceStatus_REQUESTED, nil)) - } - for i := range cdsTargets { - want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[i], "", v3adminpb.ClientResourceStatus_REQUESTED, nil)) - } - for i := range edsTargets { - want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[i], "", v3adminpb.ClientResourceStatus_REQUESTED, nil)) - } - for { - if err := ctx.Err(); err != nil { - t.Fatalf("Timeout when waiting for resources in \"Requested\" state: %v", err) - } - if err := checkClientStatusResponse(stream, want); err == nil { - break - } - time.Sleep(time.Millisecond * 100) + // state, and in version "0". + wantConfigs := []*v3statuspb.ClientConfig_GenericXdsConfig{ + makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[0], "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[1], "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[0], "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[1], "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[0], "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[1], "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[0], "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[1], "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil), + } + wantResp = &v3statuspb.ClientStatusResponse{ + Config: []*v3statuspb.ClientConfig{ + { + Node: wantNode, + GenericXdsConfigs: wantConfigs, + ClientScope: xdsClient1Name, + }, + { + Node: wantNode, + GenericXdsConfigs: wantConfigs, + ClientScope: xdsClient2Name, + }, + }, + } + if err := checkClientStatusResponse(ctx, stream, wantResp); err != nil { + t.Fatal(err) } // Configure the management server with two resources of each type, @@ -257,37 +310,40 @@ func (s) TestCSDS(t *testing.T) { // Verify that the xDS client reports the resources as being in "ACKed" // state, and in version "1". - want = nil - for i := range ldsTargets { - want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[i], "1", v3adminpb.ClientResourceStatus_ACKED, listenerAnys[i])) - } - for i := range rdsTargets { - want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[i], "1", v3adminpb.ClientResourceStatus_ACKED, routeAnys[i])) - } - for i := range cdsTargets { - want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[i], "1", v3adminpb.ClientResourceStatus_ACKED, clusterAnys[i])) - } - for i := range edsTargets { - want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[i], "1", v3adminpb.ClientResourceStatus_ACKED, endpointAnys[i])) - } - for { - if err := ctx.Err(); err != nil { - t.Fatalf("Timeout when waiting for resources in \"ACKed\" state: %v", err) - } - err := checkClientStatusResponse(stream, want) - if err == nil { - break - } - time.Sleep(time.Millisecond * 100) + wantConfigs = []*v3statuspb.ClientConfig_GenericXdsConfig{ + makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[0], "1", v3adminpb.ClientResourceStatus_ACKED, clusterAnys[0], nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[1], "1", v3adminpb.ClientResourceStatus_ACKED, clusterAnys[1], nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[0], "1", v3adminpb.ClientResourceStatus_ACKED, endpointAnys[0], nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[1], "1", v3adminpb.ClientResourceStatus_ACKED, endpointAnys[1], nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[0], "1", v3adminpb.ClientResourceStatus_ACKED, listenerAnys[0], nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[1], "1", v3adminpb.ClientResourceStatus_ACKED, listenerAnys[1], nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[0], "1", v3adminpb.ClientResourceStatus_ACKED, routeAnys[0], nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[1], "1", v3adminpb.ClientResourceStatus_ACKED, routeAnys[1], nil), + } + wantResp = &v3statuspb.ClientStatusResponse{ + Config: []*v3statuspb.ClientConfig{ + { + Node: wantNode, + GenericXdsConfigs: wantConfigs, + ClientScope: xdsClient1Name, + }, + { + Node: wantNode, + GenericXdsConfigs: wantConfigs, + ClientScope: xdsClient2Name, + }, + }, + } + if err := checkClientStatusResponse(ctx, stream, wantResp); err != nil { + t.Fatal(err) } // Update the first resource of each type in the management server to a // value which is expected to be NACK'ed by the xDS client. - const nackResourceIdx = 0 - listeners[nackResourceIdx].ApiListener = &v3listenerpb.ApiListener{} - routes[nackResourceIdx].VirtualHosts = []*v3routepb.VirtualHost{{Routes: []*v3routepb.Route{{}}}} - clusters[nackResourceIdx].ClusterDiscoveryType = &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_STATIC} - endpoints[nackResourceIdx].Endpoints = []*v3endpointpb.LocalityLbEndpoints{{}} + listeners[0].ApiListener = &v3listenerpb.ApiListener{} + routes[0].VirtualHosts = []*v3routepb.VirtualHost{{Routes: []*v3routepb.Route{{}}}} + clusters[0].ClusterDiscoveryType = &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_STATIC} + endpoints[0].Endpoints = []*v3endpointpb.LocalityLbEndpoints{{}} if err := mgmtServer.Update(ctx, e2e.UpdateOptions{ NodeID: nodeID, Listeners: listeners, @@ -304,91 +360,98 @@ func (s) TestCSDS(t *testing.T) { // "ACKed" state. The version for the ACKed resource would be "2", while // that for the NACKed resource would be "1". In the NACKed resource, the // version which is NACKed is stored in the ErrorState field. - want = nil - for i := range ldsTargets { - config := makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[i], "2", v3adminpb.ClientResourceStatus_ACKED, listenerAnys[i]) - if i == nackResourceIdx { - config.VersionInfo = "1" - config.ClientStatus = v3adminpb.ClientResourceStatus_NACKED - config.ErrorState = &v3adminpb.UpdateFailureState{VersionInfo: "2"} - } - want = append(want, config) - } - for i := range rdsTargets { - config := makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[i], "2", v3adminpb.ClientResourceStatus_ACKED, routeAnys[i]) - if i == nackResourceIdx { - config.VersionInfo = "1" - config.ClientStatus = v3adminpb.ClientResourceStatus_NACKED - config.ErrorState = &v3adminpb.UpdateFailureState{VersionInfo: "2"} - } - want = append(want, config) - } - for i := range cdsTargets { - config := makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[i], "2", v3adminpb.ClientResourceStatus_ACKED, clusterAnys[i]) - if i == nackResourceIdx { - config.VersionInfo = "1" - config.ClientStatus = v3adminpb.ClientResourceStatus_NACKED - config.ErrorState = &v3adminpb.UpdateFailureState{VersionInfo: "2"} - } - want = append(want, config) - } - for i := range edsTargets { - config := makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[i], "2", v3adminpb.ClientResourceStatus_ACKED, endpointAnys[i]) - if i == nackResourceIdx { - config.VersionInfo = "1" - config.ClientStatus = v3adminpb.ClientResourceStatus_NACKED - config.ErrorState = &v3adminpb.UpdateFailureState{VersionInfo: "2"} - } - want = append(want, config) - } - for { - if err := ctx.Err(); err != nil { - t.Fatalf("Timeout when waiting for resources in \"NACKed\" state: %v", err) - } - err := checkClientStatusResponse(stream, want) - if err == nil { - break - } - time.Sleep(time.Millisecond * 100) + wantConfigs = []*v3statuspb.ClientConfig_GenericXdsConfig{ + makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[0], "1", v3adminpb.ClientResourceStatus_NACKED, clusterAnys[0], &v3adminpb.UpdateFailureState{VersionInfo: "2"}), + makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[1], "2", v3adminpb.ClientResourceStatus_ACKED, clusterAnys[1], nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[0], "1", v3adminpb.ClientResourceStatus_NACKED, endpointAnys[0], &v3adminpb.UpdateFailureState{VersionInfo: "2"}), + makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[1], "2", v3adminpb.ClientResourceStatus_ACKED, endpointAnys[1], nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[0], "1", v3adminpb.ClientResourceStatus_NACKED, listenerAnys[0], &v3adminpb.UpdateFailureState{VersionInfo: "2"}), + makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[1], "2", v3adminpb.ClientResourceStatus_ACKED, listenerAnys[1], nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[0], "1", v3adminpb.ClientResourceStatus_NACKED, routeAnys[0], &v3adminpb.UpdateFailureState{VersionInfo: "2"}), + makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[1], "2", v3adminpb.ClientResourceStatus_ACKED, routeAnys[1], nil), + } + wantResp = &v3statuspb.ClientStatusResponse{ + Config: []*v3statuspb.ClientConfig{ + { + Node: wantNode, + GenericXdsConfigs: wantConfigs, + ClientScope: xdsClient1Name, + }, + { + Node: wantNode, + GenericXdsConfigs: wantConfigs, + ClientScope: xdsClient2Name, + }, + }, + } + if err := checkClientStatusResponse(ctx, stream, wantResp); err != nil { + t.Fatal(err) } } -func makeGenericXdsConfig(typeURL, name, version string, status v3adminpb.ClientResourceStatus, config *anypb.Any) *v3statuspb.ClientConfig_GenericXdsConfig { +func makeGenericXdsConfig(typeURL, name, version string, status v3adminpb.ClientResourceStatus, config *anypb.Any, failure *v3adminpb.UpdateFailureState) *v3statuspb.ClientConfig_GenericXdsConfig { return &v3statuspb.ClientConfig_GenericXdsConfig{ TypeUrl: typeURL, Name: name, VersionInfo: version, ClientStatus: status, XdsConfig: config, + ErrorState: failure, } } -func checkClientStatusResponse(stream v3statuspbgrpc.ClientStatusDiscoveryService_StreamClientStatusClient, want []*v3statuspb.ClientConfig_GenericXdsConfig) error { - if err := stream.Send(&v3statuspb.ClientStatusRequest{Node: nil}); err != nil { - if err != io.EOF { - return fmt.Errorf("failed to send ClientStatusRequest: %v", err) - } - // If the stream has closed, we call Recv() until it returns a non-nil - // error to get the actual error on the stream. - for { - if _, err := stream.Recv(); err != nil { - return fmt.Errorf("failed to recv ClientStatusResponse: %v", err) +// Repeatedly sends CSDS requests and receives CSDS responses on the provided +// stream and verifies that the response matches `want`. Returns an error if +// sending or receiving on the stream fails, or if the context expires before a +// response matching `want` is received. +// +// Expects client configs in `want` to be sorted on `client_scope` and the +// resource dump to be sorted on type_url and resource name. +func checkClientStatusResponse(ctx context.Context, stream v3statuspbgrpc.ClientStatusDiscoveryService_StreamClientStatusClient, want *v3statuspb.ClientStatusResponse) error { + var cmpOpts = cmp.Options{ + protocmp.Transform(), + protocmp.IgnoreFields((*v3statuspb.ClientConfig_GenericXdsConfig)(nil), "last_updated"), + protocmp.IgnoreFields((*v3adminpb.UpdateFailureState)(nil), "last_update_attempt", "details"), + } + + var lastErr error + for ; ctx.Err() == nil; <-time.After(100 * time.Millisecond) { + if err := stream.Send(&v3statuspb.ClientStatusRequest{Node: nil}); err != nil { + if err != io.EOF { + return fmt.Errorf("failed to send ClientStatusRequest: %v", err) + } + // If the stream has closed, we call Recv() until it returns a non-nil + // error to get the actual error on the stream. + for { + if _, err := stream.Recv(); err != nil { + return fmt.Errorf("failed to recv ClientStatusResponse: %v", err) + } } } + got, err := stream.Recv() + if err != nil { + return fmt.Errorf("failed to recv ClientStatusResponse: %v", err) + } + // Sort the client configs based on the `client_scope` field. + slices.SortFunc(got.GetConfig(), func(a, b *v3statuspb.ClientConfig) int { + return strings.Compare(a.ClientScope, b.ClientScope) + }) + // Sort the resource configs based on the type_url and name fields. + for _, cc := range got.GetConfig() { + slices.SortFunc(cc.GetGenericXdsConfigs(), func(a, b *v3statuspb.ClientConfig_GenericXdsConfig) int { + if strings.Compare(a.TypeUrl, b.TypeUrl) == 0 { + return strings.Compare(a.Name, b.Name) + } + return strings.Compare(a.TypeUrl, b.TypeUrl) + }) + } + diff := cmp.Diff(want, got, cmpOpts) + if diff == "" { + return nil + } + lastErr = fmt.Errorf("received unexpected resource dump, diff (-got, +want):\n%s, got: %s\n want:%s", diff, pretty.ToJSON(got), pretty.ToJSON(want)) } - resp, err := stream.Recv() - if err != nil { - return fmt.Errorf("failed to recv ClientStatusResponse: %v", err) - } - - if n := len(resp.Config); n != 1 { - return fmt.Errorf("got %d configs, want 1: %v", n, prototext.Format(resp)) - } - - if diff := cmp.Diff(resp.Config[0].GenericXdsConfigs, want, cmpOpts); diff != "" { - return fmt.Errorf(diff) - } - return nil + return fmt.Errorf("timeout when waiting for resource dump to reach expected state: %v", lastErr) } func (s) TestCSDSNoXDSClient(t *testing.T) { @@ -397,40 +460,11 @@ func (s) TestCSDSNoXDSClient(t *testing.T) { // `server_uri` field is unset. testutils.CreateBootstrapFileForTesting(t, []byte(``)) - // Initialize an gRPC server and register CSDS on it. - server := grpc.NewServer() - csdss, err := csds.NewClientStatusDiscoveryServer() - if err != nil { - t.Fatal(err) - } - defer csdss.Close() - v3statuspbgrpc.RegisterClientStatusDiscoveryServiceServer(server, csdss) - - // Create a local listener and pass it to Serve(). - lis, err := testutils.LocalTCPListener() - if err != nil { - t.Fatalf("testutils.LocalTCPListener() failed: %v", err) - } - go func() { - if err := server.Serve(lis); err != nil { - t.Errorf("Serve() failed: %v", err) - } - }() - defer server.Stop() - - // Create a client to the CSDS server. - conn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - t.Fatalf("Failed to dial CSDS server %q: %v", lis.Addr().String(), err) - } - defer conn.Close() - c := v3statuspbgrpc.NewClientStatusDiscoveryServiceClient(conn) + // Start a CSDS server and create a client stream to it. + addr := startCSDSServer(t) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - stream, err := c.StreamClientStatus(ctx, grpc.WaitForReady(true)) - if err != nil { - t.Fatalf("Failed to create a stream for CSDS: %v", err) - } + stream := startCSDSClientStream(ctx, t, addr) if err := stream.Send(&v3statuspb.ClientStatusRequest{Node: nil}); err != nil { t.Fatalf("Failed to send ClientStatusRequest: %v", err) diff --git a/xds/googledirectpath/googlec2p.go b/xds/googledirectpath/googlec2p.go index 5ea7b60d9d9d..c4ed0506726c 100644 --- a/xds/googledirectpath/googlec2p.go +++ b/xds/googledirectpath/googlec2p.go @@ -62,8 +62,8 @@ var ( randInt = rand.Int - newClientWithConfig = func(config *bootstrap.Config) (xdsclient.XDSClient, func(), error) { - return xdsclient.NewWithConfig(config) + newClientWithConfig = func(name string, config *bootstrap.Config) (xdsclient.XDSClient, func(), error) { + return xdsclient.NewWithConfig(name, config) } logger = internalgrpclog.NewPrefixLogger(grpclog.Component("directpath"), logPrefix) @@ -119,7 +119,7 @@ func (c2pResolverBuilder) Build(t resolver.Target, cc resolver.ClientConn, opts // Create singleton xds client with this config. The xds client will be // used by the xds resolver later. - _, close, err := newClientWithConfig(config) + _, close, err := newClientWithConfig(t.String(), config) if err != nil { return nil, fmt.Errorf("failed to start xDS client: %v", err) } diff --git a/xds/googledirectpath/googlec2p_test.go b/xds/googledirectpath/googlec2p_test.go index 9934d3715d64..96db393729e0 100644 --- a/xds/googledirectpath/googlec2p_test.go +++ b/xds/googledirectpath/googlec2p_test.go @@ -99,7 +99,7 @@ func overrideWithTestXDSClient(t *testing.T) (*testXDSClient, chan *bootstrap.Co xdsC := &testXDSClient{closed: make(chan struct{}, 1)} configCh := make(chan *bootstrap.Config, 1) oldNewClient := newClientWithConfig - newClientWithConfig = func(config *bootstrap.Config) (xdsclient.XDSClient, func(), error) { + newClientWithConfig = func(name string, config *bootstrap.Config) (xdsclient.XDSClient, func(), error) { configCh <- config return xdsC, func() { xdsC.Close() }, nil } diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go index dda56f69cf89..2dbe59a74111 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go @@ -138,7 +138,10 @@ func registerWrappedCDSPolicyWithNewSubConnOverride(t *testing.T, ch chan *xdscr func setupForSecurityTests(t *testing.T, bootstrapContents []byte, clientCreds, serverCreds credentials.TransportCredentials) (*grpc.ClientConn, string) { t.Helper() - xdsClient, xdsClose, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) + xdsClient, xdsClose, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bootstrapContents, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index 1cd91995a5a6..bd9cd5573805 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -233,7 +233,10 @@ func setupWithManagementServer(t *testing.T) (*e2e.ManagementServer, string, *gr nodeID := uuid.New().String() bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) - xdsC, xdsClose, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc}) + xdsC, xdsClose, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -353,7 +356,10 @@ func (s) TestConfigurationUpdate_EmptyCluster(t *testing.T) { nodeID := uuid.New().String() bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) - xdsClient, xdsClose, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc}) + xdsClient, xdsClose, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } diff --git a/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go b/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go index 32e8e3c9cd11..e3cf2a99d5f6 100644 --- a/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go +++ b/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go @@ -1139,7 +1139,11 @@ func (s) TestAggregateCluster_Fallback_EDS_ResourceNotFound(t *testing.T) { // Create an xDS client talking to the above management server, configured // with a short watch expiry timeout. - xdsClient, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents, WatchExpiryTimeout: defaultTestWatchExpiryTimeout}) + xdsClient, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bootstrapContents, + WatchExpiryTimeout: defaultTestWatchExpiryTimeout, + }) if err != nil { t.Fatalf("Failed to create an xDS client: %v", err) } diff --git a/xds/internal/balancer/clusterresolver/e2e_test/balancer_test.go b/xds/internal/balancer/clusterresolver/e2e_test/balancer_test.go index 7f8851a93a23..4c125f344588 100644 --- a/xds/internal/balancer/clusterresolver/e2e_test/balancer_test.go +++ b/xds/internal/balancer/clusterresolver/e2e_test/balancer_test.go @@ -74,7 +74,10 @@ func setupAndDial(t *testing.T, bootstrapContents []byte) (*grpc.ClientConn, fun t.Helper() // Create an xDS client for use by the cluster_resolver LB policy. - xdsC, xdsClose, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) + xdsC, xdsClose, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bootstrapContents, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } diff --git a/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go b/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go index fa4730a8b38c..8fddf0bb055a 100644 --- a/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go +++ b/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go @@ -150,7 +150,10 @@ func (s) TestEDS_OneLocality(t *testing.T) { } // Create an xDS client for use by the cluster_resolver LB policy. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bootstrapContents, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -282,7 +285,10 @@ func (s) TestEDS_MultipleLocalities(t *testing.T) { } // Create an xDS client for use by the cluster_resolver LB policy. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bootstrapContents, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -443,7 +449,10 @@ func (s) TestEDS_EndpointsHealth(t *testing.T) { } // Create an xDS client for use by the cluster_resolver LB policy. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bootstrapContents, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -513,7 +522,10 @@ func (s) TestEDS_EmptyUpdate(t *testing.T) { } // Create an xDS client for use by the cluster_resolver LB policy. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bootstrapContents, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -909,7 +921,10 @@ func (s) TestEDS_BadUpdateWithoutPreviousGoodUpdate(t *testing.T) { } // Create an xDS client for use by the cluster_resolver LB policy. - xdsClient, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) + xdsClient, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bootstrapContents, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -978,7 +993,10 @@ func (s) TestEDS_BadUpdateWithPreviousGoodUpdate(t *testing.T) { } // Create an xDS client for use by the cluster_resolver LB policy. - xdsClient, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) + xdsClient, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bootstrapContents, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -1047,7 +1065,11 @@ func (s) TestEDS_ResourceNotFound(t *testing.T) { // with a short watch expiry timeout. nodeID := uuid.New().String() bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) - xdsClient, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout}) + xdsClient, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + WatchExpiryTimeout: defaultTestWatchExpiryTimeout, + }) if err != nil { t.Fatalf("Failed to create an xDS client: %v", err) } diff --git a/xds/internal/resolver/internal/internal.go b/xds/internal/resolver/internal/internal.go index f505eeb4394e..d9c23278281f 100644 --- a/xds/internal/resolver/internal/internal.go +++ b/xds/internal/resolver/internal/internal.go @@ -26,5 +26,5 @@ var ( NewWRR any // func() wrr.WRR // NewXDSClient is the function used to create a new xDS client. - NewXDSClient any // func() (xdsclient.XDSClient, func(), error) + NewXDSClient any // func(string) (xdsclient.XDSClient, func(), error) ) diff --git a/xds/internal/resolver/xds_resolver.go b/xds/internal/resolver/xds_resolver.go index c9f4a6e58c0f..8d20d5882c38 100644 --- a/xds/internal/resolver/xds_resolver.go +++ b/xds/internal/resolver/xds_resolver.go @@ -49,8 +49,8 @@ const Scheme = "xds" // ClientConns at the same time. func newBuilderForTesting(config []byte) (resolver.Builder, error) { return &xdsResolverBuilder{ - newXDSClient: func() (xdsclient.XDSClient, func(), error) { - return xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: config}) + newXDSClient: func(name string) (xdsclient.XDSClient, func(), error) { + return xdsclient.NewForTesting(xdsclient.OptionsForTesting{Name: name, Contents: config}) }, }, nil } @@ -64,7 +64,7 @@ func init() { } type xdsResolverBuilder struct { - newXDSClient func() (xdsclient.XDSClient, func(), error) + newXDSClient func(string) (xdsclient.XDSClient, func(), error) } // Build helps implement the resolver.Builder interface. @@ -97,11 +97,11 @@ func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientCon r.serializerCancel = cancel // Initialize the xDS client. - newXDSClient := rinternal.NewXDSClient.(func() (xdsclient.XDSClient, func(), error)) + newXDSClient := rinternal.NewXDSClient.(func(string) (xdsclient.XDSClient, func(), error)) if b.newXDSClient != nil { newXDSClient = b.newXDSClient } - client, close, err := newXDSClient() + client, close, err := newXDSClient(target.String()) if err != nil { return nil, fmt.Errorf("xds: failed to create xds-client: %v", err) } diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index 25207334efbd..01440f81c8b0 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -252,9 +252,13 @@ func (s) TestResolverCloseClosesXDSClient(t *testing.T) { // client is closed. origNewClient := rinternal.NewXDSClient closeCh := make(chan struct{}) - rinternal.NewXDSClient = func() (xdsclient.XDSClient, func(), error) { + rinternal.NewXDSClient = func(string) (xdsclient.XDSClient, func(), error) { bc := e2e.DefaultBootstrapContents(t, uuid.New().String(), "dummy-management-server-address") - c, cancel, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc, WatchExpiryTimeout: defaultTestTimeout}) + c, cancel, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + WatchExpiryTimeout: defaultTestTimeout, + }) return c, grpcsync.OnceFunc(func() { close(closeCh) cancel() diff --git a/xds/internal/server/rds_handler_test.go b/xds/internal/server/rds_handler_test.go index b159ef48c6a2..915fedadbbab 100644 --- a/xds/internal/server/rds_handler_test.go +++ b/xds/internal/server/rds_handler_test.go @@ -111,7 +111,10 @@ func xdsSetupForTests(t *testing.T) (*e2e.ManagementServer, string, chan []strin nodeID := uuid.New().String() bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) - xdsC, cancel, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) + xdsC, cancel, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bootstrapContents, + }) if err != nil { t.Fatal(err) } diff --git a/xds/internal/xdsclient/authority.go b/xds/internal/xdsclient/authority.go index e6ffff2bbc65..75500b102466 100644 --- a/xds/internal/xdsclient/authority.go +++ b/xds/internal/xdsclient/authority.go @@ -597,7 +597,7 @@ func (a *authority) reportLoad() (*load.Store, func()) { return a.transport.ReportLoad() } -func (a *authority) dumpResources() ([]*v3statuspb.ClientConfig_GenericXdsConfig, error) { +func (a *authority) dumpResources() []*v3statuspb.ClientConfig_GenericXdsConfig { a.resourcesMu.Lock() defer a.resourcesMu.Unlock() @@ -627,7 +627,7 @@ func (a *authority) dumpResources() ([]*v3statuspb.ClientConfig_GenericXdsConfig ret = append(ret, config) } } - return ret, nil + return ret } func serviceStatusToProto(serviceStatus xdsresource.ServiceStatus) v3adminpb.ClientResourceStatus { diff --git a/xds/internal/xdsclient/client.go b/xds/internal/xdsclient/client.go index 468c5fb31b9b..144cb5bd7686 100644 --- a/xds/internal/xdsclient/client.go +++ b/xds/internal/xdsclient/client.go @@ -24,8 +24,6 @@ import ( "google.golang.org/grpc/internal/xds/bootstrap" "google.golang.org/grpc/xds/internal/xdsclient/load" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" - - v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3" ) // XDSClient is a full fledged gRPC client which queries a set of discovery APIs @@ -48,10 +46,6 @@ type XDSClient interface { // the watcher is canceled. Callers need to handle this case. WatchResource(rType xdsresource.Type, resourceName string, watcher xdsresource.ResourceWatcher) (cancel func()) - // DumpResources returns the status of the xDS resources. Returns a map of - // resource type URLs to a map of resource names to resource state. - DumpResources() (*v3statuspb.ClientStatusResponse, error) - ReportLoad(*bootstrap.ServerConfig) (*load.Store, func()) BootstrapConfig() *bootstrap.Config diff --git a/xds/internal/xdsclient/client_new.go b/xds/internal/xdsclient/client_new.go index 3e0acd5fc6d7..2a4f65199ff7 100644 --- a/xds/internal/xdsclient/client_new.go +++ b/xds/internal/xdsclient/client_new.go @@ -19,9 +19,7 @@ package xdsclient import ( - "bytes" "context" - "encoding/json" "fmt" "sync" "time" @@ -33,44 +31,42 @@ import ( "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" ) +// NameForServer represents the value to be passed as name when creating an xDS +// client from xDS-enabled gRPC servers. This is a well-known dedicated key +// value, and is defined in gRFC A71. +const NameForServer = "#server" + // New returns a new xDS client configured by the bootstrap file specified in env // variable GRPC_XDS_BOOTSTRAP or GRPC_XDS_BOOTSTRAP_CONFIG. // -// The returned client is a reference counted singleton instance. This function -// creates a new client only when one doesn't already exist. +// gRPC client implementations are expected to pass the channel's target URI for +// the name field, while server implementations are expected to pass a dedicated +// well-known value "#server", as specified in gRFC A71. The returned client is +// a reference counted implementation shared among callers using the same name. // // The second return value represents a close function which releases the // caller's reference on the returned client. The caller is expected to invoke // it once they are done using the client. The underlying client will be closed // only when all references are released, and it is safe for the caller to // invoke this close function multiple times. -func New() (XDSClient, func(), error) { - return newRefCountedWithConfig(nil) +func New(name string) (XDSClient, func(), error) { + return NewWithConfig(name, nil) } // NewWithConfig is similar to New, except that it uses the provided bootstrap // configuration to create the xDS client if and only if the bootstrap // environment variables are not defined. // -// The returned client is a reference counted singleton instance. This function -// creates a new client only when one doesn't already exist. -// -// The second return value represents a close function which releases the -// caller's reference on the returned client. The caller is expected to invoke -// it once they are done using the client. The underlying client will be closed -// only when all references are released, and it is safe for the caller to -// invoke this close function multiple times. -// // # Internal Only // // This function should ONLY be used by the internal google-c2p resolver. // DO NOT use this elsewhere. Use New() instead. -func NewWithConfig(config *bootstrap.Config) (XDSClient, func(), error) { - return newRefCountedWithConfig(config) +func NewWithConfig(name string, config *bootstrap.Config) (XDSClient, func(), error) { + return newRefCountedWithConfig(name, config, defaultWatchExpiryTimeout, defaultIdleAuthorityDeleteTimeout) } -// newWithConfig returns a new xdsClient with the given config. -func newWithConfig(config *bootstrap.Config, watchExpiryTimeout time.Duration, idleAuthorityDeleteTimeout time.Duration) (*clientImpl, error) { +// newClientImpl returns a new xdsClient with the given config. +func newClientImpl(config *bootstrap.Config, watchExpiryTimeout time.Duration, idleAuthorityDeleteTimeout time.Duration) (*clientImpl, error) { ctx, cancel := context.WithCancel(context.Background()) c := &clientImpl{ done: grpcsync.NewEvent(), @@ -84,13 +80,14 @@ func newWithConfig(config *bootstrap.Config, watchExpiryTimeout time.Duration, i } c.logger = prefixLogger(c) - c.logger.Infof("Created client to xDS management server: %s", config.XDSServers()[0]) return c, nil } // OptionsForTesting contains options to configure xDS client creation for // testing purposes only. type OptionsForTesting struct { + // Name is a unique name for this xDS client. + Name string // Contents contain a JSON representation of the bootstrap configuration to // be used when creating the xDS client. Contents []byte @@ -114,6 +111,9 @@ type OptionsForTesting struct { // // This function should ONLY be used for testing purposes. func NewForTesting(opts OptionsForTesting) (XDSClient, func(), error) { + if opts.Name == "" { + return nil, nil, fmt.Errorf("opts.Name field must be non-empty") + } if opts.WatchExpiryTimeout == 0 { opts.WatchExpiryTimeout = defaultWatchExpiryTimeout } @@ -121,49 +121,32 @@ func NewForTesting(opts OptionsForTesting) (XDSClient, func(), error) { opts.AuthorityIdleTimeout = defaultIdleAuthorityDeleteTimeout } - // Normalize the input configuration, as this is used as the key in the map - // of xDS clients created for testing. - buf := bytes.Buffer{} - err := json.Indent(&buf, opts.Contents, "", "") + cfg, err := bootstrap.NewConfigFromContents(opts.Contents) if err != nil { - return nil, nil, fmt.Errorf("xds: error normalizing JSON: %v", err) + return nil, nil, err } - opts.Contents = bytes.TrimSpace(buf.Bytes()) + return newRefCountedWithConfig(opts.Name, cfg, opts.WatchExpiryTimeout, opts.AuthorityIdleTimeout) +} +// GetForTesting returns an xDS client created earlier using the given name. +// +// The second return value represents a close function which the caller is +// expected to invoke once they are done using the client. It is safe for the +// caller to invoke this close function multiple times. +// +// # Testing Only +// +// This function should ONLY be used for testing purposes. +func GetForTesting(name string) (XDSClient, func(), error) { clientsMu.Lock() defer clientsMu.Unlock() - var client *clientRefCounted - closeFunc := grpcsync.OnceFunc(func() { - clientsMu.Lock() - defer clientsMu.Unlock() - if client.decrRef() == 0 { - client.close() - delete(clients, string(opts.Contents)) - } - }) - - // If an xDS client exists for the given configuration, increment its - // reference count and return it. - if c := clients[string(opts.Contents)]; c != nil { - c.incrRef() - client = c - return c, closeFunc, nil - } - - // Create a new xDS client for the given configuration - bcfg, err := bootstrap.NewConfigFromContents(opts.Contents) - if err != nil { - return nil, nil, fmt.Errorf("bootstrap config %s: %v", string(opts.Contents), err) - } - cImpl, err := newWithConfig(bcfg, opts.WatchExpiryTimeout, opts.AuthorityIdleTimeout) - if err != nil { - return nil, nil, fmt.Errorf("creating xDS client: %v", err) + c, ok := clients[name] + if !ok { + return nil, nil, fmt.Errorf("xDS client with name %q not found", name) } - client = &clientRefCounted{clientImpl: cImpl, refCount: 1} - clients[string(opts.Contents)] = client - - return client, closeFunc, nil + c.incrRef() + return c, grpcsync.OnceFunc(func() { clientRefCountedClose(name) }), nil } func init() { diff --git a/xds/internal/xdsclient/singleton.go b/xds/internal/xdsclient/client_refcounted.go similarity index 58% rename from xds/internal/xdsclient/singleton.go rename to xds/internal/xdsclient/client_refcounted.go index ce1b8a8f4eb5..72b43faf6be1 100644 --- a/xds/internal/xdsclient/singleton.go +++ b/xds/internal/xdsclient/client_refcounted.go @@ -20,7 +20,6 @@ package xdsclient import ( "fmt" - "sync" "sync/atomic" "time" @@ -35,40 +34,43 @@ const ( ) var ( - // This is the client returned by New(). It contains one client implementation, - // and maintains the refcount. - singletonMu sync.Mutex - singletonClient *clientRefCounted - // The following functions are no-ops in the actual code, but can be // overridden in tests to give them visibility into certain events. - singletonClientImplCreateHook = func() {} - singletonClientImplCloseHook = func() {} + xdsClientImplCreateHook = func(name string) {} + xdsClientImplCloseHook = func(name string) {} ) -// To override in tests. -var bootstrapNewConfig = bootstrap.NewConfig - -func clientRefCountedClose() { - singletonMu.Lock() - defer singletonMu.Unlock() +func clientRefCountedClose(name string) { + clientsMu.Lock() + defer clientsMu.Unlock() - if singletonClient.decrRef() != 0 { + client, ok := clients[name] + if !ok { + logger.Errorf("Attempt to close a non-existent xDS client with name %s", name) return } - singletonClient.clientImpl.close() - singletonClientImplCloseHook() - singletonClient = nil -} - -func newRefCountedWithConfig(fallbackConfig *bootstrap.Config) (XDSClient, func(), error) { - singletonMu.Lock() - defer singletonMu.Unlock() + if client.decrRef() != 0 { + return + } + client.clientImpl.close() + xdsClientImplCloseHook(name) + delete(clients, name) - if singletonClient != nil { - singletonClient.incrRef() - return singletonClient, grpcsync.OnceFunc(clientRefCountedClose), nil +} +// newRefCountedWithConfig creates a new reference counted xDS client +// implementation for name, if one does not exist already. If an xDS client for +// the given name exists, it gets a reference to it and returns it. +// +// The passed in fallback config is used when bootstrap environment variables +// are not defined. +func newRefCountedWithConfig(name string, fallbackConfig *bootstrap.Config, watchExpiryTimeout, idleAuthorityTimeout time.Duration) (XDSClient, func(), error) { + clientsMu.Lock() + defer clientsMu.Unlock() + + if c := clients[name]; c != nil { + c.incrRef() + return c, grpcsync.OnceFunc(func() { clientRefCountedClose(name) }), nil } // Use fallbackConfig only if bootstrap env vars are unspecified. @@ -80,22 +82,24 @@ func newRefCountedWithConfig(fallbackConfig *bootstrap.Config) (XDSClient, func( config = fallbackConfig } else { var err error - config, err = bootstrapNewConfig() + config, err = bootstrap.NewConfig() if err != nil { return nil, nil, fmt.Errorf("xds: failed to read bootstrap file: %v", err) } } // Create the new client implementation. - c, err := newWithConfig(config, defaultWatchExpiryTimeout, defaultIdleAuthorityDeleteTimeout) + c, err := newClientImpl(config, watchExpiryTimeout, idleAuthorityTimeout) if err != nil { return nil, nil, err } - singletonClient = &clientRefCounted{clientImpl: c, refCount: 1} - singletonClientImplCreateHook() + c.logger.Infof("Created client with name %q to primary xDS management server: %q", name, config.XDSServers()[0]) + client := &clientRefCounted{clientImpl: c, refCount: 1} + clients[name] = client + xdsClientImplCreateHook(name) logger.Infof("xDS node ID: %s", config.Node().GetId()) - return singletonClient, grpcsync.OnceFunc(clientRefCountedClose), nil + return client, grpcsync.OnceFunc(func() { clientRefCountedClose(name) }), nil } // clientRefCounted is ref-counted, and to be shared by the xds resolver and diff --git a/xds/internal/xdsclient/client_refcounted_test.go b/xds/internal/xdsclient/client_refcounted_test.go new file mode 100644 index 000000000000..0824ad385b26 --- /dev/null +++ b/xds/internal/xdsclient/client_refcounted_test.go @@ -0,0 +1,267 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xdsclient + +import ( + "context" + "sync" + "testing" + + "github.com/google/uuid" + "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/xds/e2e" +) + +// Tests that multiple calls to New() with the same name returns the same +// client. Also verifies that only when all references to the newly created +// client are released, the underlying client is closed. +func (s) TestClientNew_Single(t *testing.T) { + // Create a bootstrap configuration, place it in a file in the temp + // directory, and set the bootstrap env vars to point to it. + nodeID := uuid.New().String() + contents := e2e.DefaultBootstrapContents(t, nodeID, "non-existent-server-address") + testutils.CreateBootstrapFileForTesting(t, contents) + + // Override the client creation hook to get notified. + origClientImplCreateHook := xdsClientImplCreateHook + clientImplCreateCh := testutils.NewChannel() + xdsClientImplCreateHook = func(name string) { + clientImplCreateCh.Replace(name) + } + defer func() { xdsClientImplCreateHook = origClientImplCreateHook }() + + // Override the client close hook to get notified. + origClientImplCloseHook := xdsClientImplCloseHook + clientImplCloseCh := testutils.NewChannel() + xdsClientImplCloseHook = func(name string) { + clientImplCloseCh.Replace(name) + } + defer func() { xdsClientImplCloseHook = origClientImplCloseHook }() + + // The first call to New() should create a new client. + _, closeFunc, err := New(t.Name()) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if _, err := clientImplCreateCh.Receive(ctx); err != nil { + t.Fatalf("Timeout when waiting for xDS client to be created: %v", err) + } + + // Calling New() again should not create new client implementations. + const count = 9 + closeFuncs := make([]func(), count) + for i := 0; i < count; i++ { + func() { + _, closeFuncs[i], err = New(t.Name()) + if err != nil { + t.Fatalf("%d-th call to New() failed with error: %v", i, err) + } + + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + if _, err := clientImplCreateCh.Receive(sCtx); err == nil { + t.Fatalf("%d-th call to New() created a new client", i) + } + }() + } + + // Call Close() multiple times on each of the clients created in the above + // for loop. Close() calls are idempotent, and the underlying client + // implementation will not be closed until we release the first reference we + // acquired above, via the first call to New(). + for i := 0; i < count; i++ { + func() { + closeFuncs[i]() + closeFuncs[i]() + + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + if _, err := clientImplCloseCh.Receive(sCtx); err == nil { + t.Fatal("Client implementation closed before all references are released") + } + }() + } + + // Call the last Close(). The underlying implementation should be closed. + closeFunc() + if _, err := clientImplCloseCh.Receive(ctx); err != nil { + t.Fatalf("Timeout waiting for client implementation to be closed: %v", err) + } + + // Calling New() again, after the previous Client was actually closed, + // should create a new one. + _, closeFunc, err = New(t.Name()) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer closeFunc() + if _, err := clientImplCreateCh.Receive(ctx); err != nil { + t.Fatalf("Timeout when waiting for xDS client to be created: %v", err) + } +} + +// Tests the scenario where there are multiple calls to New() with different +// names. Verifies that reference counts are tracked correctly for each client +// and that only when all references are released for a client, it is closed. +func (s) TestClientNew_Multiple(t *testing.T) { + // Create a bootstrap configuration, place it in a file in the temp + // directory, and set the bootstrap env vars to point to it. + nodeID := uuid.New().String() + contents := e2e.DefaultBootstrapContents(t, nodeID, "non-existent-server-address") + testutils.CreateBootstrapFileForTesting(t, contents) + + // Override the client creation hook to get notified. + origClientImplCreateHook := xdsClientImplCreateHook + clientImplCreateCh := testutils.NewChannel() + xdsClientImplCreateHook = func(name string) { + clientImplCreateCh.Replace(name) + } + defer func() { xdsClientImplCreateHook = origClientImplCreateHook }() + + // Override the client close hook to get notified. + origClientImplCloseHook := xdsClientImplCloseHook + clientImplCloseCh := testutils.NewChannel() + xdsClientImplCloseHook = func(name string) { + clientImplCloseCh.Replace(name) + } + defer func() { xdsClientImplCloseHook = origClientImplCloseHook }() + + // Create two xDS clients. + client1Name := t.Name() + "-1" + _, closeFunc1, err := New(client1Name) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + name, err := clientImplCreateCh.Receive(ctx) + if err != nil { + t.Fatalf("Timeout when waiting for xDS client to be created: %v", err) + } + if name.(string) != client1Name { + t.Fatalf("xDS client created for name %q, want %q", name.(string), client1Name) + } + + client2Name := t.Name() + "-2" + _, closeFunc2, err := New(client2Name) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + name, err = clientImplCreateCh.Receive(ctx) + if err != nil { + t.Fatalf("Timeout when waiting for xDS client to be created: %v", err) + } + if name.(string) != client2Name { + t.Fatalf("xDS client created for name %q, want %q", name.(string), client1Name) + } + + // Create N more references to each of these clients. + const count = 9 + closeFuncs1 := make([]func(), count) + closeFuncs2 := make([]func(), count) + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + for i := 0; i < count; i++ { + var err error + _, closeFuncs1[i], err = New(client1Name) + if err != nil { + t.Errorf("%d-th call to New() failed with error: %v", i, err) + } + } + }() + go func() { + defer wg.Done() + for i := 0; i < count; i++ { + var err error + _, closeFuncs2[i], err = New(client2Name) + if err != nil { + t.Errorf("%d-th call to New() failed with error: %v", i, err) + } + } + }() + wg.Wait() + if t.Failed() { + t.FailNow() + } + + // Ensure that none of the create hooks are called. + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + if _, err := clientImplCreateCh.Receive(sCtx); err == nil { + t.Fatalf("New xDS client created when expected to reuse an existing one") + } + + // The close function returned by New() is idempotent and calling it + // multiple times should not decrement the reference count multiple times. + for i := 0; i < count; i++ { + closeFuncs1[i]() + closeFuncs1[i]() + } + sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + if _, err := clientImplCloseCh.Receive(sCtx); err == nil { + t.Fatal("Client implementation closed before all references are released") + } + + // Release the last reference and verify that the client is closed + // completely. + closeFunc1() + name, err = clientImplCloseCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for xDS client to be closed completely") + } + if name.(string) != client1Name { + t.Fatalf("xDS client closed for name %q, want %q", name.(string), client1Name) + } + + // Ensure that the close hook is not called for the second client. + sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + if _, err := clientImplCloseCh.Receive(sCtx); err == nil { + t.Fatal("Client implementation closed before all references are released") + } + + // The close function returned by New() is idempotent and calling it + // multiple times should not decrement the reference count multiple times. + for i := 0; i < count; i++ { + closeFuncs2[i]() + closeFuncs2[i]() + } + sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + if _, err := clientImplCloseCh.Receive(sCtx); err == nil { + t.Fatal("Client implementation closed before all references are released") + } + + // Release the last reference and verify that the client is closed + // completely. + closeFunc2() + name, err = clientImplCloseCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for xDS client to be closed completely") + } + if name.(string) != client2Name { + t.Fatalf("xDS client closed for name %q, want %q", name.(string), client2Name) + } +} diff --git a/xds/internal/xdsclient/clientimpl_dump.go b/xds/internal/xdsclient/clientimpl_dump.go index 0fa75fc6bdc7..f4d7b0a0115c 100644 --- a/xds/internal/xdsclient/clientimpl_dump.go +++ b/xds/internal/xdsclient/clientimpl_dump.go @@ -22,27 +22,32 @@ import ( v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3" ) -// DumpResources returns the status and contents of all xDS resources. -func (c *clientImpl) DumpResources() (*v3statuspb.ClientStatusResponse, error) { +// dumpResources returns the status and contents of all xDS resources. +func (c *clientImpl) dumpResources() *v3statuspb.ClientConfig { c.authorityMu.Lock() defer c.authorityMu.Unlock() var retCfg []*v3statuspb.ClientConfig_GenericXdsConfig for _, a := range c.authorities { - cfg, err := a.dumpResources() - if err != nil { - return nil, err - } - retCfg = append(retCfg, cfg...) + retCfg = append(retCfg, a.dumpResources()...) + } + + return &v3statuspb.ClientConfig{ + Node: c.config.Node(), + GenericXdsConfigs: retCfg, } +} - return &v3statuspb.ClientStatusResponse{ - Config: []*v3statuspb.ClientConfig{ - { - // TODO: Populate ClientScope. Need to update go-control-plane dependency. - Node: c.config.Node(), - GenericXdsConfigs: retCfg, - }, - }, - }, nil +// DumpResources returns the status and contents of all xDS resources. +func DumpResources() *v3statuspb.ClientStatusResponse { + clientsMu.Lock() + defer clientsMu.Unlock() + + resp := &v3statuspb.ClientStatusResponse{} + for key, client := range clients { + cfg := client.dumpResources() + cfg.ClientScope = key + resp.Config = append(resp.Config, cfg) + } + return resp } diff --git a/xds/internal/xdsclient/clientimpl_loadreport.go b/xds/internal/xdsclient/clientimpl_loadreport.go index ff2f5e9d6728..b42e43a56976 100644 --- a/xds/internal/xdsclient/clientimpl_loadreport.go +++ b/xds/internal/xdsclient/clientimpl_loadreport.go @@ -32,7 +32,7 @@ func (c *clientImpl) ReportLoad(server *bootstrap.ServerConfig) (*load.Store, fu a, err := c.newAuthorityLocked(server) if err != nil { c.authorityMu.Unlock() - c.logger.Infof("xds: failed to connect to the control plane to do load reporting for authority %q: %v", server, err) + c.logger.Warningf("Failed to connect to the management server to report load for authority %q: %v", server, err) return nil, func() {} } // Hold the ref before starting load reporting. diff --git a/xds/internal/xdsclient/loadreport_test.go b/xds/internal/xdsclient/loadreport_test.go index 4d095b64be80..90e29f65a80a 100644 --- a/xds/internal/xdsclient/loadreport_test.go +++ b/xds/internal/xdsclient/loadreport_test.go @@ -49,7 +49,11 @@ func (s) TestLRSClient(t *testing.T) { t.Fatalf("Failed to create server config for testing: %v", err) } bc := e2e.DefaultBootstrapContents(t, nodeID, fs1.Address) - xdsC, close, err := NewForTesting(OptionsForTesting{Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout}) + xdsC, close, err := NewForTesting(OptionsForTesting{ + Name: t.Name(), + Contents: bc, + WatchExpiryTimeout: defaultTestWatchExpiryTimeout, + }) if err != nil { t.Fatalf("Failed to create an xDS client: %v", err) } diff --git a/xds/internal/xdsclient/singleton_test.go b/xds/internal/xdsclient/singleton_test.go deleted file mode 100644 index 9890528030b7..000000000000 --- a/xds/internal/xdsclient/singleton_test.go +++ /dev/null @@ -1,118 +0,0 @@ -/* - * - * Copyright 2022 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package xdsclient - -import ( - "context" - "testing" - - "github.com/google/uuid" - "google.golang.org/grpc/internal/testutils" - "google.golang.org/grpc/internal/testutils/xds/e2e" -) - -// Test that multiple New() returns the same Client. And only when the last -// client is closed, the underlying client is closed. -func (s) TestClientNewSingleton(t *testing.T) { - // Create a bootstrap configuration, place it in a file in the temp - // directory, and set the bootstrap env vars to point to it. - nodeID := uuid.New().String() - contents := e2e.DefaultBootstrapContents(t, nodeID, "non-existent-server-address") - testutils.CreateBootstrapFileForTesting(t, contents) - - // Override the singleton creation hook to get notified. - origSingletonClientImplCreateHook := singletonClientImplCreateHook - singletonCreationCh := testutils.NewChannel() - singletonClientImplCreateHook = func() { - singletonCreationCh.Replace(nil) - } - defer func() { singletonClientImplCreateHook = origSingletonClientImplCreateHook }() - - // Override the singleton close hook to get notified. - origSingletonClientImplCloseHook := singletonClientImplCloseHook - singletonCloseCh := testutils.NewChannel() - singletonClientImplCloseHook = func() { - singletonCloseCh.Replace(nil) - } - defer func() { singletonClientImplCloseHook = origSingletonClientImplCloseHook }() - - // The first call to New() should create a new singleton client. - _, closeFunc, err := New() - if err != nil { - t.Fatalf("failed to create xDS client: %v", err) - } - - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - if _, err := singletonCreationCh.Receive(ctx); err != nil { - t.Fatalf("Timeout when waiting for singleton xDS client to be created: %v", err) - } - - // Calling New() again should not create new singleton client implementations. - const count = 9 - closeFuncs := make([]func(), 9) - for i := 0; i < count; i++ { - func() { - _, closeFuncs[i], err = New() - if err != nil { - t.Fatalf("%d-th call to New() failed with error: %v", i, err) - } - - sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) - defer sCancel() - if _, err := singletonCreationCh.Receive(sCtx); err == nil { - t.Fatalf("%d-th call to New() created a new singleton client", i) - } - }() - } - - // Call Close() multiple times on each of the clients created in the above for - // loop. Close() calls are idempotent, and the underlying client - // implementation will not be closed until we release the first reference we - // acquired above, via the first call to New(). - for i := 0; i < count; i++ { - func() { - closeFuncs[i]() - closeFuncs[i]() - - sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) - defer sCancel() - if _, err := singletonCloseCh.Receive(sCtx); err == nil { - t.Fatal("singleton client implementation closed before all references are released") - } - }() - } - - // Call the last Close(). The underlying implementation should be closed. - closeFunc() - if _, err := singletonCloseCh.Receive(ctx); err != nil { - t.Fatalf("Timeout waiting for singleton client implementation to be closed: %v", err) - } - - // Calling New() again, after the previous Client was actually closed, should - // create a new one. - _, closeFunc, err = New() - if err != nil { - t.Fatalf("failed to create client: %v", err) - } - defer closeFunc() - if _, err := singletonCreationCh.Receive(ctx); err != nil { - t.Fatalf("Timeout when waiting for singleton xDS client to be created: %v", err) - } -} diff --git a/xds/internal/xdsclient/tests/authority_test.go b/xds/internal/xdsclient/tests/authority_test.go index aa46e57888be..c2264e58a373 100644 --- a/xds/internal/xdsclient/tests/authority_test.go +++ b/xds/internal/xdsclient/tests/authority_test.go @@ -101,7 +101,12 @@ func setupForAuthorityTests(ctx context.Context, t *testing.T, idleTimeout time. if err != nil { t.Fatalf("Failed to create bootstrap configuration: %v", err) } - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents, WatchExpiryTimeout: defaultTestWatchExpiryTimeout, AuthorityIdleTimeout: idleTimeout}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bootstrapContents, + WatchExpiryTimeout: defaultTestWatchExpiryTimeout, + AuthorityIdleTimeout: idleTimeout, + }) if err != nil { t.Fatalf("Failed to create an xDS client: %v", err) } diff --git a/xds/internal/xdsclient/tests/cds_watchers_test.go b/xds/internal/xdsclient/tests/cds_watchers_test.go index fe362e66eea0..4537c33931ac 100644 --- a/xds/internal/xdsclient/tests/cds_watchers_test.go +++ b/xds/internal/xdsclient/tests/cds_watchers_test.go @@ -200,7 +200,10 @@ func (s) TestCDSWatch(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -347,7 +350,10 @@ func (s) TestCDSWatch_TwoWatchesForSameResourceName(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -447,7 +453,10 @@ func (s) TestCDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -541,7 +550,10 @@ func (s) TestCDSWatch_ResourceCaching(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -611,7 +623,11 @@ func (s) TestCDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) { bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) testutils.CreateBootstrapFileForTesting(t, bc) - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + WatchExpiryTimeout: defaultTestWatchExpiryTimeout, + }) if err != nil { t.Fatalf("Failed to create an xDS client: %v", err) } @@ -647,7 +663,11 @@ func (s) TestCDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) { bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) testutils.CreateBootstrapFileForTesting(t, bc) - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + WatchExpiryTimeout: defaultTestWatchExpiryTimeout, + }) if err != nil { t.Fatalf("Failed to create an xDS client: %v", err) } @@ -726,7 +746,10 @@ func (s) TestCDSWatch_ResourceRemoved(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -835,7 +858,10 @@ func (s) TestCDSWatch_NACKError(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -901,7 +927,10 @@ func (s) TestCDSWatch_PartialValid(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -990,7 +1019,10 @@ func (s) TestCDSWatch_PartialResponse(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } diff --git a/xds/internal/xdsclient/tests/dump_test.go b/xds/internal/xdsclient/tests/dump_test.go index 3f5a7f3c46c8..d046f9c7ca0f 100644 --- a/xds/internal/xdsclient/tests/dump_test.go +++ b/xds/internal/xdsclient/tests/dump_test.go @@ -20,24 +20,85 @@ package xdsclient_test import ( "context" + "encoding/json" + "fmt" + "slices" + "strings" "testing" + "time" + "github.com/google/go-cmp/cmp" + "github.com/google/uuid" + "google.golang.org/grpc" + "google.golang.org/grpc/internal/pretty" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/xds/bootstrap" "google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" + "google.golang.org/protobuf/testing/protocmp" "google.golang.org/protobuf/types/known/anypb" v3adminpb "github.com/envoyproxy/go-control-plane/envoy/admin/v3" v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + v3routerpb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3" + v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3" - "github.com/google/uuid" + "github.com/envoyproxy/go-control-plane/pkg/wellknown" ) -func (s) TestDumpResources(t *testing.T) { +func makeGenericXdsConfig(typeURL, name, version string, status v3adminpb.ClientResourceStatus, config *anypb.Any, failure *v3adminpb.UpdateFailureState) *v3statuspb.ClientConfig_GenericXdsConfig { + return &v3statuspb.ClientConfig_GenericXdsConfig{ + TypeUrl: typeURL, + Name: name, + VersionInfo: version, + ClientStatus: status, + XdsConfig: config, + ErrorState: failure, + } +} + +func checkResourceDump(ctx context.Context, want *v3statuspb.ClientStatusResponse) error { + var cmpOpts = cmp.Options{ + protocmp.Transform(), + protocmp.IgnoreFields((*v3statuspb.ClientConfig_GenericXdsConfig)(nil), "last_updated"), + protocmp.IgnoreFields((*v3adminpb.UpdateFailureState)(nil), "last_update_attempt", "details"), + } + + var lastErr error + for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) { + got := xdsclient.DumpResources() + // Sort the client configs based on the `client_scope` field. + slices.SortFunc(got.GetConfig(), func(a, b *v3statuspb.ClientConfig) int { + return strings.Compare(a.ClientScope, b.ClientScope) + }) + // Sort the resource configs based on the type_url and name fields. + for _, cc := range got.GetConfig() { + slices.SortFunc(cc.GetGenericXdsConfigs(), func(a, b *v3statuspb.ClientConfig_GenericXdsConfig) int { + if strings.Compare(a.TypeUrl, b.TypeUrl) == 0 { + return strings.Compare(a.Name, b.Name) + } + return strings.Compare(a.TypeUrl, b.TypeUrl) + }) + } + diff := cmp.Diff(want, got, cmpOpts) + if diff == "" { + return nil + } + lastErr = fmt.Errorf("received unexpected resource dump, diff (-got, +want):\n%s, got: %s\n want:%s", diff, pretty.ToJSON(got), pretty.ToJSON(want)) + } + return fmt.Errorf("timeout when waiting for resource dump to reach expected state: %v", lastErr) +} + +// Tests the scenario where there are multiple xDS clients talking to the same +// management server, and requesting the same set of resources. Verifies that +// under all circumstances, both xDS clients receive the same configuration from +// the server. +func (s) TestDumpResources_ManyToOne(t *testing.T) { // Initialize the xDS resources to be used in this test. ldsTargets := []string{"lds.target.good:0000", "lds.target.good:1111"} rdsTargets := []string{"route-config-0", "route-config-1"} @@ -77,76 +138,91 @@ func (s) TestDumpResources(t *testing.T) { bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) testutils.CreateBootstrapFileForTesting(t, bc) - // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc}) + // Create two xDS clients with the above bootstrap contents. + client1Name := t.Name() + "-1" + client1, close1, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: client1Name, + Contents: bc, + }) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer close1() + client2Name := t.Name() + "-2" + client2, close2, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: client2Name, + Contents: bc, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } - defer close() + defer close2() // Dump resources and expect empty configs. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - if err := compareUpdateMetadata(ctx, client.DumpResources, nil); err != nil { + wantNode := &v3corepb.Node{ + Id: nodeID, + UserAgentName: "gRPC Go", + UserAgentVersionType: &v3corepb.Node_UserAgentVersion{UserAgentVersion: grpc.Version}, + ClientFeatures: []string{"envoy.lb.does_not_support_overprovisioning", "xds.config.resource-in-sotw"}, + } + wantResp := &v3statuspb.ClientStatusResponse{ + Config: []*v3statuspb.ClientConfig{ + { + Node: wantNode, + ClientScope: client1Name, + }, + { + Node: wantNode, + ClientScope: client2Name, + }, + }, + } + if err := checkResourceDump(ctx, wantResp); err != nil { t.Fatal(err) } // Register watches, dump resources and expect configs in requested state. - for _, target := range ldsTargets { - xdsresource.WatchListener(client, target, noopListenerWatcher{}) - } - for _, target := range rdsTargets { - xdsresource.WatchRouteConfig(client, target, noopRouteConfigWatcher{}) - } - for _, target := range cdsTargets { - xdsresource.WatchCluster(client, target, noopClusterWatcher{}) + for _, xdsC := range []xdsclient.XDSClient{client1, client2} { + for _, target := range ldsTargets { + xdsresource.WatchListener(xdsC, target, noopListenerWatcher{}) + } + for _, target := range rdsTargets { + xdsresource.WatchRouteConfig(xdsC, target, noopRouteConfigWatcher{}) + } + for _, target := range cdsTargets { + xdsresource.WatchCluster(xdsC, target, noopClusterWatcher{}) + } + for _, target := range edsTargets { + xdsresource.WatchEndpoints(xdsC, target, noopEndpointsWatcher{}) + } } - for _, target := range edsTargets { - xdsresource.WatchEndpoints(client, target, noopEndpointsWatcher{}) + wantConfigs := []*v3statuspb.ClientConfig_GenericXdsConfig{ + makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[0], "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[1], "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[0], "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[1], "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[0], "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[1], "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[0], "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[1], "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil), } - want := []*v3statuspb.ClientConfig_GenericXdsConfig{ - { - TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", - Name: ldsTargets[0], - ClientStatus: v3adminpb.ClientResourceStatus_REQUESTED, - }, - { - TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", - Name: ldsTargets[1], - ClientStatus: v3adminpb.ClientResourceStatus_REQUESTED, - }, - { - TypeUrl: "type.googleapis.com/envoy.config.route.v3.RouteConfiguration", - Name: rdsTargets[0], - ClientStatus: v3adminpb.ClientResourceStatus_REQUESTED, - }, - { - TypeUrl: "type.googleapis.com/envoy.config.route.v3.RouteConfiguration", - Name: rdsTargets[1], - ClientStatus: v3adminpb.ClientResourceStatus_REQUESTED, - }, - { - TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster", - Name: cdsTargets[0], - ClientStatus: v3adminpb.ClientResourceStatus_REQUESTED, - }, - { - TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster", - Name: cdsTargets[1], - ClientStatus: v3adminpb.ClientResourceStatus_REQUESTED, - }, - { - TypeUrl: "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", - Name: edsTargets[0], - ClientStatus: v3adminpb.ClientResourceStatus_REQUESTED, - }, - { - TypeUrl: "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", - Name: edsTargets[1], - ClientStatus: v3adminpb.ClientResourceStatus_REQUESTED, + wantResp = &v3statuspb.ClientStatusResponse{ + Config: []*v3statuspb.ClientConfig{ + { + Node: wantNode, + GenericXdsConfigs: wantConfigs, + ClientScope: client1Name, + }, + { + Node: wantNode, + GenericXdsConfigs: wantConfigs, + ClientScope: client2Name, + }, }, } - if err := compareUpdateMetadata(ctx, client.DumpResources, want); err != nil { + if err := checkResourceDump(ctx, wantResp); err != nil { t.Fatal(err) } @@ -162,75 +238,55 @@ func (s) TestDumpResources(t *testing.T) { } // Dump resources and expect ACK configs. - want = []*v3statuspb.ClientConfig_GenericXdsConfig{ - { - TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", - Name: ldsTargets[0], - ClientStatus: v3adminpb.ClientResourceStatus_ACKED, - VersionInfo: "1", - XdsConfig: listenerAnys[0], - }, - { - TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", - Name: ldsTargets[1], - ClientStatus: v3adminpb.ClientResourceStatus_ACKED, - VersionInfo: "1", - XdsConfig: listenerAnys[1], - }, - { - TypeUrl: "type.googleapis.com/envoy.config.route.v3.RouteConfiguration", - Name: rdsTargets[0], - ClientStatus: v3adminpb.ClientResourceStatus_ACKED, - VersionInfo: "1", - XdsConfig: routeAnys[0], - }, - { - TypeUrl: "type.googleapis.com/envoy.config.route.v3.RouteConfiguration", - Name: rdsTargets[1], - ClientStatus: v3adminpb.ClientResourceStatus_ACKED, - VersionInfo: "1", - XdsConfig: routeAnys[1], - }, - { - TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster", - Name: cdsTargets[0], - ClientStatus: v3adminpb.ClientResourceStatus_ACKED, - VersionInfo: "1", - XdsConfig: clusterAnys[0], - }, - { - TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster", - Name: cdsTargets[1], - ClientStatus: v3adminpb.ClientResourceStatus_ACKED, - VersionInfo: "1", - XdsConfig: clusterAnys[1], - }, - { - TypeUrl: "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", - Name: edsTargets[0], - ClientStatus: v3adminpb.ClientResourceStatus_ACKED, - VersionInfo: "1", - XdsConfig: endpointAnys[0], - }, - { - TypeUrl: "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", - Name: edsTargets[1], - ClientStatus: v3adminpb.ClientResourceStatus_ACKED, - VersionInfo: "1", - XdsConfig: endpointAnys[1], + wantConfigs = []*v3statuspb.ClientConfig_GenericXdsConfig{ + makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[0], "1", v3adminpb.ClientResourceStatus_ACKED, clusterAnys[0], nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[1], "1", v3adminpb.ClientResourceStatus_ACKED, clusterAnys[1], nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[0], "1", v3adminpb.ClientResourceStatus_ACKED, endpointAnys[0], nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[1], "1", v3adminpb.ClientResourceStatus_ACKED, endpointAnys[1], nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[0], "1", v3adminpb.ClientResourceStatus_ACKED, listenerAnys[0], nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[1], "1", v3adminpb.ClientResourceStatus_ACKED, listenerAnys[1], nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[0], "1", v3adminpb.ClientResourceStatus_ACKED, routeAnys[0], nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[1], "1", v3adminpb.ClientResourceStatus_ACKED, routeAnys[1], nil), + } + wantResp = &v3statuspb.ClientStatusResponse{ + Config: []*v3statuspb.ClientConfig{ + { + Node: wantNode, + GenericXdsConfigs: wantConfigs, + ClientScope: client1Name, + }, + { + Node: wantNode, + GenericXdsConfigs: wantConfigs, + ClientScope: client2Name, + }, }, } - if err := compareUpdateMetadata(ctx, client.DumpResources, want); err != nil { + if err := checkResourceDump(ctx, wantResp); err != nil { t.Fatal(err) } // Update the first resource of each type in the management server to a // value which is expected to be NACK'ed by the xDS client. - const nackResourceIdx = 0 - listeners[nackResourceIdx].ApiListener = &v3listenerpb.ApiListener{} - routes[nackResourceIdx].VirtualHosts = []*v3routepb.VirtualHost{{Routes: []*v3routepb.Route{{}}}} - clusters[nackResourceIdx].ClusterDiscoveryType = &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_STATIC} - endpoints[nackResourceIdx].Endpoints = []*v3endpointpb.LocalityLbEndpoints{{}} + listeners[0] = func() *v3listenerpb.Listener { + hcm := testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{ + HttpFilters: []*v3httppb.HttpFilter{e2e.HTTPFilter("router", &v3routerpb.Router{})}, + }) + return &v3listenerpb.Listener{ + Name: ldsTargets[0], + ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm}, + FilterChains: []*v3listenerpb.FilterChain{{ + Name: "filter-chain-name", + Filters: []*v3listenerpb.Filter{{ + Name: wellknown.HTTPConnectionManager, + ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm}, + }}, + }}, + } + }() + routes[0].VirtualHosts = []*v3routepb.VirtualHost{{Routes: []*v3routepb.Route{{}}}} + clusters[0].ClusterDiscoveryType = &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_STATIC} + endpoints[0].Endpoints = []*v3endpointpb.LocalityLbEndpoints{{}} if err := mgmtServer.Update(ctx, e2e.UpdateOptions{ NodeID: nodeID, Listeners: listeners, @@ -242,77 +298,275 @@ func (s) TestDumpResources(t *testing.T) { t.Fatal(err) } - want = []*v3statuspb.ClientConfig_GenericXdsConfig{ - { - TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", - Name: ldsTargets[0], - ClientStatus: v3adminpb.ClientResourceStatus_NACKED, - VersionInfo: "1", - ErrorState: &v3adminpb.UpdateFailureState{ - VersionInfo: "2", + wantConfigs = []*v3statuspb.ClientConfig_GenericXdsConfig{ + makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[0], "1", v3adminpb.ClientResourceStatus_NACKED, clusterAnys[0], &v3adminpb.UpdateFailureState{VersionInfo: "2"}), + makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[1], "2", v3adminpb.ClientResourceStatus_ACKED, clusterAnys[1], nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[0], "1", v3adminpb.ClientResourceStatus_NACKED, endpointAnys[0], &v3adminpb.UpdateFailureState{VersionInfo: "2"}), + makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[1], "2", v3adminpb.ClientResourceStatus_ACKED, endpointAnys[1], nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[0], "1", v3adminpb.ClientResourceStatus_NACKED, listenerAnys[0], &v3adminpb.UpdateFailureState{VersionInfo: "2"}), + makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[1], "2", v3adminpb.ClientResourceStatus_ACKED, listenerAnys[1], nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[0], "1", v3adminpb.ClientResourceStatus_NACKED, routeAnys[0], &v3adminpb.UpdateFailureState{VersionInfo: "2"}), + makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[1], "2", v3adminpb.ClientResourceStatus_ACKED, routeAnys[1], nil), + } + wantResp = &v3statuspb.ClientStatusResponse{ + Config: []*v3statuspb.ClientConfig{ + { + Node: wantNode, + GenericXdsConfigs: wantConfigs, + ClientScope: client1Name, }, - XdsConfig: listenerAnys[0], - }, - { - TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", - Name: ldsTargets[1], - ClientStatus: v3adminpb.ClientResourceStatus_ACKED, - VersionInfo: "2", - XdsConfig: listenerAnys[1], - }, - { - TypeUrl: "type.googleapis.com/envoy.config.route.v3.RouteConfiguration", - Name: rdsTargets[0], - ClientStatus: v3adminpb.ClientResourceStatus_NACKED, - VersionInfo: "1", - ErrorState: &v3adminpb.UpdateFailureState{ - VersionInfo: "2", + { + Node: wantNode, + GenericXdsConfigs: wantConfigs, + ClientScope: client2Name, }, - XdsConfig: routeAnys[0], }, - { - TypeUrl: "type.googleapis.com/envoy.config.route.v3.RouteConfiguration", - Name: rdsTargets[1], - ClientStatus: v3adminpb.ClientResourceStatus_ACKED, - VersionInfo: "2", - XdsConfig: routeAnys[1], + } + if err := checkResourceDump(ctx, wantResp); err != nil { + t.Fatal(err) + } +} + +// Tests the scenario where there are multiple xDS client talking to different +// management server, and requesting different set of resources. +func (s) TestDumpResources_ManyToMany(t *testing.T) { + // Initialize the xDS resources to be used in this test: + // - The first xDS client watches old style resource names, and thereby + // requests these resources from the top-level xDS server. + // - The second xDS client watches new style resource names with a non-empty + // authority, and thereby requests these resources from the server + // configuration for that authority. + authority := strings.Join(strings.Split(t.Name(), "/"), "") + ldsTargets := []string{ + "lds.target.good:0000", + fmt.Sprintf("xdstp://%s/envoy.config.listener.v3.Listener/lds.targer.good:1111", authority), + } + rdsTargets := []string{ + "route-config-0", + fmt.Sprintf("xdstp://%s/envoy.config.route.v3.RouteConfiguration/route-config-1", authority), + } + cdsTargets := []string{ + "cluster-0", + fmt.Sprintf("xdstp://%s/envoy.config.cluster.v3.Cluster/cluster-1", authority), + } + edsTargets := []string{ + "endpoints-0", + fmt.Sprintf("xdstp://%s/envoy.config.endpoint.v3.ClusterLoadAssignment/endpoints-1", authority), + } + listeners := make([]*v3listenerpb.Listener, len(ldsTargets)) + listenerAnys := make([]*anypb.Any, len(ldsTargets)) + for i := range ldsTargets { + listeners[i] = e2e.DefaultClientListener(ldsTargets[i], rdsTargets[i]) + listenerAnys[i] = testutils.MarshalAny(t, listeners[i]) + } + routes := make([]*v3routepb.RouteConfiguration, len(rdsTargets)) + routeAnys := make([]*anypb.Any, len(rdsTargets)) + for i := range rdsTargets { + routes[i] = e2e.DefaultRouteConfig(rdsTargets[i], ldsTargets[i], cdsTargets[i]) + routeAnys[i] = testutils.MarshalAny(t, routes[i]) + } + clusters := make([]*v3clusterpb.Cluster, len(cdsTargets)) + clusterAnys := make([]*anypb.Any, len(cdsTargets)) + for i := range cdsTargets { + clusters[i] = e2e.DefaultCluster(cdsTargets[i], edsTargets[i], e2e.SecurityLevelNone) + clusterAnys[i] = testutils.MarshalAny(t, clusters[i]) + } + endpoints := make([]*v3endpointpb.ClusterLoadAssignment, len(edsTargets)) + endpointAnys := make([]*anypb.Any, len(edsTargets)) + ips := []string{"0.0.0.0", "1.1.1.1"} + ports := []uint32{123, 456} + for i := range edsTargets { + endpoints[i] = e2e.DefaultEndpoint(edsTargets[i], ips[i], ports[i:i+1]) + endpointAnys[i] = testutils.MarshalAny(t, endpoints[i]) + } + + // Start two management servers. + mgmtServer1 := e2e.StartManagementServer(t, e2e.ManagementServerOptions{}) + mgmtServer2 := e2e.StartManagementServer(t, e2e.ManagementServerOptions{}) + + // The first of the above management servers will be the top-level xDS + // server in the bootstrap configuration, and the second will be the xDS + // server corresponding to the test authority. + nodeID := uuid.New().String() + bc, err := bootstrap.NewContentsForTesting(bootstrap.ConfigOptionsForTesting{ + Servers: []json.RawMessage{[]byte(fmt.Sprintf(`{ + "server_uri": %q, + "channel_creds": [{"type": "insecure"}] + }`, mgmtServer1.Address))}, + NodeID: nodeID, + Authorities: map[string]json.RawMessage{ + authority: []byte(fmt.Sprintf(`{ + "xds_servers": [{ + "server_uri": %q, + "channel_creds": [{"type": "insecure"}] + }]}`, mgmtServer2.Address)), }, - { - TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster", - Name: cdsTargets[0], - ClientStatus: v3adminpb.ClientResourceStatus_NACKED, - VersionInfo: "1", - ErrorState: &v3adminpb.UpdateFailureState{ - VersionInfo: "2", + }) + if err != nil { + t.Fatalf("Failed to create bootstrap configuration: %v", err) + } + testutils.CreateBootstrapFileForTesting(t, bc) + + // Create two xDS clients with the above bootstrap contents. + client1Name := t.Name() + "-1" + client1, close1, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: client1Name, + Contents: bc, + }) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer close1() + client2Name := t.Name() + "-2" + client2, close2, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: client2Name, + Contents: bc, + }) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer close2() + + // Check the resource dump before configuring resources on the management server. + // Dump resources and expect empty configs. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + wantNode := &v3corepb.Node{ + Id: nodeID, + UserAgentName: "gRPC Go", + UserAgentVersionType: &v3corepb.Node_UserAgentVersion{UserAgentVersion: grpc.Version}, + ClientFeatures: []string{"envoy.lb.does_not_support_overprovisioning", "xds.config.resource-in-sotw"}, + } + wantResp := &v3statuspb.ClientStatusResponse{ + Config: []*v3statuspb.ClientConfig{ + { + Node: wantNode, + ClientScope: client1Name, + }, + { + Node: wantNode, + ClientScope: client2Name, }, - XdsConfig: clusterAnys[0], }, - { - TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster", - Name: cdsTargets[1], - ClientStatus: v3adminpb.ClientResourceStatus_ACKED, - VersionInfo: "2", - XdsConfig: clusterAnys[1], + } + if err := checkResourceDump(ctx, wantResp); err != nil { + t.Fatal(err) + } + + // Register watches, the first xDS client watches old style resource names, + // while the second xDS client watches new style resource names. + xdsresource.WatchListener(client1, ldsTargets[0], noopListenerWatcher{}) + xdsresource.WatchRouteConfig(client1, rdsTargets[0], noopRouteConfigWatcher{}) + xdsresource.WatchCluster(client1, cdsTargets[0], noopClusterWatcher{}) + xdsresource.WatchEndpoints(client1, edsTargets[0], noopEndpointsWatcher{}) + xdsresource.WatchListener(client2, ldsTargets[1], noopListenerWatcher{}) + xdsresource.WatchRouteConfig(client2, rdsTargets[1], noopRouteConfigWatcher{}) + xdsresource.WatchCluster(client2, cdsTargets[1], noopClusterWatcher{}) + xdsresource.WatchEndpoints(client2, edsTargets[1], noopEndpointsWatcher{}) + + // Check the resource dump. Both clients should have all resources in + // REQUESTED state. + wantConfigs1 := []*v3statuspb.ClientConfig_GenericXdsConfig{ + makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[0], "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[0], "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[0], "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[0], "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil), + } + wantConfigs2 := []*v3statuspb.ClientConfig_GenericXdsConfig{ + makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[1], "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[1], "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[1], "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[1], "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil), + } + wantResp = &v3statuspb.ClientStatusResponse{ + Config: []*v3statuspb.ClientConfig{ + { + Node: wantNode, + GenericXdsConfigs: wantConfigs1, + ClientScope: client1Name, + }, + { + Node: wantNode, + GenericXdsConfigs: wantConfigs2, + ClientScope: client2Name, + }, }, - { - TypeUrl: "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", - Name: edsTargets[0], - ClientStatus: v3adminpb.ClientResourceStatus_NACKED, - VersionInfo: "1", - ErrorState: &v3adminpb.UpdateFailureState{ - VersionInfo: "2", + } + if err := checkResourceDump(ctx, wantResp); err != nil { + t.Fatal(err) + } + + // Configure resources on the first management server. + if err := mgmtServer1.Update(ctx, e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: listeners[:1], + Routes: routes[:1], + Clusters: clusters[:1], + Endpoints: endpoints[:1], + }); err != nil { + t.Fatal(err) + } + + // Check the resource dump. One client should have resources in ACKED state, + // while the other should still have resources in REQUESTED state. + wantConfigs1 = []*v3statuspb.ClientConfig_GenericXdsConfig{ + makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[0], "1", v3adminpb.ClientResourceStatus_ACKED, clusterAnys[0], nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[0], "1", v3adminpb.ClientResourceStatus_ACKED, endpointAnys[0], nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[0], "1", v3adminpb.ClientResourceStatus_ACKED, listenerAnys[0], nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[0], "1", v3adminpb.ClientResourceStatus_ACKED, routeAnys[0], nil), + } + wantResp = &v3statuspb.ClientStatusResponse{ + Config: []*v3statuspb.ClientConfig{ + { + Node: wantNode, + GenericXdsConfigs: wantConfigs1, + ClientScope: client1Name, + }, + { + Node: wantNode, + GenericXdsConfigs: wantConfigs2, + ClientScope: client2Name, }, - XdsConfig: endpointAnys[0], }, - { - TypeUrl: "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", - Name: edsTargets[1], - ClientStatus: v3adminpb.ClientResourceStatus_ACKED, - VersionInfo: "2", - XdsConfig: endpointAnys[1], + } + if err := checkResourceDump(ctx, wantResp); err != nil { + t.Fatal(err) + } + + // Configure resources on the second management server. + if err := mgmtServer2.Update(ctx, e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: listeners[1:], + Routes: routes[1:], + Clusters: clusters[1:], + Endpoints: endpoints[1:], + }); err != nil { + t.Fatal(err) + } + + // Check the resource dump. Both clients should have appropriate resources + // in REQUESTED state. + wantConfigs2 = []*v3statuspb.ClientConfig_GenericXdsConfig{ + makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[1], "1", v3adminpb.ClientResourceStatus_ACKED, clusterAnys[1], nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[1], "1", v3adminpb.ClientResourceStatus_ACKED, endpointAnys[1], nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[1], "1", v3adminpb.ClientResourceStatus_ACKED, listenerAnys[1], nil), + makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[1], "1", v3adminpb.ClientResourceStatus_ACKED, routeAnys[1], nil), + } + wantResp = &v3statuspb.ClientStatusResponse{ + Config: []*v3statuspb.ClientConfig{ + { + Node: wantNode, + GenericXdsConfigs: wantConfigs1, + ClientScope: client1Name, + }, + { + Node: wantNode, + GenericXdsConfigs: wantConfigs2, + ClientScope: client2Name, + }, }, } - if err := compareUpdateMetadata(ctx, client.DumpResources, want); err != nil { + if err := checkResourceDump(ctx, wantResp); err != nil { t.Fatal(err) } } diff --git a/xds/internal/xdsclient/tests/eds_watchers_test.go b/xds/internal/xdsclient/tests/eds_watchers_test.go index 15f06d66b25e..a4e03007ccb6 100644 --- a/xds/internal/xdsclient/tests/eds_watchers_test.go +++ b/xds/internal/xdsclient/tests/eds_watchers_test.go @@ -231,7 +231,10 @@ func (s) TestEDSWatch(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -418,7 +421,10 @@ func (s) TestEDSWatch_TwoWatchesForSameResourceName(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -519,7 +525,10 @@ func (s) TestEDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -618,7 +627,10 @@ func (s) TestEDSWatch_ResourceCaching(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -699,7 +711,11 @@ func (s) TestEDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) { bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) testutils.CreateBootstrapFileForTesting(t, bc) - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + WatchExpiryTimeout: defaultTestWatchExpiryTimeout, + }) if err != nil { t.Fatalf("Failed to create an xDS client: %v", err) } @@ -736,7 +752,11 @@ func (s) TestEDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client talking to the above management server. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + WatchExpiryTimeout: defaultTestWatchExpiryTimeout, + }) if err != nil { t.Fatalf("Failed to create an xDS client: %v", err) } @@ -801,7 +821,10 @@ func (s) TestEDSWatch_NACKError(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -867,7 +890,10 @@ func (s) TestEDSWatch_PartialValid(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } diff --git a/xds/internal/xdsclient/tests/federation_watchers_test.go b/xds/internal/xdsclient/tests/federation_watchers_test.go index bac3e5281ea7..92135b62d88f 100644 --- a/xds/internal/xdsclient/tests/federation_watchers_test.go +++ b/xds/internal/xdsclient/tests/federation_watchers_test.go @@ -69,7 +69,10 @@ func setupForFederationWatchersTest(t *testing.T) (*e2e.ManagementServer, string t.Fatalf("Failed to create bootstrap configuration: %v", err) } // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bootstrapContents, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } diff --git a/xds/internal/xdsclient/tests/helpers_test.go b/xds/internal/xdsclient/tests/helpers_test.go index 1718cc8f5b47..0d6f222f1328 100644 --- a/xds/internal/xdsclient/tests/helpers_test.go +++ b/xds/internal/xdsclient/tests/helpers_test.go @@ -38,7 +38,7 @@ func Test(t *testing.T) { const ( defaultTestWatchExpiryTimeout = 500 * time.Millisecond defaultTestIdleAuthorityTimeout = 50 * time.Millisecond - defaultTestTimeout = 5 * time.Second + defaultTestTimeout = 10 * time.Second defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen. ldsName = "xdsclient-test-lds-resource" diff --git a/xds/internal/xdsclient/tests/lds_watchers_test.go b/xds/internal/xdsclient/tests/lds_watchers_test.go index bf18d91aac7c..1a6b101fc976 100644 --- a/xds/internal/xdsclient/tests/lds_watchers_test.go +++ b/xds/internal/xdsclient/tests/lds_watchers_test.go @@ -220,7 +220,10 @@ func (s) TestLDSWatch(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -367,7 +370,10 @@ func (s) TestLDSWatch_TwoWatchesForSameResourceName(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -468,7 +474,10 @@ func (s) TestLDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -557,7 +566,10 @@ func (s) TestLDSWatch_ResourceCaching(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -627,7 +639,12 @@ func (s) TestLDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) { bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) testutils.CreateBootstrapFileForTesting(t, bc) - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout}) + // Create an xDS client talking to the above management server. + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + WatchExpiryTimeout: defaultTestWatchExpiryTimeout, + }) if err != nil { t.Fatalf("Failed to create an xDS client: %v", err) } @@ -663,7 +680,11 @@ func (s) TestLDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client talking to the above management server. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + WatchExpiryTimeout: defaultTestWatchExpiryTimeout, + }) if err != nil { t.Fatalf("Failed to create an xDS client: %v", err) } @@ -743,7 +764,10 @@ func (s) TestLDSWatch_ResourceRemoved(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -849,7 +873,10 @@ func (s) TestLDSWatch_NACKError(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -915,7 +942,10 @@ func (s) TestLDSWatch_PartialValid(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -1005,7 +1035,10 @@ func (s) TestLDSWatch_PartialResponse(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } diff --git a/xds/internal/xdsclient/tests/misc_watchers_test.go b/xds/internal/xdsclient/tests/misc_watchers_test.go index 1b11c786790c..48f6e04f0720 100644 --- a/xds/internal/xdsclient/tests/misc_watchers_test.go +++ b/xds/internal/xdsclient/tests/misc_watchers_test.go @@ -103,7 +103,7 @@ func (s) TestWatchCallAnotherWatch(t *testing.T) { nodeID := uuid.New().String() authority := makeAuthorityName(t.Name()) - bs, err := bootstrap.NewContentsForTesting(bootstrap.ConfigOptionsForTesting{ + bc, err := bootstrap.NewContentsForTesting(bootstrap.ConfigOptionsForTesting{ Servers: []json.RawMessage{[]byte(fmt.Sprintf(`{ "server_uri": %q, "channel_creds": [{"type": "insecure"}] @@ -120,10 +120,13 @@ func (s) TestWatchCallAnotherWatch(t *testing.T) { if err != nil { t.Fatalf("Failed to create bootstrap configuration: %v", err) } - testutils.CreateBootstrapFileForTesting(t, bs) + testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bs}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -227,10 +230,13 @@ func (s) TestNodeProtoSentOnlyInFirstRequest(t *testing.T) { // Create a bootstrap file in a temporary directory. nodeID := uuid.New().String() - bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) + bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } diff --git a/xds/internal/xdsclient/tests/rds_watchers_test.go b/xds/internal/xdsclient/tests/rds_watchers_test.go index 9da3c5efd98d..28a6b20b7168 100644 --- a/xds/internal/xdsclient/tests/rds_watchers_test.go +++ b/xds/internal/xdsclient/tests/rds_watchers_test.go @@ -233,7 +233,10 @@ func (s) TestRDSWatch(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -420,7 +423,10 @@ func (s) TestRDSWatch_TwoWatchesForSameResourceName(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -521,7 +527,10 @@ func (s) TestRDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -620,7 +629,10 @@ func (s) TestRDSWatch_ResourceCaching(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -701,7 +713,11 @@ func (s) TestRDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client talking to the above management server. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + WatchExpiryTimeout: defaultTestWatchExpiryTimeout, + }) if err != nil { t.Fatalf("Failed to create an xDS client: %v", err) } @@ -737,7 +753,11 @@ func (s) TestRDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client talking to the above management server. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + WatchExpiryTimeout: defaultTestWatchExpiryTimeout, + }) if err != nil { t.Fatalf("Failed to create an xDS client: %v", err) } @@ -802,7 +822,10 @@ func (s) TestRDSWatch_NACKError(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -868,7 +891,10 @@ func (s) TestRDSWatch_PartialValid(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } diff --git a/xds/internal/xdsclient/tests/resource_update_test.go b/xds/internal/xdsclient/tests/resource_update_test.go index 94b519b6f3d9..43a655a7f959 100644 --- a/xds/internal/xdsclient/tests/resource_update_test.go +++ b/xds/internal/xdsclient/tests/resource_update_test.go @@ -65,7 +65,7 @@ func startFakeManagementServer(t *testing.T) (*fakeserver.Server, func()) { return fs, sCleanup } -func compareUpdateMetadata(ctx context.Context, dumpFunc func() (*v3statuspb.ClientStatusResponse, error), want []*v3statuspb.ClientConfig_GenericXdsConfig) error { +func compareUpdateMetadata(ctx context.Context, dumpFunc func() *v3statuspb.ClientStatusResponse, want []*v3statuspb.ClientConfig_GenericXdsConfig) error { var cmpOpts = cmp.Options{ cmp.Transformer("sort", func(in []*v3statuspb.ClientConfig_GenericXdsConfig) []*v3statuspb.ClientConfig_GenericXdsConfig { out := append([]*v3statuspb.ClientConfig_GenericXdsConfig(nil), in...) @@ -91,11 +91,10 @@ func compareUpdateMetadata(ctx context.Context, dumpFunc func() (*v3statuspb.Cli var lastErr error for ; ctx.Err() == nil; <-time.After(100 * time.Millisecond) { - resp, err := dumpFunc() - if err != nil { - return err + var got []*v3statuspb.ClientConfig_GenericXdsConfig + for _, cfg := range dumpFunc().GetConfig() { + got = append(got, cfg.GetGenericXdsConfigs()...) } - got := resp.GetConfig()[0].GetGenericXdsConfigs() diff := cmp.Diff(want, got, cmpOpts) if diff == "" { return nil @@ -284,7 +283,11 @@ func (s) TestHandleListenerResponseFromManagementServer(t *testing.T) { // Create an xDS client talking to the above management server. nodeID := uuid.New().String() bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + WatchExpiryTimeout: defaultTestWatchExpiryTimeout, + }) if err != nil { t.Fatalf("Failed to create an xDS client: %v", err) } @@ -347,7 +350,7 @@ func (s) TestHandleListenerResponseFromManagementServer(t *testing.T) { if diff := cmp.Diff(test.wantUpdate, gotUpdate, cmpOpts...); diff != "" { t.Fatalf("Unexpected diff in metadata, diff (-want +got):\n%s", diff) } - if err := compareUpdateMetadata(ctx, client.DumpResources, test.wantGenericXDSConfig); err != nil { + if err := compareUpdateMetadata(ctx, xdsclient.DumpResources, test.wantGenericXDSConfig); err != nil { t.Fatal(err) } }) @@ -559,7 +562,11 @@ func (s) TestHandleRouteConfigResponseFromManagementServer(t *testing.T) { // Create an xDS client talking to the above management server. nodeID := uuid.New().String() bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + WatchExpiryTimeout: defaultTestWatchExpiryTimeout, + }) if err != nil { t.Fatalf("Failed to create an xDS client: %v", err) } @@ -621,7 +628,7 @@ func (s) TestHandleRouteConfigResponseFromManagementServer(t *testing.T) { if diff := cmp.Diff(test.wantUpdate, gotUpdate, cmpOpts...); diff != "" { t.Fatalf("Unexpected diff in metadata, diff (-want +got):\n%s", diff) } - if err := compareUpdateMetadata(ctx, client.DumpResources, test.wantGenericXDSConfig); err != nil { + if err := compareUpdateMetadata(ctx, xdsclient.DumpResources, test.wantGenericXDSConfig); err != nil { t.Fatal(err) } }) @@ -795,7 +802,11 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) { // Create an xDS client talking to the above management server. nodeID := uuid.New().String() bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + WatchExpiryTimeout: defaultTestWatchExpiryTimeout, + }) if err != nil { t.Fatalf("Failed to create an xDS client: %v", err) } @@ -871,7 +882,7 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) { if diff := cmp.Diff(test.wantUpdate, gotUpdate, cmpOpts...); diff != "" { t.Fatalf("Unexpected diff in metadata, diff (-want +got):\n%s", diff) } - if err := compareUpdateMetadata(ctx, client.DumpResources, test.wantGenericXDSConfig); err != nil { + if err := compareUpdateMetadata(ctx, xdsclient.DumpResources, test.wantGenericXDSConfig); err != nil { t.Fatal(err) } }) @@ -1143,7 +1154,11 @@ func (s) TestHandleEndpointsResponseFromManagementServer(t *testing.T) { // Create an xDS client talking to the above management server. nodeID := uuid.New().String() bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout}) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + WatchExpiryTimeout: defaultTestWatchExpiryTimeout, + }) if err != nil { t.Fatalf("Failed to create an xDS client: %v", err) } @@ -1205,7 +1220,7 @@ func (s) TestHandleEndpointsResponseFromManagementServer(t *testing.T) { if diff := cmp.Diff(test.wantUpdate, gotUpdate, cmpOpts...); diff != "" { t.Fatalf("Unexpected diff in metadata, diff (-want +got):\n%s", diff) } - if err := compareUpdateMetadata(ctx, client.DumpResources, test.wantGenericXDSConfig); err != nil { + if err := compareUpdateMetadata(ctx, xdsclient.DumpResources, test.wantGenericXDSConfig); err != nil { t.Fatal(err) } }) diff --git a/xds/server.go b/xds/server.go index 5b29dae495fc..1fea8c830936 100644 --- a/xds/server.go +++ b/xds/server.go @@ -43,8 +43,8 @@ const serverPrefix = "[xds-server %p] " var ( // These new functions will be overridden in unit tests. - newXDSClient = func() (xdsclient.XDSClient, func(), error) { - return xdsclient.New() + newXDSClient = func(name string) (xdsclient.XDSClient, func(), error) { + return xdsclient.New(name) } newGRPCServer = func(opts ...grpc.ServerOption) grpcServer { return grpc.NewServer(opts...) @@ -95,11 +95,14 @@ func NewGRPCServer(opts ...grpc.ServerOption) (*GRPCServer, error) { newXDSClient := newXDSClient if s.opts.bootstrapContentsForTesting != nil { // Bootstrap file contents may be specified as a server option for tests. - newXDSClient = func() (xdsclient.XDSClient, func(), error) { - return xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: s.opts.bootstrapContentsForTesting}) + newXDSClient = func(name string) (xdsclient.XDSClient, func(), error) { + return xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: name, + Contents: s.opts.bootstrapContentsForTesting, + }) } } - xdsClient, xdsClientClose, err := newXDSClient() + xdsClient, xdsClientClose, err := newXDSClient(xdsclient.NameForServer) if err != nil { return nil, fmt.Errorf("xDS client creation failed: %v", err) } diff --git a/xds/server_ext_test.go b/xds/server_ext_test.go index 67dee039c3d4..edf8a8adeb91 100644 --- a/xds/server_ext_test.go +++ b/xds/server_ext_test.go @@ -194,17 +194,15 @@ func (s) TestServingModeChanges(t *testing.T) { t.Fatalf("cc.FullDuplexCall failed: %f", err) } - // Invoke the lds resource not found - this should cause the server to - // switch to not serving. This should gracefully drain connections, and fail - // RPC's after. (how to assert accepted + closed) does this make it's way to - // application layer? (should work outside of resource not found... - - // Invoke LDS Resource not found here (tests graceful close) - xdsC, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) + // Lookup the xDS client in use based on the dedicated well-known key, as + // defined in A71, used by the xDS enabled gRPC server. + xdsC, close, err := xdsclient.GetForTesting(xdsclient.NameForServer) if err != nil { t.Fatalf("Failed to find xDS client for configuration: %v", string(bootstrapContents)) } defer close() + + // Invoke LDS Resource not found here (tests graceful close). triggerResourceNotFound := internal.TriggerXDSResourceNotFoundForTesting.(func(xdsclient.XDSClient, xdsresource.Type, string) error) listenerResourceType := xdsinternal.ResourceTypeMapForTesting[version.V3ListenerURL].(xdsresource.Type) if err := triggerResourceNotFound(xdsC, listenerResourceType, listener.GetName()); err != nil { @@ -293,9 +291,9 @@ func (s) TestResourceNotFoundRDS(t *testing.T) { waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose) - // Lookup the xDS client in use based on the bootstrap configuration. The - // client was created as part of creating the xDS enabled gRPC server. - xdsC, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) + // Lookup the xDS client in use based on the dedicated well-known key, as + // defined in A71, used by the xDS enabled gRPC server. + xdsC, close, err := xdsclient.GetForTesting(xdsclient.NameForServer) if err != nil { t.Fatalf("Failed to find xDS client for configuration: %v", string(bootstrapContents)) } diff --git a/xds/server_test.go b/xds/server_test.go index bd32bdf254df..9d8db8c76d6f 100644 --- a/xds/server_test.go +++ b/xds/server_test.go @@ -475,7 +475,7 @@ func (s) TestServeSuccess(t *testing.T) { // creation fails and verifies that the call to NewGRPCServer() fails. func (s) TestNewServer_ClientCreationFailure(t *testing.T) { origNewXDSClient := newXDSClient - newXDSClient = func() (xdsclient.XDSClient, func(), error) { + newXDSClient = func(string) (xdsclient.XDSClient, func(), error) { return nil, nil, errors.New("xdsClient creation failed") } defer func() { newXDSClient = origNewXDSClient }()