Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: Revert xds flow control change. #10784

Merged
merged 2 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 81 additions & 64 deletions xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@

private void start() {
shutdown = false;
xdsClient.watchXdsResource(XdsClusterResource.getInstance(), name, this, syncContext);
xdsClient.watchXdsResource(XdsClusterResource.getInstance(), name, this);
}

void shutdown() {
Expand All @@ -341,85 +341,102 @@
String.format("Unable to load CDS %s. xDS server returned: %s: %s",
name, error.getCode(), error.getDescription()))
.withCause(error.getCause());
if (shutdown) {
return;
}
// All watchers should receive the same error, so we only propagate it once.
if (ClusterState.this == root) {
handleClusterDiscoveryError(status);
}
syncContext.execute(new Runnable() {
@Override
public void run() {
if (shutdown) {
return;

Check warning on line 348 in xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java#L348

Added line #L348 was not covered by tests
}
// All watchers should receive the same error, so we only propagate it once.
if (ClusterState.this == root) {
handleClusterDiscoveryError(status);
}
}
});
}

@Override
public void onResourceDoesNotExist(String resourceName) {
if (shutdown) {
return;
}
discovered = true;
result = null;
if (childClusterStates != null) {
for (ClusterState state : childClusterStates.values()) {
state.shutdown();
syncContext.execute(new Runnable() {
@Override
public void run() {
if (shutdown) {
return;

Check warning on line 364 in xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java#L364

Added line #L364 was not covered by tests
}
discovered = true;
result = null;
if (childClusterStates != null) {
for (ClusterState state : childClusterStates.values()) {
state.shutdown();
}
childClusterStates = null;
}
handleClusterDiscovered();
}
childClusterStates = null;
}
handleClusterDiscovered();
});
}

@Override
public void onChanged(final CdsUpdate update) {
if (shutdown) {
return;
}
logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", update);
discovered = true;
result = update;
if (update.clusterType() == ClusterType.AGGREGATE) {
isLeaf = false;
logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}, underlying clusters: {1}",
update.clusterName(), update.prioritizedClusterNames());
Map<String, ClusterState> newChildStates = new LinkedHashMap<>();
for (String cluster : update.prioritizedClusterNames()) {
if (newChildStates.containsKey(cluster)) {
logger.log(XdsLogLevel.WARNING,
String.format("duplicate cluster name %s in aggregate %s is being ignored",
cluster, update.clusterName()));
continue;
class ClusterDiscovered implements Runnable {
@Override
public void run() {
if (shutdown) {
return;

Check warning on line 385 in xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java#L385

Added line #L385 was not covered by tests
}
if (childClusterStates == null || !childClusterStates.containsKey(cluster)) {
ClusterState childState;
if (clusterStates.containsKey(cluster)) {
childState = clusterStates.get(cluster);
if (childState.shutdown) {
childState.start();

logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", update);
discovered = true;
result = update;
if (update.clusterType() == ClusterType.AGGREGATE) {
isLeaf = false;
logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}, underlying clusters: {1}",
update.clusterName(), update.prioritizedClusterNames());
Map<String, ClusterState> newChildStates = new LinkedHashMap<>();
for (String cluster : update.prioritizedClusterNames()) {
if (newChildStates.containsKey(cluster)) {
logger.log(XdsLogLevel.WARNING,
String.format("duplicate cluster name %s in aggregate %s is being ignored",
cluster, update.clusterName()));
continue;
}
if (childClusterStates == null || !childClusterStates.containsKey(cluster)) {
ClusterState childState;
if (clusterStates.containsKey(cluster)) {
childState = clusterStates.get(cluster);
if (childState.shutdown) {
childState.start();

Check warning on line 408 in xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java#L408

Added line #L408 was not covered by tests
}
} else {
childState = new ClusterState(cluster);
clusterStates.put(cluster, childState);
childState.start();
}
newChildStates.put(cluster, childState);
} else {
newChildStates.put(cluster, childClusterStates.remove(cluster));
}
} else {
childState = new ClusterState(cluster);
clusterStates.put(cluster, childState);
childState.start();
}
newChildStates.put(cluster, childState);
} else {
newChildStates.put(cluster, childClusterStates.remove(cluster));
}
}
if (childClusterStates != null) { // stop subscribing to revoked child clusters
for (ClusterState watcher : childClusterStates.values()) {
watcher.shutdown();
if (childClusterStates != null) { // stop subscribing to revoked child clusters
for (ClusterState watcher : childClusterStates.values()) {
watcher.shutdown();
}
}
childClusterStates = newChildStates;
} else if (update.clusterType() == ClusterType.EDS) {
isLeaf = true;
logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}",
update.clusterName(), update.edsServiceName());
} else { // logical DNS
isLeaf = true;
logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName());
}
handleClusterDiscovered();
}
childClusterStates = newChildStates;
} else if (update.clusterType() == ClusterType.EDS) {
isLeaf = true;
logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}",
update.clusterName(), update.edsServiceName());
} else { // logical DNS
isLeaf = true;
logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName());
}
handleClusterDiscovered();
}

