Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1034,7 +1034,6 @@ private Index maybeExecuteRollover(
}
try {
if (dataStream.isIndexManagedByDataStreamLifecycle(currentRunWriteIndex, project::index)) {
DataStreamLifecycle lifecycle = rolloverFailureStore ? dataStream.getFailuresLifecycle() : dataStream.getDataLifecycle();
RolloverRequest rolloverRequest = getDefaultRolloverRequest(
rolloverConfiguration,
dataStream.getName(),
Expand Down Expand Up @@ -1100,13 +1099,13 @@ Set<Index> maybeExecuteRetention(
if (dataRetention == null && failureRetention == null) {
return Set.of();
}
List<Index> backingIndicesOlderThanRetention = dataStream.getIndicesOlderThan(
Set<Index> backingIndicesOlderThanRetention = dataStream.getIndicesOlderThan(
project::index,
nowSupplier,
dataRetention,
BACKING_INDICES
);
List<Index> failureIndicesOlderThanRetention = dataStream.getIndicesOlderThan(
Set<Index> failureIndicesOlderThanRetention = dataStream.getIndicesOlderThan(
project::index,
nowSupplier,
failureRetention,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,17 @@ public void testOperationsExecutedOnce() {
.stream()
.map(transportRequest -> (DeleteIndexRequest) transportRequest)
.toList();
assertThat(deleteRequests.get(0).indices()[0], is(dataStream.getIndices().get(0).getName()));
assertThat(deleteRequests.get(1).indices()[0], is(dataStream.getIndices().get(1).getName()));
assertThat(deleteRequests.get(2).indices()[0], is(dataStream.getFailureIndices().get(0).getName()));
Set<String> indicesToDelete = Set.of(
deleteRequests.get(0).indices()[0],
deleteRequests.get(1).indices()[0],
deleteRequests.get(2).indices()[0]
);
Set<String> indicesInDataStreamToDelete = Set.of(
dataStream.getIndices().get(0).getName(),
dataStream.getIndices().get(1).getName(),
dataStream.getFailureIndices().get(0).getName()
);
assertThat(indicesToDelete, equalTo(indicesInDataStreamToDelete));

// on the second run the rollover and delete requests should not execute anymore
// i.e. the count should *remain* 1 for rollover and 2 for deletes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -1264,17 +1265,17 @@ private static boolean isAnyIndexMissing(List<Index> indices, ProjectMetadata.Bu
* NOTE that this specifically does not return the write index of the data stream as usually retention
* is treated differently for the write index (i.e. they first need to be rolled over)
*/
public List<Index> getIndicesOlderThan(
public Set<Index> getIndicesOlderThan(
Function<String, IndexMetadata> indexMetadataSupplier,
LongSupplier nowSupplier,
TimeValue effectiveRetention,
DatastreamIndexTypes types
) {
if (effectiveRetention == null) {
return List.of();
return Set.of();
}

List<Index> indices = new ArrayList<>();
Set<Index> indices = new HashSet<>();
if (types == DatastreamIndexTypes.ALL || types == DatastreamIndexTypes.BACKING_INDICES) {
indices.addAll(getDataStreamIndices(false).getIndices());
}
Expand Down Expand Up @@ -1335,23 +1336,23 @@ public List<DownsamplingRound> getDownsamplingRoundsFor(
* be filtered according to the predicate definition. This is useful for things like "return only
* the indices that are managed by the data stream lifecycle".
*/
private List<Index> getNonWriteIndicesOlderThan(
List<Index> indices,
private Set<Index> getNonWriteIndicesOlderThan(
Set<Index> indices,
TimeValue retentionPeriod,
Function<String, IndexMetadata> indexMetadataSupplier,
@Nullable Predicate<IndexMetadata> indicesPredicate,
LongSupplier nowSupplier
) {
if (indices.isEmpty()) {
return List.of();
return Set.of();
}
List<Index> olderIndices = new ArrayList<>();
Set<Index> olderIndices = new HashSet<>();
for (Index index : indices) {
if (isIndexOlderThan(index, retentionPeriod.getMillis(), nowSupplier.getAsLong(), indicesPredicate, indexMetadataSupplier)) {
olderIndices.add(index);
}
}
return olderIndices;
return Set.copyOf(olderIndices);
}

private boolean isIndexOlderThan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1455,7 +1455,7 @@ public void testGetIndicesOlderThanWithoutFailureStoreParam() {
builder.put(dataStream);
Metadata metadata = builder.build();

List<Index> indicesPastRetention = dataStream.getIndicesOlderThan(
Set<Index> indicesPastRetention = dataStream.getIndicesOlderThan(
metadata.getProject()::index,
() -> now,
timeValueSeconds(2500),
Expand Down Expand Up @@ -1528,37 +1528,43 @@ private void testIndicesPastRetention(boolean failureStore) {

{
// Mix of indices younger and older than retention
List<Index> indicesPastRetention = dataStream.getIndicesOlderThan(
Set<Index> indicesPastRetention = dataStream.getIndicesOlderThan(
metadata.getProject()::index,
() -> now,
TimeValue.timeValueSeconds(2500),
failureStore ? FAILURE_INDICES : BACKING_INDICES

);
assertThat(indicesPastRetention.size(), is(2));
for (int i = 0; i < indicesPastRetention.size(); i++) {
assertThat(indicesPastRetention.get(i).getName(), is(indicesSupplier.get().get(i).getName()));
}
assertThat(indicesPastRetention, equalTo(Set.of(indicesSupplier.get().get(0), indicesSupplier.get().get(1))));
}

{
// All indices past retention, but we keep the write index
List<Index> indicesPastRetention = dataStream.getIndicesOlderThan(
Set<Index> indicesPastRetention = dataStream.getIndicesOlderThan(
metadata.getProject()::index,
() -> now,
TimeValue.ZERO,
failureStore ? FAILURE_INDICES : BACKING_INDICES

);
assertThat(indicesPastRetention.size(), is(4));
for (int i = 0; i < indicesPastRetention.size(); i++) {
assertThat(indicesPastRetention.get(i).getName(), is(indicesSupplier.get().get(i).getName()));
}
assertThat(
indicesPastRetention,
equalTo(
Set.of(
indicesSupplier.get().get(0),
indicesSupplier.get().get(1),
indicesSupplier.get().get(2),
indicesSupplier.get().get(3)
)
)
);
}

{
// All indices younger than retention
List<Index> indicesPastRetention = dataStream.getIndicesOlderThan(
Set<Index> indicesPastRetention = dataStream.getIndicesOlderThan(
metadata.getProject()::index,
() -> now,
TimeValue.timeValueSeconds(6000),
Expand All @@ -1579,15 +1585,15 @@ private void testIndicesPastRetention(boolean failureStore) {
.settings(Settings.builder().put(indexMetadata.getSettings()).put(IndexMetadata.LIFECYCLE_NAME, "some-policy").build())
.build();
};
List<Index> indicesPastRetention = dataStream.getIndicesOlderThan(
Set<Index> indicesPastRetention = dataStream.getIndicesOlderThan(
indexMetadataWithSomeLifecycleSupplier,
() -> now,
TimeValue.ZERO,
failureStore ? FAILURE_INDICES : BACKING_INDICES

);
assertThat(indicesPastRetention.size(), is(1));
assertThat(indicesPastRetention.get(0).getName(), is(indicesSupplier.get().get(2).getName()));
assertThat(indicesPastRetention, equalTo(Set.of(indicesSupplier.get().get(2))));
}
}

Expand Down Expand Up @@ -1637,22 +1643,23 @@ private void testIndicesOlderThanWithOriginationDate(boolean failureStore) {

{
// retention period where first and second index is too old, and 5th has old origination date.
List<Index> indicesPastRetention = dataStream.getIndicesOlderThan(
Set<Index> indicesPastRetention = dataStream.getIndicesOlderThan(
metadata.getProject()::index,
() -> now,
TimeValue.timeValueMillis(2500),
failureStore ? FAILURE_INDICES : BACKING_INDICES

);
assertThat(indicesPastRetention.size(), is(3));
assertThat(indicesPastRetention.get(0).getName(), is(indicesSupplier.get().get(0).getName()));
assertThat(indicesPastRetention.get(1).getName(), is(indicesSupplier.get().get(1).getName()));
assertThat(indicesPastRetention.get(2).getName(), is(indicesSupplier.get().get(5).getName()));
assertThat(
indicesPastRetention,
equalTo(Set.of(indicesSupplier.get().get(0), indicesSupplier.get().get(1), indicesSupplier.get().get(5)))
);
}

{
// no index matches the retention age
List<Index> indicesPastRetention = dataStream.getIndicesOlderThan(
Set<Index> indicesPastRetention = dataStream.getIndicesOlderThan(
metadata.getProject()::index,
() -> now,
TimeValue.timeValueMillis(9000),
Expand Down
Loading