Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ protected ClusterBlockException checkBlock(ClusterSearchShardsRequest request, C
indexNameExpressionResolver.concreteIndexNames(state, request));
}

@Override
protected ClusterBlockException checkGlobalBlock(ClusterState clusterState) {
return clusterState.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}

@Override
protected void masterOperation(Task task, final ClusterSearchShardsRequest request, final ClusterState state,
final ActionListener<ClusterSearchShardsResponse> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ protected ClusterBlockException checkBlock(GetAliasesRequest request, ClusterSta
indexNameExpressionResolver.concreteIndexNamesWithSystemIndexAccess(state, request));
}

@Override
protected ClusterBlockException checkGlobalBlock(ClusterState clusterState) {
return clusterState.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}

@Override
protected void masterOperation(Task task, GetAliasesRequest request, ClusterState state, ActionListener<GetAliasesResponse> listener) {
assert Transports.assertNotTransportThread("no need to avoid the context switch and may be expensive if there are many aliases");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ protected ClusterBlockException checkBlock(CloseIndexRequest request, ClusterSta
indexNameExpressionResolver.concreteIndexNames(state, request));
}

@Override
protected ClusterBlockException checkGlobalBlock(ClusterState clusterState) {
return clusterState.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}

@Override
protected void masterOperation(final Task task,
final CloseIndexRequest request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetadataDeleteIndexService;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -63,6 +64,11 @@ protected ClusterBlockException checkBlock(DeleteIndexRequest request, ClusterSt
return state.blocks().indicesAllowReleaseResources(indexNameExpressionResolver.concreteIndexNames(state, request));
}

@Override
protected ClusterBlockException checkGlobalBlock(ClusterState clusterState) {
return clusterState.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}

@Override
protected void masterOperation(Task task, final DeleteIndexRequest request, final ClusterState state,
final ActionListener<AcknowledgedResponse> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ protected ClusterBlockException checkBlock(PutMappingRequest request, ClusterSta
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, indices);
}

@Override
protected ClusterBlockException checkGlobalBlock(ClusterState clusterState) {
return clusterState.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}

@Override
protected void masterOperation(Task task, final PutMappingRequest request, final ClusterState state,
final ActionListener<AcknowledgedResponse> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ protected ClusterBlockException checkBlock(OpenIndexRequest request, ClusterStat
indexNameExpressionResolver.concreteIndexNames(state, request));
}

@Override
protected ClusterBlockException checkGlobalBlock(ClusterState clusterState) {
return clusterState.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}

@Override
protected void masterOperation(Task task, final OpenIndexRequest request, final ClusterState state,
final ActionListener<OpenIndexResponse> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ protected ClusterBlockException checkBlock(AddIndexBlockRequest request, Cluster
indexNameExpressionResolver.concreteIndexNames(state, request));
}

@Override
protected ClusterBlockException checkGlobalBlock(ClusterState clusterState) {
return clusterState.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}

@Override
protected void masterOperation(final Task task,
final AddIndexBlockRequest request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ protected ClusterBlockException checkBlock(RolloverRequest request, ClusterState
indexNameExpressionResolver.concreteIndexNames(state, indicesOptions, request));
}

@Override
protected ClusterBlockException checkGlobalBlock(ClusterState clusterState) {
return clusterState.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}

@Override
protected void masterOperation(Task task, final RolloverRequest rolloverRequest, final ClusterState oldState,
final ActionListener<RolloverResponse> listener) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ protected ClusterBlockException checkBlock(GetSettingsRequest request, ClusterSt
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ,
indexNameExpressionResolver.concreteIndexNames(state, request));
}
//

private static boolean isFilteredRequest(GetSettingsRequest request) {
return CollectionUtils.isEmpty(request.names()) == false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ protected ClusterBlockException checkBlock(IndicesShardStoresRequest request, Cl
indexNameExpressionResolver.concreteIndexNames(state, request));
}

@Override
protected ClusterBlockException checkGlobalBlock(ClusterState clusterState) {
return clusterState.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}