syncContext.execute(new ClusterDiscovered());
}
}
}
}
51 changes: 30 additions & 21 deletions xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,7 @@
void start() {
String resourceName = edsServiceName != null ? edsServiceName : name;
logger.log(XdsLogLevel.INFO, "Start watching EDS resource {0}", resourceName);
xdsClient.watchXdsResource(XdsEndpointResource.getInstance(),
resourceName, this, syncContext);
xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), resourceName, this);
}

@Override
Expand Down Expand Up @@ -453,7 +452,7 @@
}
}

new EndpointsUpdated().run();
syncContext.execute(new EndpointsUpdated());
}

private List<String> generatePriorityNames(String name,
Expand Down Expand Up @@ -492,28 +491,38 @@

@Override
public void onResourceDoesNotExist(final String resourceName) {
if (shutdown) {
return;
}
logger.log(XdsLogLevel.INFO, "Resource {0} unavailable", resourceName);
status = Status.OK;
resolved = true;
result = null; // resource revoked
handleEndpointResourceUpdate();
syncContext.execute(new Runnable() {
@Override
public void run() {
if (shutdown) {
return;

Check warning on line 498 in xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java#L498

Added line #L498 was not covered by tests
}
logger.log(XdsLogLevel.INFO, "Resource {0} unavailable", resourceName);
status = Status.OK;
resolved = true;
result = null; // resource revoked
handleEndpointResourceUpdate();
}
});
}

@Override
public void onError(final Status error) {
if (shutdown) {
return;
}
String resourceName = edsServiceName != null ? edsServiceName : name;
status = Status.UNAVAILABLE
.withDescription(String.format("Unable to load EDS %s. xDS server returned: %s: %s",
resourceName, error.getCode(), error.getDescription()))
.withCause(error.getCause());
logger.log(XdsLogLevel.WARNING, "Received EDS error: {0}", error);
handleEndpointResolutionError();
syncContext.execute(new Runnable() {
@Override
public void run() {
if (shutdown) {
return;

Check warning on line 515 in xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java#L515

Added line #L515 was not covered by tests
}
String resourceName = edsServiceName != null ? edsServiceName : name;
status = Status.UNAVAILABLE
.withDescription(String.format("Unable to load EDS %s. xDS server returned: %s: %s",
resourceName, error.getCode(), error.getDescription()))
.withCause(error.getCause());
logger.log(XdsLogLevel.WARNING, "Received EDS error: {0}", error);
handleEndpointResolutionError();
}
});
}
}

