Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xdsclient: support fallback within an authority #7701

Merged
merged 4 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 198 additions & 20 deletions xds/internal/xdsclient/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
cache xdsresource.ResourceData // Most recent ACKed update for this resource.
md xdsresource.UpdateMetadata // Metadata for the most recent update.
deletionIgnored bool // True, if resource deletion was ignored for a prior update.
xdsChannelConfigs []*xdsChannelWithConfig // List of xdsChannels where this resource is subscribed.
xdsChannelConfigs map[*xdsChannelWithConfig]bool // Set of xdsChannels where this resource is subscribed.
}

// xdsChannelForADS is used to acquire a reference to an xdsChannel. This
Expand All @@ -59,9 +59,9 @@
// xdsChannelWithConfig is a struct that holds an xdsChannel and its associated
// ServerConfig, along with a cleanup function to release the xdsChannel.
type xdsChannelWithConfig struct {
xc *xdsChannel
sc *bootstrap.ServerConfig
cleanup func()
channel *xdsChannel
serverConfig *bootstrap.ServerConfig
cleanup func()
}

// authority provides the functionality required to communicate with a
Expand Down Expand Up @@ -149,7 +149,7 @@
// first watch is registered, and channels to other server configurations
// are created as needed to support fallback.
for _, sc := range args.serverConfigs {
ret.xdsChannelConfigs = append(ret.xdsChannelConfigs, &xdsChannelWithConfig{sc: sc})
ret.xdsChannelConfigs = append(ret.xdsChannelConfigs, &xdsChannelWithConfig{serverConfig: sc})
}
return ret
}
Expand Down Expand Up @@ -200,8 +200,101 @@
}
}

// TODO(easwars-fallback): Trigger fallback here if conditions for fallback
// are met.
// Two conditions need to be met for fallback to be triggered:
// 1. There is a connectivity failure on the ADS stream, as described in
// gRFC A57. For us, this means that the ADS stream was closed before the
// first server response was received. We already checked that condition
// earlier in this method.
// 2. There is at least one watcher for a resource that is not cached.
// Cached resources include ones that
// - have been successfully received and can be used.
// - are considered non-existent according to xDS Protocol Specification.
if !a.watcherExistsForUncachedResource() {
if a.logger.V(2) {
a.logger.Infof("No watchers for uncached resources. Not triggering fallback")
zasweq marked this conversation as resolved.
Show resolved Hide resolved
}

Check warning on line 215 in xds/internal/xdsclient/authority.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/authority.go#L214-L215

Added lines #L214 - L215 were not covered by tests
return
}
a.fallbackToNextServerIfPossible(serverConfig)
}

// serverIndexForConfig returns the index of the xdsChannelConfig that matches
// the provided ServerConfig. If no match is found, it returns the length of the
// xdsChannelConfigs slice, which represents the index of a non-existent config.
func (a *authority) serverIndexForConfig(sc *bootstrap.ServerConfig) int {
for i, cfg := range a.xdsChannelConfigs {
if cfg.serverConfig.Equal(sc) {
return i
}
}
return len(a.xdsChannelConfigs)

Check warning on line 230 in xds/internal/xdsclient/authority.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/authority.go#L230

Added line #L230 was not covered by tests
}

