Skip to content

Commit 597101c

Browse files
authored
xds: fix flow control message not delivered when previous message type is unknown (#10785)
1 parent 0f21574 commit 597101c

14 files changed

+531
-342
lines changed

xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java

+64-81
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ private ClusterState(String name) {
320320

321321
private void start() {
322322
shutdown = false;
323-
xdsClient.watchXdsResource(XdsClusterResource.getInstance(), name, this);
323+
xdsClient.watchXdsResource(XdsClusterResource.getInstance(), name, this, syncContext);
324324
}
325325

326326
void shutdown() {
@@ -341,102 +341,85 @@ public void onError(Status error) {
341341
String.format("Unable to load CDS %s. xDS server returned: %s: %s",
342342
name, error.getCode(), error.getDescription()))
343343
.withCause(error.getCause());
344-
syncContext.execute(new Runnable() {
345-
@Override
346-
public void run() {
347-
if (shutdown) {
348-
return;
349-
}
350-
// All watchers should receive the same error, so we only propagate it once.
351-
if (ClusterState.this == root) {
352-
handleClusterDiscoveryError(status);
353-
}
354-
}
355-
});
344+
if (shutdown) {
345+
return;
346+
}
347+
// All watchers should receive the same error, so we only propagate it once.
348+
if (ClusterState.this == root) {
349+
handleClusterDiscoveryError(status);
350+
}
356351
}
357352

358353
@Override
359354
public void onResourceDoesNotExist(String resourceName) {
360-
syncContext.execute(new Runnable() {
361-
@Override
362-
public void run() {
363-
if (shutdown) {
364-
return;
365-
}
366-
discovered = true;
367-
result = null;
368-
if (childClusterStates != null) {
369-
for (ClusterState state : childClusterStates.values()) {
370-
state.shutdown();
371-
}
372-
childClusterStates = null;
373-
}
374-
handleClusterDiscovered();
355+
if (shutdown) {
356+
return;
357+
}
358+
discovered = true;
359+
result = null;
360+
if (childClusterStates != null) {
361+
for (ClusterState state : childClusterStates.values()) {
362+
state.shutdown();
375363
}
376-
});
364+
childClusterStates = null;
365+
}
366+
handleClusterDiscovered();
377367
}
378368

379369
@Override
380370
public void onChanged(final CdsUpdate update) {
381-
class ClusterDiscovered implements Runnable {
382-
@Override
383-
public void run() {
384-
if (shutdown) {
385-
return;
371+
if (shutdown) {
372+
return;
373+
}
374+
logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", update);
375+
discovered = true;
376+
result = update;
377+
if (update.clusterType() == ClusterType.AGGREGATE) {
378+
isLeaf = false;
379+
logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}, underlying clusters: {1}",
380+
update.clusterName(), update.prioritizedClusterNames());
381+
Map<String, ClusterState> newChildStates = new LinkedHashMap<>();
382+
for (String cluster : update.prioritizedClusterNames()) {
383+
if (newChildStates.containsKey(cluster)) {
384+
logger.log(XdsLogLevel.WARNING,
385+
String.format("duplicate cluster name %s in aggregate %s is being ignored",
386+
cluster, update.clusterName()));
387+
continue;
386388
}
387-
388-
logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", update);
389-
discovered = true;
390-
result = update;
391-
if (update.clusterType() == ClusterType.AGGREGATE) {
392-
isLeaf = false;
393-
logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}, underlying clusters: {1}",
394-
update.clusterName(), update.prioritizedClusterNames());
395-
Map<String, ClusterState> newChildStates = new LinkedHashMap<>();
396-
for (String cluster : update.prioritizedClusterNames()) {
397-
if (newChildStates.containsKey(cluster)) {
398-
logger.log(XdsLogLevel.WARNING,
399-
String.format("duplicate cluster name %s in aggregate %s is being ignored",
400-
cluster, update.clusterName()));
401-
continue;
402-
}
403-
if (childClusterStates == null || !childClusterStates.containsKey(cluster)) {
404-
ClusterState childState;
405-
if (clusterStates.containsKey(cluster)) {
406-
childState = clusterStates.get(cluster);
407-
if (childState.shutdown) {
408-
childState.start();
409-
}
410-
} else {
411-
childState = new ClusterState(cluster);
412-
clusterStates.put(cluster, childState);
413-
childState.start();
414-
}
415-
newChildStates.put(cluster, childState);
416-
} else {
417-
newChildStates.put(cluster, childClusterStates.remove(cluster));
418-
}
419-
}
420-
if (childClusterStates != null) { // stop subscribing to revoked child clusters
421-
for (ClusterState watcher : childClusterStates.values()) {
422-
watcher.shutdown();
389+
if (childClusterStates == null || !childClusterStates.containsKey(cluster)) {
390+
ClusterState childState;
391+
if (clusterStates.containsKey(cluster)) {
392+
childState = clusterStates.get(cluster);
393+
if (childState.shutdown) {
394+
childState.start();
423395
}
396+
} else {
397+
childState = new ClusterState(cluster);
398+
clusterStates.put(cluster, childState);
399+
childState.start();
424400
}
425-
childClusterStates = newChildStates;
426-
} else if (update.clusterType() == ClusterType.EDS) {
427-
isLeaf = true;
428-
logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}",
429-
update.clusterName(), update.edsServiceName());
430-
} else { // logical DNS
431-
isLeaf = true;
432-
logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName());
401+
newChildStates.put(cluster, childState);
402+
} else {
403+
newChildStates.put(cluster, childClusterStates.remove(cluster));
404+
}
405+
}
406+
if (childClusterStates != null) { // stop subscribing to revoked child clusters
407+
for (ClusterState watcher : childClusterStates.values()) {
408+
watcher.shutdown();
433409
}
434-
handleClusterDiscovered();
435410
}
411+
childClusterStates = newChildStates;
412+
} else if (update.clusterType() == ClusterType.EDS) {
413+
isLeaf = true;
414+
logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}",
415+
update.clusterName(), update.edsServiceName());
416+
} else { // logical DNS
417+
isLeaf = true;
418+
logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName());
436419
}
437-
438-
syncContext.execute(new ClusterDiscovered());
420+
handleClusterDiscovered();
439421
}
422+
440423
}
441424
}
442425
}

xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java

+21-30
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,8 @@ private EdsClusterState(String name, @Nullable String edsServiceName,
366366
void start() {
367367
String resourceName = edsServiceName != null ? edsServiceName : name;
368368
logger.log(XdsLogLevel.INFO, "Start watching EDS resource {0}", resourceName);
369-
xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), resourceName, this);
369+
xdsClient.watchXdsResource(XdsEndpointResource.getInstance(),
370+
resourceName, this, syncContext);
370371
}
371372

372373
@Override
@@ -452,7 +453,7 @@ public void run() {
452453
}
453454
}
454455

455-
syncContext.execute(new EndpointsUpdated());
456+
new EndpointsUpdated().run();
456457
}
457458

458459
private List<String> generatePriorityNames(String name,
@@ -491,38 +492,28 @@ private List<String> generatePriorityNames(String name,
491492

492493
@Override
493494
public void onResourceDoesNotExist(final String resourceName) {
494-
syncContext.execute(new Runnable() {
495-
@Override
496-
public void run() {
497-
if (shutdown) {
498-
return;
499-
}
500-
logger.log(XdsLogLevel.INFO, "Resource {0} unavailable", resourceName);
501-
status = Status.OK;
502-
resolved = true;
503-
result = null; // resource revoked
504-
handleEndpointResourceUpdate();
505-
}
506-
});
495+
if (shutdown) {
496+
return;
497+
}
498+
logger.log(XdsLogLevel.INFO, "Resource {0} unavailable", resourceName);
499+
status = Status.OK;
500+
resolved = true;
501+
result = null; // resource revoked
502+
handleEndpointResourceUpdate();
507503
}
508504

509505
@Override
510506
public void onError(final Status error) {
511-
syncContext.execute(new Runnable() {
512-
@Override
513-
public void run() {
514-
if (shutdown) {
515-
return;
516-
}
517-
String resourceName = edsServiceName != null ? edsServiceName : name;
518-
status = Status.UNAVAILABLE
519-
.withDescription(String.format("Unable to load EDS %s. xDS server returned: %s: %s",
520-
resourceName, error.getCode(), error.getDescription()))
521-
.withCause(error.getCause());
522-
logger.log(XdsLogLevel.WARNING, "Received EDS error: {0}", error);
523-
handleEndpointResolutionError();
524-
}
525-
});
507+
if (shutdown) {
508+
return;
509+
}
510+
String resourceName = edsServiceName != null ? edsServiceName : name;
511+
status = Status.UNAVAILABLE
512+
.withDescription(String.format("Unable to load EDS %s. xDS server returned: %s: %s",
513+
resourceName, error.getCode(), error.getDescription()))
514+
.withCause(error.getCause());
515+
logger.log(XdsLogLevel.WARNING, "Received EDS error: {0}", error);
516+
handleEndpointResolutionError();
526517
}
527518
}
528519

xds/src/main/java/io/grpc/xds/ControlPlaneClient.java

+18-4
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@
3838
import io.grpc.internal.BackoffPolicy;
3939
import io.grpc.stub.ClientCallStreamObserver;
4040
import io.grpc.stub.ClientResponseObserver;
41-
import io.grpc.stub.StreamObserver;
4241
import io.grpc.xds.Bootstrapper.ServerInfo;
4342
import io.grpc.xds.EnvoyProtoData.Node;
43+
import io.grpc.xds.XdsClient.ProcessingTracker;
4444
import io.grpc.xds.XdsClient.ResourceStore;
4545
import io.grpc.xds.XdsClient.XdsResponseHandler;
4646
import io.grpc.xds.XdsClientImpl.XdsChannelFactory;
@@ -288,6 +288,8 @@ private abstract class AbstractAdsStream {
288288

289289
abstract boolean isReady();
290290

291+
abstract void request(int count);
292+
291293
/**
292294
* Sends a discovery request with the given {@code versionInfo}, {@code nonce} and
293295
* {@code errorDetail}. Used for reacting to a specific discovery response. For
@@ -314,7 +316,10 @@ final void handleRpcResponse(XdsResourceType<?> type, String versionInfo, List<A
314316
}
315317
responseReceived = true;
316318
respNonces.put(type, nonce);
317-
xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce);
319+
ProcessingTracker processingTracker = new ProcessingTracker(() -> request(1), syncContext);
320+
xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce,
321+
processingTracker);
322+
processingTracker.onComplete();
318323
}
319324

320325
final void handleRpcError(Throwable t) {
@@ -372,14 +377,15 @@ private void cleanUp() {
372377
}
373378

374379
private final class AdsStreamV3 extends AbstractAdsStream {
375-
private StreamObserver<DiscoveryRequest> requestWriter;
380+
private ClientCallStreamObserver<DiscoveryRequest> requestWriter;
376381

377382
@Override
378383
public boolean isReady() {
379384
return requestWriter != null && ((ClientCallStreamObserver<?>) requestWriter).isReady();
380385
}
381386

382387
@Override
388+
@SuppressWarnings("unchecked")
383389
void start() {
384390
AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub =
385391
AggregatedDiscoveryServiceGrpc.newStub(channel);
@@ -389,6 +395,7 @@ final class AdsClientResponseObserver
389395

390396
@Override
391397
public void beforeStart(ClientCallStreamObserver<DiscoveryRequest> requestStream) {
398+
requestStream.disableAutoRequestWithInitial(1);
392399
requestStream.setOnReadyHandler(ControlPlaneClient.this::readyHandler);
393400
}
394401

@@ -408,6 +415,7 @@ public void run() {
408415
XdsLogLevel.WARNING,
409416
"Ignore an unknown type of DiscoveryResponse: {0}",
410417
response.getTypeUrl());
418+
request(1);
411419
return;
412420
}
413421
handleRpcResponse(type, response.getVersionInfo(), response.getResourcesList(),
@@ -437,7 +445,8 @@ public void run() {
437445
}
438446
}
439447

440-
requestWriter = stub.streamAggregatedResources(new AdsClientResponseObserver());
448+
requestWriter = (ClientCallStreamObserver) stub.streamAggregatedResources(
449+
new AdsClientResponseObserver());
441450
}
442451

443452
@Override
@@ -467,6 +476,11 @@ void sendDiscoveryRequest(XdsResourceType<?> type, String versionInfo,
467476
}
468477
}
469478

479+
@Override
480+
void request(int count) {
481+
requestWriter.request(count);
482+
}
483+
470484
@Override
471485
void sendError(Exception error) {
472486
requestWriter.onError(error);

0 commit comments

Comments
 (0)