Expand Down
21 changes: 4 additions & 17 deletions xds/src/main/java/io/grpc/xds/ControlPlaneClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@
import io.grpc.internal.BackoffPolicy;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.EnvoyProtoData.Node;
import io.grpc.xds.XdsClient.ProcessingTracker;
import io.grpc.xds.XdsClient.ResourceStore;
import io.grpc.xds.XdsClient.XdsResponseHandler;
import io.grpc.xds.XdsClientImpl.XdsChannelFactory;
Expand Down Expand Up @@ -288,8 +288,6 @@ private abstract class AbstractAdsStream {

abstract boolean isReady();

abstract void request(int count);

/**
* Sends a discovery request with the given {@code versionInfo}, {@code nonce} and
* {@code errorDetail}. Used for reacting to a specific discovery response. For
Expand All @@ -316,10 +314,7 @@ final void handleRpcResponse(XdsResourceType<?> type, String versionInfo, List<A
}
responseReceived = true;
respNonces.put(type, nonce);
ProcessingTracker processingTracker = new ProcessingTracker(() -> request(1), syncContext);
xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce,
processingTracker);
processingTracker.onComplete();
xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce);
}

final void handleRpcError(Throwable t) {
Expand Down Expand Up @@ -377,15 +372,14 @@ private void cleanUp() {
}

private final class AdsStreamV3 extends AbstractAdsStream {
private ClientCallStreamObserver<DiscoveryRequest> requestWriter;
private StreamObserver<DiscoveryRequest> requestWriter;

@Override
public boolean isReady() {
return requestWriter != null && ((ClientCallStreamObserver<?>) requestWriter).isReady();
}

@Override
@SuppressWarnings("unchecked")
void start() {
AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub =
AggregatedDiscoveryServiceGrpc.newStub(channel);
Expand All @@ -395,7 +389,6 @@ final class AdsClientResponseObserver

@Override
public void beforeStart(ClientCallStreamObserver<DiscoveryRequest> requestStream) {
requestStream.disableAutoRequestWithInitial(1);
requestStream.setOnReadyHandler(ControlPlaneClient.this::readyHandler);
}

Expand Down Expand Up @@ -444,8 +437,7 @@ public void run() {
}
}

requestWriter = (ClientCallStreamObserver) stub.streamAggregatedResources(
new AdsClientResponseObserver());
requestWriter = stub.streamAggregatedResources(new AdsClientResponseObserver());
}

@Override
Expand Down Expand Up @@ -475,11 +467,6 @@ void sendDiscoveryRequest(XdsResourceType<?> type, String versionInfo,
}
}

@Override
void request(int count) {
requestWriter.request(count);
}

@Override
void sendError(Exception error) {
requestWriter.onError(error);
Expand Down
34 changes: 2 additions & 32 deletions xds/src/main/java/io/grpc/xds/XdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.google.common.base.Splitter;
import com.google.common.net.UrlEscapers;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Any;
import io.grpc.Status;
import io.grpc.xds.Bootstrapper.ServerInfo;
Expand All @@ -37,8 +36,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -307,15 +304,9 @@
/**
* Registers a data watcher for the given Xds resource.
*/
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
ResourceWatcher<T> watcher,
Executor executor) {
throw new UnsupportedOperationException();
}

<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
ResourceWatcher<T> watcher) {
watchXdsResource(type, resourceName, watcher, MoreExecutors.directExecutor());
throw new UnsupportedOperationException();

Check warning on line 309 in xds/src/main/java/io/grpc/xds/XdsClient.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/XdsClient.java#L309

Added line #L309 was not covered by tests
}

/**
Expand Down Expand Up @@ -362,32 +353,11 @@
throw new UnsupportedOperationException();
}

static final class ProcessingTracker {
private final AtomicInteger pendingTask = new AtomicInteger(1);
private final Executor executor;
private final Runnable completionListener;

ProcessingTracker(Runnable completionListener, Executor executor) {
this.executor = executor;
this.completionListener = completionListener;
}

void startTask() {
pendingTask.incrementAndGet();
}

void onComplete() {
if (pendingTask.decrementAndGet() == 0) {
executor.execute(completionListener);
}
}
}

interface XdsResponseHandler {
/** Called when a xds response is received. */
void handleResourceResponse(
XdsResourceType<?> resourceType, ServerInfo serverInfo, String versionInfo,
List<Any> resources, String nonce, ProcessingTracker processingTracker);
List<Any> resources, String nonce);

/** Called when the ADS stream is closed passively. */
// Must be synchronized.
Expand Down
Loading