Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.MasterNodeChangePredicate;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand All @@ -30,6 +32,8 @@
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -105,6 +109,21 @@ protected boolean localExecute(Request request) {

protected abstract ClusterBlockException checkBlock(Request request, ClusterState state);

private ClusterBlockException checkBlockIfStateRecovered(Request request, ClusterState state) {
try {
return checkBlock(request, state);
} catch (IndexNotFoundException e) {
if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// no index metadata is exposed yet, but checkBlock depends on an index, so keep trying until the cluster forms
assert GatewayService.STATE_NOT_RECOVERED_BLOCK.contains(ClusterBlockLevel.METADATA_READ);
assert state.blocks().global(ClusterBlockLevel.METADATA_READ).stream().allMatch(ClusterBlock::retryable);
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
} else {
throw e;
}
}
}

@Override
protected void doExecute(Task task, final Request request, ActionListener<Response> listener) {
ClusterState state = clusterService.state();
Expand Down Expand Up @@ -139,15 +158,15 @@ protected void doStart(ClusterState clusterState) {
final DiscoveryNodes nodes = clusterState.nodes();
if (nodes.isLocalNodeElectedMaster() || localExecute(request)) {
// check for block, if blocked, retry, else, execute locally
final ClusterBlockException blockException = checkBlock(request, clusterState);
final ClusterBlockException blockException = checkBlockIfStateRecovered(request, clusterState);
if (blockException != null) {
if (blockException.retryable() == false) {
listener.onFailure(blockException);
} else {
logger.debug("can't execute due to a cluster block, retrying", blockException);
retry(clusterState, blockException, newState -> {
try {
ClusterBlockException newException = checkBlock(request, newState);
ClusterBlockException newException = checkBlockIfStateRecovered(request, newState);
return (newException == null || newException.retryable() == false);
} catch (Exception e) {
// accept state as block will be rechecked by doStart() and listener.onFailure() then called
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
Expand All @@ -25,15 +27,19 @@
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.indices.TestIndexNameExpressionResolver;
import org.elasticsearch.node.NodeClosedException;
Expand Down Expand Up @@ -65,6 +71,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
import static org.elasticsearch.test.ClusterServiceUtils.setState;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -124,7 +131,9 @@ void assertListenerThrows(String msg, ActionFuture<?> listener, Class<?> klass)
}
}

public static class Request extends MasterNodeRequest<Request> {
public static class Request extends MasterNodeRequest<Request> implements IndicesRequest.Replaceable {
private String[] indices = Strings.EMPTY_ARRAY;

Request() {}

Request(StreamInput in) throws IOException {
Expand All @@ -140,6 +149,22 @@ public ActionRequestValidationException validate() {
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(id, type, action, "", parentTaskId, headers);
}

@Override
public String[] indices() {
return indices;
}

@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.strictExpandOpen();
}

@Override
public IndicesRequest indices(String... indices) {
this.indices = indices;
return this;
}
}

class Response extends ActionResponse {
Expand Down Expand Up @@ -567,6 +592,76 @@ public void testTaskCancellationOnceActionItIsDispatchedToMaster() throws Except
expectThrows(CancellationException.class, listener::actionGet);
}

public void testGlobalBlocksAreCheckedAfterIndexNotFoundException() throws Exception {
Request request = new Request().masterNodeTimeout(TimeValue.timeValueSeconds(60));
String indexRequestName = "my-index";
request.indices(indexRequestName);

ClusterState stateWithBlockWithoutIndexMetadata =
ClusterState.builder(ClusterStateCreationUtils.state(localNode, localNode, allNodes))
.blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK))
.build();
setState(clusterService, stateWithBlockWithoutIndexMetadata);

Action action = new Action("internal:testAction", transportService, clusterService, threadPool, ThreadPool.Names.SAME) {
final IndexNameExpressionResolver indexNameExpressionResolver = TestIndexNameExpressionResolver.newInstance();

@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ,
indexNameExpressionResolver.concreteIndexNamesWithSystemIndexAccess(state, request));
}

};

PlainActionFuture<Response> listener = new PlainActionFuture<>();
ActionTestUtils.execute(action, null, request, listener);

assertFalse(listener.isDone());
IndexMetadata.Builder indexMetadataBuilder =
IndexMetadata.builder(indexRequestName)
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(0);
ClusterState clusterStateWithoutBlocks = ClusterState.builder(ClusterStateCreationUtils.state(localNode, localNode, allNodes))
.metadata(Metadata.builder().put(indexMetadataBuilder).build())
.blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK)
.build();
setState(clusterService, clusterStateWithoutBlocks);
assertTrue(listener.isDone());
listener.get();
}

public void testGlobalBlocksAreCheckedAfterIndexNotFoundExceptionTimesOutIfIndexIsNotFound() {
Request request = new Request().masterNodeTimeout(TimeValue.timeValueMillis(50));
String indexRequestName = "my-index";
request.indices(indexRequestName);

ClusterState stateWithBlockWithoutIndexMetadata =
ClusterState.builder(ClusterStateCreationUtils.state(localNode, localNode, allNodes))
.blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK))
.build();
setState(clusterService, stateWithBlockWithoutIndexMetadata);

Action action = new Action("internal:testAction", transportService, clusterService, threadPool, ThreadPool.Names.SAME) {
final IndexNameExpressionResolver indexNameExpressionResolver = TestIndexNameExpressionResolver.newInstance();

@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ,
indexNameExpressionResolver.concreteIndexNamesWithSystemIndexAccess(state, request));
}

};

PlainActionFuture<Response> listener = new PlainActionFuture<>();
ActionTestUtils.execute(action, null, request, listener);

ExecutionException ex = expectThrows(ExecutionException.class, listener::get);
assertThat(ex.getCause(), instanceOf(MasterNotDiscoveredException.class));
assertThat(ex.getCause().getCause(), instanceOf(ClusterBlockException.class));
}

private Runnable blockAllThreads(String executorName) throws Exception {
final int numberOfThreads = threadPool.info(executorName).getMax();
final EsThreadPoolExecutor executor = (EsThreadPoolExecutor) threadPool.executor(executorName);
Expand Down