private class AsyncShardStoresInfoFetches {
private final DiscoveryNodes nodes;
private final RoutingNodes routingNodes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -99,6 +100,10 @@ protected boolean localExecute(Request request) {

protected abstract ClusterBlockException checkBlock(Request request, ClusterState state);

protected ClusterBlockException checkGlobalBlock(ClusterState clusterState) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this some more, I think we only really care about INFEs thrown while the STATE_NOT_RECOVERED_BLOCK is in place, other blocks don't really matter. I think rather than introducing the checkGlobalBlock method (which kinda duplicates checkBlock) it'd be better to wrap checkBlock like this:

    private ClusterBlockException checkBlockIfStateRecovered(Request request, ClusterState state) {
        try {
            return checkBlock(request, state);
        } catch (IndexNotFoundException e) {
            if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
                // no index metadata is exposed yet, but checkBlock depends on an index, so keep trying until the cluster forms
                assert GatewayService.STATE_NOT_RECOVERED_BLOCK.contains(ClusterBlockLevel.METADATA_READ);
                assert state.blocks().global().stream().allMatch(ClusterBlock::retryable);
                return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
            } else {
                throw e;
            }
        }
    }

return null;
}

@Override
protected void doExecute(Task task, final Request request, ActionListener<Response> listener) {
ClusterState state = clusterService.state();
Expand Down Expand Up @@ -135,22 +140,7 @@ protected void doStart(ClusterState clusterState) {
// check for block, if blocked, retry, else, execute locally
final ClusterBlockException blockException = checkBlock(request, clusterState);
if (blockException != null) {
if (blockException.retryable() == false) {
logger.trace("can't execute due to a non-retryable cluster block", blockException);
listener.onFailure(blockException);
} else {
logger.debug("can't execute due to a cluster block, retrying", blockException);
retry(clusterState, blockException, newState -> {
try {
ClusterBlockException newException = checkBlock(request, newState);
return (newException == null || newException.retryable() == false);
} catch (Exception e) {
// accept state as block will be rechecked by doStart() and listener.onFailure() then called
logger.debug("exception occurred during cluster block checking, accepting state", e);
return true;
}
});
}
handleClusterBlockException(clusterState, blockException);
} else {
ActionListener<Response> delegate = listener.delegateResponse((delegatedListener, t) -> {
if (t instanceof FailedToCommitClusterStateException || t instanceof NotMasterException) {
Expand Down Expand Up @@ -193,12 +183,41 @@ public void handleException(final TransportException exp) {
});
}
}
} catch (IndexNotFoundException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about catching an INFE thrown by any of this code, could we just protect the call to checkBlock()?

// In some situations it's possible that this is a false exception, i.e. while there's a STATE_NOT_RECOVERED_BLOCK
// to ensure that this is a legitimate index not found exception we should check if there's a cluster exception and
// handle it if there's one.
ClusterBlockException clusterBlockException = checkGlobalBlock(clusterState);
if (clusterBlockException != null) {
handleClusterBlockException(clusterState, clusterBlockException);
} else {
listener.onFailure(e);
}
} catch (Exception e) {
logger.trace("top-level failure", e);
listener.onFailure(e);
}
}

private void handleClusterBlockException(ClusterState clusterState, ClusterBlockException blockException) {
if (blockException.retryable() == false) {
logger.trace("can't execute due to a non-retryable cluster block", blockException);
listener.onFailure(blockException);
} else {
logger.debug("can't execute due to a cluster block, retrying", blockException);
retry(clusterState, blockException, newState -> {
try {
ClusterBlockException newException = checkBlock(request, newState);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this call to checkBlock also throws an INFE and the STATE_NOT_RECOVERED_BLOCK is in place then we could reasonably reject the state I think.

return (newException == null || newException.retryable() == false);
} catch (Exception e) {
// accept state as block will be rechecked by doStart() and listener.onFailure() then called
logger.debug("exception occurred during cluster block checking, accepting state", e);
return true;
}
});
}
}

private void retryOnMasterChange(ClusterState state, Throwable failure) {
retry(state, failure, MasterNodeChangePredicate.build(state));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state)
indexNameExpressionResolver.concreteIndexNames(state, request));
}

@Override
protected ClusterBlockException checkGlobalBlock(ClusterState clusterState) {
return clusterState.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}

@Override
protected final void masterOperation(Task task, final Request request, final ClusterState state,
final ActionListener<Response> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
Expand All @@ -25,15 +27,19 @@
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.indices.TestIndexNameExpressionResolver;
import org.elasticsearch.node.NodeClosedException;
Expand Down Expand Up @@ -65,6 +71,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
import static org.elasticsearch.test.ClusterServiceUtils.setState;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -124,7 +131,9 @@ void assertListenerThrows(String msg, ActionFuture<?> listener, Class<?> klass)
}
}