// Determines the server to fallback to and triggers fallback to the same. If
// required, creates an xdsChannel to that server, and re-subscribes to all
// existing resources.
//
// Only executed in the context of a serializer callback.
func (a *authority) fallbackToNextServerIfPossible(failingServerConfig *bootstrap.ServerConfig) {
if a.logger.V(2) {
a.logger.Infof("Attempting to initiate fallback after failure from server %q", failingServerConfig)
}

Check warning on line 241 in xds/internal/xdsclient/authority.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/authority.go#L240-L241

Added lines #L240 - L241 were not covered by tests

// The server to fallback to is the next server on the list. If the current
// server is the last server, then there is nothing that can be done.
currentServerIdx := a.serverIndexForConfig(failingServerConfig)
if currentServerIdx == len(a.xdsChannelConfigs) {
// This can never happen.
a.logger.Errorf("Received error from an unknown server: %s", failingServerConfig)
return
}

Check warning on line 250 in xds/internal/xdsclient/authority.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/authority.go#L247-L250

Added lines #L247 - L250 were not covered by tests
if currentServerIdx == len(a.xdsChannelConfigs)-1 {
if a.logger.V(2) {
a.logger.Infof("No more servers to fallback to")
}

Check warning on line 254 in xds/internal/xdsclient/authority.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/authority.go#L253-L254

Added lines #L253 - L254 were not covered by tests
return
}
fallbackServerIdx := currentServerIdx + 1
fallbackChannel := a.xdsChannelConfigs[fallbackServerIdx]

// If the server to fallback to already has an xdsChannel, it means that
// this connectivity error is from a server with a higher priority. There
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't quite understand this comment. If there is an error from higher priority server, we should fallback to a lower priority server. How is having an existing channel for fallback server makes any difference?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets say the authority has two servers: primary and fallback. We start off with primary, and say it fails and therefore we switch to fallback. And lets say the connection to fallback works and we get all resources from it and we are happy. Now, we get another error from the primary (in fact, we will keep getting stream errors from it since we retry the stream with backoff). At this point, we will see that we already have a channel to the next server in the list which is the fallback server, and therefore we have nothing to do here.

// is not much we can do here.
if fallbackChannel.channel != nil {
if a.logger.V(2) {
a.logger.Infof("Channel to the next server in the list %q already exists", fallbackChannel.serverConfig)
}

Check warning on line 266 in xds/internal/xdsclient/authority.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/authority.go#L265-L266

Added lines #L265 - L266 were not covered by tests
return
}

// Create an xdsChannel for the fallback server.
if a.logger.V(2) {
a.logger.Infof("Initiating fallback to server %s", fallbackChannel.serverConfig)
}

Check warning on line 273 in xds/internal/xdsclient/authority.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/authority.go#L272-L273

Added lines #L272 - L273 were not covered by tests
xc, cleanup, err := a.getChannelForADS(fallbackChannel.serverConfig, a)
if err != nil {
a.logger.Errorf("Failed to create XDS channel: %v", err)
return
}

Check warning on line 278 in xds/internal/xdsclient/authority.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/authority.go#L276-L278

Added lines #L276 - L278 were not covered by tests
fallbackChannel.channel = xc
fallbackChannel.cleanup = cleanup
a.activeXDSChannel = fallbackChannel

// Subscribe to all existing resources from the new management server.
for typ, resources := range a.resources {
for name, state := range resources {
if a.logger.V(2) {
a.logger.Infof("Resubscribing to resource of type %q and name %q", typ.TypeName(), name)
}

Check warning on line 288 in xds/internal/xdsclient/authority.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/authority.go#L287-L288

Added lines #L287 - L288 were not covered by tests
xc.subscribe(typ, name)

// Add the fallback channel to the list of xdsChannels from which
// this resource has been requested from. Retain the cached resource
// and the set of existing watchers (and other metadata fields) in
// the resource state.
state.xdsChannelConfigs[fallbackChannel] = true
}
}
}

// adsResourceUpdate is called to notify the authority about a resource update
Expand All @@ -218,13 +311,15 @@
// handleADSResourceUpdate processes an update from the xDS client, updating the
// resource cache and notifying any registered watchers of the update.
//
// If the update is received from a higher priority xdsChannel that was
// previously down, we revert to it and close all lower priority xdsChannels.
//
// Once the update has been processed by all watchers, the authority is expected
// to invoke the onDone callback.
//
// Only executed in the context of a serializer callback.
func (a *authority) handleADSResourceUpdate(serverConfig *bootstrap.ServerConfig, rType xdsresource.Type, updates map[string]ads.DataAndErrTuple, md xdsresource.UpdateMetadata, onDone func()) {
// TODO(easwars-fallback): Trigger reverting to a higher priority server if
// the update is from one.
a.handleRevertingToPrimaryOnUpdate(serverConfig)

// We build a list of callback funcs to invoke, and invoke them at the end
// of this method instead of inline (when handling the update for a
Expand Down Expand Up @@ -416,6 +511,74 @@
}
}

// handleRevertingToPrimaryOnUpdate is called when a resource update is received
// from the xDS client.
//
// If the update is from the currently active server, nothing is done. Else, all
// lower priority servers are closed and the active server is reverted to the
// highest priority server that sent the update.
//
// This method is only executed in the context of a serializer callback.
func (a *authority) handleRevertingToPrimaryOnUpdate(serverConfig *bootstrap.ServerConfig) {
if a.activeXDSChannel != nil && a.activeXDSChannel.serverConfig.Equal(serverConfig) {
// If the resource update is from the current active server, nothing
// needs to be done from fallback point of view.
return
}

if a.logger.V(2) {
a.logger.Infof("Received update from non-active server %q", serverConfig)
}

Check warning on line 531 in xds/internal/xdsclient/authority.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/authority.go#L530-L531

Added lines #L530 - L531 were not covered by tests

// If the resource update is not from the current active server, it means
// that we have received an update from a higher priority server and we need
// to revert back to it. This method guarantees that when an update is
// received from a server, all lower priority servers are closed.
serverIdx := a.serverIndexForConfig(serverConfig)
if serverIdx == len(a.xdsChannelConfigs) {
// This can never happen.
a.logger.Errorf("Received update from an unknown server: %s", serverConfig)
return
}

Check warning on line 542 in xds/internal/xdsclient/authority.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/authority.go#L539-L542

Added lines #L539 - L542 were not covered by tests
a.activeXDSChannel = a.xdsChannelConfigs[serverIdx]

// Close all lower priority channels.
//
// But before closing any channel, we need to unsubscribe from any resources
// that were subscribed to on this channel. Resources could be subscribed to
// from multiple channels as we fallback to lower priority servers. But when
// a higher priority one comes back up, we need to unsubscribe from all
// lower priority ones before releasing the reference to them.
for i := serverIdx + 1; i < len(a.xdsChannelConfigs); i++ {
cfg := a.xdsChannelConfigs[i]

for rType, rState := range a.resources {
for resourceName, state := range rState {
for xcc := range state.xdsChannelConfigs {
if xcc != cfg {
continue
}
// If the current resource is subscribed to on this channel,
// unsubscribe, and remove the channel from the list of
// channels that this resource is subscribed to.
xcc.channel.unsubscribe(rType, resourceName)
delete(state.xdsChannelConfigs, xcc)
}
}
}

// Release the reference to the channel.
if cfg.cleanup != nil {
if a.logger.V(2) {
a.logger.Infof("Closing lower priority server %q", cfg.serverConfig)
}

Check warning on line 574 in xds/internal/xdsclient/authority.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/authority.go#L573-L574

Added lines #L573 - L574 were not covered by tests
cfg.cleanup()
cfg.cleanup = nil
}
cfg.channel = nil
}
}

