Skip to content

Commit 664c12c

Browse files
authored
Preventing exceptions on node shutdown in integration tests (#88827)
1 parent 32c33ed commit 664c12c

File tree

1 file changed

+30
-12
lines changed

1 file changed

+30
-12
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationDiagnosticsService.java

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.common.io.stream.StreamOutput;
2525
import org.elasticsearch.common.io.stream.Writeable;
2626
import org.elasticsearch.common.settings.Setting;
27+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
2728
import org.elasticsearch.core.Nullable;
2829
import org.elasticsearch.core.Releasable;
2930
import org.elasticsearch.core.Releasables;
@@ -650,12 +651,20 @@ void beginPollingClusterFormationInfo(
650651
) {
651652
masterEligibleNodes.forEach(masterEligibleNode -> {
652653
Consumer<ClusterFormationStateOrException> responseConsumer = result -> nodeResponseConsumer.accept(masterEligibleNode, result);
653-
cancellableConsumer.accept(
654-
fetchClusterFormationInfo(
655-
masterEligibleNode,
656-
responseConsumer.andThen(rescheduleFetchConsumer(masterEligibleNode, responseConsumer, cancellableConsumer))
657-
)
658-
);
654+
try {
655+
cancellableConsumer.accept(
656+
fetchClusterFormationInfo(
657+
masterEligibleNode,
658+
responseConsumer.andThen(rescheduleFetchConsumer(masterEligibleNode, responseConsumer, cancellableConsumer))
659+
)
660+
);
661+
} catch (EsRejectedExecutionException e) {
662+
if (e.isExecutorShutdown()) {
663+
logger.trace("Not rescheduling request for cluster coordination info because this node is being shutdown", e);
664+
} else {
665+
throw e;
666+
}
667+
}
659668
});
660669
}
661670

@@ -673,12 +682,20 @@ private Consumer<CoordinationDiagnosticsService.ClusterFormationStateOrException
673682
Consumer<Scheduler.Cancellable> cancellableConsumer
674683
) {
675684
return response -> {
676-
cancellableConsumer.accept(
677-
fetchClusterFormationInfo(
678-
masterEligibleNode,
679-
responseConsumer.andThen(rescheduleFetchConsumer(masterEligibleNode, responseConsumer, cancellableConsumer))
680-
)
681-
);
685+
try {
686+
cancellableConsumer.accept(
687+
fetchClusterFormationInfo(
688+
masterEligibleNode,
689+
responseConsumer.andThen(rescheduleFetchConsumer(masterEligibleNode, responseConsumer, cancellableConsumer))
690+
)
691+
);
692+
} catch (EsRejectedExecutionException e) {
693+
if (e.isExecutorShutdown()) {
694+
logger.trace("Not rescheduling request for cluster coordination info because this node is being shutdown", e);
695+
} else {
696+
throw e;
697+
}
698+
}
682699
};
683700
}
684701

@@ -697,6 +714,7 @@ private void cancelPollingClusterFormationInfo() {
697714
* @param node The node to poll for cluster formation information
698715
* @param responseConsumer The consumer of the cluster formation info for the node, or the exception encountered while contacting it
699716
* @return A Cancellable for the task that is scheduled to fetch cluster formation information
717+
* @throws EsRejectedExecutionException If the task cannot be scheduled, possibly because the node is shutting down.
700718
*/
701719
private Scheduler.Cancellable fetchClusterFormationInfo(
702720
DiscoveryNode node,

0 commit comments

Comments
 (0)