diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java index f121d858360e2..e6d86705ed40f 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java @@ -1034,7 +1034,6 @@ private Index maybeExecuteRollover( } try { if (dataStream.isIndexManagedByDataStreamLifecycle(currentRunWriteIndex, project::index)) { - DataStreamLifecycle lifecycle = rolloverFailureStore ? dataStream.getFailuresLifecycle() : dataStream.getDataLifecycle(); RolloverRequest rolloverRequest = getDefaultRolloverRequest( rolloverConfiguration, dataStream.getName(), @@ -1100,13 +1099,13 @@ Set maybeExecuteRetention( if (dataRetention == null && failureRetention == null) { return Set.of(); } - List backingIndicesOlderThanRetention = dataStream.getIndicesOlderThan( + Set backingIndicesOlderThanRetention = dataStream.getIndicesOlderThan( project::index, nowSupplier, dataRetention, BACKING_INDICES ); - List failureIndicesOlderThanRetention = dataStream.getIndicesOlderThan( + Set failureIndicesOlderThanRetention = dataStream.getIndicesOlderThan( project::index, nowSupplier, failureRetention, diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java index cdccbb62a8cfc..957e51719766a 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java @@ -241,9 +241,17 @@ public void testOperationsExecutedOnce() { .stream() .map(transportRequest -> (DeleteIndexRequest) transportRequest) .toList(); - assertThat(deleteRequests.get(0).indices()[0], is(dataStream.getIndices().get(0).getName())); - assertThat(deleteRequests.get(1).indices()[0], is(dataStream.getIndices().get(1).getName())); - assertThat(deleteRequests.get(2).indices()[0], is(dataStream.getFailureIndices().get(0).getName())); + Set indicesToDelete = Set.of( + deleteRequests.get(0).indices()[0], + deleteRequests.get(1).indices()[0], + deleteRequests.get(2).indices()[0] + ); + Set indicesInDataStreamToDelete = Set.of( + dataStream.getIndices().get(0).getName(), + dataStream.getIndices().get(1).getName(), + dataStream.getFailureIndices().get(0).getName() + ); + assertThat(indicesToDelete, equalTo(indicesInDataStreamToDelete)); // on the second run the rollover and delete requests should not execute anymore // i.e. the count should *remain* 1 for rollover and 2 for deletes diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index 99af4ca352e20..615aa9c085820 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -63,6 +63,7 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -1264,17 +1265,17 @@ private static boolean isAnyIndexMissing(List indices, ProjectMetadata.Bu * NOTE that this specifically does not return the write index of the data stream as usually retention * is treated differently for the write index (i.e. they first need to be rolled over) */ - public List getIndicesOlderThan( + public Set getIndicesOlderThan( Function indexMetadataSupplier, LongSupplier nowSupplier, TimeValue effectiveRetention, DatastreamIndexTypes types ) { if (effectiveRetention == null) { - return List.of(); + return Set.of(); } - List indices = new ArrayList<>(); + Set indices = new HashSet<>(); if (types == DatastreamIndexTypes.ALL || types == DatastreamIndexTypes.BACKING_INDICES) { indices.addAll(getDataStreamIndices(false).getIndices()); } @@ -1335,23 +1336,23 @@ public List getDownsamplingRoundsFor( * be filtered according to the predicate definition. This is useful for things like "return only * the indices that are managed by the data stream lifecycle". */ - private List getNonWriteIndicesOlderThan( - List indices, + private Set getNonWriteIndicesOlderThan( + Set indices, TimeValue retentionPeriod, Function indexMetadataSupplier, @Nullable Predicate indicesPredicate, LongSupplier nowSupplier ) { if (indices.isEmpty()) { - return List.of(); + return Set.of(); } - List olderIndices = new ArrayList<>(); + Set olderIndices = new HashSet<>(); for (Index index : indices) { if (isIndexOlderThan(index, retentionPeriod.getMillis(), nowSupplier.getAsLong(), indicesPredicate, indexMetadataSupplier)) { olderIndices.add(index); } } - return olderIndices; + return Set.copyOf(olderIndices); } private boolean isIndexOlderThan( diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index 57dc73318d09e..0e6e12540a039 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -1455,7 +1455,7 @@ public void testGetIndicesOlderThanWithoutFailureStoreParam() { builder.put(dataStream); Metadata metadata = builder.build(); - List indicesPastRetention = dataStream.getIndicesOlderThan( + Set indicesPastRetention = dataStream.getIndicesOlderThan( metadata.getProject()::index, () -> now, timeValueSeconds(2500), @@ -1528,7 +1528,7 @@ private void testIndicesPastRetention(boolean failureStore) { { // Mix of indices younger and older than retention - List indicesPastRetention = dataStream.getIndicesOlderThan( + Set indicesPastRetention = dataStream.getIndicesOlderThan( metadata.getProject()::index, () -> now, TimeValue.timeValueSeconds(2500), @@ -1536,14 +1536,12 @@ private void testIndicesPastRetention(boolean failureStore) { ); assertThat(indicesPastRetention.size(), is(2)); - for (int i = 0; i < indicesPastRetention.size(); i++) { - assertThat(indicesPastRetention.get(i).getName(), is(indicesSupplier.get().get(i).getName())); - } + assertThat(indicesPastRetention, equalTo(Set.of(indicesSupplier.get().get(0), indicesSupplier.get().get(1)))); } { // All indices past retention, but we keep the write index - List indicesPastRetention = dataStream.getIndicesOlderThan( + Set indicesPastRetention = dataStream.getIndicesOlderThan( metadata.getProject()::index, () -> now, TimeValue.ZERO, @@ -1551,14 +1549,22 @@ private void testIndicesPastRetention(boolean failureStore) { ); assertThat(indicesPastRetention.size(), is(4)); - for (int i = 0; i < indicesPastRetention.size(); i++) { - assertThat(indicesPastRetention.get(i).getName(), is(indicesSupplier.get().get(i).getName())); - } + assertThat( + indicesPastRetention, + equalTo( + Set.of( + indicesSupplier.get().get(0), + indicesSupplier.get().get(1), + indicesSupplier.get().get(2), + indicesSupplier.get().get(3) + ) + ) + ); } { // All indices younger than retention - List indicesPastRetention = dataStream.getIndicesOlderThan( + Set indicesPastRetention = dataStream.getIndicesOlderThan( metadata.getProject()::index, () -> now, TimeValue.timeValueSeconds(6000), @@ -1579,7 +1585,7 @@ private void testIndicesPastRetention(boolean failureStore) { .settings(Settings.builder().put(indexMetadata.getSettings()).put(IndexMetadata.LIFECYCLE_NAME, "some-policy").build()) .build(); }; - List indicesPastRetention = dataStream.getIndicesOlderThan( + Set indicesPastRetention = dataStream.getIndicesOlderThan( indexMetadataWithSomeLifecycleSupplier, () -> now, TimeValue.ZERO, @@ -1587,7 +1593,7 @@ private void testIndicesPastRetention(boolean failureStore) { ); assertThat(indicesPastRetention.size(), is(1)); - assertThat(indicesPastRetention.get(0).getName(), is(indicesSupplier.get().get(2).getName())); + assertThat(indicesPastRetention, equalTo(Set.of(indicesSupplier.get().get(2)))); } } @@ -1637,7 +1643,7 @@ private void testIndicesOlderThanWithOriginationDate(boolean failureStore) { { // retention period where first and second index is too old, and 5th has old origination date. - List indicesPastRetention = dataStream.getIndicesOlderThan( + Set indicesPastRetention = dataStream.getIndicesOlderThan( metadata.getProject()::index, () -> now, TimeValue.timeValueMillis(2500), @@ -1645,14 +1651,15 @@ private void testIndicesOlderThanWithOriginationDate(boolean failureStore) { ); assertThat(indicesPastRetention.size(), is(3)); - assertThat(indicesPastRetention.get(0).getName(), is(indicesSupplier.get().get(0).getName())); - assertThat(indicesPastRetention.get(1).getName(), is(indicesSupplier.get().get(1).getName())); - assertThat(indicesPastRetention.get(2).getName(), is(indicesSupplier.get().get(5).getName())); + assertThat( + indicesPastRetention, + equalTo(Set.of(indicesSupplier.get().get(0), indicesSupplier.get().get(1), indicesSupplier.get().get(5))) + ); } { // no index matches the retention age - List indicesPastRetention = dataStream.getIndicesOlderThan( + Set indicesPastRetention = dataStream.getIndicesOlderThan( metadata.getProject()::index, () -> now, TimeValue.timeValueMillis(9000),