Skip to content

Correcting the index version filter in migration reindex logic #118487

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.FeatureFlag;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContent;
Expand All @@ -39,10 +43,24 @@ public class ReindexDataStreamAction extends ActionType<ReindexDataStreamAction.
public static final ParseField SOURCE_FIELD = new ParseField("source");
public static final ParseField INDEX_FIELD = new ParseField("index");

/*
* The version before which we do not support writes in the _next_ major version of Elasticsearch. For example, Elasticsearch 10.x will
* not support writing to indices created before version 9.0.0.
*/
private static final IndexVersion MINIMUM_WRITEABLE_VERSION_AFTER_UPGRADE = IndexVersions.UPGRADE_TO_LUCENE_10_0_0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good explanation and naming 👍


public ReindexDataStreamAction() {
super(NAME);
}

/*
* This predicate allows through only indices that were created with a previous lucene version, meaning that they need to be reindexed
* in order to be writable in the _next_ lucene version.
*/
public static Predicate<Index> getOldIndexVersionPredicate(Metadata metadata) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe change predicate name to hasOldIndexVersion

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Err maybe not. I see that the current naming is meant to explain that you are making a predicate from the metadata

return index -> metadata.index(index).getCreationVersion().onOrBefore(MINIMUM_WRITEABLE_VERSION_AFTER_UPGRADE);
}

public enum Mode {
UPGRADE
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.xpack.migrate.task.ReindexDataStreamTaskParams;

import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.TASK_ID_PREFIX;
import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.getOldIndexVersionPredicate;

/*
* This transport action creates a new persistent task for reindexing the source data stream given in the request. On successful creation
Expand Down Expand Up @@ -67,10 +68,7 @@ protected void doExecute(Task task, ReindexDataStreamRequest request, ActionList
return;
}
int totalIndices = dataStream.getIndices().size();
int totalIndicesToBeUpgraded = (int) dataStream.getIndices()
.stream()
.filter(index -> metadata.index(index).getCreationVersion().isLegacyIndexVersion())
.count();
int totalIndicesToBeUpgraded = (int) dataStream.getIndices().stream().filter(getOldIndexVersionPredicate(metadata)).count();
ReindexDataStreamTaskParams params = new ReindexDataStreamTaskParams(
sourceDataStreamName,
transportService.getThreadPool().absoluteTimeInMillis(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.List;
import java.util.Map;

import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.getOldIndexVersionPredicate;

public class ReindexDataStreamPersistentTaskExecutor extends PersistentTasksExecutor<ReindexDataStreamTaskParams> {
private static final TimeValue TASK_KEEP_ALIVE_TIME = TimeValue.timeValueDays(1);
private final Client client;
Expand Down Expand Up @@ -72,7 +74,7 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask
if (dataStreamInfos.size() == 1) {
List<Index> indices = dataStreamInfos.getFirst().getDataStream().getIndices();
List<Index> indicesToBeReindexed = indices.stream()
.filter(index -> clusterService.state().getMetadata().index(index).getCreationVersion().isLegacyIndexVersion())
.filter(getOldIndexVersionPredicate(clusterService.state().metadata()))
.toList();
reindexDataStreamTask.setPendingIndicesCount(indicesToBeReindexed.size());
for (Index index : indicesToBeReindexed) {
Expand Down