diff --git a/docs/changelog/84166.yaml b/docs/changelog/84166.yaml new file mode 100644 index 0000000000000..163f297b0c5db --- /dev/null +++ b/docs/changelog/84166.yaml @@ -0,0 +1,5 @@ +pr: 84166 +summary: Remove LegacyCTRAL from `TransportRolloverAction` +area: Indices APIs +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java index 751d5b0f1cc03..541ddabc9cb05 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java @@ -10,7 +10,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; @@ -45,6 +44,7 @@ import org.elasticsearch.transport.TransportService; import java.time.Instant; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -61,7 +61,6 @@ public class TransportRolloverAction extends TransportMasterNodeAction { - // Now that we have the stats for the cluster, we need to know the - // names of the index for which we should evaluate + // Now that we have the stats for the cluster, we need to know the names of the index for which we should evaluate // conditions, as well as what our newly created index *would* be. final MetadataRolloverService.NameResolution trialRolloverNames = rolloverService.resolveRolloverNames( oldState, @@ -244,37 +245,66 @@ static Condition.Stats buildStats(@Nullable final IndexMetadata metadata, @Nulla } } - class RolloverTask implements ClusterStateTaskListener { - - private final RolloverRequest rolloverRequest; - private final IndicesStatsResponse statsResponse; - private final RolloverResponse trialRolloverResponse; - private final ActionListener listener; - - private boolean clusterStateProcessed = false; - // Holders for what our final source and rolled over index names are as well as the - // conditions met to cause the rollover, these are needed so we wait on and report - // the correct indices and conditions in the clusterStateProcessed method - private final SetOnce sourceIndex = new SetOnce<>(); - private final SetOnce rolloverIndex = new SetOnce<>(); - private final SetOnce> conditionResults = new SetOnce<>(); - - RolloverTask( - RolloverRequest rolloverRequest, - IndicesStatsResponse statsResponse, - RolloverResponse trialRolloverResponse, - ActionListener listener - ) { - this.rolloverRequest = rolloverRequest; - this.statsResponse = statsResponse; - this.trialRolloverResponse = trialRolloverResponse; - this.listener = listener; + record RolloverTask( + RolloverRequest rolloverRequest, + IndicesStatsResponse statsResponse, + RolloverResponse trialRolloverResponse, + ActionListener listener + ) implements ClusterStateTaskListener { + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { + assert false : "not called"; + } + } + + record RolloverExecutor( + AllocationService allocationService, + MetadataRolloverService rolloverService, + ActiveShardsObserver activeShardsObserver + ) implements ClusterStateTaskExecutor { + @Override + public ClusterState execute(ClusterState currentState, List> taskContexts) throws Exception { + final var results = new ArrayList(taskContexts.size()); + var state = currentState; + for (final var taskContext : taskContexts) { + try { + state = executeTask(state, results, taskContext); + } catch (Exception e) { + taskContext.onFailure(e); + } + } + + if (state != currentState) { + var reason = new StringBuilder(); + Strings.collectionToDelimitedStringWithLimit( + (Iterable) () -> results.stream().map(t -> t.sourceIndexName() + "->" + t.rolloverIndexName()).iterator(), + ",", + "bulk rollover [", + "]", + 1024, + reason + ); + state = allocationService.reroute(state, reason.toString()); + } + return state; } - ClusterState performRollover(ClusterState currentState) throws Exception { - // Regenerate the rollover names, as a rollover could have happened - // in between the pre-check and the cluster state update - final MetadataRolloverService.NameResolution rolloverNames = rolloverService.resolveRolloverNames( + public ClusterState executeTask( + ClusterState currentState, + List results, + TaskContext rolloverTaskContext + ) throws Exception { + final var rolloverTask = rolloverTaskContext.getTask(); + final var rolloverRequest = rolloverTask.rolloverRequest(); + + // Regenerate the rollover names, as a rollover could have happened in between the pre-check and the cluster state update + final var rolloverNames = rolloverService.resolveRolloverNames( currentState, rolloverRequest.getRolloverTarget(), rolloverRequest.getNewIndexName(), @@ -285,20 +315,18 @@ ClusterState performRollover(ClusterState currentState) throws Exception { // Re-evaluate the conditions, now with our final source index name final Map postConditionResults = evaluateConditions( rolloverRequest.getConditions().values(), - buildStats(currentState.metadata().index(sourceIndexName), statsResponse) + buildStats(currentState.metadata().index(sourceIndexName), rolloverTask.statsResponse()) ); final List> metConditions = rolloverRequest.getConditions() .values() .stream() .filter(condition -> postConditionResults.get(condition.toString())) .collect(Collectors.toList()); - // Update the final condition results so they can be used when returning the response - conditionResults.set(postConditionResults); if (postConditionResults.size() == 0 || metConditions.size() > 0) { - clusterStateProcessed = true; + // Perform the actual rollover - MetadataRolloverService.RolloverResult rolloverResult = rolloverService.rolloverClusterState( + final var rolloverResult = rolloverService.rolloverClusterState( currentState, rolloverRequest.getRolloverTarget(), rolloverRequest.getNewIndexName(), @@ -308,100 +336,41 @@ ClusterState performRollover(ClusterState currentState) throws Exception { false, false ); + results.add(rolloverResult); logger.trace("rollover result [{}]", rolloverResult); - // Update the "final" source and resulting rollover index names. - // Note that we use the actual rollover result for these, because - // even though we're single threaded, it's possible for the - // rollover names generated before the actual rollover to be - // different due to things like date resolution - sourceIndex.set(rolloverResult.sourceIndexName()); - rolloverIndex.set(rolloverResult.rolloverIndexName()); - - // Return the new rollover cluster state, which includes the changes that create the new index - return rolloverResult.clusterState(); - } else { - // Upon re-evaluation of the conditions, none were met, so - // therefore do not perform a rollover, returning the current - // cluster state. - return currentState; - } - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - - @Override - public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { - // Now assuming we have a new state and the name of the rolled over index, we need to wait for the - // configured number of active shards, as well as return the names of the indices that were rolled/created - if (clusterStateProcessed) { - assert sourceIndex.get() != null : "source index missing on successful rollover"; - assert rolloverIndex.get() != null : "rollover index missing on successful rollover"; - assert conditionResults.get() != null : "matching rollover conditions missing on successful rollover"; - + rolloverTaskContext.success(rolloverTask.listener().delegateFailure((delegate, ignored) -> + // Now assuming we have a new state and the name of the rolled over index, we need to wait for the configured number of + // active shards, as well as return the names of the indices that were rolled/created activeShardsObserver.waitForActiveShards( - new String[] { rolloverIndex.get() }, + new String[] { rolloverResult.rolloverIndexName() }, rolloverRequest.getCreateIndexRequest().waitForActiveShards(), rolloverRequest.masterNodeTimeout(), - isShardsAcknowledged -> listener.onResponse( + isShardsAcknowledged -> delegate.onResponse( new RolloverResponse( - sourceIndex.get(), - rolloverIndex.get(), - conditionResults.get(), + // Note that we use the actual rollover result for these, because even though we're single threaded, it's + // possible for the rollover names generated before the actual rollover to be different due to things like date + // resolution + rolloverResult.sourceIndexName(), + rolloverResult.rolloverIndexName(), + postConditionResults, false, true, true, isShardsAcknowledged ) ), - listener::onFailure - ); - } else { - // We did not roll over due to conditions not being met inside the cluster state update - listener.onResponse(trialRolloverResponse); - } - } - } + delegate::onFailure + ))); - static class RolloverExecutor implements ClusterStateTaskExecutor { - - private final AllocationService allocationService; - - RolloverExecutor(AllocationService allocationService) { - this.allocationService = allocationService; - } - - @Override - public ClusterState execute(ClusterState currentState, List> taskContexts) throws Exception { - ClusterState state = currentState; - for (final var taskContext : taskContexts) { - try { - final var task = taskContext.getTask(); - state = task.performRollover(state); - taskContext.success(new LegacyClusterTaskResultActionListener(task, currentState)); - } catch (Exception e) { - taskContext.onFailure(e); - } - } - - if (state != currentState) { - var reason = new StringBuilder(); - Strings.collectionToDelimitedStringWithLimit( - (Iterable) () -> taskContexts.stream() - .map(t -> t.getTask().sourceIndex.get() + "->" + t.getTask().rolloverIndex.get()) - .iterator(), - ",", - "bulk rollover [", - "]", - 1024, - reason - ); - state = allocationService.reroute(state, reason.toString()); + // Return the new rollover cluster state, which includes the changes that create the new index + return rolloverResult.clusterState(); + } else { + // Upon re-evaluation of the conditions, none were met, so therefore do not perform a rollover, returning the current + // cluster state. + rolloverTaskContext.success(rolloverTask.listener().map(ignored -> rolloverTask.trialRolloverResponse())); + return currentState; } - return state; } } }