// watchResource registers a new watcher for the specified resource type and
// name. It returns a function that can be called to cancel the watch.
//
Expand Down Expand Up @@ -462,10 +625,10 @@
state = &resourceState{
watchers: make(map[xdsresource.ResourceWatcher]bool),
md: xdsresource.UpdateMetadata{Status: xdsresource.ServiceStatusRequested},
xdsChannelConfigs: []*xdsChannelWithConfig{xdsChannel},
xdsChannelConfigs: map[*xdsChannelWithConfig]bool{xdsChannel: true},
}
resources[resourceName] = state
xdsChannel.xc.subscribe(rType, resourceName)
xdsChannel.channel.subscribe(rType, resourceName)
}
// Always add the new watcher to the set of watchers.
state.watchers[watcher] = true
Expand Down Expand Up @@ -516,8 +679,8 @@
if a.logger.V(2) {
a.logger.Infof("Removing last watch for resource name %q", resourceName)
}
for _, xc := range state.xdsChannelConfigs {
xc.xc.unsubscribe(rType, resourceName)
for xcc := range state.xdsChannelConfigs {
xcc.channel.unsubscribe(rType, resourceName)
}
delete(resources, resourceName)

Expand Down Expand Up @@ -553,13 +716,13 @@
return a.activeXDSChannel
}

sc := a.xdsChannelConfigs[0].sc
sc := a.xdsChannelConfigs[0].serverConfig
xc, cleanup, err := a.getChannelForADS(sc, a)
if err != nil {
a.logger.Warningf("Failed to create xDS channel: %v", err)
return nil
}
a.xdsChannelConfigs[0].xc = xc
a.xdsChannelConfigs[0].channel = xc
a.xdsChannelConfigs[0].cleanup = cleanup
a.activeXDSChannel = a.xdsChannelConfigs[0]
return a.activeXDSChannel
Expand All @@ -570,16 +733,31 @@
//
// Only executed in the context of a serializer callback.
func (a *authority) closeXDSChannels() {
for _, xc := range a.xdsChannelConfigs {
if xc.cleanup != nil {
xc.cleanup()
xc.cleanup = nil
for _, xcc := range a.xdsChannelConfigs {
if xcc.cleanup != nil {
xcc.cleanup()
xcc.cleanup = nil
}
xc.xc = nil
xcc.channel = nil
}
a.activeXDSChannel = nil
}

// watcherExistsForUncachedResource returns true if there is at least one
// watcher for a resource that has not yet been cached.
//
// Only executed in the context of a serializer callback.
func (a *authority) watcherExistsForUncachedResource() bool {
for _, resourceStates := range a.resources {
for _, state := range resourceStates {
if state.md.Status == xdsresource.ServiceStatusRequested {
return true
}
}
}
return false
}

// dumpResources returns a dump of the resource configuration cached by this
// authority, for CSDS purposes.
func (a *authority) dumpResources() []*v3statuspb.ClientConfig_GenericXdsConfig {
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/xdsclient/client_new.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func newClientImpl(config *bootstrap.Config, watchExpiryTimeout, idleChannelExpi
type OptionsForTesting struct {
// Name is a unique name for this xDS client.
Name string

// Contents contain a JSON representation of the bootstrap configuration to
// be used when creating the xDS client.
Contents []byte
Expand Down Expand Up @@ -180,7 +181,6 @@ func GetForTesting(name string) (XDSClient, func(), error) {
func init() {
internal.TriggerXDSResourceNotFoundForTesting = triggerXDSResourceNotFoundForTesting
xdsclientinternal.ResourceWatchStateForTesting = resourceWatchStateForTesting

}

func triggerXDSResourceNotFoundForTesting(client XDSClient, typ xdsresource.Type, name string) error {
Expand Down
Loading