diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.open/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.open/10_basic.yml index a8ab29d9feb97..8bc8ce6c4c871 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.open/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.open/10_basic.yml @@ -79,3 +79,40 @@ - is_true: acknowledged - match: { acknowledged: true } - match: { shards_acknowledged: true } +--- +"Close index response with result per index": + - skip: + version: " - 7.99.99" + reason: "close index response reports result per index starting version 8.0.0" + + - do: + indices.create: + index: index_1 + body: + settings: + number_of_replicas: 0 + + - do: + indices.create: + index: index_2 + body: + settings: + number_of_replicas: 0 + + - do: + indices.create: + index: index_3 + body: + settings: + number_of_replicas: 0 + + - do: + indices.close: + index: "index_*" + + - match: { acknowledged: true } + - match: { shards_acknowledged: true } + - match: { indices.index_1.closed: true } + - match: { indices.index_2.closed: true } + - match: { indices.index_3.closed: true } + diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponse.java index ea44ba7a8e46b..ea7d14655c594 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponse.java @@ -18,20 +18,40 @@ */ package org.elasticsearch.action.admin.indices.close; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.action.support.DefaultShardOperationFailedException; import org.elasticsearch.action.support.master.ShardsAcknowledgedResponse; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContentFragment; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.Index; import java.io.IOException; +import java.util.List; +import java.util.Objects; + +import static java.util.Collections.emptyList; +import static java.util.Collections.unmodifiableList; public class CloseIndexResponse extends ShardsAcknowledgedResponse { + private List indices; + CloseIndexResponse() { } - public CloseIndexResponse(final boolean acknowledged, final boolean shardsAcknowledged) { + public CloseIndexResponse(final boolean acknowledged, final boolean shardsAcknowledged, final List indices) { super(acknowledged, shardsAcknowledged); + this.indices = unmodifiableList(Objects.requireNonNull(indices)); + } + + public List getIndices() { + return indices; } @Override @@ -40,6 +60,11 @@ public void readFrom(StreamInput in) throws IOException { if (in.getVersion().onOrAfter(Version.V_7_2_0)) { readShardsAcknowledged(in); } + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + indices = unmodifiableList(in.readList(IndexResult::new)); + } else { + indices = unmodifiableList(emptyList()); + } } @Override @@ -48,5 +73,225 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_7_2_0)) { writeShardsAcknowledged(out); } + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeList(indices); + } + } + + protected void addCustomFields(final XContentBuilder builder, final Params params) throws IOException { + super.addCustomFields(builder, params); + builder.startObject("indices"); + for (IndexResult index : indices) { + index.toXContent(builder, params); + } + builder.endObject(); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + public static class IndexResult implements Writeable, ToXContentFragment { + + private final Index index; + private final @Nullable Exception exception; + private final @Nullable ShardResult[] shards; + + public IndexResult(final Index index) { + this(index, null, null); + } + + public IndexResult(final Index index, final Exception failure) { + this(index, Objects.requireNonNull(failure), null); + } + + public IndexResult(final Index index, final ShardResult[] shards) { + this(index, null, Objects.requireNonNull(shards)); + } + + private IndexResult(final Index index, @Nullable final Exception exception, @Nullable final ShardResult[] shards) { + this.index = Objects.requireNonNull(index); + this.exception = exception; + this.shards = shards; + } + + IndexResult(final StreamInput in) throws IOException { + this.index = new Index(in); + this.exception = in.readException(); + this.shards = in.readOptionalArray(ShardResult::new, ShardResult[]::new); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + index.writeTo(out); + out.writeException(exception); + out.writeOptionalArray(shards); + } + + public Index getIndex() { + return index; + } + + public Exception getException() { + return exception; + } + + public ShardResult[] getShards() { + return shards; + } + + public boolean hasFailures() { + if (exception != null) { + return true; + } + if (shards != null) { + for (ShardResult shard : shards) { + if (shard.hasFailures()) { + return true; + } + } + } + return false; + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + builder.startObject(index.getName()); + { + if (hasFailures()) { + builder.field("closed", false); + if (exception != null) { + builder.startObject("exception"); + ElasticsearchException.generateFailureXContent(builder, params, exception, true); + builder.endObject(); + } else { + builder.startObject("failedShards"); + for (ShardResult shard : shards) { + if (shard.hasFailures()) { + shard.toXContent(builder, params); + } + } + builder.endObject(); + } + } else { + builder.field("closed", true); + } + } + return builder.endObject(); + } + + @Override + public String toString() { + return Strings.toString(this); + } + } + + public static class ShardResult implements Writeable, ToXContentFragment { + + private final int id; + private final ShardResult.Failure[] failures; + + public ShardResult(final int id, final Failure[] failures) { + this.id = id; + this.failures = failures; + } + + ShardResult(final StreamInput in) throws IOException { + this.id = in.readVInt(); + this.failures = in.readOptionalArray(Failure::readFailure, ShardResult.Failure[]::new); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + out.writeVInt(id); + out.writeOptionalArray(failures); + } + + public boolean hasFailures() { + return failures != null && failures.length > 0; + } + + public int getId() { + return id; + } + + public Failure[] getFailures() { + return failures; + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + builder.startObject(String.valueOf(id)); + { + builder.startArray("failures"); + if (failures != null) { + for (Failure failure : failures) { + builder.startObject(); + failure.toXContent(builder, params); + builder.endObject(); + } + } + builder.endArray(); + } + return builder.endObject(); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + public static class Failure extends DefaultShardOperationFailedException implements Writeable { + + private @Nullable String nodeId; + + private Failure() { + } + + public Failure(final String index, final int shardId, final Throwable reason) { + this(index, shardId, reason, null); + } + + public Failure(final String index, final int shardId, final Throwable reason, final String nodeId) { + super(index, shardId, reason); + this.nodeId = nodeId; + } + + public String getNodeId() { + return nodeId; + } + + @Override + public void readFrom(final StreamInput in) throws IOException { + super.readFrom(in); + nodeId = in.readOptionalString(); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + out.writeOptionalString(nodeId); + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + if (nodeId != null) { + builder.field("node", nodeId); + } + return super.toXContent(builder, params); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + static Failure readFailure(final StreamInput in) throws IOException { + final Failure failure = new Failure(); + failure.readFrom(in); + return failure; + } + } } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java index a6f4b6f3d0c4a..3c231d13845b2 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java @@ -40,6 +40,8 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.util.Collections; + /** * Close index action */ @@ -109,7 +111,7 @@ protected void masterOperation(final Task task, final ActionListener listener) throws Exception { final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request); if (concreteIndices == null || concreteIndices.length == 0) { - listener.onResponse(new CloseIndexResponse(true, false)); + listener.onResponse(new CloseIndexResponse(true, false, Collections.emptyList())); return; } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java index 80be71dadd3d6..ef4583e98e544 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java @@ -29,10 +29,11 @@ import org.elasticsearch.action.NotifyOnceListener; import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; +import org.elasticsearch.action.admin.indices.close.CloseIndexResponse.IndexResult; +import org.elasticsearch.action.admin.indices.close.CloseIndexResponse.ShardResult; import org.elasticsearch.action.admin.indices.close.TransportVerifyShardBeforeCloseAction; import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest; import org.elasticsearch.action.support.ActiveShardsObserver; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; @@ -52,6 +53,7 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.ValidationException; import org.elasticsearch.common.collect.ImmutableOpenIntMap; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -72,6 +74,8 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -144,27 +148,22 @@ public ClusterState execute(final ClusterState currentState) { public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) { if (oldState == newState) { assert blockedIndices.isEmpty() : "List of blocked indices is not empty but cluster state wasn't changed"; - listener.onResponse(new CloseIndexResponse(true, false)); + listener.onResponse(new CloseIndexResponse(true, false, Collections.emptyList())); } else { assert blockedIndices.isEmpty() == false : "List of blocked indices is empty but cluster state was changed"; threadPool.executor(ThreadPool.Names.MANAGEMENT) .execute(new WaitForClosedBlocksApplied(blockedIndices, request, - ActionListener.wrap(results -> + ActionListener.wrap(verifyResults -> clusterService.submitStateUpdateTask("close-indices", new ClusterStateUpdateTask(Priority.URGENT) { - - boolean acknowledged = true; + private final List indices = new ArrayList<>(); @Override public ClusterState execute(final ClusterState currentState) throws Exception { - final ClusterState updatedState = closeRoutingTable(currentState, blockedIndices, results); - for (Map.Entry result : results.entrySet()) { - IndexMetaData updatedMetaData = updatedState.metaData().index(result.getKey()); - if (updatedMetaData != null && updatedMetaData.getState() != IndexMetaData.State.CLOSE) { - acknowledged = false; - break; - } - } - return allocationService.reroute(updatedState, "indices closed"); + Tuple> closingResult = + closeRoutingTable(currentState, blockedIndices, verifyResults); + assert verifyResults.size() == closingResult.v2().size(); + indices.addAll(closingResult.v2()); + return allocationService.reroute(closingResult.v1(), "indices closed"); } @Override @@ -176,27 +175,28 @@ public void onFailure(final String source, final Exception e) { public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) { - final String[] indices = results.entrySet().stream() - .filter(result -> result.getValue().isAcknowledged()) - .map(result -> result.getKey().getName()) - .filter(index -> newState.routingTable().hasIndex(index)) + final boolean acknowledged = indices.stream().noneMatch(IndexResult::hasFailures); + final String[] waitForIndices = indices.stream() + .filter(result -> result.hasFailures() == false) + .filter(result -> newState.routingTable().hasIndex(result.getIndex())) + .map(result -> result.getIndex().getName()) .toArray(String[]::new); - if (indices.length > 0) { - activeShardsObserver.waitForActiveShards(indices, request.waitForActiveShards(), + if (waitForIndices.length > 0) { + activeShardsObserver.waitForActiveShards(waitForIndices, request.waitForActiveShards(), request.ackTimeout(), shardsAcknowledged -> { if (shardsAcknowledged == false) { logger.debug("[{}] indices closed, but the operation timed out while waiting " + - "for enough shards to be started.", Arrays.toString(indices)); + "for enough shards to be started.", Arrays.toString(waitForIndices)); } // acknowledged maybe be false but some indices may have been correctly closed, so // we maintain a kind of coherency by overriding the shardsAcknowledged value // (see ShardsAcknowledgedResponse constructor) boolean shardsAcked = acknowledged ? shardsAcknowledged : false; - listener.onResponse(new CloseIndexResponse(acknowledged, shardsAcked)); + listener.onResponse(new CloseIndexResponse(acknowledged, shardsAcked, indices)); }, listener::onFailure); } else { - listener.onResponse(new CloseIndexResponse(acknowledged, false)); + listener.onResponse(new CloseIndexResponse(acknowledged, false, indices)); } } }), @@ -292,11 +292,11 @@ class WaitForClosedBlocksApplied extends AbstractRunnable { private final Map blockedIndices; private final CloseIndexClusterStateUpdateRequest request; - private final ActionListener> listener; + private final ActionListener> listener; private WaitForClosedBlocksApplied(final Map blockedIndices, final CloseIndexClusterStateUpdateRequest request, - final ActionListener> listener) { + final ActionListener> listener) { if (blockedIndices == null || blockedIndices.isEmpty()) { throw new IllegalArgumentException("Cannot wait for closed blocks to be applied, list of blocked indices is empty or null"); } @@ -312,7 +312,7 @@ public void onFailure(final Exception e) { @Override protected void doRun() throws Exception { - final Map results = ConcurrentCollections.newConcurrentMap(); + final Map results = ConcurrentCollections.newConcurrentMap(); final CountDown countDown = new CountDown(blockedIndices.size()); final ClusterState state = clusterService.state(); blockedIndices.forEach((index, block) -> { @@ -325,47 +325,51 @@ protected void doRun() throws Exception { }); } - private void waitForShardsReadyForClosing(final Index index, final ClusterBlock closingBlock, - final ClusterState state, final Consumer onResponse) { + private void waitForShardsReadyForClosing(final Index index, + final ClusterBlock closingBlock, + final ClusterState state, + final Consumer onResponse) { final IndexMetaData indexMetaData = state.metaData().index(index); if (indexMetaData == null) { logger.debug("index {} has been blocked before closing and is now deleted, ignoring", index); - onResponse.accept(new AcknowledgedResponse(true)); + onResponse.accept(new IndexResult(index)); return; } final IndexRoutingTable indexRoutingTable = state.routingTable().index(index); if (indexRoutingTable == null || indexMetaData.getState() == IndexMetaData.State.CLOSE) { assert state.blocks().hasIndexBlock(index.getName(), INDEX_CLOSED_BLOCK); logger.debug("index {} has been blocked before closing and is already closed, ignoring", index); - onResponse.accept(new AcknowledgedResponse(true)); + onResponse.accept(new IndexResult(index)); return; } final ImmutableOpenIntMap shards = indexRoutingTable.getShards(); - final AtomicArray results = new AtomicArray<>(shards.size()); + final AtomicArray results = new AtomicArray<>(shards.size()); final CountDown countDown = new CountDown(shards.size()); for (IntObjectCursor shard : shards) { final IndexShardRoutingTable shardRoutingTable = shard.value; - final ShardId shardId = shardRoutingTable.shardId(); + final int shardId = shardRoutingTable.shardId().id(); sendVerifyShardBeforeCloseRequest(shardRoutingTable, closingBlock, new NotifyOnceListener() { @Override public void innerOnResponse(final ReplicationResponse replicationResponse) { - ReplicationResponse.ShardInfo shardInfo = replicationResponse.getShardInfo(); - results.setOnce(shardId.id(), new AcknowledgedResponse(shardInfo.getFailed() == 0)); + ShardResult.Failure[] failures = Arrays.stream(replicationResponse.getShardInfo().getFailures()) + .map(f -> new ShardResult.Failure(f.index(), f.shardId(), f.getCause(), f.nodeId())) + .toArray(ShardResult.Failure[]::new); + results.setOnce(shardId, new ShardResult(shardId, failures)); processIfFinished(); } @Override public void innerOnFailure(final Exception e) { - results.setOnce(shardId.id(), new AcknowledgedResponse(false)); + ShardResult.Failure failure = new ShardResult.Failure(index.getName(), shardId, e); + results.setOnce(shardId, new ShardResult(shardId, new ShardResult.Failure[]{failure})); processIfFinished(); } private void processIfFinished() { if (countDown.countDown()) { - final boolean acknowledged = results.asList().stream().allMatch(AcknowledgedResponse::isAcknowledged); - onResponse.accept(new AcknowledgedResponse(acknowledged)); + onResponse.accept(new IndexResult(index, results.toArray(new ShardResult[results.length()]))); } } }); @@ -396,9 +400,9 @@ private void sendVerifyShardBeforeCloseRequest(final IndexShardRoutingTable shar /** * Step 3 - Move index states from OPEN to CLOSE in cluster state for indices that are ready for closing. */ - static ClusterState closeRoutingTable(final ClusterState currentState, - final Map blockedIndices, - final Map results) { + static Tuple> closeRoutingTable(final ClusterState currentState, + final Map blockedIndices, + final Map verifyResult) { // Remove the index routing table of closed indices if the cluster is in a mixed version // that does not support the replication of closed indices @@ -409,9 +413,10 @@ static ClusterState closeRoutingTable(final ClusterState currentState, final RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable()); final Set closedIndices = new HashSet<>(); - for (Map.Entry result : results.entrySet()) { + Map closingResults = new HashMap<>(verifyResult); + for (Map.Entry result : verifyResult.entrySet()) { final Index index = result.getKey(); - final boolean acknowledged = result.getValue().isAcknowledged(); + final boolean acknowledged = result.getValue().hasFailures() == false; try { if (acknowledged == false) { logger.debug("verification of shards before closing {} failed", index); @@ -424,7 +429,11 @@ static ClusterState closeRoutingTable(final ClusterState currentState, continue; } final ClusterBlock closingBlock = blockedIndices.get(index); + assert closingBlock != null; if (currentState.blocks().hasIndexBlock(index.getName(), closingBlock) == false) { + // we should report error in this case as the index can be left as open. + closingResults.put(result.getKey(), new IndexResult(result.getKey(), new IllegalStateException( + "verification of shards before closing " + index + " succeeded but block has been removed in the meantime"))); logger.debug("verification of shards before closing {} succeeded but block has been removed in the meantime", index); continue; } @@ -450,9 +459,9 @@ static ClusterState closeRoutingTable(final ClusterState currentState, logger.debug("index {} has been deleted since it was blocked before closing, ignoring", index); } } - logger.info("completed closing of indices {}", closedIndices); - return ClusterState.builder(currentState).blocks(blocks).metaData(metadata).routingTable(routingTable.build()).build(); + return Tuple.tuple(ClusterState.builder(currentState).blocks(blocks).metaData(metadata).routingTable(routingTable.build()).build(), + closingResults.values()); } public void openIndex(final OpenIndexClusterStateUpdateRequest request, diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponseTests.java index f86beff7738e3..40c34af51598d 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponseTests.java @@ -19,14 +19,30 @@ package org.elasticsearch.action.admin.indices.close; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.ActionNotFoundTransportException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.elasticsearch.test.VersionUtils.getPreviousVersion; import static org.elasticsearch.test.VersionUtils.randomVersionBetween; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; public class CloseIndexResponseTests extends ESTestCase { @@ -47,11 +63,12 @@ public void testBwcSerialization() throws Exception { { final CloseIndexResponse response = randomResponse(); try (BytesStreamOutput out = new BytesStreamOutput()) { - out.setVersion(randomVersionBetween(random(), Version.V_7_0_0, Version.V_7_1_0)); + out.setVersion(randomVersionBetween(random(), Version.V_7_0_0, getPreviousVersion(Version.V_7_2_0))); response.writeTo(out); final AcknowledgedResponse deserializedResponse = new AcknowledgedResponse(); try (StreamInput in = out.bytes().streamInput()) { + in.setVersion(out.getVersion()); deserializedResponse.readFrom(in); } assertThat(deserializedResponse.isAcknowledged(), equalTo(response.isAcknowledged())); @@ -64,22 +81,136 @@ public void testBwcSerialization() throws Exception { final CloseIndexResponse deserializedResponse = new CloseIndexResponse(); try (StreamInput in = out.bytes().streamInput()) { - in.setVersion(randomVersionBetween(random(), Version.V_7_0_0, Version.V_7_1_0)); + in.setVersion(randomVersionBetween(random(), Version.V_7_0_0, getPreviousVersion(Version.V_7_2_0))); deserializedResponse.readFrom(in); } assertThat(deserializedResponse.isAcknowledged(), equalTo(response.isAcknowledged())); } } + { + final CloseIndexResponse response = randomResponse(); + try (BytesStreamOutput out = new BytesStreamOutput()) { + Version version = randomVersionBetween(random(), Version.V_7_2_0, Version.CURRENT); + out.setVersion(version); + response.writeTo(out); + final CloseIndexResponse deserializedResponse = new CloseIndexResponse(); + try (StreamInput in = out.bytes().streamInput()) { + in.setVersion(version); + deserializedResponse.readFrom(in); + } + assertThat(deserializedResponse.isAcknowledged(), equalTo(response.isAcknowledged())); + assertThat(deserializedResponse.isShardsAcknowledged(), equalTo(response.isShardsAcknowledged())); + if (version.onOrAfter(Version.V_8_0_0)) { + assertThat(deserializedResponse.getIndices(), hasSize(response.getIndices().size())); + } else { + assertThat(deserializedResponse.getIndices(), empty()); + } + } + } } private CloseIndexResponse randomResponse() { - final boolean acknowledged = randomBoolean(); + boolean acknowledged = true; + final String[] indicesNames = generateRandomStringArray(10, 10, false, true); + + final List indexResults = new ArrayList<>(); + for (String indexName : indicesNames) { + final Index index = new Index(indexName, "_na_"); + if (randomBoolean()) { + indexResults.add(new CloseIndexResponse.IndexResult(index)); + } else { + if (randomBoolean()) { + acknowledged = false; + indexResults.add(new CloseIndexResponse.IndexResult(index, randomException(index, 0))); + } else { + final int nbShards = randomIntBetween(1, 5); + CloseIndexResponse.ShardResult[] shards = new CloseIndexResponse.ShardResult[nbShards]; + for (int i = 0; i < nbShards; i++) { + CloseIndexResponse.ShardResult.Failure[] failures = null; + if (randomBoolean()) { + acknowledged = false; + failures = new CloseIndexResponse.ShardResult.Failure[randomIntBetween(1, 3)]; + for (int j = 0; j < failures.length; j++) { + String nodeId = randomAlphaOfLength(5); + failures[j] = new CloseIndexResponse.ShardResult.Failure(indexName, i, randomException(index, i), nodeId); + } + } + shards[i] = new CloseIndexResponse.ShardResult(i, failures); + } + indexResults.add(new CloseIndexResponse.IndexResult(index, shards)); + } + } + + } + final boolean shardsAcknowledged = acknowledged ? randomBoolean() : false; - return new CloseIndexResponse(acknowledged, shardsAcknowledged); + return new CloseIndexResponse(acknowledged, shardsAcknowledged, indexResults); + } + + private static ElasticsearchException randomException(final Index index, final int id) { + return randomFrom( + new IndexNotFoundException(index), + new ActionNotFoundTransportException("test"), + new NoShardAvailableActionException(new ShardId(index, id))); } private static void assertCloseIndexResponse(final CloseIndexResponse actual, final CloseIndexResponse expected) { assertThat(actual.isAcknowledged(), equalTo(expected.isAcknowledged())); assertThat(actual.isShardsAcknowledged(), equalTo(expected.isShardsAcknowledged())); + + for (int i = 0; i < expected.getIndices().size(); i++) { + CloseIndexResponse.IndexResult expectedIndexResult = expected.getIndices().get(i); + CloseIndexResponse.IndexResult actualIndexResult = actual.getIndices().get(i); + assertThat(actualIndexResult.getIndex(), equalTo(expectedIndexResult.getIndex())); + assertThat(actualIndexResult.hasFailures(), equalTo(expectedIndexResult.hasFailures())); + + if (expectedIndexResult.hasFailures() == false) { + assertThat(actualIndexResult.getException(), nullValue()); + if (actualIndexResult.getShards() != null) { + assertThat(Arrays.stream(actualIndexResult.getShards()) + .allMatch(shardResult -> shardResult.hasFailures() == false), is(true)); + } + } + + if (expectedIndexResult.getException() != null) { + assertThat(actualIndexResult.getShards(), nullValue()); + assertThat(actualIndexResult.getException(), notNullValue()); + assertThat(actualIndexResult.getException().getMessage(), equalTo(expectedIndexResult.getException().getMessage())); + assertThat(actualIndexResult.getException().getClass(), equalTo(expectedIndexResult.getException().getClass())); + assertArrayEquals(actualIndexResult.getException().getStackTrace(), expectedIndexResult.getException().getStackTrace()); + } else { + assertThat(actualIndexResult.getException(), nullValue()); + } + + if (expectedIndexResult.getShards() != null) { + assertThat(actualIndexResult.getShards().length, equalTo(expectedIndexResult.getShards().length)); + + for (int j = 0; j < expectedIndexResult.getShards().length; j++) { + CloseIndexResponse.ShardResult expectedShardResult = expectedIndexResult.getShards()[j]; + CloseIndexResponse.ShardResult actualShardResult = actualIndexResult.getShards()[j]; + assertThat(actualShardResult.getId(), equalTo(expectedShardResult.getId())); + assertThat(actualShardResult.hasFailures(), equalTo(expectedShardResult.hasFailures())); + + if (expectedShardResult.hasFailures()) { + assertThat(actualShardResult.getFailures().length, equalTo(expectedShardResult.getFailures().length)); + + for (int k = 0; k < expectedShardResult.getFailures().length; k++) { + CloseIndexResponse.ShardResult.Failure expectedFailure = expectedShardResult.getFailures()[k]; + CloseIndexResponse.ShardResult.Failure actualFailure = actualShardResult.getFailures()[k]; + assertThat(actualFailure.getNodeId(), equalTo(expectedFailure.getNodeId())); + assertThat(actualFailure.index(), equalTo(expectedFailure.index())); + assertThat(actualFailure.shardId(), equalTo(expectedFailure.shardId())); + assertThat(actualFailure.getCause().getMessage(), equalTo(expectedFailure.getCause().getMessage())); + assertThat(actualFailure.getCause().getClass(), equalTo(expectedFailure.getCause().getClass())); + assertArrayEquals(actualFailure.getCause().getStackTrace(), expectedFailure.getCause().getStackTrace()); + } + } else { + assertThat(actualShardResult.getFailures(), nullValue()); + } + } + } else { + assertThat(actualIndexResult.getShards(), nullValue()); + } + } } } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java index 36bca0be1c2d5..b655a98379553 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java @@ -20,7 +20,8 @@ package org.elasticsearch.cluster.metadata; import org.elasticsearch.Version; -import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; +import org.elasticsearch.action.admin.indices.close.CloseIndexResponse.IndexResult; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.RestoreInProgress; @@ -50,6 +51,7 @@ import org.elasticsearch.test.ESTestCase; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -69,6 +71,7 @@ import static org.elasticsearch.cluster.shards.ClusterShardLimitIT.ShardCounts.forDataNodeCount; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -80,7 +83,7 @@ public class MetaDataIndexStateServiceTests extends ESTestCase { public void testCloseRoutingTable() { final Set nonBlockedIndices = new HashSet<>(); final Map blockedIndices = new HashMap<>(); - final Map results = new HashMap<>(); + final Map results = new HashMap<>(); ClusterState state = ClusterState.builder(new ClusterName("testCloseRoutingTable")).build(); for (int i = 0; i < randomIntBetween(1, 25); i++) { @@ -92,12 +95,17 @@ public void testCloseRoutingTable() { } else { final ClusterBlock closingBlock = MetaDataIndexStateService.createIndexClosingBlock(); state = addBlockedIndex(indexName, randomIntBetween(1, 5), randomIntBetween(0, 5), state, closingBlock); - blockedIndices.put(state.metaData().index(indexName).getIndex(), closingBlock); - results.put(state.metaData().index(indexName).getIndex(), new AcknowledgedResponse(randomBoolean())); + final Index index = state.metaData().index(indexName).getIndex(); + blockedIndices.put(index, closingBlock); + if (randomBoolean()) { + results.put(index, new CloseIndexResponse.IndexResult(index)); + } else { + results.put(index, new CloseIndexResponse.IndexResult(index, new Exception("test"))); + } } } - final ClusterState updatedState = MetaDataIndexStateService.closeRoutingTable(state, blockedIndices, results); + final ClusterState updatedState = MetaDataIndexStateService.closeRoutingTable(state, blockedIndices, results).v1(); assertThat(updatedState.metaData().indices().size(), equalTo(nonBlockedIndices.size() + blockedIndices.size())); for (Index nonBlockedIndex : nonBlockedIndices) { @@ -105,7 +113,7 @@ public void testCloseRoutingTable() { assertThat(updatedState.blocks().hasIndexBlockWithId(nonBlockedIndex.getName(), INDEX_CLOSED_BLOCK_ID), is(false)); } for (Index blockedIndex : blockedIndices.keySet()) { - if (results.get(blockedIndex).isAcknowledged()) { + if (results.get(blockedIndex).hasFailures() == false) { assertIsClosed(blockedIndex.getName(), updatedState); } else { assertIsOpened(blockedIndex.getName(), updatedState); @@ -117,7 +125,7 @@ public void testCloseRoutingTable() { public void testCloseRoutingTableRemovesRoutingTable() { final Set nonBlockedIndices = new HashSet<>(); final Map blockedIndices = new HashMap<>(); - final Map results = new HashMap<>(); + final Map results = new HashMap<>(); final ClusterBlock closingBlock = MetaDataIndexStateService.createIndexClosingBlock(); ClusterState state = ClusterState.builder(new ClusterName("testCloseRoutingTableRemovesRoutingTable")).build(); @@ -129,8 +137,13 @@ public void testCloseRoutingTableRemovesRoutingTable() { nonBlockedIndices.add(state.metaData().index(indexName).getIndex()); } else { state = addBlockedIndex(indexName, randomIntBetween(1, 5), randomIntBetween(0, 5), state, closingBlock); - blockedIndices.put(state.metaData().index(indexName).getIndex(), closingBlock); - results.put(state.metaData().index(indexName).getIndex(), new AcknowledgedResponse(randomBoolean())); + final Index index = state.metaData().index(indexName).getIndex(); + blockedIndices.put(index, closingBlock); + if (randomBoolean()) { + results.put(index, new CloseIndexResponse.IndexResult(index)); + } else { + results.put(index, new CloseIndexResponse.IndexResult(index, new Exception("test"))); + } } } @@ -142,7 +155,7 @@ public void testCloseRoutingTableRemovesRoutingTable() { new HashSet<>(Arrays.asList(DiscoveryNode.Role.values())), Version.V_7_2_0))) .build(); - state = MetaDataIndexStateService.closeRoutingTable(state, blockedIndices, results); + state = MetaDataIndexStateService.closeRoutingTable(state, blockedIndices, results).v1(); assertThat(state.metaData().indices().size(), equalTo(nonBlockedIndices.size() + blockedIndices.size())); for (Index nonBlockedIndex : nonBlockedIndices) { @@ -150,7 +163,7 @@ public void testCloseRoutingTableRemovesRoutingTable() { assertThat(state.blocks().hasIndexBlockWithId(nonBlockedIndex.getName(), INDEX_CLOSED_BLOCK_ID), is(false)); } for (Index blockedIndex : blockedIndices.keySet()) { - if (results.get(blockedIndex).isAcknowledged()) { + if (results.get(blockedIndex).hasFailures() == false) { IndexMetaData indexMetaData = state.metaData().index(blockedIndex); assertThat(indexMetaData.getState(), is(IndexMetaData.State.CLOSE)); Settings indexSettings = indexMetaData.getSettings(); @@ -329,6 +342,33 @@ public void testIsIndexVerifiedBeforeClosed() { } } + public void testCloseFailedIfBlockDisappeared() { + ClusterState state = ClusterState.builder(new ClusterName("failedIfBlockDisappeared")).build(); + Map blockedIndices = new HashMap<>(); + int numIndices = between(1, 10); + Set disappearedIndices = new HashSet<>(); + Map verifyResults = new HashMap<>(); + for (int i = 0; i < numIndices; i++) { + String indexName = "test-" + i; + state = addOpenedIndex(indexName, randomIntBetween(1, 3), randomIntBetween(0, 3), state); + Index index = state.metaData().index(indexName).getIndex(); + state = MetaDataIndexStateService.addIndexClosedBlocks(new Index[]{index}, blockedIndices, state); + if (randomBoolean()) { + state = ClusterState.builder(state) + .blocks(ClusterBlocks.builder().blocks(state.blocks()).removeIndexBlocks(indexName).build()) + .build(); + disappearedIndices.add(index); + } + verifyResults.put(index, new IndexResult(index)); + } + Collection closingResults = + MetaDataIndexStateService.closeRoutingTable(state, blockedIndices, unmodifiableMap(verifyResults)).v2(); + assertThat(closingResults, hasSize(numIndices)); + Set failedIndices = closingResults.stream().filter(IndexResult::hasFailures) + .map(IndexResult::getIndex).collect(Collectors.toSet()); + assertThat(failedIndices, equalTo(disappearedIndices)); + } + public static ClusterState createClusterForShardLimitTest(int nodesInCluster, int openIndexShards, int openIndexReplicas, int closedIndexShards, int closedIndexReplicas, Settings clusterSettings) { ImmutableOpenMap.Builder dataNodes = ImmutableOpenMap.builder(); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceUtils.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceUtils.java index 5ee6a7c60da3d..7c94a42bd0cb5 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceUtils.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceUtils.java @@ -18,7 +18,7 @@ */ package org.elasticsearch.cluster.metadata; -import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.index.Index; @@ -43,7 +43,7 @@ public static ClusterState addIndexClosedBlocks(final Index[] indices, final Map */ public static ClusterState closeRoutingTable(final ClusterState state, final Map blockedIndices, - final Map results) { - return MetaDataIndexStateService.closeRoutingTable(state, blockedIndices, results); + final Map results) { + return MetaDataIndexStateService.closeRoutingTable(state, blockedIndices, results).v1(); } } diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index a8c47f5d3ef39..433662f95d4e0 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest; import org.elasticsearch.action.admin.cluster.reroute.TransportClusterRerouteAction; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; +import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction; import org.elasticsearch.action.admin.indices.close.TransportVerifyShardBeforeCloseAction; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; @@ -40,7 +41,6 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.DestructiveOperations; import org.elasticsearch.action.support.PlainActionFuture; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.action.support.master.TransportMasterNodeActionUtils; @@ -227,8 +227,8 @@ public ClusterState closeIndices(ClusterState state, CloseIndexRequest request) final Map blockedIndices = new HashMap<>(); ClusterState newState = MetaDataIndexStateServiceUtils.addIndexClosedBlocks(concreteIndices, blockedIndices, state); - newState = MetaDataIndexStateServiceUtils.closeRoutingTable(newState, blockedIndices, blockedIndices.keySet().stream() - .collect(Collectors.toMap(Function.identity(), index -> new AcknowledgedResponse(true)))); + newState = MetaDataIndexStateServiceUtils.closeRoutingTable(newState, blockedIndices, + blockedIndices.keySet().stream().collect(Collectors.toMap(Function.identity(), CloseIndexResponse.IndexResult::new))); return allocationService.reroute(newState, "indices closed"); } diff --git a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java index 6f666483b18d0..b39a008de5f4f 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java @@ -20,6 +20,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.admin.indices.close.CloseIndexRequestBuilder; import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.Client; @@ -45,6 +46,7 @@ import org.elasticsearch.test.InternalTestCluster; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Locale; import java.util.concurrent.CountDownLatch; @@ -64,6 +66,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; public class CloseIndexIT extends ESIntegTestCase { @@ -115,7 +118,7 @@ public void testCloseIndex() throws Exception { indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, nbDocs) .mapToObj(i -> client().prepareIndex(indexName, "_doc", String.valueOf(i)).setSource("num", i)).collect(toList())); - assertBusy(() -> assertAcked(client().admin().indices().prepareClose(indexName))); + assertBusy(() -> closeIndices(indexName)); assertIndexIsClosed(indexName); assertAcked(client().admin().indices().prepareOpen(indexName)); @@ -130,13 +133,17 @@ public void testCloseAlreadyClosedIndex() throws Exception { indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, randomIntBetween(1, 10)) .mapToObj(i -> client().prepareIndex(indexName, "_doc", String.valueOf(i)).setSource("num", i)).collect(toList())); } - // First close should be acked - assertBusy(() -> assertAcked(client().admin().indices().prepareClose(indexName))); + // First close should be fully acked + assertBusy(() -> closeIndices(indexName)); assertIndexIsClosed(indexName); // Second close should be acked too final ActiveShardCount activeShardCount = randomFrom(ActiveShardCount.NONE, ActiveShardCount.DEFAULT, ActiveShardCount.ALL); - assertBusy(() -> assertAcked(client().admin().indices().prepareClose(indexName).setWaitForActiveShards(activeShardCount))); + assertBusy(() -> { + CloseIndexResponse response = client().admin().indices().prepareClose(indexName).setWaitForActiveShards(activeShardCount).get(); + assertAcked(response); + assertTrue(response.getIndices().isEmpty()); + }); assertIndexIsClosed(indexName); } @@ -150,7 +157,7 @@ public void testCloseUnassignedIndex() throws Exception { assertThat(clusterState.metaData().indices().get(indexName).getState(), is(IndexMetaData.State.OPEN)); assertThat(clusterState.routingTable().allShards().stream().allMatch(ShardRouting::unassigned), is(true)); - assertBusy(() -> assertAcked(client().admin().indices().prepareClose(indexName).setWaitForActiveShards(ActiveShardCount.NONE))); + assertBusy(() -> closeIndices(client().admin().indices().prepareClose(indexName).setWaitForActiveShards(ActiveShardCount.NONE))); assertIndexIsClosed(indexName); } @@ -198,7 +205,7 @@ public void testCloseWhileIndexingDocuments() throws Exception { indexer.setAssertNoFailuresOnStop(false); waitForDocs(randomIntBetween(10, 50), indexer); - assertBusy(() -> assertAcked(client().admin().indices().prepareClose(indexName))); + assertBusy(() -> closeIndices(indexName)); indexer.stop(); nbDocs += indexer.totalIndexedDocs(); @@ -345,6 +352,9 @@ public void testCloseIndexWaitForActiveShards() throws Exception { assertThat(client().admin().cluster().prepareHealth(indexName).get().getStatus(), is(ClusterHealthStatus.GREEN)); assertTrue(closeIndexResponse.isAcknowledged()); assertTrue(closeIndexResponse.isShardsAcknowledged()); + assertThat(closeIndexResponse.getIndices().get(0), notNullValue()); + assertThat(closeIndexResponse.getIndices().get(0).hasFailures(), is(false)); + assertThat(closeIndexResponse.getIndices().get(0).getIndex().getName(), equalTo(indexName)); assertIndexIsClosed(indexName); } @@ -448,6 +458,36 @@ public void testResyncPropagatePrimaryTerm() throws Exception { } } + private static void closeIndices(final String... indices) { + closeIndices(client().admin().indices().prepareClose(indices)); + } + + private static void closeIndices(final CloseIndexRequestBuilder requestBuilder) { + final CloseIndexResponse response = requestBuilder.get(); + assertThat(response.isAcknowledged(), is(true)); + assertThat(response.isShardsAcknowledged(), is(true)); + + final String[] indices = requestBuilder.request().indices(); + if (indices != null) { + assertThat(response.getIndices().size(), equalTo(indices.length)); + for (String index : indices) { + CloseIndexResponse.IndexResult indexResult = response.getIndices().stream() + .filter(result -> index.equals(result.getIndex().getName())).findFirst().get(); + assertThat(indexResult, notNullValue()); + assertThat(indexResult.hasFailures(), is(false)); + assertThat(indexResult.getException(), nullValue()); + assertThat(indexResult.getShards(), notNullValue()); + Arrays.stream(indexResult.getShards()).forEach(shardResult -> { + assertThat(shardResult.hasFailures(), is(false)); + assertThat(shardResult.getFailures(), notNullValue()); + assertThat(shardResult.getFailures().length, equalTo(0)); + }); + } + } else { + assertThat(response.getIndices().size(), equalTo(0)); + } + } + static void assertIndexIsClosed(final String... indices) { final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); for (String index : indices) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/CloseFollowerIndexStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/CloseFollowerIndexStepTests.java index 368afaa26d0cc..4c00485e631e2 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/CloseFollowerIndexStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/CloseFollowerIndexStepTests.java @@ -44,7 +44,7 @@ public void testCloseFollowingIndex() { assertThat(closeIndexRequest.indices()[0], equalTo("follower-index")); @SuppressWarnings("unchecked") ActionListener listener = (ActionListener) invocation.getArguments()[1]; - listener.onResponse(new CloseIndexResponse(true, true)); + listener.onResponse(new CloseIndexResponse(true, true, Collections.emptyList())); return null; }).when(indicesClient).close(Mockito.any(), Mockito.any());