Skip to content

Commit a346e23

Browse files
committed
Revert "xds: Revert xds flow control change. (grpc#10784)"
This reverts commit 846e008.
1 parent 0f21574 commit a346e23

14 files changed

+520
-342
lines changed

xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java

+64-81
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ private ClusterState(String name) {
320320

321321
private void start() {
322322
shutdown = false;
323-
xdsClient.watchXdsResource(XdsClusterResource.getInstance(), name, this);
323+
xdsClient.watchXdsResource(XdsClusterResource.getInstance(), name, this, syncContext);
324324
}
325325

326326
void shutdown() {
@@ -341,102 +341,85 @@ public void onError(Status error) {
341341
String.format("Unable to load CDS %s. xDS server returned: %s: %s",
342342
name, error.getCode(), error.getDescription()))
343343
.withCause(error.getCause());
344-
syncContext.execute(new Runnable() {
345-
@Override
346-
public void run() {
347-
if (shutdown) {
348-
return;
349-
}
350-
// All watchers should receive the same error, so we only propagate it once.
351-
if (ClusterState.this == root) {
352-
handleClusterDiscoveryError(status);
353-
}
354-
}
355-
});
344+
if (shutdown) {
345+
return;
346+
}
347+
// All watchers should receive the same error, so we only propagate it once.
348+
if (ClusterState.this == root) {
349+
handleClusterDiscoveryError(status);
350+
}
356351
}
357352

358353
@Override
359354
public void onResourceDoesNotExist(String resourceName) {
360-
syncContext.execute(new Runnable() {
361-
@Override
362-
public void run() {
363-
if (shutdown) {
364-
return;
365-
}
366-
discovered = true;
367-
result = null;
368-
if (childClusterStates != null) {
369-
for (ClusterState state : childClusterStates.values()) {
370-
state.shutdown();
371-
}
372-
childClusterStates = null;
373-
}
374-
handleClusterDiscovered();
355+
if (shutdown) {
356+
return;
357+
}
358+
discovered = true;
359+
result = null;
360+
if (childClusterStates != null) {
361+
for (ClusterState state : childClusterStates.values()) {
362+
state.shutdown();
375363
}
376-
});
364+
childClusterStates = null;
365+
}
366+
handleClusterDiscovered();
377367
}
378368

379369
@Override
380370
public void onChanged(final CdsUpdate update) {
381-
class ClusterDiscovered implements Runnable {
382-
@Override
383-
public void run() {
384-
if (shutdown) {
385-
return;
371+
if (shutdown) {
372+
return;
373+
}
374+
logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", update);
375+
discovered = true;
376+
result = update;
377+
if (update.clusterType() == ClusterType.AGGREGATE) {
378+
isLeaf = false;
379+
logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}, underlying clusters: {1}",
380+
update.clusterName(), update.prioritizedClusterNames());
381+
Map<String, ClusterState> newChildStates = new LinkedHashMap<>();
382+
for (String cluster : update.prioritizedClusterNames()) {
383+
if (newChildStates.containsKey(cluster)) {
384+
logger.log(XdsLogLevel.WARNING,
385+
String.format("duplicate cluster name %s in aggregate %s is being ignored",
386+
cluster, update.clusterName()));
387+
continue;
386388
}
387-
388-
logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", update);
389-
discovered = true;
390-
result = update;
391-
if (update.clusterType() == ClusterType.AGGREGATE) {
392-
isLeaf = false;
393-
logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}, underlying clusters: {1}",
394-
update.clusterName(), update.prioritizedClusterNames());
395-
Map<String, ClusterState> newChildStates = new LinkedHashMap<>();
396-
for (String cluster : update.prioritizedClusterNames()) {
397-
if (newChildStates.containsKey(cluster)) {
398-
logger.log(XdsLogLevel.WARNING,
399-
String.format("duplicate cluster name %s in aggregate %s is being ignored",
400-
cluster, update.clusterName()));
401-
continue;
402-
}
403-
if (childClusterStates == null || !childClusterStates.containsKey(cluster)) {
404-
ClusterState childState;
405-
if (clusterStates.containsKey(cluster)) {
406-
childState = clusterStates.get(cluster);
407-
if (childState.shutdown) {
408-
childState.start();
409-
}
410-
} else {
411-
childState = new ClusterState(cluster);
412-
clusterStates.put(cluster, childState);
413-
childState.start();
414-
}
415-
newChildStates.put(cluster, childState);
416-
} else {
417-
newChildStates.put(cluster, childClusterStates.remove(cluster));
418-
}
419-
}
420-
if (childClusterStates != null) { // stop subscribing to revoked child clusters
421-
for (ClusterState watcher : childClusterStates.values()) {
422-
watcher.shutdown();
389+
if (childClusterStates == null || !childClusterStates.containsKey(cluster)) {
390+
ClusterState childState;
391+
if (clusterStates.containsKey(cluster)) {
392+
childState = clusterStates.get(cluster);
393+
if (childState.shutdown) {
394+
childState.start();
423395
}
396+
} else {
397+
childState = new ClusterState(cluster);
398+
clusterStates.put(cluster, childState);
399+
childState.start();
424400
}
425-
childClusterStates = newChildStates;
426-
} else if (update.clusterType() == ClusterType.EDS) {
427-
isLeaf = true;
428-
logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}",
429-
update.clusterName(), update.edsServiceName());
430-
} else { // logical DNS
431-
isLeaf = true;
432-
logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName());
401+
newChildStates.put(cluster, childState);
402+
} else {
403+
newChildStates.put(cluster, childClusterStates.remove(cluster));
404+
}
405+
}
406+
if (childClusterStates != null) { // stop subscribing to revoked child clusters
407+
for (ClusterState watcher : childClusterStates.values()) {
408+
watcher.shutdown();
433409
}
434-
handleClusterDiscovered();
435410
}
411+
childClusterStates = newChildStates;
412+
} else if (update.clusterType() == ClusterType.EDS) {
413+
isLeaf = true;
414+
logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}",
415+
update.clusterName(), update.edsServiceName());
416+
} else { // logical DNS
417+
isLeaf = true;
418+
logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName());
436419
}
437-
438-
syncContext.execute(new ClusterDiscovered());
420+
handleClusterDiscovered();
439421
}
422+
440423
}
441424
}
442425
}

xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java

+21-30
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,8 @@ private EdsClusterState(String name, @Nullable String edsServiceName,
366366
void start() {
367367
String resourceName = edsServiceName != null ? edsServiceName : name;
368368
logger.log(XdsLogLevel.INFO, "Start watching EDS resource {0}", resourceName);
369-
xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), resourceName, this);
369+
xdsClient.watchXdsResource(XdsEndpointResource.getInstance(),
370+
resourceName, this, syncContext);
370371
}
371372

372373
@Override
@@ -452,7 +453,7 @@ public void run() {
452453
}
453454
}
454455

455-
syncContext.execute(new EndpointsUpdated());
456+
new EndpointsUpdated().run();
456457
}
457458

458459
private List<String> generatePriorityNames(String name,
@@ -491,38 +492,28 @@ private List<String> generatePriorityNames(String name,
491492

492493
@Override
493494
public void onResourceDoesNotExist(final String resourceName) {
494-
syncContext.execute(new Runnable() {
495-
@Override
496-
public void run() {
497-
if (shutdown) {
498-
return;
499-
}
500-
logger.log(XdsLogLevel.INFO, "Resource {0} unavailable", resourceName);
501-
status = Status.OK;
502-
resolved = true;
503-
result = null; // resource revoked
504-
handleEndpointResourceUpdate();
505-
}
506-
});
495+
if (shutdown) {
496+
return;
497+
}
498+
logger.log(XdsLogLevel.INFO, "Resource {0} unavailable", resourceName);
499+
status = Status.OK;
500+
resolved = true;
501+
result = null; // resource revoked
502+
handleEndpointResourceUpdate();
507503
}
508504

509505
@Override
510506
public void onError(final Status error) {
511-
syncContext.execute(new Runnable() {
512-
@Override
513-
public void run() {
514-
if (shutdown) {
515-
return;
516-
}
517-
String resourceName = edsServiceName != null ? edsServiceName : name;
518-
status = Status.UNAVAILABLE
519-
.withDescription(String.format("Unable to load EDS %s. xDS server returned: %s: %s",
520-
resourceName, error.getCode(), error.getDescription()))
521-
.withCause(error.getCause());
522-
logger.log(XdsLogLevel.WARNING, "Received EDS error: {0}", error);
523-
handleEndpointResolutionError();
524-
}
525-
});
507+
if (shutdown) {
508+
return;
509+
}
510+
String resourceName = edsServiceName != null ? edsServiceName : name;
511+
status = Status.UNAVAILABLE
512+
.withDescription(String.format("Unable to load EDS %s. xDS server returned: %s: %s",
513+
resourceName, error.getCode(), error.getDescription()))
514+
.withCause(error.getCause());
515+
logger.log(XdsLogLevel.WARNING, "Received EDS error: {0}", error);
516+
handleEndpointResolutionError();
526517
}
527518
}
528519

xds/src/main/java/io/grpc/xds/ControlPlaneClient.java

+17-4
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@
3838
import io.grpc.internal.BackoffPolicy;
3939
import io.grpc.stub.ClientCallStreamObserver;
4040
import io.grpc.stub.ClientResponseObserver;
41-
import io.grpc.stub.StreamObserver;
4241
import io.grpc.xds.Bootstrapper.ServerInfo;
4342
import io.grpc.xds.EnvoyProtoData.Node;
43+
import io.grpc.xds.XdsClient.ProcessingTracker;
4444
import io.grpc.xds.XdsClient.ResourceStore;
4545
import io.grpc.xds.XdsClient.XdsResponseHandler;
4646
import io.grpc.xds.XdsClientImpl.XdsChannelFactory;
@@ -288,6 +288,8 @@ private abstract class AbstractAdsStream {
288288

289289
abstract boolean isReady();
290290

291+
abstract void request(int count);
292+
291293
/**
292294
* Sends a discovery request with the given {@code versionInfo}, {@code nonce} and
293295
* {@code errorDetail}. Used for reacting to a specific discovery response. For
@@ -314,7 +316,10 @@ final void handleRpcResponse(XdsResourceType<?> type, String versionInfo, List<A
314316
}
315317
responseReceived = true;
316318
respNonces.put(type, nonce);
317-
xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce);
319+
ProcessingTracker processingTracker = new ProcessingTracker(() -> request(1), syncContext);
320+
xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce,
321+
processingTracker);
322+
processingTracker.onComplete();
318323
}
319324

320325
final void handleRpcError(Throwable t) {
@@ -372,14 +377,15 @@ private void cleanUp() {
372377
}
373378

374379
private final class AdsStreamV3 extends AbstractAdsStream {
375-
private StreamObserver<DiscoveryRequest> requestWriter;
380+
private ClientCallStreamObserver<DiscoveryRequest> requestWriter;
376381

377382
@Override
378383
public boolean isReady() {
379384
return requestWriter != null && ((ClientCallStreamObserver<?>) requestWriter).isReady();
380385
}
381386

382387
@Override
388+
@SuppressWarnings("unchecked")
383389
void start() {
384390
AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub =
385391
AggregatedDiscoveryServiceGrpc.newStub(channel);
@@ -389,6 +395,7 @@ final class AdsClientResponseObserver
389395

390396
@Override
391397
public void beforeStart(ClientCallStreamObserver<DiscoveryRequest> requestStream) {
398+
requestStream.disableAutoRequestWithInitial(1);
392399
requestStream.setOnReadyHandler(ControlPlaneClient.this::readyHandler);
393400
}
394401

@@ -437,7 +444,8 @@ public void run() {
437444
}
438445
}
439446

440-
requestWriter = stub.streamAggregatedResources(new AdsClientResponseObserver());
447+
requestWriter = (ClientCallStreamObserver) stub.streamAggregatedResources(
448+
new AdsClientResponseObserver());
441449
}
442450

443451
@Override
@@ -467,6 +475,11 @@ void sendDiscoveryRequest(XdsResourceType<?> type, String versionInfo,
467475
}
468476
}
469477

478+
@Override
479+
void request(int count) {
480+
requestWriter.request(count);
481+
}
482+
470483
@Override
471484
void sendError(Exception error) {
472485
requestWriter.onError(error);

xds/src/main/java/io/grpc/xds/XdsClient.java

+32-2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.common.base.Splitter;
2525
import com.google.common.net.UrlEscapers;
2626
import com.google.common.util.concurrent.ListenableFuture;
27+
import com.google.common.util.concurrent.MoreExecutors;
2728
import com.google.protobuf.Any;
2829
import io.grpc.Status;
2930
import io.grpc.xds.Bootstrapper.ServerInfo;
@@ -36,6 +37,8 @@
3637
import java.util.Collections;
3738
import java.util.List;
3839
import java.util.Map;
40+
import java.util.concurrent.Executor;
41+
import java.util.concurrent.atomic.AtomicInteger;
3942
import javax.annotation.Nullable;
4043

4144
/**
@@ -305,10 +308,16 @@ TlsContextManager getTlsContextManager() {
305308
* Registers a data watcher for the given Xds resource.
306309
*/
307310
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
308-
ResourceWatcher<T> watcher) {
311+
ResourceWatcher<T> watcher,
312+
Executor executor) {
309313
throw new UnsupportedOperationException();
310314
}
311315

316+
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
317+
ResourceWatcher<T> watcher) {
318+
watchXdsResource(type, resourceName, watcher, MoreExecutors.directExecutor());
319+
}
320+
312321
/**
313322
* Unregisters the given resource watcher.
314323
*/
@@ -353,11 +362,32 @@ Map<ServerInfo, LoadReportClient> getServerLrsClientMap() {
353362
throw new UnsupportedOperationException();
354363
}
355364

365+
static final class ProcessingTracker {
366+
private final AtomicInteger pendingTask = new AtomicInteger(1);
367+
private final Executor executor;
368+
private final Runnable completionListener;
369+
370+
ProcessingTracker(Runnable completionListener, Executor executor) {
371+
this.executor = executor;
372+
this.completionListener = completionListener;
373+
}
374+
375+
void startTask() {
376+
pendingTask.incrementAndGet();
377+
}
378+
379+
void onComplete() {
380+
if (pendingTask.decrementAndGet() == 0) {
381+
executor.execute(completionListener);
382+
}
383+
}
384+
}
385+
356386
interface XdsResponseHandler {
357387
/** Called when a xds response is received. */
358388
void handleResourceResponse(
359389
XdsResourceType<?> resourceType, ServerInfo serverInfo, String versionInfo,
360-
List<Any> resources, String nonce);
390+
List<Any> resources, String nonce, ProcessingTracker processingTracker);
361391

362392
/** Called when the ADS stream is closed passively. */
363393
// Must be synchronized.

0 commit comments

Comments
 (0)