diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java index a0a6d31d37cac..a53ff9133cda3 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java @@ -46,6 +46,8 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT; + private boolean performReroute = true; + public CreateIndexClusterStateUpdateRequest(String cause, String index, String providedName) { this.cause = cause; this.index = index; @@ -175,6 +177,15 @@ public CreateIndexClusterStateUpdateRequest dataStreamName(String dataStreamName return this; } + public boolean performReroute() { + return performReroute; + } + + public CreateIndexClusterStateUpdateRequest performReroute(boolean performReroute) { + this.performReroute = performReroute; + return this; + } + @Override public String toString() { return "CreateIndexClusterStateUpdateRequest{" diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java index 8797848c11dfa..e7b148c9a179d 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java @@ -367,7 +367,8 @@ static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest( .settings(b.build()) .aliases(createIndexRequest.aliases()) .waitForActiveShards(ActiveShardCount.NONE) // not waiting for shards here, will wait on the alias switch operation - .mappings(createIndexRequest.mappings()); + .mappings(createIndexRequest.mappings()) + .performReroute(false); } /** diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java index feda87c3dca98..2ed4e047de3ca 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java @@ -23,13 +23,18 @@ import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterStateTaskConfig; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.Nullable; @@ -44,6 +49,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; /** @@ -52,9 +58,12 @@ public class TransportRolloverAction extends TransportMasterNodeAction { private static final Logger logger = LogManager.getLogger(TransportRolloverAction.class); + private static final ClusterStateTaskConfig ROLLOVER_TASK_CONFIG = ClusterStateTaskConfig.build(Priority.NORMAL); + private final MetadataRolloverService rolloverService; private final ActiveShardsObserver activeShardsObserver; private final Client client; + private final ClusterStateTaskExecutor rolloverTaskExecutor; @Inject public TransportRolloverAction( @@ -64,7 +73,8 @@ public TransportRolloverAction( ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, MetadataRolloverService rolloverService, - Client client + Client client, + AllocationService allocationService ) { super( RolloverAction.NAME, @@ -80,6 +90,33 @@ public TransportRolloverAction( this.rolloverService = rolloverService; this.client = client; this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool); + this.rolloverTaskExecutor = (currentState, tasks) -> { + ClusterStateTaskExecutor.ClusterTasksResult.Builder builder = ClusterStateTaskExecutor.ClusterTasksResult + .builder(); + ClusterState state = currentState; + for (RolloverTask task : tasks) { + try { + state = task.performRollover(state); + builder.success(task); + } catch (Exception e) { + builder.failure(task, e); + } + } + + if (state != currentState) { + var reason = new StringBuilder(); + Strings.collectionToDelimitedStringWithLimit( + (Iterable) () -> tasks.stream().map(t -> t.sourceIndex.get() + "->" + t.rolloverIndex.get()).iterator(), + ",", + "bulk rollover [", + "]", + 1024, + reason + ); + state = allocationService.reroute(state, reason.toString()); + } + return builder.build(state); + }; } @Override @@ -150,144 +187,30 @@ protected void masterOperation( return; } - // Holders for what our final source and rolled over index names are as well as the - // conditions met to cause the rollover, these are needed so we wait on and report - // the correct indices and conditions in the clusterStateProcessed method - final SetOnce sourceIndex = new SetOnce<>(); - final SetOnce rolloverIndex = new SetOnce<>(); - final SetOnce> conditionResults = new SetOnce<>(); - final List> trialMetConditions = rolloverRequest.getConditions() .values() .stream() .filter(condition -> trialConditionResults.get(condition.toString())) .collect(Collectors.toList()); + final RolloverResponse trailRolloverResponse = new RolloverResponse( + trialSourceIndexName, + trialRolloverIndexName, + trialConditionResults, + false, + false, + false, + false + ); + // Pre-check the conditions to see whether we should submit a new cluster state task if (trialConditionResults.size() == 0 || trialMetConditions.size() > 0) { - - // Submit the cluster state, this can be thought of as a "synchronized" - // block in that it is single-threaded on the master node - clusterService.submitStateUpdateTask( - "rollover_index source [" + trialRolloverIndexName + "] to target [" + trialRolloverIndexName + "]", - new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - // Regenerate the rollover names, as a rollover could have happened - // in between the pre-check and the cluster state update - final MetadataRolloverService.NameResolution rolloverNames = rolloverService.resolveRolloverNames( - currentState, - rolloverRequest.getRolloverTarget(), - rolloverRequest.getNewIndexName(), - rolloverRequest.getCreateIndexRequest() - ); - final String sourceIndexName = rolloverNames.sourceName; - - // Re-evaluate the conditions, now with our final source index name - final Map postConditionResults = evaluateConditions( - rolloverRequest.getConditions().values(), - buildStats(metadata.index(sourceIndexName), statsResponse) - ); - final List> metConditions = rolloverRequest.getConditions() - .values() - .stream() - .filter(condition -> postConditionResults.get(condition.toString())) - .collect(Collectors.toList()); - // Update the final condition results so they can be used when returning the response - conditionResults.set(postConditionResults); - - if (postConditionResults.size() == 0 || metConditions.size() > 0) { - // Perform the actual rollover - MetadataRolloverService.RolloverResult rolloverResult = rolloverService.rolloverClusterState( - currentState, - rolloverRequest.getRolloverTarget(), - rolloverRequest.getNewIndexName(), - rolloverRequest.getCreateIndexRequest(), - metConditions, - false, - false - ); - logger.trace("rollover result [{}]", rolloverResult); - - // Update the "final" source and resulting rollover index names. - // Note that we use the actual rollover result for these, because - // even though we're single threaded, it's possible for the - // rollover names generated before the actual rollover to be - // different due to things like date resolution - sourceIndex.set(rolloverResult.sourceIndexName); - rolloverIndex.set(rolloverResult.rolloverIndexName); - - // Return the new rollover cluster state, which includes the changes that create the new index - return rolloverResult.clusterState; - } else { - // Upon re-evaluation of the conditions, none were met, so - // therefore do not perform a rollover, returning the current - // cluster state. - return currentState; - } - } - - @Override - public void onFailure(String source, Exception e) { - listener.onFailure(e); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - // Now assuming we have a new state and the name of the rolled over index, we need to wait for the - // configured number of active shards, as well as return the names of the indices that were rolled/created - if (newState.equals(oldState) == false) { - assert sourceIndex.get() != null : "source index missing on successful rollover"; - assert rolloverIndex.get() != null : "rollover index missing on successful rollover"; - assert conditionResults.get() != null : "matching rollover conditions missing on successful rollover"; - - activeShardsObserver.waitForActiveShards( - new String[] { rolloverIndex.get() }, - rolloverRequest.getCreateIndexRequest().waitForActiveShards(), - rolloverRequest.masterNodeTimeout(), - isShardsAcknowledged -> listener.onResponse( - new RolloverResponse( - sourceIndex.get(), - rolloverIndex.get(), - conditionResults.get(), - false, - true, - true, - isShardsAcknowledged - ) - ), - listener::onFailure - ); - } else { - // We did not roll over due to conditions not being met inside the cluster state update - listener.onResponse( - new RolloverResponse( - trialSourceIndexName, - trialRolloverIndexName, - trialConditionResults, - false, - false, - false, - false - ) - ); - } - } - } - ); + String source = "rollover_index source [" + trialRolloverIndexName + "] to target [" + trialRolloverIndexName + "]"; + RolloverTask rolloverTask = new RolloverTask(rolloverRequest, statsResponse, trailRolloverResponse, listener); + clusterService.submitStateUpdateTask(source, rolloverTask, ROLLOVER_TASK_CONFIG, rolloverTaskExecutor, rolloverTask); } else { // conditions not met - listener.onResponse( - new RolloverResponse( - trialSourceIndexName, - trialRolloverIndexName, - trialConditionResults, - false, - false, - false, - false - ) - ); + listener.onResponse(trailRolloverResponse); } }, listener::onFailure) ); @@ -333,4 +256,125 @@ static Condition.Stats buildStats(@Nullable final IndexMetadata metadata, @Nulla ); } } + + class RolloverTask implements ClusterStateTaskListener { + + private final RolloverRequest rolloverRequest; + private final IndicesStatsResponse statsResponse; + private final RolloverResponse trialRolloverResponse; + private final ActionListener listener; + + private final AtomicBoolean conditionsMet = new AtomicBoolean(false); + // Holders for what our final source and rolled over index names are as well as the + // conditions met to cause the rollover, these are needed so we wait on and report + // the correct indices and conditions in the clusterStateProcessed method + private final SetOnce sourceIndex = new SetOnce<>(); + private final SetOnce rolloverIndex = new SetOnce<>(); + private final SetOnce> conditionResults = new SetOnce<>(); + + RolloverTask( + RolloverRequest rolloverRequest, + IndicesStatsResponse statsResponse, + RolloverResponse trialRolloverResponse, + ActionListener listener + ) { + this.rolloverRequest = rolloverRequest; + this.statsResponse = statsResponse; + this.trialRolloverResponse = trialRolloverResponse; + this.listener = listener; + } + + ClusterState performRollover(ClusterState currentState) throws Exception { + // Regenerate the rollover names, as a rollover could have happened + // in between the pre-check and the cluster state update + final MetadataRolloverService.NameResolution rolloverNames = rolloverService.resolveRolloverNames( + currentState, + rolloverRequest.getRolloverTarget(), + rolloverRequest.getNewIndexName(), + rolloverRequest.getCreateIndexRequest() + ); + final String sourceIndexName = rolloverNames.sourceName; + + // Re-evaluate the conditions, now with our final source index name + final Map postConditionResults = evaluateConditions( + rolloverRequest.getConditions().values(), + buildStats(currentState.metadata().index(sourceIndexName), statsResponse) + ); + final List> metConditions = rolloverRequest.getConditions() + .values() + .stream() + .filter(condition -> postConditionResults.get(condition.toString())) + .collect(Collectors.toList()); + // Update the final condition results so they can be used when returning the response + conditionResults.set(postConditionResults); + + if (postConditionResults.size() == 0 || metConditions.size() > 0) { + conditionsMet.set(true); + // Perform the actual rollover + MetadataRolloverService.RolloverResult rolloverResult = rolloverService.rolloverClusterState( + currentState, + rolloverRequest.getRolloverTarget(), + rolloverRequest.getNewIndexName(), + rolloverRequest.getCreateIndexRequest(), + metConditions, + false, + false + ); + logger.trace("rollover result [{}]", rolloverResult); + + // Update the "final" source and resulting rollover index names. + // Note that we use the actual rollover result for these, because + // even though we're single threaded, it's possible for the + // rollover names generated before the actual rollover to be + // different due to things like date resolution + sourceIndex.set(rolloverResult.sourceIndexName); + rolloverIndex.set(rolloverResult.rolloverIndexName); + + // Return the new rollover cluster state, which includes the changes that create the new index + return rolloverResult.clusterState; + } else { + // Upon re-evaluation of the conditions, none were met, so + // therefore do not perform a rollover, returning the current + // cluster state. + return currentState; + } + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + // Now assuming we have a new state and the name of the rolled over index, we need to wait for the + // configured number of active shards, as well as return the names of the indices that were rolled/created + if (conditionsMet.get()) { + assert sourceIndex.get() != null : "source index missing on successful rollover"; + assert rolloverIndex.get() != null : "rollover index missing on successful rollover"; + assert conditionResults.get() != null : "matching rollover conditions missing on successful rollover"; + + activeShardsObserver.waitForActiveShards( + new String[] { rolloverIndex.get() }, + rolloverRequest.getCreateIndexRequest().waitForActiveShards(), + rolloverRequest.masterNodeTimeout(), + isShardsAcknowledged -> listener.onResponse( + new RolloverResponse( + sourceIndex.get(), + rolloverIndex.get(), + conditionResults.get(), + false, + true, + true, + isShardsAcknowledged + ) + ), + listener::onFailure + ); + } else { + // We did not roll over due to conditions not being met inside the cluster state update + listener.onResponse(trialRolloverResponse); + } + } + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java index 3767899f226de..bdde0debc5ded 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java @@ -478,7 +478,10 @@ private ClusterState applyCreateIndexWithTemporaryService( ); indexService.getIndexEventListener().beforeIndexAddedToCluster(indexMetadata.getIndex(), indexMetadata.getSettings()); - return clusterStateCreateIndex(currentState, request.blocks(), indexMetadata, allocationService::reroute, metadataTransformer); + BiFunction rerouteFunction = request.performReroute() + ? allocationService::reroute + : (cs, reason) -> cs; + return clusterStateCreateIndex(currentState, request.blocks(), indexMetadata, rerouteFunction, metadataTransformer); }); } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java index ecc76a05dd500..229b775430146 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; @@ -231,6 +232,7 @@ public void testConditionEvaluationWhenAliasToWriteAndReadIndicesConsidersOnlyPr final MetadataIndexAliasesService mdIndexAliasesService = mock(MetadataIndexAliasesService.class); final Client mockClient = mock(Client.class); + final AllocationService mockAllocationService = mock(AllocationService.class); final Map indexStats = new HashMap<>(); int total = randomIntBetween(500, 1000); @@ -279,7 +281,8 @@ public void testConditionEvaluationWhenAliasToWriteAndReadIndicesConsidersOnlyPr mockActionFilters, mockIndexNameExpressionResolver, rolloverService, - mockClient + mockClient, + mockAllocationService ); // For given alias, verify that condition evaluation fails when the condition doc count is greater than the primaries doc count