Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/84166.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 84166
summary: Remove LegacyCTRAL from `TransportRolloverAction`
area: Indices APIs
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
Expand Down Expand Up @@ -45,6 +44,7 @@
import org.elasticsearch.transport.TransportService;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
Expand All @@ -61,7 +61,6 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
private static final Logger logger = LogManager.getLogger(TransportRolloverAction.class);

private final MetadataRolloverService rolloverService;
private final ActiveShardsObserver activeShardsObserver;
private final Client client;
private final RolloverExecutor rolloverTaskExecutor;

Expand Down Expand Up @@ -89,8 +88,11 @@ public TransportRolloverAction(
);
this.rolloverService = rolloverService;
this.client = client;
this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool);
this.rolloverTaskExecutor = new RolloverExecutor(allocationService);
this.rolloverTaskExecutor = new RolloverExecutor(
allocationService,
rolloverService,
new ActiveShardsObserver(clusterService, threadPool)
);
}

@Override
Expand Down Expand Up @@ -134,8 +136,7 @@ protected void masterOperation(
statsRequest,

ActionListener.wrap(statsResponse -> {
// Now that we have the stats for the cluster, we need to know the
// names of the index for which we should evaluate
// Now that we have the stats for the cluster, we need to know the names of the index for which we should evaluate
// conditions, as well as what our newly created index *would* be.
final MetadataRolloverService.NameResolution trialRolloverNames = rolloverService.resolveRolloverNames(
oldState,
Expand Down Expand Up @@ -244,37 +245,66 @@ static Condition.Stats buildStats(@Nullable final IndexMetadata metadata, @Nulla
}
}

class RolloverTask implements ClusterStateTaskListener {

private final RolloverRequest rolloverRequest;
private final IndicesStatsResponse statsResponse;
private final RolloverResponse trialRolloverResponse;
private final ActionListener<RolloverResponse> listener;

private boolean clusterStateProcessed = false;
// Holders for what our final source and rolled over index names are as well as the
// conditions met to cause the rollover, these are needed so we wait on and report
// the correct indices and conditions in the clusterStateProcessed method
private final SetOnce<String> sourceIndex = new SetOnce<>();
private final SetOnce<String> rolloverIndex = new SetOnce<>();
private final SetOnce<Map<String, Boolean>> conditionResults = new SetOnce<>();

RolloverTask(
RolloverRequest rolloverRequest,
IndicesStatsResponse statsResponse,
RolloverResponse trialRolloverResponse,
ActionListener<RolloverResponse> listener
) {
this.rolloverRequest = rolloverRequest;
this.statsResponse = statsResponse;
this.trialRolloverResponse = trialRolloverResponse;
this.listener = listener;
record RolloverTask(
RolloverRequest rolloverRequest,
IndicesStatsResponse statsResponse,
RolloverResponse trialRolloverResponse,
ActionListener<RolloverResponse> listener
) implements ClusterStateTaskListener {

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
assert false : "not called";
}
}

record RolloverExecutor(
AllocationService allocationService,
MetadataRolloverService rolloverService,
ActiveShardsObserver activeShardsObserver
) implements ClusterStateTaskExecutor<RolloverTask> {
@Override
public ClusterState execute(ClusterState currentState, List<TaskContext<RolloverTask>> taskContexts) throws Exception {
final var results = new ArrayList<MetadataRolloverService.RolloverResult>(taskContexts.size());
var state = currentState;
for (final var taskContext : taskContexts) {
try {
state = executeTask(state, results, taskContext);
} catch (Exception e) {
taskContext.onFailure(e);
}
}

if (state != currentState) {
var reason = new StringBuilder();
Strings.collectionToDelimitedStringWithLimit(
(Iterable<String>) () -> results.stream().map(t -> t.sourceIndexName() + "->" + t.rolloverIndexName()).iterator(),
",",
"bulk rollover [",
"]",
1024,
reason
);
state = allocationService.reroute(state, reason.toString());
}
return state;
}

ClusterState performRollover(ClusterState currentState) throws Exception {
// Regenerate the rollover names, as a rollover could have happened
// in between the pre-check and the cluster state update
final MetadataRolloverService.NameResolution rolloverNames = rolloverService.resolveRolloverNames(
public ClusterState executeTask(
ClusterState currentState,
List<MetadataRolloverService.RolloverResult> results,
TaskContext<RolloverTask> rolloverTaskContext
) throws Exception {
final var rolloverTask = rolloverTaskContext.getTask();
final var rolloverRequest = rolloverTask.rolloverRequest();

// Regenerate the rollover names, as a rollover could have happened in between the pre-check and the cluster state update
final var rolloverNames = rolloverService.resolveRolloverNames(
currentState,
rolloverRequest.getRolloverTarget(),
rolloverRequest.getNewIndexName(),
Expand All @@ -285,20 +315,18 @@ ClusterState performRollover(ClusterState currentState) throws Exception {
// Re-evaluate the conditions, now with our final source index name
final Map<String, Boolean> postConditionResults = evaluateConditions(
rolloverRequest.getConditions().values(),
buildStats(currentState.metadata().index(sourceIndexName), statsResponse)
buildStats(currentState.metadata().index(sourceIndexName), rolloverTask.statsResponse())
);
final List<Condition<?>> metConditions = rolloverRequest.getConditions()
.values()
.stream()
.filter(condition -> postConditionResults.get(condition.toString()))
.collect(Collectors.toList());
// Update the final condition results so they can be used when returning the response
conditionResults.set(postConditionResults);

if (postConditionResults.size() == 0 || metConditions.size() > 0) {
clusterStateProcessed = true;

// Perform the actual rollover
MetadataRolloverService.RolloverResult rolloverResult = rolloverService.rolloverClusterState(
final var rolloverResult = rolloverService.rolloverClusterState(
currentState,
rolloverRequest.getRolloverTarget(),
rolloverRequest.getNewIndexName(),
Expand All @@ -308,100 +336,41 @@ ClusterState performRollover(ClusterState currentState) throws Exception {
false,
false
);
results.add(rolloverResult);
logger.trace("rollover result [{}]", rolloverResult);

// Update the "final" source and resulting rollover index names.
// Note that we use the actual rollover result for these, because
// even though we're single threaded, it's possible for the
// rollover names generated before the actual rollover to be
// different due to things like date resolution
sourceIndex.set(rolloverResult.sourceIndexName());
rolloverIndex.set(rolloverResult.rolloverIndexName());

// Return the new rollover cluster state, which includes the changes that create the new index
return rolloverResult.clusterState();
} else {
// Upon re-evaluation of the conditions, none were met, so
// therefore do not perform a rollover, returning the current
// cluster state.
return currentState;
}
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
// Now assuming we have a new state and the name of the rolled over index, we need to wait for the
// configured number of active shards, as well as return the names of the indices that were rolled/created
if (clusterStateProcessed) {
assert sourceIndex.get() != null : "source index missing on successful rollover";
assert rolloverIndex.get() != null : "rollover index missing on successful rollover";
assert conditionResults.get() != null : "matching rollover conditions missing on successful rollover";

rolloverTaskContext.success(rolloverTask.listener().delegateFailure((delegate, ignored) ->
// Now assuming we have a new state and the name of the rolled over index, we need to wait for the configured number of
// active shards, as well as return the names of the indices that were rolled/created
activeShardsObserver.waitForActiveShards(
new String[] { rolloverIndex.get() },
new String[] { rolloverResult.rolloverIndexName() },
rolloverRequest.getCreateIndexRequest().waitForActiveShards(),
rolloverRequest.masterNodeTimeout(),
isShardsAcknowledged -> listener.onResponse(
isShardsAcknowledged -> delegate.onResponse(
new RolloverResponse(
sourceIndex.get(),
rolloverIndex.get(),
conditionResults.get(),
// Note that we use the actual rollover result for these, because even though we're single threaded, it's
// possible for the rollover names generated before the actual rollover to be different due to things like date
// resolution
rolloverResult.sourceIndexName(),
rolloverResult.rolloverIndexName(),
postConditionResults,
false,
true,
true,
isShardsAcknowledged
)
),
listener::onFailure
);
} else {
// We did not roll over due to conditions not being met inside the cluster state update
listener.onResponse(trialRolloverResponse);
}
}
}
delegate::onFailure
)));

static class RolloverExecutor implements ClusterStateTaskExecutor<RolloverTask> {

private final AllocationService allocationService;

RolloverExecutor(AllocationService allocationService) {
this.allocationService = allocationService;
}

@Override
public ClusterState execute(ClusterState currentState, List<TaskContext<RolloverTask>> taskContexts) throws Exception {
ClusterState state = currentState;
for (final var taskContext : taskContexts) {
try {
final var task = taskContext.getTask();
state = task.performRollover(state);
taskContext.success(new LegacyClusterTaskResultActionListener(task, currentState));
} catch (Exception e) {
taskContext.onFailure(e);
}
}

if (state != currentState) {
var reason = new StringBuilder();
Strings.collectionToDelimitedStringWithLimit(
(Iterable<String>) () -> taskContexts.stream()
.map(t -> t.getTask().sourceIndex.get() + "->" + t.getTask().rolloverIndex.get())
.iterator(),
",",
"bulk rollover [",
"]",
1024,
reason
);
state = allocationService.reroute(state, reason.toString());
// Return the new rollover cluster state, which includes the changes that create the new index
return rolloverResult.clusterState();
} else {
// Upon re-evaluation of the conditions, none were met, so therefore do not perform a rollover, returning the current
// cluster state.
rolloverTaskContext.success(rolloverTask.listener().map(ignored -> rolloverTask.trialRolloverResponse()));
return currentState;
}
return state;
}
}
}