Skip to content

Commit ea7d3f9

Browse files
authored
Check for global blocks after IndexNotFoundException in TransportMasterNodeAction (#78128)
Today we try to resolve index patterns to check cluster blocks in certain TransportMasterNodeActions, in some scenarios where a master node is recovering we could end up throwing a false IndexNotFoundException. This commit adds an extra check for global blocks when a IndexNotFoundException is thrown to ensure that we cover that case. Closes #70572
1 parent d2de1af commit ea7d3f9

File tree

2 files changed

+118
-4
lines changed

2 files changed

+118
-4
lines changed

server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
import org.elasticsearch.cluster.ClusterStateObserver;
2222
import org.elasticsearch.cluster.MasterNodeChangePredicate;
2323
import org.elasticsearch.cluster.NotMasterException;
24+
import org.elasticsearch.cluster.block.ClusterBlock;
2425
import org.elasticsearch.cluster.block.ClusterBlockException;
26+
import org.elasticsearch.cluster.block.ClusterBlockLevel;
2527
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
2628
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2729
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -30,6 +32,8 @@
3032
import org.elasticsearch.common.io.stream.Writeable;
3133
import org.elasticsearch.core.TimeValue;
3234
import org.elasticsearch.discovery.MasterNotDiscoveredException;
35+
import org.elasticsearch.gateway.GatewayService;
36+
import org.elasticsearch.index.IndexNotFoundException;
3337
import org.elasticsearch.node.NodeClosedException;
3438
import org.elasticsearch.tasks.CancellableTask;
3539
import org.elasticsearch.tasks.Task;
@@ -99,6 +103,21 @@ protected boolean localExecute(Request request) {
99103

100104
protected abstract ClusterBlockException checkBlock(Request request, ClusterState state);
101105

106+
private ClusterBlockException checkBlockIfStateRecovered(Request request, ClusterState state) {
107+
try {
108+
return checkBlock(request, state);
109+
} catch (IndexNotFoundException e) {
110+
if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
111+
// no index metadata is exposed yet, but checkBlock depends on an index, so keep trying until the cluster forms
112+
assert GatewayService.STATE_NOT_RECOVERED_BLOCK.contains(ClusterBlockLevel.METADATA_READ);
113+
assert state.blocks().global(ClusterBlockLevel.METADATA_READ).stream().allMatch(ClusterBlock::retryable);
114+
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
115+
} else {
116+
throw e;
117+
}
118+
}
119+
}
120+
102121
@Override
103122
protected void doExecute(Task task, final Request request, ActionListener<Response> listener) {
104123
ClusterState state = clusterService.state();
@@ -133,7 +152,7 @@ protected void doStart(ClusterState clusterState) {
133152
final DiscoveryNodes nodes = clusterState.nodes();
134153
if (nodes.isLocalNodeElectedMaster() || localExecute(request)) {
135154
// check for block, if blocked, retry, else, execute locally
136-
final ClusterBlockException blockException = checkBlock(request, clusterState);
155+
final ClusterBlockException blockException = checkBlockIfStateRecovered(request, clusterState);
137156
if (blockException != null) {
138157
if (blockException.retryable() == false) {
139158
logger.trace("can't execute due to a non-retryable cluster block", blockException);
@@ -142,7 +161,7 @@ protected void doStart(ClusterState clusterState) {
142161
logger.debug("can't execute due to a cluster block, retrying", blockException);
143162
retry(clusterState, blockException, newState -> {
144163
try {
145-
ClusterBlockException newException = checkBlock(request, newState);
164+
ClusterBlockException newException = checkBlockIfStateRecovered(request, newState);
146165
return (newException == null || newException.retryable() == false);
147166
} catch (Exception e) {
148167
// accept state as block will be rechecked by doStart() and listener.onFailure() then called

server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java

Lines changed: 97 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@
1313
import org.elasticsearch.action.ActionListener;
1414
import org.elasticsearch.action.ActionRequestValidationException;
1515
import org.elasticsearch.action.ActionResponse;
16+
import org.elasticsearch.action.IndicesRequest;
1617
import org.elasticsearch.action.support.ActionFilters;
1718
import org.elasticsearch.action.support.ActionTestUtils;
19+
import org.elasticsearch.action.support.IndicesOptions;
1820
import org.elasticsearch.action.support.PlainActionFuture;
1921
import org.elasticsearch.action.support.ThreadedActionListener;
2022
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
@@ -25,15 +27,19 @@
2527
import org.elasticsearch.cluster.block.ClusterBlockLevel;
2628
import org.elasticsearch.cluster.block.ClusterBlocks;
2729
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
30+
import org.elasticsearch.cluster.metadata.IndexMetadata;
31+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
32+
import org.elasticsearch.cluster.metadata.Metadata;
2833
import org.elasticsearch.cluster.node.DiscoveryNode;
2934
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
3035
import org.elasticsearch.cluster.node.DiscoveryNodes;
3136
import org.elasticsearch.cluster.service.ClusterService;
37+
import org.elasticsearch.common.Strings;
3238
import org.elasticsearch.common.io.stream.StreamInput;
3339
import org.elasticsearch.common.io.stream.StreamOutput;
3440
import org.elasticsearch.common.settings.Settings;
35-
import org.elasticsearch.core.TimeValue;
3641
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
42+
import org.elasticsearch.core.TimeValue;
3743
import org.elasticsearch.discovery.MasterNotDiscoveredException;
3844
import org.elasticsearch.indices.TestIndexNameExpressionResolver;
3945
import org.elasticsearch.node.NodeClosedException;
@@ -65,6 +71,7 @@
6571
import java.util.concurrent.ExecutionException;
6672
import java.util.concurrent.TimeUnit;
6773

74+
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
6875
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
6976
import static org.elasticsearch.test.ClusterServiceUtils.setState;
7077
import static org.hamcrest.Matchers.equalTo;
@@ -124,7 +131,9 @@ void assertListenerThrows(String msg, ActionFuture<?> listener, Class<?> klass)
124131
}
125132
}
126133

127-
public static class Request extends MasterNodeRequest<Request> {
134+
public static class Request extends MasterNodeRequest<Request> implements IndicesRequest.Replaceable {
135+
private String[] indices = Strings.EMPTY_ARRAY;
136+
128137
Request() {}
129138

130139
Request(StreamInput in) throws IOException {
@@ -140,6 +149,22 @@ public ActionRequestValidationException validate() {
140149
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
141150
return new CancellableTask(id, type, action, "", parentTaskId, headers);
142151
}
152+
153+
@Override
154+
public String[] indices() {
155+
return indices;
156+
}
157+
158+
@Override
159+
public IndicesOptions indicesOptions() {
160+
return IndicesOptions.strictExpandOpen();
161+
}
162+
163+
@Override
164+
public IndicesRequest indices(String... indices) {
165+
this.indices = indices;
166+
return this;
167+
}
143168
}
144169

145170
class Response extends ActionResponse {
@@ -568,6 +593,76 @@ public void testTaskCancellationOnceActionItIsDispatchedToMaster() throws Except
568593
expectThrows(CancellationException.class, listener::actionGet);
569594
}
570595

596+
public void testGlobalBlocksAreCheckedAfterIndexNotFoundException() throws Exception {
597+
Request request = new Request().masterNodeTimeout(TimeValue.timeValueSeconds(60));
598+
String indexRequestName = "my-index";
599+
request.indices(indexRequestName);
600+
601+
ClusterState stateWithBlockWithoutIndexMetadata =
602+
ClusterState.builder(ClusterStateCreationUtils.state(localNode, localNode, allNodes))
603+
.blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK))
604+
.build();
605+
setState(clusterService, stateWithBlockWithoutIndexMetadata);
606+
607+
Action action = new Action("internal:testAction", transportService, clusterService, threadPool, ThreadPool.Names.SAME) {
608+
final IndexNameExpressionResolver indexNameExpressionResolver = TestIndexNameExpressionResolver.newInstance();
609+
610+
@Override
611+
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
612+
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ,
613+
indexNameExpressionResolver.concreteIndexNamesWithSystemIndexAccess(state, request));
614+
}
615+
616+
};
617+
618+
PlainActionFuture<Response> listener = new PlainActionFuture<>();
619+
ActionTestUtils.execute(action, null, request, listener);
620+
621+
assertFalse(listener.isDone());
622+
IndexMetadata.Builder indexMetadataBuilder =
623+
IndexMetadata.builder(indexRequestName)
624+
.settings(settings(Version.CURRENT))
625+
.numberOfShards(1)
626+
.numberOfReplicas(0);
627+
ClusterState clusterStateWithoutBlocks = ClusterState.builder(ClusterStateCreationUtils.state(localNode, localNode, allNodes))
628+
.metadata(Metadata.builder().put(indexMetadataBuilder).build())
629+
.blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK)
630+
.build();
631+
setState(clusterService, clusterStateWithoutBlocks);
632+
assertTrue(listener.isDone());
633+
listener.get();
634+
}
635+
636+
public void testGlobalBlocksAreCheckedAfterIndexNotFoundExceptionTimesOutIfIndexIsNotFound() {
637+
Request request = new Request().masterNodeTimeout(TimeValue.timeValueMillis(50));
638+
String indexRequestName = "my-index";
639+
request.indices(indexRequestName);
640+
641+
ClusterState stateWithBlockWithoutIndexMetadata =
642+
ClusterState.builder(ClusterStateCreationUtils.state(localNode, localNode, allNodes))
643+
.blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK))
644+
.build();
645+
setState(clusterService, stateWithBlockWithoutIndexMetadata);
646+
647+
Action action = new Action("internal:testAction", transportService, clusterService, threadPool, ThreadPool.Names.SAME) {
648+
final IndexNameExpressionResolver indexNameExpressionResolver = TestIndexNameExpressionResolver.newInstance();
649+
650+
@Override
651+
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
652+
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ,
653+
indexNameExpressionResolver.concreteIndexNamesWithSystemIndexAccess(state, request));
654+
}
655+
656+
};
657+
658+
PlainActionFuture<Response> listener = new PlainActionFuture<>();
659+
ActionTestUtils.execute(action, null, request, listener);
660+
661+
ExecutionException ex = expectThrows(ExecutionException.class, listener::get);
662+
assertThat(ex.getCause(), instanceOf(MasterNotDiscoveredException.class));
663+
assertThat(ex.getCause().getCause(), instanceOf(ClusterBlockException.class));
664+
}
665+
571666
private Runnable blockAllThreads(String executorName) throws Exception {
572667
final int numberOfThreads = threadPool.info(executorName).getMax();
573668
final EsThreadPoolExecutor executor = (EsThreadPoolExecutor) threadPool.executor(executorName);

0 commit comments

Comments
 (0)