From 4909aa4eef91cbc0d9616ec653fc35eed356e1fd Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 29 Aug 2024 18:18:40 +0000 Subject: [PATCH 1/4] xdsclient: complete refactor and fallback support --- xds/internal/xdsclient/authority.go | 177 ++++- xds/internal/xdsclient/client_new.go | 2 +- xds/internal/xdsclient/tests/fallback_test.go | 626 ++++++++++++++++++ 3 files changed, 800 insertions(+), 5 deletions(-) create mode 100644 xds/internal/xdsclient/tests/fallback_test.go diff --git a/xds/internal/xdsclient/authority.go b/xds/internal/xdsclient/authority.go index bd1662e8bca7..ec60088fc161 100644 --- a/xds/internal/xdsclient/authority.go +++ b/xds/internal/xdsclient/authority.go @@ -200,8 +200,90 @@ func (a *authority) handleADSStreamFailure(serverConfig *bootstrap.ServerConfig, } } - // TODO(easwars-fallback): Trigger fallback here if conditions for fallback - // are met. + // Two conditions need to be met for fallback to be triggered: + // 1. There is a connectivity failure on the ADS stream, as described in + // gRFC A57. For us, this means that the ADS stream was closed before the + // first server response was received. We already checked that condition + // earlier in this method. + // 2. There is at least one watcher for a resource that is not cached. + // Cached resources include ones that + // - have been successfully received and can be used. + // - are considered non-existent according to xDS Protocol Specification. + if !a.uncachedWatcherExists() { + if a.logger.V(2) { + a.logger.Infof("No watchers for uncached resources. Not triggering fallback") + } + return + } + a.triggerFallbackOnStreamFailure(serverConfig) +} + +// Determines the server to fallback to and triggers fallback to the same. If +// required, creates an xdsChannel to that server, and re-subscribes to all +// existing resources. +// +// Only executed in the context of a serializer callback. +func (a *authority) triggerFallbackOnStreamFailure(failingServerConfig *bootstrap.ServerConfig) { + if a.logger.V(2) { + a.logger.Infof("Attempting to initiate fallback after failure from server %q", failingServerConfig) + } + + // The server to fallback to is the next server on the list. If the current + // server is the last server, then there is nothing that can be done. + currentServerIdx := 0 + for _, cfg := range a.xdsChannelConfigs { + if cfg.sc.Equal(failingServerConfig) { + break + } + currentServerIdx++ + } + if currentServerIdx == len(a.xdsChannelConfigs)-1 { + if a.logger.V(2) { + a.logger.Infof("No more servers to fallback to") + } + return + } + fallbackServerIdx := currentServerIdx + 1 + fallbackChannel := a.xdsChannelConfigs[fallbackServerIdx] + + // If the server to fallback to already has an xdsChannel, it means that + // this connectivity error is from a server with a higher priority. There + // is not much we can do here. + if fallbackChannel.xc != nil { + if a.logger.V(2) { + a.logger.Infof("Channel to the next server in the list %q already exists", fallbackChannel.sc) + } + return + } + + // Create an xdsChannel for the fallback server. + if a.logger.V(2) { + a.logger.Infof("Initiating fallback to server %s", fallbackChannel.sc) + } + xc, cleanup, err := a.getChannelForADS(fallbackChannel.sc, a) + if err != nil { + a.logger.Errorf("Failed to create XDS channel: %v", err) + return + } + fallbackChannel.xc = xc + fallbackChannel.cleanup = cleanup + a.activeXDSChannel = fallbackChannel + + // Subscribe to all existing resources from the new management server. + for typ, resources := range a.resources { + for name, state := range resources { + if a.logger.V(2) { + a.logger.Infof("Resubscribing to resource of type %q and name %q", typ.TypeName(), name) + } + xc.subscribe(typ, name) + + // Add the fallback channel to the list of xdsChannels from which + // this resource has been requested from. Retain the cached resource + // and the set of existing watchers (and other metadata fields) in + // the resource state. + state.xdsChannelConfigs = append(state.xdsChannelConfigs, fallbackChannel) + } + } } // adsResourceUpdate is called to notify the authority about a resource update @@ -218,13 +300,15 @@ func (a *authority) adsResourceUpdate(serverConfig *bootstrap.ServerConfig, rTyp // handleADSResourceUpdate processes an update from the xDS client, updating the // resource cache and notifying any registered watchers of the update. // +// If the update is received from a higher priority xdsChannel that was +// previously down, we revert to it and close all lower priority xdsChannels. +// // Once the update has been processed by all watchers, the authority is expected // to invoke the onDone callback. // // Only executed in the context of a serializer callback. func (a *authority) handleADSResourceUpdate(serverConfig *bootstrap.ServerConfig, rType xdsresource.Type, updates map[string]ads.DataAndErrTuple, md xdsresource.UpdateMetadata, onDone func()) { - // TODO(easwars-fallback): Trigger reverting to a higher priority server if - // the update is from one. + a.handleRevertingToPrimaryOnUpdate(serverConfig) // We build a list of callback funcs to invoke, and invoke them at the end // of this method instead of inline (when handling the update for a @@ -416,6 +500,76 @@ func (a *authority) handleADSResourceDoesNotExist(rType xdsresource.Type, resour } } +// handleRevertingToPrimaryOnUpdate is called when a resource update is received +// from a server that is not the current active server. This method ensures that +// all lower priority servers are closed and the active server is reverted to +// the highest priority server that has sent an update. +// +// This method is only executed in the context of a serializer callback. +func (a *authority) handleRevertingToPrimaryOnUpdate(serverConfig *bootstrap.ServerConfig) { + if a.activeXDSChannel != nil && a.activeXDSChannel.sc.Equal(serverConfig) { + // If the resource update is from the current active server, nothing + // needs to be done from fallback point of view. + return + } + + if a.logger.V(2) { + a.logger.Infof("Received update from non-active server %q", serverConfig) + } + + // If the resource update is not from the current active server, it means + // that we have received an update from a higher priority server and we need + // to revert back to it. This method guarantees that when an update is + // received from a server, all lower priority servers are closed. + serverIdx := 0 + for _, cfg := range a.xdsChannelConfigs { + if cfg.sc.Equal(serverConfig) { + break + } + serverIdx++ + } + if serverIdx == len(a.xdsChannelConfigs) { + // This can never happen. + a.logger.Errorf("Received update from an unknown server: %v", serverConfig) + return + } + a.activeXDSChannel = a.xdsChannelConfigs[serverIdx] + + // Close all lower priority channel. + for i := serverIdx + 1; i < len(a.xdsChannelConfigs); i++ { + cfg := a.xdsChannelConfigs[i] + + // Unsubscribe any resources that were subscribed to, on this channel + // and remove it from the resource cache. When a ref to a channel is + // being released, there should be no more references to it from the + // resource cache. + for rType, rState := range a.resources { + for resourceName, state := range rState { + idx := 0 + for _, xc := range state.xdsChannelConfigs { + if xc != cfg { + state.xdsChannelConfigs[idx] = xc + idx++ + continue + } + xc.xc.unsubscribe(rType, resourceName) + } + state.xdsChannelConfigs = state.xdsChannelConfigs[:idx] + } + } + + // Release the reference to the channel. + if cfg.cleanup != nil { + if a.logger.V(2) { + a.logger.Infof("Closing lower priority server %q", cfg.sc) + } + cfg.cleanup() + cfg.cleanup = nil + } + cfg.xc = nil + } +} + // watchResource registers a new watcher for the specified resource type and // name. It returns a function that can be called to cancel the watch. // @@ -580,6 +734,21 @@ func (a *authority) closeXDSChannels() { a.activeXDSChannel = nil } +// uncachedWatcherExists returns true if there is at least one watcher for a +// resource that has not yet been cached. +// +// Only executed in the context of a serializer callback. +func (a *authority) uncachedWatcherExists() bool { + for _, resourceStates := range a.resources { + for _, state := range resourceStates { + if state.md.Status == xdsresource.ServiceStatusRequested { + return true + } + } + } + return false +} + // dumpResources returns a dump of the resource configuration cached by this // authority, for CSDS purposes. func (a *authority) dumpResources() []*v3statuspb.ClientConfig_GenericXdsConfig { diff --git a/xds/internal/xdsclient/client_new.go b/xds/internal/xdsclient/client_new.go index 839cf23833b3..82e549fda53a 100644 --- a/xds/internal/xdsclient/client_new.go +++ b/xds/internal/xdsclient/client_new.go @@ -108,6 +108,7 @@ func newClientImpl(config *bootstrap.Config, watchExpiryTimeout, idleChannelExpi type OptionsForTesting struct { // Name is a unique name for this xDS client. Name string + // Contents contain a JSON representation of the bootstrap configuration to // be used when creating the xDS client. Contents []byte @@ -180,7 +181,6 @@ func GetForTesting(name string) (XDSClient, func(), error) { func init() { internal.TriggerXDSResourceNotFoundForTesting = triggerXDSResourceNotFoundForTesting xdsclientinternal.ResourceWatchStateForTesting = resourceWatchStateForTesting - } func triggerXDSResourceNotFoundForTesting(client XDSClient, typ xdsresource.Type, name string) error { diff --git a/xds/internal/xdsclient/tests/fallback_test.go b/xds/internal/xdsclient/tests/fallback_test.go new file mode 100644 index 000000000000..b9f933783726 --- /dev/null +++ b/xds/internal/xdsclient/tests/fallback_test.go @@ -0,0 +1,626 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xdsclient_test + +import ( + "context" + "fmt" + "sync/atomic" + "testing" + "time" + + "github.com/google/uuid" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/envconfig" + "google.golang.org/grpc/internal/stubserver" + "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/xds/bootstrap" + "google.golang.org/grpc/peer" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/status" + "google.golang.org/grpc/xds/internal/xdsclient" + "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" + + v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + testgrpc "google.golang.org/grpc/interop/grpc_testing" + testpb "google.golang.org/grpc/interop/grpc_testing" +) + +// Give the fallback tests additional time to complete because they need to +// first identify failed connections before establishing new ones. +const defaultFallbackTestTimeout = 2 * defaultTestTimeout + +func waitForRPCsToReachBackend(ctx context.Context, client testgrpc.TestServiceClient, backend string) error { + var lastErr error + for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) { + var peer peer.Peer + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil { + lastErr = err + continue + } + // Veirfy the peer when the RPC succeeds. + if peer.Addr.String() == backend { + break + } + } + if ctx.Err() != nil { + return fmt.Errorf("timeout when waiting for RPCs to reach expected backend. Last error: %v", lastErr) + } + return nil +} + +// Tests fallback on startup where the xDS client is unable to establish a +// connection to the primary server. The test verifies that the xDS client falls +// back to the secondary server, and when the primary comes back up, it reverts +// to it. The test also verifies that when all requested resources are cached +// from the primary, fallback is not triggered when the connection goes down. +func (s) TestFallback_OnStartup(t *testing.T) { + // Enable fallback env var. + origFallbackEnv := envconfig.XDSFallbackSupport + envconfig.XDSFallbackSupport = true + defer func() { envconfig.XDSFallbackSupport = origFallbackEnv }() + + ctx, cancel := context.WithTimeout(context.Background(), defaultFallbackTestTimeout) + defer cancel() + + // Create two listeners for the two management servers. The test can + // start/stop these listeners and can also get notified when the listener + // receives a connection request. + primaryWrappedLis := testutils.NewListenerWrapper(t, nil) + primaryLis := testutils.NewRestartableListener(primaryWrappedLis) + fallbackWrappedLis := testutils.NewListenerWrapper(t, nil) + fallbackLis := testutils.NewRestartableListener(fallbackWrappedLis) + + // Start two management servers, primary and fallback, with the above + // listeners. + primaryManagementServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{Listener: primaryLis}) + fallbackManagementServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{Listener: fallbackLis}) + + // Start two test service backends. + backend1 := stubserver.StartTestService(t, nil) + defer backend1.Stop() + backend2 := stubserver.StartTestService(t, nil) + defer backend2.Stop() + + // Configure xDS resource on the primary management server, with a cluster + // resource that contains an endpoint for backend1. + nodeID := uuid.New().String() + const serviceName = "my-service-fallback-xds" + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + DialTarget: serviceName, + NodeID: nodeID, + Host: "localhost", + Port: testutils.ParsePort(t, backend1.Address), + SecLevel: e2e.SecurityLevelNone, + }) + if err := primaryManagementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Configure xDS resource on the secondary management server, with a cluster + // resource that contains an endpoint for backend2. Only the listener + // resource has the same name on both servers. + fallbackRouteConfigName := "fallback-route-" + serviceName + fallbackClusterName := "fallback-cluster-" + serviceName + fallbackEndpointsName := "fallback-endpoints-" + serviceName + resources = e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, fallbackRouteConfigName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(fallbackRouteConfigName, serviceName, fallbackClusterName)}, + Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(fallbackClusterName, fallbackEndpointsName, e2e.SecurityLevelNone)}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(fallbackEndpointsName, "localhost", []uint32{testutils.ParsePort(t, backend2.Address)})}, + } + if err := fallbackManagementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Shut both management servers down before starting the gRPC client to + // trigger fallback on startup. + primaryLis.Stop() + fallbackLis.Stop() + + // Generate bootstrap configuration with the above two servers. + bootstrapContents, err := bootstrap.NewContentsForTesting(bootstrap.ConfigOptionsForTesting{ + Servers: []byte(fmt.Sprintf(`[ + { + "server_uri": %q, + "channel_creds": [{"type": "insecure"}] + }, + { + "server_uri": %q, + "channel_creds": [{"type": "insecure"}] + }]`, primaryManagementServer.Address, fallbackManagementServer.Address)), + Node: []byte(fmt.Sprintf(`{"id": "%s"}`, nodeID)), + }) + if err != nil { + t.Fatalf("Failed to create bootstrap file: %v", err) + } + + // Create an xDS client with the above bootstrap configuration and a short + // idle channel expiry timeout. This ensures that connections to lower + // priority servers get closed quickly, for the test to verify. + xdsC, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bootstrapContents, + IdleChannelExpiryTimeout: defaultTestIdleChannelExpiryTimeout, + }) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer close() + + // Get the xDS resolver to use the above xDS client. + resolverBuilder := internal.NewXDSResolverWithClientForTesting.(func(xdsclient.XDSClient) (resolver.Builder, error)) + resolver, err := resolverBuilder(xdsC) + if err != nil { + t.Fatalf("Failed to create xDS resolver for testing: %v", err) + } + + // Start a gRPC client that uses the above xDS resolver. + cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver)) + if err != nil { + t.Fatalf("Failed to create gRPC client: %v", err) + } + defer cc.Close() + cc.Connect() + + // Ensure that a connection is attempted to the primary. + if _, err := primaryWrappedLis.NewConnCh.Receive(ctx); err != nil { + t.Fatalf("Failure when waiting for a connection to be opened to the primary management server: %v", err) + } + + // Ensure that a connection is attempted to the fallback. + if _, err := fallbackWrappedLis.NewConnCh.Receive(ctx); err != nil { + t.Fatalf("Failure when waiting for a connection to be opened to the primary management server: %v", err) + } + + // Make an RPC with a shortish deadline and expect it to fail. + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + client := testgrpc.NewTestServiceClient(cc) + if _, err := client.EmptyCall(sCtx, &testpb.Empty{}, grpc.WaitForReady(true)); err == nil || status.Code(err) != codes.DeadlineExceeded { + t.Fatalf("EmptyCall() = %v, want DeadlineExceeded", err) + } + + // Start the fallback server. Ensure that an RPC can succeed, and that it + // reaches backend2. + fallbackLis.Restart() + if err := waitForRPCsToReachBackend(ctx, client, backend2.Address); err != nil { + t.Fatal(err) + } + + // Start the primary server. It can take a while before the xDS client + // notices this, since the ADS stream implementation uses a backoff before + // retrying the stream. + primaryLis.Restart() + + // Wait for the connection to the secondary to be closed and ensure that an + // RPC can succeed, and that it reaches backend1. + c, err := fallbackWrappedLis.NewConnCh.Receive(ctx) + if err != nil { + t.Fatalf("Failure when retrieving the most recent connection to the fallback management server: %v", err) + } + conn := c.(*testutils.ConnWrapper) + if _, err := conn.CloseCh.Receive(ctx); err != nil { + t.Fatalf("Connection to fallback server not closed once primary becomes ready: %v", err) + } + if err := waitForRPCsToReachBackend(ctx, client, backend1.Address); err != nil { + t.Fatal(err) + } + + // Stop the primary servers. Since all xDS resources were received from the + // primary (and RPCs were succeeding to the clusters returned by the + // primary), we will not trigger fallback. + primaryLis.Stop() + sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + if _, err := fallbackWrappedLis.NewConnCh.Receive(sCtx); err == nil { + t.Fatalf("Fallback attempted when not expected to. There are no uncached resources from the primary server at this point.") + } + + // Ensure that RPCs still succeed, and that they use the configuration + // received from the primary. + if err := waitForRPCsToReachBackend(ctx, client, backend1.Address); err != nil { + t.Fatal(err) + } +} + +// Tests fallback when the primary management server fails during an update. +func (s) TestFallback_MidUpdate(t *testing.T) { + // Enable fallback env var. + origFallbackEnv := envconfig.XDSFallbackSupport + envconfig.XDSFallbackSupport = true + defer func() { envconfig.XDSFallbackSupport = origFallbackEnv }() + + ctx, cancel := context.WithTimeout(context.Background(), defaultFallbackTestTimeout) + defer cancel() + + // Create two listeners for the two management servers. The test can + // start/stop these listeners and can also get notified when the listener + // receives a connection request. + primaryWrappedLis := testutils.NewListenerWrapper(t, nil) + primaryLis := testutils.NewRestartableListener(primaryWrappedLis) + fallbackWrappedLis := testutils.NewListenerWrapper(t, nil) + fallbackLis := testutils.NewRestartableListener(fallbackWrappedLis) + + // This boolean helps with triggering fallback mid update. When this boolean + // is set and the below defined cluster resource is requested, the primary + // management server shuts down the connection, forcing the client to + // fallback to the secondary server. + var closeConnOnMidUpdateClusterResource atomic.Bool + const ( + serviceName = "my-service-fallback-xds" + routeConfigName = "route-" + serviceName + clusterName = "cluster-" + serviceName + endpointsName = "endpoints-" + serviceName + midUpdateRouteConfigName = "mid-update-route-" + serviceName + midUpdateClusterName = "mid-update-cluster-" + serviceName + midUpdateEndpointsName = "mid-update-endpoints-" + serviceName + ) + + // Start two management servers, primary and fallback, with the above + // listeners. + primaryManagementServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ + Listener: primaryLis, + OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error { + if closeConnOnMidUpdateClusterResource.Load() == false { + return nil + } + if req.GetTypeUrl() != version.V3ClusterURL { + return nil + } + for _, name := range req.GetResourceNames() { + if name == midUpdateClusterName { + primaryLis.Stop() + return fmt.Errorf("closing ADS stream because %q resource was requested", midUpdateClusterName) + } + } + return nil + }, + AllowResourceSubset: true, + }) + fallbackManagementServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{Listener: fallbackLis}) + + // Start three test service backends. + backend1 := stubserver.StartTestService(t, nil) + defer backend1.Stop() + backend2 := stubserver.StartTestService(t, nil) + defer backend2.Stop() + backend3 := stubserver.StartTestService(t, nil) + defer backend3.Stop() + + // Configure xDS resource on the primary management server, with a cluster + // resource that contains an endpoint for backend1. + nodeID := uuid.New().String() + primaryResources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, routeConfigName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeConfigName, serviceName, clusterName)}, + Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, endpointsName, e2e.SecurityLevelNone)}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(endpointsName, "localhost", []uint32{testutils.ParsePort(t, backend1.Address)})}, + } + if err := primaryManagementServer.Update(ctx, primaryResources); err != nil { + t.Fatal(err) + } + + // Configure xDS resource on the secondary management server, with a cluster + // resource that contains an endpoint for backend2. Only the listener + // resource has the same name on both servers. + const ( + fallbackRouteConfigName = "fallback-route-" + serviceName + fallbackClusterName = "fallback-cluster-" + serviceName + fallbackEndpointsName = "fallback-endpoints-" + serviceName + ) + fallbackResources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, fallbackRouteConfigName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(fallbackRouteConfigName, serviceName, fallbackClusterName)}, + Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(fallbackClusterName, fallbackEndpointsName, e2e.SecurityLevelNone)}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(fallbackEndpointsName, "localhost", []uint32{testutils.ParsePort(t, backend2.Address)})}, + } + if err := fallbackManagementServer.Update(ctx, fallbackResources); err != nil { + t.Fatal(err) + } + + // Generate bootstrap configuration with the above two servers. + bootstrapContents, err := bootstrap.NewContentsForTesting(bootstrap.ConfigOptionsForTesting{ + Servers: []byte(fmt.Sprintf(`[ + { + "server_uri": %q, + "channel_creds": [{"type": "insecure"}] + }, + { + "server_uri": %q, + "channel_creds": [{"type": "insecure"}] + }]`, primaryManagementServer.Address, fallbackManagementServer.Address)), + Node: []byte(fmt.Sprintf(`{"id": "%s"}`, nodeID)), + }) + if err != nil { + t.Fatalf("Failed to create bootstrap file: %v", err) + } + + // Create an xDS client with the above bootstrap configuration and a short + // idle channel expiry timeout. This ensures that connections to lower + // priority servers get closed quickly, for the test to verify. + xdsC, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bootstrapContents, + IdleChannelExpiryTimeout: defaultTestIdleChannelExpiryTimeout, + }) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer close() + + // Get the xDS resolver to use the above xDS client. + resolverBuilder := internal.NewXDSResolverWithClientForTesting.(func(xdsclient.XDSClient) (resolver.Builder, error)) + resolver, err := resolverBuilder(xdsC) + if err != nil { + t.Fatalf("Failed to create xDS resolver for testing: %v", err) + } + + // Start a gRPC client that uses the above xDS resolver. + cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver)) + if err != nil { + t.Fatalf("Failed to create gRPC client: %v", err) + } + defer cc.Close() + cc.Connect() + + // Ensure that RPCs reach the cluster specified by the primary server and + // that no connection is attempted to the fallback server. + client := testgrpc.NewTestServiceClient(cc) + if err := waitForRPCsToReachBackend(ctx, client, backend1.Address); err != nil { + t.Fatal(err) + } + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + if _, err := fallbackWrappedLis.NewConnCh.Receive(sCtx); err != context.DeadlineExceeded { + t.Fatalf("Connection attempt made to fallback server when none expected: %v", err) + } + + // Instruct the primary server to close the connection if below defined + // cluster resource is requested. + closeConnOnMidUpdateClusterResource.Store(true) + + // Update the listener resource on the primary server to point to a new + // route configuration that points to a new cluster that points to a new + // endpoints resource that contains backend3. + primaryResources = e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, midUpdateRouteConfigName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(midUpdateRouteConfigName, serviceName, midUpdateClusterName)}, + Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(midUpdateClusterName, midUpdateEndpointsName, e2e.SecurityLevelNone)}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(midUpdateEndpointsName, "localhost", []uint32{testutils.ParsePort(t, backend3.Address)})}, + } + if err := primaryManagementServer.Update(ctx, primaryResources); err != nil { + t.Fatal(err) + } + + // Ensure that a connection is attempted to the fallback, and that RPCs are + // routed to the cluster returned by the fallback server. + c, err := fallbackWrappedLis.NewConnCh.Receive(ctx) + if err != nil { + t.Fatalf("Failure when waiting for a connection to be opened to the fallback management server: %v", err) + } + fallbackConn := c.(*testutils.ConnWrapper) + if err := waitForRPCsToReachBackend(ctx, client, backend2.Address); err != nil { + t.Fatal(err) + } + + // Set the primary management server to not close the connection anymore if + // the mid-update cluster resource is requested, and get it to start serving + // again. + closeConnOnMidUpdateClusterResource.Store(false) + primaryLis.Restart() + + // A new snapshot, with the same resources, is pushed to the management + // server to get it to respond for already requested resource names. + if err := primaryManagementServer.Update(ctx, primaryResources); err != nil { + t.Fatal(err) + } + + // Ensure that RPCs reach the backend pointed to by the new cluster. + if err := waitForRPCsToReachBackend(ctx, client, backend3.Address); err != nil { + t.Fatal(err) + } + + // Wait for the connection to the secondary to be closed since we have + // reverted back to the primary. + if _, err := fallbackConn.CloseCh.Receive(ctx); err != nil { + t.Fatalf("Connection to fallback server not closed once primary becomes ready: %v", err) + } +} + +// Tests fallback when the primary management server fails during startup. +func (s) TestFallback_MidStartup(t *testing.T) { + // Enable fallback env var. + origFallbackEnv := envconfig.XDSFallbackSupport + envconfig.XDSFallbackSupport = true + defer func() { envconfig.XDSFallbackSupport = origFallbackEnv }() + + ctx, cancel := context.WithTimeout(context.Background(), defaultFallbackTestTimeout) + defer cancel() + + // Create two listeners for the two management servers. The test can + // start/stop these listeners and can also get notified when the listener + // receives a connection request. + primaryWrappedLis := testutils.NewListenerWrapper(t, nil) + primaryLis := testutils.NewRestartableListener(primaryWrappedLis) + fallbackWrappedLis := testutils.NewListenerWrapper(t, nil) + fallbackLis := testutils.NewRestartableListener(fallbackWrappedLis) + + // This boolean helps with triggering fallback during startup. When this + // boolean is set and a cluster resource is requested, the primary + // management server shuts down the connection, forcing the client to + // fallback to the secondary server. + var closeConnOnClusterResource atomic.Bool + closeConnOnClusterResource.Store(true) + + // Start two management servers, primary and fallback, with the above + // listeners. + primaryManagementServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ + Listener: primaryLis, + OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error { + if closeConnOnClusterResource.Load() == false { + return nil + } + if req.GetTypeUrl() != version.V3ClusterURL { + return nil + } + primaryLis.Stop() + return fmt.Errorf("closing ADS stream because cluster resource was requested") + }, + AllowResourceSubset: true, + }) + fallbackManagementServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{Listener: fallbackLis}) + + // Start two test service backends. + backend1 := stubserver.StartTestService(t, nil) + defer backend1.Stop() + backend2 := stubserver.StartTestService(t, nil) + defer backend2.Stop() + + // Configure xDS resource on the primary management server, with a cluster + // resource that contains an endpoint for backend1. + nodeID := uuid.New().String() + const serviceName = "my-service-fallback-xds" + primaryResources := e2e.DefaultClientResources(e2e.ResourceParams{ + DialTarget: serviceName, + NodeID: nodeID, + Host: "localhost", + Port: testutils.ParsePort(t, backend1.Address), + SecLevel: e2e.SecurityLevelNone, + }) + if err := primaryManagementServer.Update(ctx, primaryResources); err != nil { + t.Fatal(err) + } + + // Configure xDS resource on the secondary management server, with a cluster + // resource that contains an endpoint for backend2. Only the listener + // resource has the same name on both servers. + fallbackRouteConfigName := "fallback-route-" + serviceName + fallbackClusterName := "fallback-cluster-" + serviceName + fallbackEndpointsName := "fallback-endpoints-" + serviceName + fallbackResources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, fallbackRouteConfigName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(fallbackRouteConfigName, serviceName, fallbackClusterName)}, + Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(fallbackClusterName, fallbackEndpointsName, e2e.SecurityLevelNone)}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(fallbackEndpointsName, "localhost", []uint32{testutils.ParsePort(t, backend2.Address)})}, + } + if err := fallbackManagementServer.Update(ctx, fallbackResources); err != nil { + t.Fatal(err) + } + + // Generate bootstrap configuration with the above two servers. + bootstrapContents, err := bootstrap.NewContentsForTesting(bootstrap.ConfigOptionsForTesting{ + Servers: []byte(fmt.Sprintf(`[ + { + "server_uri": %q, + "channel_creds": [{"type": "insecure"}] + }, + { + "server_uri": %q, + "channel_creds": [{"type": "insecure"}] + }]`, primaryManagementServer.Address, fallbackManagementServer.Address)), + Node: []byte(fmt.Sprintf(`{"id": "%s"}`, nodeID)), + }) + if err != nil { + t.Fatalf("Failed to create bootstrap file: %v", err) + } + + // Create an xDS client with the above bootstrap configuration and a short + // idle channel expiry timeout. This ensures that connections to lower + // priority servers get closed quickly, for the test to verify. + xdsC, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bootstrapContents, + IdleChannelExpiryTimeout: defaultTestIdleChannelExpiryTimeout, + }) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer close() + + // Get the xDS resolver to use the above xDS client. + resolverBuilder := internal.NewXDSResolverWithClientForTesting.(func(xdsclient.XDSClient) (resolver.Builder, error)) + resolver, err := resolverBuilder(xdsC) + if err != nil { + t.Fatalf("Failed to create xDS resolver for testing: %v", err) + } + + // Start a gRPC client that uses the above xDS resolver. + cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver)) + if err != nil { + t.Fatalf("Failed to create gRPC client: %v", err) + } + defer cc.Close() + cc.Connect() + + // Ensure that a connection is attempted to the primary. + if _, err := primaryWrappedLis.NewConnCh.Receive(ctx); err != nil { + t.Fatalf("Failure when waiting for a connection to be opened to the primary management server: %v", err) + } + + // Ensure that a connection is attempted to the fallback. + c, err := fallbackWrappedLis.NewConnCh.Receive(ctx) + if err != nil { + t.Fatalf("Failure when waiting for a connection to be opened to the primary management server: %v", err) + } + fallbackConn := c.(*testutils.ConnWrapper) + + // Ensure that RPCs are routed to the cluster returned by the fallback + // management server. + client := testgrpc.NewTestServiceClient(cc) + if err := waitForRPCsToReachBackend(ctx, client, backend2.Address); err != nil { + t.Fatal(err) + } + + // Get the primary management server to no longer close the connection when + // the cluster resource is requested. + closeConnOnClusterResource.Store(false) + primaryLis.Restart() + + // A new snapshot, with the same resources, is pushed to the management + // server to get it to respond for already requested resource names. + if err := primaryManagementServer.Update(ctx, primaryResources); err != nil { + t.Fatal(err) + } + + // Ensure that RPCs are routed to the cluster returned by the primary + // management server. + if err := waitForRPCsToReachBackend(ctx, client, backend1.Address); err != nil { + t.Fatal(err) + } + + // Wait for the connection to the secondary to be closed since we have + // reverted back to the primary. + if _, err := fallbackConn.CloseCh.Receive(ctx); err != nil { + t.Fatalf("Connection to fallback server not closed once primary becomes ready: %v", err) + } +} From 2741b76167d8219bb7df5f2ccade2dc937d3cb3f Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Tue, 5 Nov 2024 00:22:24 +0000 Subject: [PATCH 2/4] first round of review comments from zasweq --- xds/internal/xdsclient/authority.go | 68 ++++++++++++++++------------- 1 file changed, 38 insertions(+), 30 deletions(-) diff --git a/xds/internal/xdsclient/authority.go b/xds/internal/xdsclient/authority.go index ec60088fc161..3ee84e77c208 100644 --- a/xds/internal/xdsclient/authority.go +++ b/xds/internal/xdsclient/authority.go @@ -218,6 +218,18 @@ func (a *authority) handleADSStreamFailure(serverConfig *bootstrap.ServerConfig, a.triggerFallbackOnStreamFailure(serverConfig) } +// serverIndexForConfig returns the index of the xdsChannelConfig that matches +// the provided ServerConfig. If no match is found, it returns the length of the +// xdsChannelConfigs slice, which represents the index of a non-existent config. +func (a *authority) serverIndexForConfig(sc *bootstrap.ServerConfig) int { + for i, cfg := range a.xdsChannelConfigs { + if cfg.sc.Equal(sc) { + return i + } + } + return len(a.xdsChannelConfigs) +} + // Determines the server to fallback to and triggers fallback to the same. If // required, creates an xdsChannel to that server, and re-subscribes to all // existing resources. @@ -230,12 +242,11 @@ func (a *authority) triggerFallbackOnStreamFailure(failingServerConfig *bootstra // The server to fallback to is the next server on the list. If the current // server is the last server, then there is nothing that can be done. - currentServerIdx := 0 - for _, cfg := range a.xdsChannelConfigs { - if cfg.sc.Equal(failingServerConfig) { - break - } - currentServerIdx++ + currentServerIdx := a.serverIndexForConfig(failingServerConfig) + if currentServerIdx == len(a.xdsChannelConfigs) { + // This can never happen. + a.logger.Errorf("Received error from an unknown server: %s", failingServerConfig) + return } if currentServerIdx == len(a.xdsChannelConfigs)-1 { if a.logger.V(2) { @@ -501,9 +512,11 @@ func (a *authority) handleADSResourceDoesNotExist(rType xdsresource.Type, resour } // handleRevertingToPrimaryOnUpdate is called when a resource update is received -// from a server that is not the current active server. This method ensures that -// all lower priority servers are closed and the active server is reverted to -// the highest priority server that has sent an update. +// from the xDS client. +// +// If the update is from the currently active server, nothing is done. Else, all +// lower priority servers are closed and the active server is reverted to the +// highest priority server that sent the update. // // This method is only executed in the context of a serializer callback. func (a *authority) handleRevertingToPrimaryOnUpdate(serverConfig *bootstrap.ServerConfig) { @@ -521,40 +534,35 @@ func (a *authority) handleRevertingToPrimaryOnUpdate(serverConfig *bootstrap.Ser // that we have received an update from a higher priority server and we need // to revert back to it. This method guarantees that when an update is // received from a server, all lower priority servers are closed. - serverIdx := 0 - for _, cfg := range a.xdsChannelConfigs { - if cfg.sc.Equal(serverConfig) { - break - } - serverIdx++ - } + serverIdx := a.serverIndexForConfig(serverConfig) if serverIdx == len(a.xdsChannelConfigs) { // This can never happen. - a.logger.Errorf("Received update from an unknown server: %v", serverConfig) + a.logger.Errorf("Received update from an unknown server: %s", serverConfig) return } a.activeXDSChannel = a.xdsChannelConfigs[serverIdx] - // Close all lower priority channel. + // Close all lower priority channels. + // + // But before closing any channel, we need to unsubscribe from any resources + // that were subscribed to on this channel. Resources could be subscribed to + // from multiple channels as we fallback to lower priority servers. But when + // a higher priority one comes back up, we need to unsubscribe from all + // lower priority ones before releasing the reference to them. for i := serverIdx + 1; i < len(a.xdsChannelConfigs); i++ { cfg := a.xdsChannelConfigs[i] - // Unsubscribe any resources that were subscribed to, on this channel - // and remove it from the resource cache. When a ref to a channel is - // being released, there should be no more references to it from the - // resource cache. for rType, rState := range a.resources { for resourceName, state := range rState { - idx := 0 - for _, xc := range state.xdsChannelConfigs { - if xc != cfg { - state.xdsChannelConfigs[idx] = xc - idx++ - continue + for idx, xc := range state.xdsChannelConfigs { + // If the current resource is subscribed to on this channel, + // unsubscribe, and remove the channel from the list of + // channels that this resource is subscribed to. + if xc == cfg { + state.xdsChannelConfigs = append(state.xdsChannelConfigs[:idx], state.xdsChannelConfigs[idx+1:]...) + xc.xc.unsubscribe(rType, resourceName) } - xc.xc.unsubscribe(rType, resourceName) } - state.xdsChannelConfigs = state.xdsChannelConfigs[:idx] } } From e23a323a7c974d58291e4b5f76b42eb5d179721f Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Tue, 5 Nov 2024 22:23:49 +0000 Subject: [PATCH 3/4] more review comments from zasweq and purnesh42H --- xds/internal/xdsclient/authority.go | 24 +++++++++---------- xds/internal/xdsclient/tests/fallback_test.go | 6 +++-- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/xds/internal/xdsclient/authority.go b/xds/internal/xdsclient/authority.go index 3ee84e77c208..a7c0a2bc24df 100644 --- a/xds/internal/xdsclient/authority.go +++ b/xds/internal/xdsclient/authority.go @@ -40,7 +40,7 @@ type resourceState struct { cache xdsresource.ResourceData // Most recent ACKed update for this resource. md xdsresource.UpdateMetadata // Metadata for the most recent update. deletionIgnored bool // True, if resource deletion was ignored for a prior update. - xdsChannelConfigs []*xdsChannelWithConfig // List of xdsChannels where this resource is subscribed. + xdsChannelConfigs map[*xdsChannelWithConfig]bool // Set of xdsChannels where this resource is subscribed. } // xdsChannelForADS is used to acquire a reference to an xdsChannel. This @@ -209,13 +209,13 @@ func (a *authority) handleADSStreamFailure(serverConfig *bootstrap.ServerConfig, // Cached resources include ones that // - have been successfully received and can be used. // - are considered non-existent according to xDS Protocol Specification. - if !a.uncachedWatcherExists() { + if !a.watcherExistsForUncachedResource() { if a.logger.V(2) { a.logger.Infof("No watchers for uncached resources. Not triggering fallback") } return } - a.triggerFallbackOnStreamFailure(serverConfig) + a.fallbackToNextServerIfPossible(serverConfig) } // serverIndexForConfig returns the index of the xdsChannelConfig that matches @@ -235,7 +235,7 @@ func (a *authority) serverIndexForConfig(sc *bootstrap.ServerConfig) int { // existing resources. // // Only executed in the context of a serializer callback. -func (a *authority) triggerFallbackOnStreamFailure(failingServerConfig *bootstrap.ServerConfig) { +func (a *authority) fallbackToNextServerIfPossible(failingServerConfig *bootstrap.ServerConfig) { if a.logger.V(2) { a.logger.Infof("Attempting to initiate fallback after failure from server %q", failingServerConfig) } @@ -292,7 +292,7 @@ func (a *authority) triggerFallbackOnStreamFailure(failingServerConfig *bootstra // this resource has been requested from. Retain the cached resource // and the set of existing watchers (and other metadata fields) in // the resource state. - state.xdsChannelConfigs = append(state.xdsChannelConfigs, fallbackChannel) + state.xdsChannelConfigs[fallbackChannel] = true } } } @@ -554,13 +554,13 @@ func (a *authority) handleRevertingToPrimaryOnUpdate(serverConfig *bootstrap.Ser for rType, rState := range a.resources { for resourceName, state := range rState { - for idx, xc := range state.xdsChannelConfigs { + for xc := range state.xdsChannelConfigs { // If the current resource is subscribed to on this channel, // unsubscribe, and remove the channel from the list of // channels that this resource is subscribed to. if xc == cfg { - state.xdsChannelConfigs = append(state.xdsChannelConfigs[:idx], state.xdsChannelConfigs[idx+1:]...) xc.xc.unsubscribe(rType, resourceName) + delete(state.xdsChannelConfigs, xc) } } } @@ -624,7 +624,7 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w state = &resourceState{ watchers: make(map[xdsresource.ResourceWatcher]bool), md: xdsresource.UpdateMetadata{Status: xdsresource.ServiceStatusRequested}, - xdsChannelConfigs: []*xdsChannelWithConfig{xdsChannel}, + xdsChannelConfigs: map[*xdsChannelWithConfig]bool{xdsChannel: true}, } resources[resourceName] = state xdsChannel.xc.subscribe(rType, resourceName) @@ -678,7 +678,7 @@ func (a *authority) unwatchResource(rType xdsresource.Type, resourceName string, if a.logger.V(2) { a.logger.Infof("Removing last watch for resource name %q", resourceName) } - for _, xc := range state.xdsChannelConfigs { + for xc := range state.xdsChannelConfigs { xc.xc.unsubscribe(rType, resourceName) } delete(resources, resourceName) @@ -742,11 +742,11 @@ func (a *authority) closeXDSChannels() { a.activeXDSChannel = nil } -// uncachedWatcherExists returns true if there is at least one watcher for a -// resource that has not yet been cached. +// watcherExistsForUncachedResource returns true if there is at least one +// watcher for a resource that has not yet been cached. // // Only executed in the context of a serializer callback. -func (a *authority) uncachedWatcherExists() bool { +func (a *authority) watcherExistsForUncachedResource() bool { for _, resourceStates := range a.resources { for _, state := range resourceStates { if state.md.Status == xdsresource.ServiceStatusRequested { diff --git a/xds/internal/xdsclient/tests/fallback_test.go b/xds/internal/xdsclient/tests/fallback_test.go index b9f933783726..514945f833d0 100644 --- a/xds/internal/xdsclient/tests/fallback_test.go +++ b/xds/internal/xdsclient/tests/fallback_test.go @@ -421,7 +421,9 @@ func (s) TestFallback_MidUpdate(t *testing.T) { t.Fatal(err) } - // Ensure that a connection is attempted to the fallback, and that RPCs are + // Ensure that a connection is attempted to the fallback (because both + // conditions mentioned for fallback in A71 are satisfied: connectivity + // failure and a watcher for an uncached resource), and that RPCs are // routed to the cluster returned by the fallback server. c, err := fallbackWrappedLis.NewConnCh.Receive(ctx) if err != nil { @@ -590,7 +592,7 @@ func (s) TestFallback_MidStartup(t *testing.T) { // Ensure that a connection is attempted to the fallback. c, err := fallbackWrappedLis.NewConnCh.Receive(ctx) if err != nil { - t.Fatalf("Failure when waiting for a connection to be opened to the primary management server: %v", err) + t.Fatalf("Failure when waiting for a connection to be opened to the secondary management server: %v", err) } fallbackConn := c.(*testutils.ConnWrapper) From 72930ff6d509f02d8fe6ba152196d4a85166b44f Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Wed, 6 Nov 2024 19:30:18 +0000 Subject: [PATCH 4/4] use more descriptive field names --- xds/internal/xdsclient/authority.go | 57 +++++++++++++++-------------- 1 file changed, 29 insertions(+), 28 deletions(-) diff --git a/xds/internal/xdsclient/authority.go b/xds/internal/xdsclient/authority.go index a7c0a2bc24df..b9052ffe8d14 100644 --- a/xds/internal/xdsclient/authority.go +++ b/xds/internal/xdsclient/authority.go @@ -59,9 +59,9 @@ type xdsChannelForADS func(*bootstrap.ServerConfig, *authority) (*xdsChannel, fu // xdsChannelWithConfig is a struct that holds an xdsChannel and its associated // ServerConfig, along with a cleanup function to release the xdsChannel. type xdsChannelWithConfig struct { - xc *xdsChannel - sc *bootstrap.ServerConfig - cleanup func() + channel *xdsChannel + serverConfig *bootstrap.ServerConfig + cleanup func() } // authority provides the functionality required to communicate with a @@ -149,7 +149,7 @@ func newAuthority(args authorityBuildOptions) *authority { // first watch is registered, and channels to other server configurations // are created as needed to support fallback. for _, sc := range args.serverConfigs { - ret.xdsChannelConfigs = append(ret.xdsChannelConfigs, &xdsChannelWithConfig{sc: sc}) + ret.xdsChannelConfigs = append(ret.xdsChannelConfigs, &xdsChannelWithConfig{serverConfig: sc}) } return ret } @@ -223,7 +223,7 @@ func (a *authority) handleADSStreamFailure(serverConfig *bootstrap.ServerConfig, // xdsChannelConfigs slice, which represents the index of a non-existent config. func (a *authority) serverIndexForConfig(sc *bootstrap.ServerConfig) int { for i, cfg := range a.xdsChannelConfigs { - if cfg.sc.Equal(sc) { + if cfg.serverConfig.Equal(sc) { return i } } @@ -260,23 +260,23 @@ func (a *authority) fallbackToNextServerIfPossible(failingServerConfig *bootstra // If the server to fallback to already has an xdsChannel, it means that // this connectivity error is from a server with a higher priority. There // is not much we can do here. - if fallbackChannel.xc != nil { + if fallbackChannel.channel != nil { if a.logger.V(2) { - a.logger.Infof("Channel to the next server in the list %q already exists", fallbackChannel.sc) + a.logger.Infof("Channel to the next server in the list %q already exists", fallbackChannel.serverConfig) } return } // Create an xdsChannel for the fallback server. if a.logger.V(2) { - a.logger.Infof("Initiating fallback to server %s", fallbackChannel.sc) + a.logger.Infof("Initiating fallback to server %s", fallbackChannel.serverConfig) } - xc, cleanup, err := a.getChannelForADS(fallbackChannel.sc, a) + xc, cleanup, err := a.getChannelForADS(fallbackChannel.serverConfig, a) if err != nil { a.logger.Errorf("Failed to create XDS channel: %v", err) return } - fallbackChannel.xc = xc + fallbackChannel.channel = xc fallbackChannel.cleanup = cleanup a.activeXDSChannel = fallbackChannel @@ -520,7 +520,7 @@ func (a *authority) handleADSResourceDoesNotExist(rType xdsresource.Type, resour // // This method is only executed in the context of a serializer callback. func (a *authority) handleRevertingToPrimaryOnUpdate(serverConfig *bootstrap.ServerConfig) { - if a.activeXDSChannel != nil && a.activeXDSChannel.sc.Equal(serverConfig) { + if a.activeXDSChannel != nil && a.activeXDSChannel.serverConfig.Equal(serverConfig) { // If the resource update is from the current active server, nothing // needs to be done from fallback point of view. return @@ -554,14 +554,15 @@ func (a *authority) handleRevertingToPrimaryOnUpdate(serverConfig *bootstrap.Ser for rType, rState := range a.resources { for resourceName, state := range rState { - for xc := range state.xdsChannelConfigs { + for xcc := range state.xdsChannelConfigs { + if xcc != cfg { + continue + } // If the current resource is subscribed to on this channel, // unsubscribe, and remove the channel from the list of // channels that this resource is subscribed to. - if xc == cfg { - xc.xc.unsubscribe(rType, resourceName) - delete(state.xdsChannelConfigs, xc) - } + xcc.channel.unsubscribe(rType, resourceName) + delete(state.xdsChannelConfigs, xcc) } } } @@ -569,12 +570,12 @@ func (a *authority) handleRevertingToPrimaryOnUpdate(serverConfig *bootstrap.Ser // Release the reference to the channel. if cfg.cleanup != nil { if a.logger.V(2) { - a.logger.Infof("Closing lower priority server %q", cfg.sc) + a.logger.Infof("Closing lower priority server %q", cfg.serverConfig) } cfg.cleanup() cfg.cleanup = nil } - cfg.xc = nil + cfg.channel = nil } } @@ -627,7 +628,7 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w xdsChannelConfigs: map[*xdsChannelWithConfig]bool{xdsChannel: true}, } resources[resourceName] = state - xdsChannel.xc.subscribe(rType, resourceName) + xdsChannel.channel.subscribe(rType, resourceName) } // Always add the new watcher to the set of watchers. state.watchers[watcher] = true @@ -678,8 +679,8 @@ func (a *authority) unwatchResource(rType xdsresource.Type, resourceName string, if a.logger.V(2) { a.logger.Infof("Removing last watch for resource name %q", resourceName) } - for xc := range state.xdsChannelConfigs { - xc.xc.unsubscribe(rType, resourceName) + for xcc := range state.xdsChannelConfigs { + xcc.channel.unsubscribe(rType, resourceName) } delete(resources, resourceName) @@ -715,13 +716,13 @@ func (a *authority) xdsChannelToUse() *xdsChannelWithConfig { return a.activeXDSChannel } - sc := a.xdsChannelConfigs[0].sc + sc := a.xdsChannelConfigs[0].serverConfig xc, cleanup, err := a.getChannelForADS(sc, a) if err != nil { a.logger.Warningf("Failed to create xDS channel: %v", err) return nil } - a.xdsChannelConfigs[0].xc = xc + a.xdsChannelConfigs[0].channel = xc a.xdsChannelConfigs[0].cleanup = cleanup a.activeXDSChannel = a.xdsChannelConfigs[0] return a.activeXDSChannel @@ -732,12 +733,12 @@ func (a *authority) xdsChannelToUse() *xdsChannelWithConfig { // // Only executed in the context of a serializer callback. func (a *authority) closeXDSChannels() { - for _, xc := range a.xdsChannelConfigs { - if xc.cleanup != nil { - xc.cleanup() - xc.cleanup = nil + for _, xcc := range a.xdsChannelConfigs { + if xcc.cleanup != nil { + xcc.cleanup() + xcc.cleanup = nil } - xc.xc = nil + xcc.channel = nil } a.activeXDSChannel = nil }