diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 392e157f60952..515992e18d62d 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -140,6 +140,7 @@ static TransportVersion def(int id) { public static final TransportVersion ESQL_ENRICH_OPERATOR_STATUS = def(8_600_00_0); public static final TransportVersion ESQL_SERIALIZE_ARRAY_VECTOR = def(8_601_00_0); public static final TransportVersion ESQL_SERIALIZE_ARRAY_BLOCK = def(8_602_00_0); + public static final TransportVersion ADD_DATA_STREAM_GLOBAL_RETENTION = def(8_603_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetention.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetention.java new file mode 100644 index 0000000000000..f3b88ba6083c3 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetention.java @@ -0,0 +1,148 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.TransportVersions; +import org.elasticsearch.cluster.AbstractNamedDiffable; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.NamedDiff; +import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Objects; + +/** + * A cluster state entry that contains global retention settings that are configurable by the user. These settings include: + * - default retention, applied on any data stream managed by DSL that does not have an explicit retention defined + * - max retention, applied on every data stream managed by DSL + */ +public final class DataStreamGlobalRetention extends AbstractNamedDiffable implements ClusterState.Custom { + + public static final String TYPE = "data-stream-global-retention"; + + public static final ParseField DEFAULT_RETENTION_FIELD = new ParseField("default_retention"); + public static final ParseField MAX_RETENTION_FIELD = new ParseField("max_retention"); + + public static final DataStreamGlobalRetention EMPTY = new DataStreamGlobalRetention(null, null); + + @Nullable + private final TimeValue defaultRetention; + @Nullable + private final TimeValue maxRetention; + + /** + * @param defaultRetention the default retention or null if it's undefined + * @param maxRetention the max retention or null if it's undefined + * @throws IllegalArgumentException when the default retention is greater than the max retention. + */ + public DataStreamGlobalRetention(TimeValue defaultRetention, TimeValue maxRetention) { + if (defaultRetention != null && maxRetention != null && defaultRetention.getMillis() > maxRetention.getMillis()) { + throw new IllegalArgumentException( + "Default global retention [" + + defaultRetention.getStringRep() + + "] cannot be greater than the max global retention [" + + maxRetention.getStringRep() + + "]." + ); + } + this.defaultRetention = defaultRetention; + this.maxRetention = maxRetention; + } + + public static DataStreamGlobalRetention read(StreamInput in) throws IOException { + return new DataStreamGlobalRetention(in.readOptionalTimeValue(), in.readOptionalTimeValue()); + } + + @Override + public String getWriteableName() { + return TYPE; + } + + @Override + public TransportVersion getMinimalSupportedVersion() { + return TransportVersions.ADD_DATA_STREAM_GLOBAL_RETENTION; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalTimeValue(defaultRetention); + out.writeOptionalTimeValue(maxRetention); + } + + public static NamedDiff readDiffFrom(StreamInput in) throws IOException { + return readDiffFrom(ClusterState.Custom.class, TYPE, in); + } + + @Override + public Iterator toXContentChunked(ToXContent.Params ignored) { + return Iterators.single(this::toXContentFragment); + } + + /** + * Adds to the XContentBuilder the two fields when they are not null. + */ + public XContentBuilder toXContentFragment(XContentBuilder builder, ToXContent.Params params) throws IOException { + if (defaultRetention != null) { + builder.field(DEFAULT_RETENTION_FIELD.getPreferredName(), defaultRetention.getStringRep()); + } + if (maxRetention != null) { + builder.field(MAX_RETENTION_FIELD.getPreferredName(), maxRetention.getStringRep()); + } + return builder; + } + + /** + * Returns the metadata found in the cluster state or null. + */ + public static DataStreamGlobalRetention getFromClusterState(ClusterState clusterState) { + return clusterState.custom(DataStreamGlobalRetention.TYPE); + } + + @Nullable + public TimeValue getDefaultRetention() { + return defaultRetention; + } + + @Nullable + public TimeValue getMaxRetention() { + return maxRetention; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DataStreamGlobalRetention that = (DataStreamGlobalRetention) o; + return Objects.equals(defaultRetention, that.defaultRetention) && Objects.equals(maxRetention, that.maxRetention); + } + + @Override + public int hashCode() { + return Objects.hash(defaultRetention, maxRetention); + } + + @Override + public String toString() { + return "DataStreamGlobalRetention{" + + "defaultRetention=" + + (defaultRetention == null ? "null" : defaultRetention.getStringRep()) + + ", maxRetention=" + + (maxRetention == null ? "null" : maxRetention.getStringRep()) + + '}'; + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetentionSerializationTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetentionSerializationTests.java new file mode 100644 index 0000000000000..8c3d36464784e --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetentionSerializationTests.java @@ -0,0 +1,99 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.Diff; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.test.AbstractChunkedSerializingTestCase; +import org.elasticsearch.test.SimpleDiffableWireSerializationTestCase; + +import java.util.List; + +public class DataStreamGlobalRetentionSerializationTests extends SimpleDiffableWireSerializationTestCase { + + @Override + protected ClusterState.Custom makeTestChanges(ClusterState.Custom testInstance) { + if (randomBoolean()) { + return testInstance; + } + return mutateInstance(testInstance); + } + + @Override + protected Writeable.Reader> diffReader() { + return DataStreamGlobalRetention::readDiffFrom; + } + + @Override + protected Writeable.Reader instanceReader() { + return DataStreamGlobalRetention::read; + } + + @Override + protected NamedWriteableRegistry getNamedWriteableRegistry() { + return new NamedWriteableRegistry( + List.of( + new NamedWriteableRegistry.Entry(ClusterState.Custom.class, DataStreamGlobalRetention.TYPE, DataStreamGlobalRetention::read) + ) + ); + } + + @Override + protected ClusterState.Custom createTestInstance() { + return randomGlobalRetention(); + } + + @Override + protected ClusterState.Custom mutateInstance(ClusterState.Custom instance) { + DataStreamGlobalRetention metadata = (DataStreamGlobalRetention) instance; + var defaultRetention = metadata.getDefaultRetention(); + var maxRetention = metadata.getMaxRetention(); + switch (randomInt(1)) { + case 0 -> { + if (defaultRetention == null) { + defaultRetention = TimeValue.timeValueDays(randomIntBetween(1, 1000)); + } else { + defaultRetention = randomBoolean() ? null : TimeValue.timeValueDays(randomIntBetween(1, 1000)); + } + } + case 1 -> { + if (maxRetention == null) { + maxRetention = TimeValue.timeValueDays(randomIntBetween(1000, 2000)); + } else { + maxRetention = randomBoolean() ? null : TimeValue.timeValueDays(randomIntBetween(1000, 2000)); + } + } + } + return new DataStreamGlobalRetention(defaultRetention, maxRetention); + } + + public static DataStreamGlobalRetention randomGlobalRetention() { + return new DataStreamGlobalRetention( + randomBoolean() ? null : TimeValue.timeValueDays(randomIntBetween(1, 1000)), + randomBoolean() ? null : TimeValue.timeValueDays(randomIntBetween(1000, 2000)) + ); + } + + public void testChunking() { + AbstractChunkedSerializingTestCase.assertChunkCount(createTestInstance(), ignored -> 1); + } + + public void testValidation() { + expectThrows( + IllegalArgumentException.class, + () -> new DataStreamGlobalRetention( + TimeValue.timeValueDays(randomIntBetween(1001, 2000)), + TimeValue.timeValueDays(randomIntBetween(1, 1000)) + ) + ); + } +}