Skip to content
46 changes: 22 additions & 24 deletions xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,8 @@ private abstract class XdsWatcherBase<T extends ResourceUpdate>

@Nullable
private StatusOr<T> data;
@Nullable
private Status ambientError; // To hold transient errors


private XdsWatcherBase(XdsResourceType<T> type, String resourceName) {
Expand All @@ -640,42 +642,38 @@ private XdsWatcherBase(XdsResourceType<T> type, String resourceName) {
}

@Override
public void onError(Status error) {
checkNotNull(error, "error");
public void onResourceChanged(StatusOr<T> update) {
if (cancelled) {
return;
}
// Don't update configuration on error, if we've already received configuration
if (!hasDataValue()) {
this.data = StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
String.format("Error retrieving %s: %s: %s",
toContextString(), error.getCode(), error.getDescription())));
maybePublishConfig();
}
}
ambientError = null;
if (update.hasValue()) {
data = update;
subscribeToChildren(update.getValue());
} else {
Status status = update.getStatus();
Status translatedStatus = Status.UNAVAILABLE.withDescription(
String.format("Error retrieving %s: %s. Details: %s",
toContextString(),
status.getCode(),
status.getDescription() != null ? status.getDescription() : ""));

@Override
public void onResourceDoesNotExist(String resourceName) {
if (cancelled) {
return;
data = StatusOr.fromStatus(translatedStatus);
Comment on lines 656 to 663

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This block translates the Status from the update to a translatedStatus before creating a StatusOr from it. It would be helpful to include the original status as the cause of the translatedStatus exception, to preserve the original error information for debugging purposes.

Also, the status.getDescription() can be null, so it's good that you are handling that.

Suggested change
Status translatedStatus = Status.UNAVAILABLE.withDescription(
String.format("Error retrieving %s: %s. Details: %s",
toContextString(),
status.getCode(),
status.getDescription() != null ? status.getDescription() : ""));
@Override
public void onResourceDoesNotExist(String resourceName) {
if (cancelled) {
return;
data = StatusOr.fromStatus(translatedStatus);
Status status = update.getStatus();
Status translatedStatus = Status.UNAVAILABLE.withDescription(
String.format("Error retrieving %s: %s. Details: %s",
toContextString(),
status.getCode(),
status.getDescription() != null ? status.getDescription() : "")).withCause(status.asException()); // Include original status as cause

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gemini-code-assist so we don't even need to store in data? Because you suggested to remove this line as well.
data = StatusOr.fromStatus(translatedStatus);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the line data = StatusOr.fromStatus(translatedStatus); should definitely remain. My apologies if my previous suggestion was unclear or misleading in its scope. The purpose of that line is to propagate the translatedStatus to the data field, which is crucial for the watcher to correctly handle the resource error.

My intention was only to enhance the translatedStatus itself by including the original status as a cause, which aids in debugging by preserving the full error context.

Here is a refined suggestion that only modifies the translatedStatus creation, assuming status is already defined on line 654:

Suggested change
Status translatedStatus = Status.UNAVAILABLE.withDescription(
String.format("Error retrieving %s: %s. Details: %s",
toContextString(),
status.getCode(),
status.getDescription() != null ? status.getDescription() : ""));
@Override
public void onResourceDoesNotExist(String resourceName) {
if (cancelled) {
return;
data = StatusOr.fromStatus(translatedStatus);
Status translatedStatus = Status.UNAVAILABLE.withDescription(
String.format("Error retrieving %s: %s. Details: %s",
toContextString(),
status.getCode(),
status.getDescription() != null ? status.getDescription() : "")).withCause(status.asException()); // Include original status as cause

}

checkArgument(this.resourceName.equals(resourceName), "Resource name does not match");
this.data = StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
toContextString() + " does not exist" + nodeInfo()));
maybePublishConfig();
}

@Override
public void onChanged(T update) {
checkNotNull(update, "update");
public void onAmbientError(Status error) {
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gemini-code-assist I want you to review the XdsDependencyManager and tell me the use of variable 'ambientError'. As per gRFC A88. Right now I'm only focused on implementing xds client watcher API changes.

if (cancelled) {
return;
}

this.data = StatusOr.fromValue(update);
subscribeToChildren(update);
maybePublishConfig();
ambientError = error.withDescription(
String.format("Ambient error for %s: %s. Details: %s%s",
toContextString(),
error.getCode(),
error.getDescription(),
nodeInfo()));
Comment on lines 673 to 678

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In onAmbientError, error.getDescription() can be null, which would result in the string "null" being embedded in the error message. This is inconsistent with how null descriptions are handled in onResourceChanged (lines 656-661), where a null description is replaced with an empty string.

For consistency and clearer error messages, I suggest handling the potentially null description in the same way.

Suggested change
ambientError = error.withDescription(
String.format("Ambient error for %s: %s. Details: %s%s",
toContextString(),
error.getCode(),
error.getDescription(),
nodeInfo()));
ambientError = error.withDescription(
String.format("Ambient error for %s: %s. Details: %s%s",
toContextString(),
error.getCode(),
error.getDescription() != null ? error.getDescription() : "",
nodeInfo()));

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gemini-code-assist what is the use of ambientError variable? Why we are not doing anything in onAmbientError here?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a great question! The ambientError variable and the onAmbientError method are introduced as part of the XdsClient watcher API changes, specifically detailed in gRFC A88.

  1. Use of ambientError variable:

    • The ambientError variable (declared on line 637 in the XdsDependencyManager.java file) is intended to store transient errors that occur in the xDS client. These are errors that provide useful information about the general state of the XdsClient but should not cause the watcher to discard its currently active, valid resource.
    • It's marked with @SuppressWarnings("unused") because, in this specific XdsWatcherBase class, its primary purpose is to be set and potentially retrieved by other components or for logging/metrics, rather than directly triggering an action within this base class itself. The actual use of this ambient error information would typically happen in concrete implementations or higher-level logic that consumes the XdsWatcherBase.
  2. Why nothing is done in onAmbientError (to change the resource state):

    • As per gRFC A88, onAmbientError is designed for errors that "should not modify the watcher's use of that resource but that may be useful information about the ambient state of the XdsClient." This means that unlike onResourceChanged (which can deliver an error that does invalidate the current resource), onAmbientError explicitly should not cause the watcher to stop using its previously received valid resource.
    • Therefore, the implementation of onAmbientError in XdsDependencyManager.java correctly captures the error in ambientError and formats a log message, but it does not update the data field (which holds the active resource) or call maybePublishConfig(), because doing so would imply a change in the active resource, which is contrary to the definition of an "ambient" error.

My previous suggestion was to ensure that even when an ambient error's description is null, it's handled gracefully (by replacing null with an empty string) to prevent the literal string "null" from appearing in the formatted error message, improving clarity and consistency.

}

protected abstract void subscribeToChildren(T update);
Expand Down
153 changes: 78 additions & 75 deletions xds/src/main/java/io/grpc/xds/XdsServerWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.grpc.ServerServiceDefinition;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusOr;
import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.GrpcUtil;
Expand Down Expand Up @@ -382,18 +383,38 @@ private DiscoveryState(String resourceName) {
}

@Override
public void onChanged(final LdsUpdate update) {
public void onResourceChanged(final StatusOr<LdsUpdate> update) {
if (stopped) {
return;
}
logger.log(Level.FINEST, "Received Lds update {0}", update);
if (update.listener() == null) {
onResourceDoesNotExist("Non-API");

if (!update.hasValue()) {
// This is a definitive resource error (e.g., NOT_FOUND).
// We must treat the resource as unavailable and tear down the server.
Status status = update.getStatus();
StatusException statusException = Status.UNAVAILABLE.withDescription(
String.format("Listener %s unavailable: %s", resourceName, status.getDescription()))
.withCause(status.asException())
.asException();
handleConfigNotFoundOrMismatch(statusException);
return;
}

String ldsAddress = update.listener().address();
if (ldsAddress == null || update.listener().protocol() != Protocol.TCP
// The original 'onChanged' logic starts here.
final LdsUpdate ldsUpdate = update.getValue();
logger.log(Level.FINEST, "Received Lds update {0}", ldsUpdate);
if (ldsUpdate.listener() == null) {
handleConfigNotFoundOrMismatch(
Status.NOT_FOUND.withDescription("Listener is null in LdsUpdate").asException());
return;
}

// This check is now covered by the '!update.hasValue()' block above.
// The original check was: if (update.listener() == null)

// The ipAddressesMatch function and its logic remain critical.
String ldsAddress = ldsUpdate.listener().address();
if (ldsAddress == null || ldsUpdate.listener().protocol() != Protocol.TCP
|| !ipAddressesMatch(ldsAddress)) {
handleConfigNotFoundOrMismatch(
Status.UNKNOWN.withDescription(
Expand All @@ -402,21 +423,19 @@ public void onChanged(final LdsUpdate update) {
listenerAddress, ldsAddress)).asException());
return;
}

// The rest of the logic is a direct copy from the original onChanged method.
if (!pendingRds.isEmpty()) {
// filter chain state has not yet been applied to filterChainSelectorManager and there
// are two sets of sslContextProviderSuppliers, so we release the old ones.
releaseSuppliersInFlight();
pendingRds.clear();
}

filterChains = update.listener().filterChains();
defaultFilterChain = update.listener().defaultFilterChain();
// Filters are loaded even if the server isn't serving yet.
filterChains = ldsUpdate.listener().filterChains();
defaultFilterChain = ldsUpdate.listener().defaultFilterChain();
updateActiveFilters();

List<FilterChain> allFilterChains = filterChains;
List<FilterChain> allFilterChains = new ArrayList<>(filterChains);
if (defaultFilterChain != null) {
allFilterChains = new ArrayList<>(filterChains);
allFilterChains.add(defaultFilterChain);
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This change is correct, but it introduces a small performance regression by always creating a new ArrayList, even when defaultFilterChain is null. The previous implementation was more efficient as it only created a new list when necessary. Consider reverting to a pattern similar to the original implementation to avoid the unnecessary allocation, for example:

List<FilterChain> allFilterChains = filterChains;
if (defaultFilterChain != null) {
  allFilterChains = new ArrayList<>(filterChains);
  allFilterChains.add(defaultFilterChain);
}


Expand Down Expand Up @@ -450,43 +469,36 @@ public void onChanged(final LdsUpdate update) {
}
}

private boolean ipAddressesMatch(String ldsAddress) {
HostAndPort ldsAddressHnP = HostAndPort.fromString(ldsAddress);
HostAndPort listenerAddressHnP = HostAndPort.fromString(listenerAddress);
if (!ldsAddressHnP.hasPort() || !listenerAddressHnP.hasPort()
|| ldsAddressHnP.getPort() != listenerAddressHnP.getPort()) {
return false;
}
InetAddress listenerIp = InetAddresses.forString(listenerAddressHnP.getHost());
InetAddress ldsIp = InetAddresses.forString(ldsAddressHnP.getHost());
return listenerIp.equals(ldsIp);
}

@Override
public void onResourceDoesNotExist(final String resourceName) {
if (stopped) {
return;
}
StatusException statusException = Status.UNAVAILABLE.withDescription(
String.format("Listener %s unavailable, xDS node ID: %s", resourceName,
xdsClient.getBootstrapInfo().node().getId())).asException();
handleConfigNotFoundOrMismatch(statusException);
}

@Override
public void onError(final Status error) {
public void onAmbientError(final Status error) {
if (stopped) {
return;
}
// This logic is preserved from the original 'onError' method.
String description = error.getDescription() == null ? "" : error.getDescription() + " ";
Status errorWithNodeId = error.withDescription(
description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId());
logger.log(Level.FINE, "Error from XdsClient", errorWithNodeId);

// If the server isn't serving yet, a transient error is a startup failure.
// If it is already serving, we ignore it to prevent an outage.
if (!isServing) {
listener.onNotServing(errorWithNodeId.asException());
}
}

private boolean ipAddressesMatch(String ldsAddress) {
HostAndPort ldsAddressHnP = HostAndPort.fromString(ldsAddress);
HostAndPort listenerAddressHnP = HostAndPort.fromString(listenerAddress);
if (!ldsAddressHnP.hasPort() || !listenerAddressHnP.hasPort()
|| ldsAddressHnP.getPort() != listenerAddressHnP.getPort()) {
return false;
}
InetAddress listenerIp = InetAddresses.forString(listenerAddressHnP.getHost());
InetAddress ldsIp = InetAddresses.forString(ldsAddressHnP.getHost());
return listenerIp.equals(ldsIp);
}

private void shutdown() {
stopped = true;
cleanUpRouteDiscoveryStates();
Expand Down Expand Up @@ -775,54 +787,45 @@ private RouteDiscoveryState(String resourceName) {
}

@Override
public void onChanged(final RdsUpdate update) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (!routeDiscoveryStates.containsKey(resourceName)) {
return;
}
if (savedVirtualHosts == null && !isPending) {
logger.log(Level.WARNING, "Received valid Rds {0} configuration.", resourceName);
}
savedVirtualHosts = ImmutableList.copyOf(update.virtualHosts);
updateRdsRoutingConfig();
maybeUpdateSelector();
public void onResourceChanged(final StatusOr<RdsUpdate> update) {
syncContext.execute(() -> {
if (!routeDiscoveryStates.containsKey(resourceName)) {
return; // Watcher has been cancelled.
}
});
}

@Override
public void onResourceDoesNotExist(final String resourceName) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (!routeDiscoveryStates.containsKey(resourceName)) {
return;
if (update.hasValue()) {
// This is a successful update, taken from the original onChanged.
if (savedVirtualHosts == null && !isPending) {
logger.log(Level.WARNING, "Received valid Rds {0} configuration.", resourceName);
}
logger.log(Level.WARNING, "Rds {0} unavailable", resourceName);
savedVirtualHosts = ImmutableList.copyOf(update.getValue().virtualHosts);
} else {
// This is a definitive resource error, taken from onResourceDoesNotExist.
logger.log(Level.WARNING, "Rds {0} unavailable: {1}",
new Object[]{resourceName, update.getStatus()});
savedVirtualHosts = null;
updateRdsRoutingConfig();
maybeUpdateSelector();
}
// In both cases, a change has occurred that requires a config update.
updateRdsRoutingConfig();
maybeUpdateSelector();
});
}

@Override
public void onError(final Status error) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (!routeDiscoveryStates.containsKey(resourceName)) {
return;
}
String description = error.getDescription() == null ? "" : error.getDescription() + " ";
Status errorWithNodeId = error.withDescription(
description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId());
logger.log(Level.WARNING, "Error loading RDS resource {0} from XdsClient: {1}.",
new Object[]{resourceName, errorWithNodeId});
maybeUpdateSelector();
public void onAmbientError(final Status error) {
syncContext.execute(() -> {
if (!routeDiscoveryStates.containsKey(resourceName)) {
return; // Watcher has been cancelled.
}
// This is a transient error, taken from the original onError.
String description = error.getDescription() == null ? "" : error.getDescription() + " ";
Status errorWithNodeId = error.withDescription(
description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId());
logger.log(Level.WARNING, "Error loading RDS resource {0} from XdsClient: {1}.",
new Object[]{resourceName, errorWithNodeId});

// Per gRFC A88, ambient errors should not trigger a configuration change.
// Therefore, we do NOT call maybeUpdateSelector() here.
});
}

Expand Down
6 changes: 4 additions & 2 deletions xds/src/main/java/io/grpc/xds/client/BootstrapperImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -256,13 +256,15 @@ private List<ServerInfo> parseServerInfos(List<?> rawServerConfigs, XdsLogger lo
Object implSpecificConfig = getImplSpecificConfig(serverConfig, serverUri);

boolean resourceTimerIsTransientError = false;
boolean ignoreResourceDeletion = false;
boolean ignoreResourceDeletion = xdsDataErrorHandlingEnabled;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The logic for determining ignoreResourceDeletion seems to enable the feature if xdsDataErrorHandlingEnabled is true, even if the server does not send the ignore_resource_deletion feature. This appears to contradict gRFC A88, which states: "If this feature is not present, then the client MUST NOT ignore resource deletions."

To align with the gRFC, ignoreResourceDeletion should only be enabled when the server explicitly signals support via the ignore_resource_deletion feature. The current implementation could lead to incorrect behavior where the client ignores resource deletions when the server does not expect it to.

I suggest changing this line to initialize ignoreResourceDeletion to false. This ensures that it will only be enabled if the server feature is present in the if block below.

Suggested change
boolean ignoreResourceDeletion = xdsDataErrorHandlingEnabled;
boolean ignoreResourceDeletion = false;

// "For forward compatibility reasons, the client will ignore any entry in the list that it
// does not understand, regardless of type."
List<?> serverFeatures = JsonUtil.getList(serverConfig, "server_features");
if (serverFeatures != null) {
logger.log(XdsLogLevel.INFO, "Server features: {0}", serverFeatures);
ignoreResourceDeletion = serverFeatures.contains(SERVER_FEATURE_IGNORE_RESOURCE_DELETION);
if (serverFeatures.contains(SERVER_FEATURE_IGNORE_RESOURCE_DELETION)) {
ignoreResourceDeletion = true;
}
resourceTimerIsTransientError = xdsDataErrorHandlingEnabled
&& serverFeatures.contains(SERVER_FEATURE_RESOURCE_TIMER_IS_TRANSIENT_ERROR);
}
Expand Down
42 changes: 14 additions & 28 deletions xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -453,44 +453,30 @@ private void handleRpcStreamClosed(Status status) {
stopwatch.reset();
}

Status newStatus = status;
if (responseReceived) {
// A closed ADS stream after a successful response is not considered an error. Servers may
// close streams for various reasons during normal operation, such as load balancing or
// underlying connection hitting its max connection age limit (see gRFC A9).
if (!status.isOk()) {
newStatus = Status.OK;
logger.log( XdsLogLevel.DEBUG, "ADS stream closed with error {0}: {1}. However, a "
+ "response was received, so this will not be treated as an error. Cause: {2}",
status.getCode(), status.getDescription(), status.getCause());
} else {
logger.log(XdsLogLevel.DEBUG,
"ADS stream closed by server after a response was received");
}
} else {
// If the ADS stream is closed without ever having received a response from the server, then
// the XdsClient should consider that a connectivity error (see gRFC A57).
Status statusToPropagate = status;
if (!responseReceived && status.isOk()) {
// If the ADS stream is closed with OK without ever having received a response,
// it is a connectivity error.
statusToPropagate = Status.UNAVAILABLE.withDescription(
"ADS stream closed with OK before receiving a response");
Comment on lines 457 to 461

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This logic seems to be checking if the ADS stream closed with an OK status before receiving a response, and if so, it's setting the statusToPropagate to UNAVAILABLE. It would be helpful to add a comment explaining why an OK status in this scenario is considered a connectivity error, referencing gRFC A57 for context.

Suggested change
if (!responseReceived && status.isOk()) {
// If the ADS stream is closed with OK without ever having received a response,
// it is a connectivity error.
statusToPropagate = Status.UNAVAILABLE.withDescription(
"ADS stream closed with OK before receiving a response");
if (!responseReceived && status.isOk()) {
// If the ADS stream is closed with OK without ever having received a response,
// it is a connectivity error (see gRFC A57).
statusToPropagate = Status.UNAVAILABLE.withDescription(
"ADS stream closed with OK before receiving a response");

}
if (!statusToPropagate.isOk()) {
inError = true;
if (status.isOk()) {
newStatus = Status.UNAVAILABLE.withDescription(
"ADS stream closed with OK before receiving a response");
}
logger.log(
XdsLogLevel.ERROR, "ADS stream failed with status {0}: {1}. Cause: {2}",
newStatus.getCode(), newStatus.getDescription(), newStatus.getCause());
logger.log(XdsLogLevel.ERROR, "ADS stream failed with status {0}: {1}. Cause: {2}",
statusToPropagate.getCode(), statusToPropagate.getDescription(),
statusToPropagate.getCause());
}

close(newStatus.asException());

// FakeClock in tests isn't thread-safe. Schedule the retry timer before notifying callbacks
// to avoid TSAN races, since tests may wait until callbacks are called but then would run
// concurrently with the stopwatch and schedule.
long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
long delayNanos = Math.max(0, retryBackoffPolicy.nextBackoffNanos() - elapsed);
close(status.asException());
rpcRetryTimer =
syncContext.schedule(new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService);

xdsResponseHandler.handleStreamClosed(newStatus, !responseReceived);
// Notify the handler of the stream closure before cleaning up the stream state.
xdsResponseHandler.handleStreamClosed(statusToPropagate, !responseReceived);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The comment on line 478 states that the handler is notified before cleaning up the stream state. However, close(status.asException()) is called on line 475, which cleans up the stream state by setting adsStream to null. To align the code with the comment's intent and avoid potential race conditions or unexpected behavior if handleStreamClosed were to depend on the stream's state, it's better to notify the handler before closing the stream.

Suggested change
long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
long delayNanos = Math.max(0, retryBackoffPolicy.nextBackoffNanos() - elapsed);
close(status.asException());
rpcRetryTimer =
syncContext.schedule(new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService);
xdsResponseHandler.handleStreamClosed(newStatus, !responseReceived);
// Notify the handler of the stream closure before cleaning up the stream state.
xdsResponseHandler.handleStreamClosed(statusToPropagate, !responseReceived);
long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
long delayNanos = Math.max(0, retryBackoffPolicy.nextBackoffNanos() - elapsed);
// Notify the handler of the stream closure before cleaning up the stream state.
xdsResponseHandler.handleStreamClosed(statusToPropagate, !responseReceived);
close(status.asException());
rpcRetryTimer =
syncContext.schedule(new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService);

}

private void close(Exception error) {
Expand Down
Loading
Loading