public static class Request extends MasterNodeRequest<Request> {
public static class Request extends MasterNodeRequest<Request> implements IndicesRequest.Replaceable {
private String[] indices = Strings.EMPTY_ARRAY;

Request() {}

Request(StreamInput in) throws IOException {
Expand All @@ -140,6 +149,22 @@ public ActionRequestValidationException validate() {
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(id, type, action, "", parentTaskId, headers);
}

@Override
public String[] indices() {
return indices;
}

@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.strictExpandOpen();
}

@Override
public IndicesRequest indices(String... indices) {
this.indices = indices;
return this;
}
}

class Response extends ActionResponse {
Expand Down Expand Up @@ -568,6 +593,84 @@ public void testTaskCancellationOnceActionItIsDispatchedToMaster() throws Except
expectThrows(CancellationException.class, listener::actionGet);
}

public void testGlobalBlocksAreCheckedAfterIndexNotFoundException() throws Exception {
Request request = new Request().masterNodeTimeout(TimeValue.timeValueSeconds(60));
String indexRequestName = "my-index";
request.indices(indexRequestName);

ClusterState stateWithBlockWithoutIndexMetadata =
ClusterState.builder(ClusterStateCreationUtils.state(localNode, localNode, allNodes))
.blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK))
.build();
setState(clusterService, stateWithBlockWithoutIndexMetadata);

Action action = new Action("internal:testAction", transportService, clusterService, threadPool, ThreadPool.Names.SAME) {
final IndexNameExpressionResolver indexNameExpressionResolver = TestIndexNameExpressionResolver.newInstance();

@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ,
indexNameExpressionResolver.concreteIndexNamesWithSystemIndexAccess(state, request));
}

@Override
protected ClusterBlockException checkGlobalBlock(ClusterState clusterState) {
return clusterState.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}
};

PlainActionFuture<Response> listener = new PlainActionFuture<>();
ActionTestUtils.execute(action, null, request, listener);

assertFalse(listener.isDone());
IndexMetadata.Builder indexMetadataBuilder =
IndexMetadata.builder(indexRequestName)
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(0);
ClusterState clusterStateWithoutBlocks = ClusterState.builder(ClusterStateCreationUtils.state(localNode, localNode, allNodes))
.metadata(Metadata.builder().put(indexMetadataBuilder).build())
.blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK)
.build();
setState(clusterService, clusterStateWithoutBlocks);
assertTrue(listener.isDone());
listener.get();
}

public void testGlobalBlocksAreCheckedAfterIndexNotFoundExceptionTimesOutIfIndexIsNotFound() {
Request request = new Request().masterNodeTimeout(TimeValue.timeValueMillis(50));
String indexRequestName = "my-index";
request.indices(indexRequestName);

ClusterState stateWithBlockWithoutIndexMetadata =
ClusterState.builder(ClusterStateCreationUtils.state(localNode, localNode, allNodes))
.blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK))
.build();
setState(clusterService, stateWithBlockWithoutIndexMetadata);

Action action = new Action("internal:testAction", transportService, clusterService, threadPool, ThreadPool.Names.SAME) {
final IndexNameExpressionResolver indexNameExpressionResolver = TestIndexNameExpressionResolver.newInstance();

@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ,
indexNameExpressionResolver.concreteIndexNamesWithSystemIndexAccess(state, request));
}

@Override
protected ClusterBlockException checkGlobalBlock(ClusterState clusterState) {
return clusterState.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}
};

PlainActionFuture<Response> listener = new PlainActionFuture<>();
ActionTestUtils.execute(action, null, request, listener);

ExecutionException ex = expectThrows(ExecutionException.class, listener::get);
assertThat(ex.getCause(), instanceOf(MasterNotDiscoveredException.class));
assertThat(ex.getCause().getCause(), instanceOf(ClusterBlockException.class));
}

private Runnable blockAllThreads(String executorName) throws Exception {
final int numberOfThreads = threadPool.info(executorName).getMax();
final EsThreadPoolExecutor executor = (EsThreadPoolExecutor) threadPool.executor(executorName);
Expand Down