Skip to content

Commit 7af83d9

Browse files
committed
Correcting the index version filter in migration reindex logic (elastic#118487)
1 parent e1d83e9 commit 7af83d9

File tree

3 files changed

+23
-5
lines changed

3 files changed

+23
-5
lines changed

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamAction.java

+18
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,14 @@
1313
import org.elasticsearch.action.ActionType;
1414
import org.elasticsearch.action.IndicesRequest;
1515
import org.elasticsearch.action.support.IndicesOptions;
16+
import org.elasticsearch.cluster.metadata.Metadata;
1617
import org.elasticsearch.common.io.stream.StreamInput;
1718
import org.elasticsearch.common.io.stream.StreamOutput;
1819
import org.elasticsearch.common.util.FeatureFlag;
1920
import org.elasticsearch.features.NodeFeature;
21+
import org.elasticsearch.index.Index;
22+
import org.elasticsearch.index.IndexVersion;
23+
import org.elasticsearch.index.IndexVersions;
2024
import org.elasticsearch.xcontent.ConstructingObjectParser;
2125
import org.elasticsearch.xcontent.ParseField;
2226
import org.elasticsearch.xcontent.ToXContent;
@@ -39,10 +43,24 @@ public class ReindexDataStreamAction extends ActionType<ReindexDataStreamAction.
3943
public static final ParseField SOURCE_FIELD = new ParseField("source");
4044
public static final ParseField INDEX_FIELD = new ParseField("index");
4145

46+
/*
47+
* The version before which we do not support writes in the _next_ major version of Elasticsearch. For example, Elasticsearch 10.x will
48+
* not support writing to indices created before version 9.0.0.
49+
*/
50+
private static final IndexVersion MINIMUM_WRITEABLE_VERSION_AFTER_UPGRADE = IndexVersions.UPGRADE_TO_LUCENE_10_0_0;
51+
4252
public ReindexDataStreamAction() {
4353
super(NAME);
4454
}
4555

56+
/*
57+
* This predicate allows through only indices that were created with a previous lucene version, meaning that they need to be reindexed
58+
* in order to be writable in the _next_ lucene version.
59+
*/
60+
public static Predicate<Index> getOldIndexVersionPredicate(Metadata metadata) {
61+
return index -> metadata.index(index).getCreationVersion().onOrBefore(MINIMUM_WRITEABLE_VERSION_AFTER_UPGRADE);
62+
}
63+
4664
public enum Mode {
4765
UPGRADE
4866
}

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.xpack.migrate.task.ReindexDataStreamTaskParams;
2828

2929
import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.TASK_ID_PREFIX;
30+
import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.getOldIndexVersionPredicate;
3031

3132
/*
3233
* This transport action creates a new persistent task for reindexing the source data stream given in the request. On successful creation
@@ -67,10 +68,7 @@ protected void doExecute(Task task, ReindexDataStreamRequest request, ActionList
6768
return;
6869
}
6970
int totalIndices = dataStream.getIndices().size();
70-
int totalIndicesToBeUpgraded = (int) dataStream.getIndices()
71-
.stream()
72-
.filter(index -> metadata.index(index).getCreationVersion().isLegacyIndexVersion())
73-
.count();
71+
int totalIndicesToBeUpgraded = (int) dataStream.getIndices().stream().filter(getOldIndexVersionPredicate(metadata)).count();
7472
ReindexDataStreamTaskParams params = new ReindexDataStreamTaskParams(
7573
sourceDataStreamName,
7674
transportService.getThreadPool().absoluteTimeInMillis(),

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import java.util.List;
2525
import java.util.Map;
2626

27+
import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.getOldIndexVersionPredicate;
28+
2729
public class ReindexDataStreamPersistentTaskExecutor extends PersistentTasksExecutor<ReindexDataStreamTaskParams> {
2830
private static final TimeValue TASK_KEEP_ALIVE_TIME = TimeValue.timeValueDays(1);
2931
private final Client client;
@@ -72,7 +74,7 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask
7274
if (dataStreamInfos.size() == 1) {
7375
List<Index> indices = dataStreamInfos.get(0).getDataStream().getIndices();
7476
List<Index> indicesToBeReindexed = indices.stream()
75-
.filter(index -> clusterService.state().getMetadata().index(index).getCreationVersion().isLegacyIndexVersion())
77+
.filter(getOldIndexVersionPredicate(clusterService.state().metadata()))
7678
.toList();
7779
reindexDataStreamTask.setPendingIndicesCount(indicesToBeReindexed.size());
7880
for (Index index : indicesToBeReindexed) {

0 commit comments

Comments
 (0)