diff --git a/xds/internal/clients/xdsclient/authority.go b/xds/internal/clients/xdsclient/authority.go index 4c8fd545f2df..f50e0ba5f1db 100644 --- a/xds/internal/clients/xdsclient/authority.go +++ b/xds/internal/clients/xdsclient/authority.go @@ -333,7 +333,9 @@ func (a *authority) adsResourceUpdate(serverConfig *ServerConfig, rType Resource // // Only executed in the context of a serializer callback. func (a *authority) handleADSResourceUpdate(serverConfig *ServerConfig, rType ResourceType, updates map[string]dataAndErrTuple, md xdsresource.UpdateMetadata, onDone func()) { - a.handleRevertingToPrimaryOnUpdate(serverConfig) + if !a.handleRevertingToPrimaryOnUpdate(serverConfig) { + return + } // We build a list of callback funcs to invoke, and invoke them at the end // of this method instead of inline (when handling the update for a @@ -551,12 +553,23 @@ func (a *authority) handleADSResourceDoesNotExist(rType ResourceType, resourceNa // lower priority servers are closed and the active server is reverted to the // highest priority server that sent the update. // +// The return value indicates whether subsequent processing of the resource +// update should continue or not. +// // This method is only executed in the context of a serializer callback. -func (a *authority) handleRevertingToPrimaryOnUpdate(serverConfig *ServerConfig) { - if a.activeXDSChannel != nil && isServerConfigEqual(serverConfig, a.activeXDSChannel.serverConfig) { +func (a *authority) handleRevertingToPrimaryOnUpdate(serverConfig *ServerConfig) bool { + if a.activeXDSChannel == nil { + // This can happen only when all watches on this authority have been + // removed, and the xdsChannels have been closed. This update should + // have been received prior to closing of the channel, and therefore + // must be ignored. + return false + } + + if isServerConfigEqual(serverConfig, a.activeXDSChannel.serverConfig) { // If the resource update is from the current active server, nothing // needs to be done from fallback point of view. - return + return true } if a.logger.V(2) { @@ -564,10 +577,23 @@ func (a *authority) handleRevertingToPrimaryOnUpdate(serverConfig *ServerConfig) } // If the resource update is not from the current active server, it means - // that we have received an update from a higher priority server and we need - // to revert back to it. This method guarantees that when an update is - // received from a server, all lower priority servers are closed. + // that we have received an update either from: + // - a server that has a higher priority than the current active server and + // therefore we need to revert back to it and close all lower priority + // servers, or, + // - a server that has a lower priority than the current active server. This + // can happen when the server close and the response race against each + // other. We can safely ignore this update, since we have already reverted + // to the higher priority server, and closed all lower priority servers. serverIdx := a.serverIndexForConfig(serverConfig) + activeServerIdx := a.serverIndexForConfig(a.activeXDSChannel.serverConfig) + if activeServerIdx < serverIdx { + return false + } + + // At this point, we are guaranteed that we have received a response from a + // higher priority server compared to the current active server. So, we + // revert to the higher priorty server and close all lower priority ones. a.activeXDSChannel = a.xdsChannelConfigs[serverIdx] // Close all lower priority channels. @@ -605,6 +631,7 @@ func (a *authority) handleRevertingToPrimaryOnUpdate(serverConfig *ServerConfig) } cfg.channel = nil } + return true } // watchResource registers a new watcher for the specified resource type and