diff --git a/docs/changelog/105682.yaml b/docs/changelog/105682.yaml new file mode 100644 index 0000000000000..f1713357ace80 --- /dev/null +++ b/docs/changelog/105682.yaml @@ -0,0 +1,20 @@ +pr: 105682 +summary: Introduce global retention in data stream lifecycle. +area: Data streams +type: feature +issues: + - 106169 +highlight: + title: Add global retention in data stream lifecycle + body: |- + Data stream lifecycle now supports configuring retention on a cluster level, namely global retention. Global retention + allows us to configure two different retentions: + + - `default_retention` is applied to all data streams managed by the data stream lifecycle that do not have retention + defined on the data stream level. + - `max_retention` is applied to all data streams managed by the data stream lifecycle and it allows any data stream + data to be deleted after the `max_retention` has passed. + + Furthermore, we introduce the term `effective_retention` which is the retention applied at a certain moment to a data + stream considering all the available retention configurations. + notable: true \ No newline at end of file diff --git a/docs/reference/data-streams/data-stream-apis.asciidoc b/docs/reference/data-streams/data-stream-apis.asciidoc index 3c2e703d264ff..d5a81a485af03 100644 --- a/docs/reference/data-streams/data-stream-apis.asciidoc +++ b/docs/reference/data-streams/data-stream-apis.asciidoc @@ -25,7 +25,13 @@ preview:[] preview:[] * <> preview:[] -* <> +* <> +preview:[] +* <> +preview:[] +* <> +preview:[] +* <> preview:[] The following API is available for <>: @@ -59,4 +65,10 @@ include::{es-repo-dir}/data-streams/lifecycle/apis/explain-lifecycle.asciidoc[] include::{es-repo-dir}/data-streams/lifecycle/apis/get-lifecycle-stats.asciidoc[] +include::{es-repo-dir}/data-streams/lifecycle/apis/put-global-retention.asciidoc[] + +include::{es-repo-dir}/data-streams/lifecycle/apis/get-global-retention.asciidoc[] + +include::{es-repo-dir}/data-streams/lifecycle/apis/delete-global-retention.asciidoc[] + include::{es-repo-dir}/indices/downsample-data-stream.asciidoc[] diff --git a/docs/reference/data-streams/lifecycle/apis/delete-global-retention.asciidoc b/docs/reference/data-streams/lifecycle/apis/delete-global-retention.asciidoc new file mode 100644 index 0000000000000..5b211eaf09e13 --- /dev/null +++ b/docs/reference/data-streams/lifecycle/apis/delete-global-retention.asciidoc @@ -0,0 +1,121 @@ +[[data-streams-delete-global-retention]] +=== Delete the global retention of data streams +++++ +Delete Data Stream Global Retention +++++ + +preview::[] + +Deletes the global retention configuration that applies on every data stream managed by <>. + +[[delete-global-retention-api-prereqs]] +==== {api-prereq-title} + +** If the {es} {security-features} are enabled, you must have the `manage_data_stream_global_retention` <> to use this API. + +[[data-streams-delete-global-retention-request]] +==== {api-request-title} + +`DELETE _data_stream/_global_retention` + +[[data-streams-delete-global-retention-desc]] +==== {api-description-title} + +Deletes the global retention configuration that is applied on data streams managed by data stream lifecycle. + +[role="child_attributes"] +[[delete-global-retention-api-query-parms]] +==== {api-query-parms-title} + +`dry_run`:: +(Boolean) Signals that the request should determine the effect of the removal of the existing without updating +the global retention. The default value is `false`, which means the removal will happen. + +[[delete-global-retention-api-response-body]] +==== {api-response-body-title} + +`acknowledged`:: +(boolean) +True, if the global retention has been removed. False, if it fails or if it was a dry run. + +`dry_run`:: +(boolean) +True, if this was a dry run, false otherwise. + +`affected_data_streams`:: +(array of objects) +Contains information about the data streams affected by the change. ++ +.Properties of objects in `affected_data_streams` +[%collapsible%open] +==== +`name`:: +(string) +Name of the data stream. +`previous_effective_retention`:: +(string) +The retention that was effective before the change of this request. `infinite` if there was no retention applicable. +`new_effective_retention`:: +(string) +The retention that is or would be effective after this request. `infinite` if there is no retention applicable. +==== + +[[data-streams-delete-global-retention-example]] +==== {api-examples-title} + +//// + +[source,console] +-------------------------------------------------- +PUT _data_stream/_global_retention +{ + "default_retention": "7d", + "max_retention": "90d" +} + +PUT /_index_template/template +{ + "index_patterns": ["my-data-stream*"], + "template": { + "lifecycle": {} + }, + "data_stream": { } +} + +PUT /_data_stream/my-data-stream +---- +// TESTSETUP +//// + +//// +[source,console] +---- +DELETE /_data_stream/my-data-stream* +DELETE /_index_template/template +DELETE /_data_stream/_global_retention +---- +// TEARDOWN +//// + +Let's update the global retention: +[source,console] +-------------------------------------------------- +DELETE _data_stream/_global_retention +-------------------------------------------------- + +The response will look like the following: + +[source,console-result] +-------------------------------------------------- +{ + "acknowledged": true, + "dry_run": false, + "affected_data_streams": [ + { + "name": "my-data-stream", + "previous_effective_retention": "7d", + "new_effective_retention": "infinite" + } + ] +} +-------------------------------------------------- diff --git a/docs/reference/data-streams/lifecycle/apis/get-global-retention.asciidoc b/docs/reference/data-streams/lifecycle/apis/get-global-retention.asciidoc new file mode 100644 index 0000000000000..03e485f3e7eb9 --- /dev/null +++ b/docs/reference/data-streams/lifecycle/apis/get-global-retention.asciidoc @@ -0,0 +1,90 @@ +[[data-streams-get-global-retention]] +=== Get the global retention of data streams +++++ +Get Data Stream Global Retention +++++ + +preview::[] + +Gets the global retention that applies on every data stream managed by <>. + +[[get-global-retention-api-prereqs]] +==== {api-prereq-title} + +** If the {es} {security-features} are enabled, you must have the `monitor_data_stream_global_retention` or +`manage_data_stream_global_retention` <> to use this API. + +[[data-streams-get-global-retention-request]] +==== {api-request-title} + +`GET _data_stream/_global_retention` + +[[data-streams-get-global-retention-desc]] +==== {api-description-title} + +Gets the global retention configuration that is applied on data streams managed by data stream lifecycle. + +[role="child_attributes"] +[[get-global-retention-api-query-parms]] +==== {api-query-parms-title} + +include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=local] + +include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=master-timeout] + +[[get-global-retention-api-response-body]] +==== {api-response-body-title} + +`default_retention`:: +(Optional, string) +The default retention that will apply to any data stream managed by data stream lifecycle that does not have a retention +defined on the data stream level. + +`max_retention`:: +(Optional, string) +The max retention that will apply to all data streams managed by data stream lifecycle. The max retention will override the +retention of a data stream whose retention exceeds the max retention. + + +[[data-streams-get-global-retention-example]] +==== {api-examples-title} + +//// + +[source,console] +-------------------------------------------------- +PUT _data_stream/_global_retention +{ + "default_retention": "7d", + "max_retention": "90d" +} +-------------------------------------------------- +// TESTSETUP + +[source,console] +-------------------------------------------------- +DELETE _data_stream/_global_retention +-------------------------------------------------- +// TEARDOWN + +//// + +Let's retrieve the global retention: + +[source,console] +-------------------------------------------------- +GET _data_stream/_global_retention +-------------------------------------------------- + +The response will look like the following: + +[source,console-result] +-------------------------------------------------- +{ + "default_retention": "7d", <1> + "max_retention": "90d" <2> +} +-------------------------------------------------- +<1> 7 days retention will be applied to any data stream that does not have retention set in its lifecycle. +<2> 90 days retention will be applied to all data streams that have retention that exceeds the 90 days, this +applies to data streams that have infinite retention too. \ No newline at end of file diff --git a/docs/reference/data-streams/lifecycle/apis/put-global-retention.asciidoc b/docs/reference/data-streams/lifecycle/apis/put-global-retention.asciidoc new file mode 100644 index 0000000000000..c9bc804c13408 --- /dev/null +++ b/docs/reference/data-streams/lifecycle/apis/put-global-retention.asciidoc @@ -0,0 +1,131 @@ +[[data-streams-put-global-retention]] +=== Update the global retention of data streams +++++ +Update Data Stream Global Retention +++++ + +preview::[] + +Updates the global retention configuration that applies on every data stream managed by <>. + +[[put-global-retention-api-prereqs]] +==== {api-prereq-title} + +** If the {es} {security-features} are enabled, you must have the `manage_data_stream_global_retention` <> to use this API. + +[[data-streams-put-global-retention-request]] +==== {api-request-title} + +`PUT _data_stream/_global_retention` + +[[data-streams-put-global-retention-desc]] +==== {api-description-title} + +Updates the global retention configuration that is applied on data streams managed by data stream lifecycle. + +[role="child_attributes"] +[[put-global-retention-api-query-parms]] +==== {api-query-parms-title} + +`dry_run`:: +(Boolean) Signals that the request should determine the effect of the provided configuration without updating the +global retention settings. The default value is `false`, which means the configuration provided will be applied. + +[[put-global-retention-api-request-body]] +==== {api-request-body-title} + +`default_retention`:: +(Optional, string) +The default retention that will apply to any data stream managed by data stream lifecycle that does not have a retention +defined on the data stream level. + +`max_retention`:: +(Optional, string) +The max retention that will apply to all data streams managed by data stream lifecycle. The max retention will override the +retention of a data stream which retention exceeds the max retention. + +[[put-global-retention-api-response-body]] +==== {api-response-body-title} + +`acknowledged`:: +(boolean) +True, if the global retention has been updated to the provided values. False, if it fails or if it was a dry run. + +`dry_run`:: +(boolean) +True, if this was a dry run, false otherwise. + +`affected_data_streams`:: +(array of objects) +Contains information about the data streams affected by the change. ++ +.Properties of objects in `affected_data_streams` +[%collapsible%open] +==== +`name`:: +(string) +Name of the data stream. +`previous_effective_retention`:: +(string) +The retention that was effective before the change of this request. `infinite` if there was no retention applicable. +`new_effective_retention`:: +(string) +The retention that is or would be effective after this request. `infinite` if there is no retention applicable. +==== + +[[data-streams-put-global-retention-example]] +==== {api-examples-title} + +//// +[source,console] +---- +PUT /_index_template/template +{ + "index_patterns": ["my-data-stream*"], + "template": { + "lifecycle": {} + }, + "data_stream": { } +} + +PUT /_data_stream/my-data-stream +---- +// TESTSETUP +//// + +//// +[source,console] +---- +DELETE /_data_stream/my-data-stream* +DELETE /_index_template/template +DELETE /_data_stream/_global_retention +---- +// TEARDOWN +//// + +Let's update the global retention: +[source,console] +-------------------------------------------------- +PUT _data_stream/_global_retention +{ + "default_retention": "7d", + "max_retention": "90d" +} +-------------------------------------------------- + +The response will look like the following: + +[source,console-result] +-------------------------------------------------- +{ + "acknowledged": true, + "dry_run": false, + "affected_data_streams": [ + { + "name": "my-data-stream", + "previous_effective_retention": "infinite", + "new_effective_retention": "7d" + } + ] +} +-------------------------------------------------- diff --git a/docs/reference/data-streams/lifecycle/index.asciidoc b/docs/reference/data-streams/lifecycle/index.asciidoc index bf861df7c80d4..dff3dae22f8ef 100644 --- a/docs/reference/data-streams/lifecycle/index.asciidoc +++ b/docs/reference/data-streams/lifecycle/index.asciidoc @@ -16,7 +16,8 @@ To achieve that, it supports: * Automatic <>, which chunks your incoming data in smaller pieces to facilitate better performance and backwards incompatible mapping changes. * Configurable retention, which allows you to configure the time period for which your data is guaranteed to be stored. -{es} is allowed at a later time to delete data older than this time period. +{es} is allowed at a later time to delete data older than this time period. Retention can be configured on the data stream level +or on a global level. Read more about the different options in this <>. A data stream lifecycle also supports downsampling the data stream backing indices. See <> for @@ -42,9 +43,10 @@ data that is most likely to keep being queried. 4. If <> is configured it will execute all the configured downsampling rounds. 5. Applies retention to the remaining backing indices. This means deleting the backing indices whose -`generation_time` is longer than the configured retention period. The `generation_time` is only applicable to rolled over backing -indices and it is either the time since the backing index got rolled over, or the time optionally configured in the -<> setting. +`generation_time` is longer than the effective retention period (read more about the +<>). The `generation_time` is only applicable to rolled +over backing indices and it is either the time since the backing index got rolled over, or the time optionally configured +in the <> setting. IMPORTANT: We use the `generation_time` instead of the creation time because this ensures that all data in the backing index have passed the retention period. As a result, the retention period is not the exact time data gets deleted, but @@ -77,4 +79,6 @@ include::tutorial-manage-new-data-stream.asciidoc[] include::tutorial-manage-existing-data-stream.asciidoc[] +include::tutorial-manage-data-stream-retention.asciidoc[] + include::tutorial-migrate-data-stream-from-ilm-to-dsl.asciidoc[] diff --git a/docs/reference/data-streams/lifecycle/tutorial-manage-data-stream-retention.asciidoc b/docs/reference/data-streams/lifecycle/tutorial-manage-data-stream-retention.asciidoc new file mode 100644 index 0000000000000..7b84cd238ce49 --- /dev/null +++ b/docs/reference/data-streams/lifecycle/tutorial-manage-data-stream-retention.asciidoc @@ -0,0 +1,183 @@ +[role="xpack"] +[[tutorial-manage-data-stream-retention]] +=== Tutorial: Data stream retention + +preview::[] + +In this tutorial, we are going to go over the data stream lifecycle retention, define it, go over how it can be configured and how +it can be applied. Keep in mind, the following options apply only to data streams that are managed by the data stream lifecycle. + +. <> +. <> +. <> +. <> + +You can verify if a data steam is managed by the data stream lifecycle via the <>: + +//// +[source,console] +---- +PUT /_index_template/template +{ + "index_patterns": ["my-data-stream*"], + "template": { + "lifecycle": {} + }, + "data_stream": { } +} + +PUT /_data_stream/my-data-stream +---- +// TESTSETUP +//// + +//// +[source,console] +---- +DELETE /_data_stream/my-data-stream* +DELETE /_index_template/template +DELETE /_data_stream/_global_retention +---- +// TEARDOWN +//// + +[source,console] +-------------------------------------------------- +GET _data_stream/my-data-stream/_lifecycle +-------------------------------------------------- + +The result should look like this: + +[source,console-result] +-------------------------------------------------- +{ + "data_streams": [ + { + "name": "my-data-stream", <1> + "lifecycle": { + "enabled": true <2> + } + } + ] +} +-------------------------------------------------- +// TESTRESPONSE[skip:the result is for illustrating purposes only] +<1> The name of your data stream. +<2> Ensure that the lifecycle is enabled, meaning this should be `true`. + +[discrete] +[[what-is-retention]] +==== What is data stream retention? + +We define retention as the least amount of time the data of a data stream are going to be kept in {es}. After this time period +has passed, {es} is allowed to remove these data to free up space and/or manage costs. + +NOTE: Retention does not define the period that the data will be removed, but the minimum time period they will be kept. + +We define 4 different types of retention: + +* The data stream retention, or `data_retention`, which is the retention configured on the data stream level. It can be +set via an <> for future data streams or via the <> for an existing data stream. When the data stream retention is not set, it implies that the data +need to be kept forever. +* The global default retention, or `default_retention`, which is a retention configured on a cluster level and will be +applied to all data streams managed by data stream lifecycle that do not have `data_retention` configured. Effectively, +it ensures that there will be no data streams keeping their data forever. This can be set via the +<>. +* The global max retention, or `max_retention`, which is a retention configured on a cluster level and will be applied to +all data streams managed by data stream lifecycle. Effectively, it ensures that there will be no data streams whose retention +will exceed this time period. This can be set via the <>. +* The effective retention, or `effective_retention`, which is the retention applied at a data stream on a given moment. +Effective retention cannot be set, it is derived by taking into account all the configured retention listed above and is +calculated as it is described <>. + +[discrete] +[[retention-configuration]] +==== How to configure retention? + +- By setting the `data_retention` on the data stream level. This retention can be configured in two ways: ++ +-- For new data streams, it can be defined in the index template that would be applied during the data stream's creation. +You can use the <>, for example: ++ +[source,console] +-------------------------------------------------- +PUT _index_template/template +{ + "index_patterns": ["my-data-stream*"], + "data_stream": { }, + "priority": 500, + "template": { + "lifecycle": { + "data_retention": "7d" + } + }, + "_meta": { + "description": "Template with data stream lifecycle" + } +} +-------------------------------------------------- +-- For an existing data stream, it can be set via the <>. ++ +[source,console] +---- +PUT _data_stream/my-data-stream/_lifecycle +{ + "data_retention": "30d" <1> +} +---- +// TEST[continued] +<1> The retention period of this data stream is set to 30 days. + +- By setting the global retention via the `default_retention` and `max_retention` that are set on a cluster level. You +can set them via the <>. For example: ++ +[source,console] +-------------------------------------------------- +PUT _data_stream/_global_retention +{ + "default_retention": "7d", + "max_retention": "90d" +} +-------------------------------------------------- +// TEST[continued] + +[discrete] +[[effective-retention-calculation]] +==== How is the effective retention calculated? +The effective is calculated in the following way: + +- The `effective_retention` is the `default_retention`, when `default_retention` is defined and the data stream does not +have `data_retention`. +- The `effective_retention` is the `data_retention`, when `data_retention` is defined and if `max_retention` is defined, +it is less than the `max_retention`. +- The `effective_retention` is the `max_retention`, when `max_retention` is defined, and the data stream has either no +`data_retention` or its `data_retention` is greater than the `max_retention`. + +The above is demonstrated in the examples below: + +|=== +|`default_retention` |`max_retention` |`data_retention` |`effective_retention` |Retention determined by + +|Not set |Not set |Not set |Infinite |N/A +|Not relevant |12 months |**30 days** |30 days |`data_retention` +|Not relevant |Not set |**30 days** |30 days |`data_retention` +|**30 days** |12 months |Not set |30 days |`default_retention` +|**30 days** |30 days |Not set |30 days |`default_retention` +|Not relevant |**30 days** |12 months |30 days |`max_retention` +|Not set |**30 days** |Not set |30 days |`max_retention` +|=== + +[discrete] +[[effective-retention-application]] +==== How is the effective retention applied? + +Retention is applied to the remaining backing indices of a data stream as the last step of +<>. Data stream lifecycle will retrieve the backing indices +whose `generation_time` is longer than the effective retention period and delete them. The `generation_time` is only +applicable to rolled over backing indices and it is either the time since the backing index got rolled over, or the time +optionally configured in the <> setting. + +IMPORTANT: We use the `generation_time` instead of the creation time because this ensures that all data in the backing +index have passed the retention period. As a result, the retention period is not the exact time data get deleted, but +the minimum time data will be stored. \ No newline at end of file diff --git a/docs/reference/rest-api/security/get-builtin-privileges.asciidoc b/docs/reference/rest-api/security/get-builtin-privileges.asciidoc index 576a30866dbdf..63f906d29b4d6 100644 --- a/docs/reference/rest-api/security/get-builtin-privileges.asciidoc +++ b/docs/reference/rest-api/security/get-builtin-privileges.asciidoc @@ -75,6 +75,7 @@ A successful call returns an object with "cluster" and "index" fields. "manage_behavioral_analytics", "manage_ccr", "manage_data_frame_transforms", + "manage_data_stream_global_retention", "manage_enrich", "manage_ilm", "manage_index_templates", @@ -99,6 +100,7 @@ A successful call returns an object with "cluster" and "index" fields. "manage_watcher", "monitor", "monitor_data_frame_transforms", + "monitor_data_stream_global_retention", "monitor_enrich", "monitor_inference", "monitor_ml", diff --git a/docs/reference/security/authorization/privileges.asciidoc b/docs/reference/security/authorization/privileges.asciidoc index d2885f63b8c26..5ff713a4b58fc 100644 --- a/docs/reference/security/authorization/privileges.asciidoc +++ b/docs/reference/security/authorization/privileges.asciidoc @@ -171,6 +171,9 @@ deprecated[7.5] Use `manage_transform` instead. `manage_enrich`:: All operations related to managing and executing enrich policies. +`manage_data_stream_global_retention`:: +All operations related to managing the data stream global retention settings. + `manage_watcher`:: All watcher operations, such as putting watches, executing, activate or acknowledging. + @@ -206,6 +209,9 @@ All read-only operations related to the <>. `monitor_transform`:: All read-only operations related to {transforms}. +`monitor_data_stream_global_retention`:: +Allows the retrieval of the data stream global retention settings. + `monitor_watcher`:: All read-only watcher operations, such as getting a watch and watcher stats. diff --git a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/LazyRolloverDataStreamIT.java b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/LazyRolloverDataStreamIT.java index 2ff148b82ae92..5389ad3a00b4e 100644 --- a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/LazyRolloverDataStreamIT.java +++ b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/LazyRolloverDataStreamIT.java @@ -47,7 +47,7 @@ public class LazyRolloverDataStreamIT extends ESRestTestCase { .setting("xpack.security.transport.ssl.enabled", "false") .setting("xpack.security.http.ssl.enabled", "false") .user("test_admin", PASSWORD, "superuser", false) - .user("test_simple_user", PASSWORD, "not_privileged", false) + .user("test_simple_user", PASSWORD, "under_privilged", false) .rolesFile(Resource.fromClasspath("roles.yml")) .build(); @@ -69,7 +69,7 @@ protected Settings restClientSettings() { } private Settings simpleUserRestClientSettings() { - // Note: This user is assigned the role "not_privileged". That role is defined in roles.yml. + // Note: This user is assigned the role "under_privilged". That role is defined in roles.yml. String token = basicAuthHeaderValue("test_simple_user", new SecureString(PASSWORD.toCharArray())); return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build(); } diff --git a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamGlobalRetentionIT.java b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamGlobalRetentionIT.java new file mode 100644 index 0000000000000..557e70ba65e9b --- /dev/null +++ b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamGlobalRetentionIT.java @@ -0,0 +1,147 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +package org.elasticsearch.datastreams.lifecycle; + +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.datastreams.DisabledSecurityDataStreamTestCase; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +public class DataStreamGlobalRetentionIT extends DisabledSecurityDataStreamTestCase { + + @Before + public void setup() throws IOException { + updateClusterSettings( + Settings.builder() + .put("data_streams.lifecycle.poll_interval", "1s") + .put("cluster.lifecycle.default.rollover", "min_docs=1,max_docs=1") + .build() + ); + // Create a template with the default lifecycle + Request putComposableIndexTemplateRequest = new Request("POST", "/_index_template/1"); + putComposableIndexTemplateRequest.setJsonEntity(""" + { + "index_patterns": ["my-data-stream*"], + "data_stream": {}, + "template": { + "lifecycle": {} + } + } + """); + assertOK(client().performRequest(putComposableIndexTemplateRequest)); + + // Create a data streams with one doc + Request createDocRequest = new Request("POST", "/my-data-stream/_doc?refresh=true"); + createDocRequest.setJsonEntity("{ \"@timestamp\": \"2022-12-12\"}"); + assertOK(client().performRequest(createDocRequest)); + } + + @After + public void cleanUp() throws IOException { + adminClient().performRequest(new Request("DELETE", "_data_stream/*")); + } + + @SuppressWarnings("unchecked") + public void testDefaultRetention() throws Exception { + { + // Set global retention + Request request = new Request("PUT", "_data_stream/_global_retention"); + request.setJsonEntity(""" + { + "default_retention": "10s" + }"""); + assertAcknowledged(client().performRequest(request)); + } + + // Verify that the effective retention matches the default retention + { + Request request = new Request("GET", "/_data_stream/my-data-stream"); + Response response = client().performRequest(request); + List dataStreams = (List) entityAsMap(response).get("data_streams"); + assertThat(dataStreams.size(), is(1)); + Map dataStream = (Map) dataStreams.get(0); + assertThat(dataStream.get("name"), is("my-data-stream")); + Map lifecycle = (Map) dataStream.get("lifecycle"); + assertThat(lifecycle.get("effective_retention"), is("10s")); + assertThat(lifecycle.get("retention_determined_by"), is("default_global_retention")); + assertThat(lifecycle.get("data_retention"), nullValue()); + } + + // Verify that the first generation index was removed + assertBusy(() -> { + Response response = client().performRequest(new Request("GET", "/_data_stream/my-data-stream")); + Map dataStream = ((List>) entityAsMap(response).get("data_streams")).get(0); + assertThat(dataStream.get("name"), is("my-data-stream")); + List backingIndices = (List) dataStream.get("indices"); + assertThat(backingIndices.size(), is(1)); + // 2 backing indices created + 1 for the deleted index + assertThat(dataStream.get("generation"), is(3)); + }, 20, TimeUnit.SECONDS); + } + + @SuppressWarnings("unchecked") + public void testMaxRetention() throws Exception { + { + // Set global retention + Request request = new Request("PUT", "_data_stream/_global_retention"); + request.setJsonEntity(""" + { + "max_retention": "10s" + }"""); + assertAcknowledged(client().performRequest(request)); + } + boolean withDataStreamLevelRetention = randomBoolean(); + if (withDataStreamLevelRetention) { + Request request = new Request("PUT", "_data_stream/my-data-stream/_lifecycle"); + request.setJsonEntity(""" + { + "data_retention": "30d" + }"""); + assertAcknowledged(client().performRequest(request)); + } + + // Verify that the effective retention matches the max retention + { + Request request = new Request("GET", "/_data_stream/my-data-stream"); + Response response = client().performRequest(request); + List dataStreams = (List) entityAsMap(response).get("data_streams"); + assertThat(dataStreams.size(), is(1)); + Map dataStream = (Map) dataStreams.get(0); + assertThat(dataStream.get("name"), is("my-data-stream")); + Map lifecycle = (Map) dataStream.get("lifecycle"); + assertThat(lifecycle.get("effective_retention"), is("10s")); + assertThat(lifecycle.get("retention_determined_by"), is("max_global_retention")); + if (withDataStreamLevelRetention) { + assertThat(lifecycle.get("data_retention"), is("30d")); + } else { + assertThat(lifecycle.get("data_retention"), nullValue()); + } + } + + // Verify that the first generation index was removed + assertBusy(() -> { + Response response = client().performRequest(new Request("GET", "/_data_stream/my-data-stream")); + Map dataStream = ((List>) entityAsMap(response).get("data_streams")).get(0); + assertThat(dataStream.get("name"), is("my-data-stream")); + List backingIndices = (List) dataStream.get("indices"); + assertThat(backingIndices.size(), is(1)); + // 2 backing indices created + 1 for the deleted index + assertThat(dataStream.get("generation"), is(3)); + }, 20, TimeUnit.SECONDS); + } +} diff --git a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamGlobalRetentionPermissionsRestIT.java b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamGlobalRetentionPermissionsRestIT.java new file mode 100644 index 0000000000000..e2e82b343fc5f --- /dev/null +++ b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamGlobalRetentionPermissionsRestIT.java @@ -0,0 +1,213 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.datastreams.lifecycle; + +import org.apache.http.HttpHost; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.common.settings.SecureString; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.test.cluster.FeatureFlag; +import org.elasticsearch.test.cluster.local.distribution.DistributionType; +import org.elasticsearch.test.cluster.util.resource.Resource; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.junit.ClassRule; + +import java.util.Map; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class DataStreamGlobalRetentionPermissionsRestIT extends ESRestTestCase { + + private static final String PASSWORD = "secret-test-password"; + + @ClassRule + public static ElasticsearchCluster cluster = ElasticsearchCluster.local() + .distribution(DistributionType.DEFAULT) + .feature(FeatureFlag.FAILURE_STORE_ENABLED) + .setting("xpack.watcher.enabled", "false") + .setting("xpack.ml.enabled", "false") + .setting("xpack.security.enabled", "true") + .setting("xpack.security.transport.ssl.enabled", "false") + .setting("xpack.security.http.ssl.enabled", "false") + .user("test_admin", PASSWORD, "superuser", false) + .user("test_manage_global_retention", PASSWORD, "manage_data_stream_global_retention", false) + .user("test_monitor_global_retention", PASSWORD, "monitor_data_stream_global_retention", false) + .user("test_monitor", PASSWORD, "manage_data_stream_lifecycle", false) + .user("test_no_privilege", PASSWORD, "no_privilege", false) + .rolesFile(Resource.fromClasspath("roles.yml")) + .build(); + + @Override + protected String getTestRestCluster() { + return cluster.getHttpAddresses(); + } + + @Override + protected Settings restClientSettings() { + // If this test is running in a test framework that handles its own authorization, we don't want to overwrite it. + if (super.restClientSettings().keySet().contains(ThreadContext.PREFIX + ".Authorization")) { + return super.restClientSettings(); + } else { + // Note: This user is assigned the role "manage_data_stream_lifecycle". That role is defined in roles.yml. + String token = basicAuthHeaderValue("test_data_stream_lifecycle", new SecureString(PASSWORD.toCharArray())); + return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build(); + } + } + + @Override + protected Settings restAdminSettings() { + // If this test is running in a test framework that handles its own authorization, we don't want to overwrite it. + if (super.restClientSettings().keySet().contains(ThreadContext.PREFIX + ".Authorization")) { + return super.restClientSettings(); + } else { + // Note: We use the admin user because the other one is too unprivileged, so it breaks the initialization of the test + String token = basicAuthHeaderValue("test_admin", new SecureString(PASSWORD.toCharArray())); + return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build(); + } + } + + private Settings restManageGlobalRetentionClientSettings() { + String token = basicAuthHeaderValue("test_manage_global_retention", new SecureString(PASSWORD.toCharArray())); + return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build(); + } + + private Settings restMonitorGlobalRetentionClientSettings() { + String token = basicAuthHeaderValue("test_monitor_global_retention", new SecureString(PASSWORD.toCharArray())); + return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build(); + } + + private Settings restOnlyManageLifecycleClientSettings() { + String token = basicAuthHeaderValue("test_monitor", new SecureString(PASSWORD.toCharArray())); + return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build(); + } + + private Settings restNoPrivilegeClientSettings() { + String token = basicAuthHeaderValue("test_no_privilege", new SecureString(PASSWORD.toCharArray())); + return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build(); + } + + public void testManageGlobalRetentionPrivileges() throws Exception { + try (var client = buildClient(restManageGlobalRetentionClientSettings(), getClusterHosts().toArray(new HttpHost[0]))) { + Request request = new Request("PUT", "_data_stream/_global_retention"); + request.setJsonEntity(""" + { + "default_retention": "1d", + "max_retention": "7d" + }"""); + assertAcknowledged(client.performRequest(request)); + Map response = entityAsMap(client.performRequest(new Request("GET", "/_data_stream/_global_retention"))); + assertThat(response.get("default_retention"), equalTo("1d")); + assertThat(response.get("max_retention"), equalTo("7d")); + assertAcknowledged(client.performRequest(new Request("DELETE", "/_data_stream/_global_retention"))); + } + } + + public void testMonitorGlobalRetentionPrivileges() throws Exception { + { + Request request = new Request("PUT", "_data_stream/_global_retention"); + request.setJsonEntity(""" + { + "default_retention": "1d", + "max_retention": "7d" + }"""); + assertAcknowledged(adminClient().performRequest(request)); + } + try (var client = buildClient(restMonitorGlobalRetentionClientSettings(), getClusterHosts().toArray(new HttpHost[0]))) { + Request request = new Request("PUT", "_data_stream/_global_retention"); + request.setJsonEntity(""" + { + "default_retention": "1d", + "max_retention": "7d" + }"""); + ResponseException responseException = expectThrows(ResponseException.class, () -> client.performRequest(request)); + assertThat(responseException.getResponse().getStatusLine().getStatusCode(), is(403)); + assertThat( + responseException.getMessage(), + containsString( + "action [cluster:admin/data_stream/global_retention/put] is unauthorized for user [test_monitor_global_retention]" + ) + ); + responseException = expectThrows( + ResponseException.class, + () -> client.performRequest(new Request("DELETE", "/_data_stream/_global_retention")) + ); + assertThat(responseException.getResponse().getStatusLine().getStatusCode(), is(403)); + assertThat( + responseException.getMessage(), + containsString( + "action [cluster:admin/data_stream/global_retention/delete] is unauthorized for user [test_monitor_global_retention]" + ) + ); + Map response = entityAsMap(client.performRequest(new Request("GET", "/_data_stream/_global_retention"))); + assertThat(response.get("default_retention"), equalTo("1d")); + assertThat(response.get("max_retention"), equalTo("7d")); + } + } + + public void testManageLifecyclePrivileges() throws Exception { + try (var client = buildClient(restOnlyManageLifecycleClientSettings(), getClusterHosts().toArray(new HttpHost[0]))) { + Request request = new Request("PUT", "_data_stream/_global_retention"); + request.setJsonEntity(""" + { + "default_retention": "1d", + "max_retention": "7d" + }"""); + ResponseException responseException = expectThrows(ResponseException.class, () -> client.performRequest(request)); + assertThat(responseException.getResponse().getStatusLine().getStatusCode(), is(403)); + assertThat( + responseException.getMessage(), + containsString("action [cluster:admin/data_stream/global_retention/put] is unauthorized for user [test_monitor]") + ); + // This use has the monitor privilege which includes the monitor_data_stream_global_retention + Response response = client.performRequest(new Request("GET", "/_data_stream/_global_retention")); + assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); + } + } + + public void testNoPrivileges() throws Exception { + try (var client = buildClient(restNoPrivilegeClientSettings(), getClusterHosts().toArray(new HttpHost[0]))) { + Request request = new Request("PUT", "_data_stream/_global_retention"); + request.setJsonEntity(""" + { + "default_retention": "1d", + "max_retention": "7d" + }"""); + ResponseException responseException = expectThrows(ResponseException.class, () -> client.performRequest(request)); + assertThat(responseException.getResponse().getStatusLine().getStatusCode(), is(403)); + assertThat( + responseException.getMessage(), + containsString("action [cluster:admin/data_stream/global_retention/put] is unauthorized for user [test_no_privilege]") + ); + responseException = expectThrows( + ResponseException.class, + () -> client.performRequest(new Request("DELETE", "/_data_stream/_global_retention")) + ); + assertThat(responseException.getResponse().getStatusLine().getStatusCode(), is(403)); + assertThat( + responseException.getMessage(), + containsString("action [cluster:admin/data_stream/global_retention/delete] is unauthorized for user [test_no_privilege]") + ); + responseException = expectThrows( + ResponseException.class, + () -> client.performRequest(new Request("GET", "/_data_stream/_global_retention")) + ); + assertThat(responseException.getResponse().getStatusLine().getStatusCode(), is(403)); + assertThat( + responseException.getMessage(), + containsString("action [cluster:monitor/data_stream/global_retention/get] is unauthorized for user [test_no_privilege]") + ); + } + } +} diff --git a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecyclePermissionsRestIT.java b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecyclePermissionsRestIT.java index b83479909307e..c78ae0577f586 100644 --- a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecyclePermissionsRestIT.java +++ b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecyclePermissionsRestIT.java @@ -47,7 +47,7 @@ public class DataStreamLifecyclePermissionsRestIT extends ESRestTestCase { .setting("xpack.security.http.ssl.enabled", "false") .user("test_admin", PASSWORD, "superuser", false) .user("test_data_stream_lifecycle", PASSWORD, "manage_data_stream_lifecycle", false) - .user("test_non_privileged", PASSWORD, "not_privileged", false) + .user("test_non_privileged", PASSWORD, "under_privilged", false) .rolesFile(Resource.fromClasspath("roles.yml")) .build(); @@ -88,13 +88,13 @@ protected Settings restAdminSettings() { } private Settings restPrivilegedClientSettings() { - // Note: This user is assigned the role "not_privileged". That role is defined in roles.yml. + // Note: This user is assigned the role "under_privilged". That role is defined in roles.yml. String token = basicAuthHeaderValue("test_data_stream_lifecycle", new SecureString(PASSWORD.toCharArray())); return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build(); } private Settings restUnprivilegedClientSettings() { - // Note: This user is assigned the role "not_privileged". That role is defined in roles.yml. + // Note: This user is assigned the role "under_privilged". That role is defined in roles.yml. String token = basicAuthHeaderValue("test_non_privileged", new SecureString(PASSWORD.toCharArray())); return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build(); } @@ -106,7 +106,7 @@ public void testManageDataStreamLifecycle() throws Exception { * This test checks that a user with the "manage_data_stream_lifecycle" index privilege on "data-stream-lifecycle-*" data * streams can delete and put a lifecycle on the "data-stream-lifecycle-test" data stream, while a user with who does not have * that privilege (but does have all the other same "data-stream-lifecycle-*" privileges) cannot delete or put a lifecycle on - * that datastream. + * that data stream. */ String dataStreamName = "data-stream-lifecycle-test"; // Needs to match the pattern of the names in roles.yml createDataStreamAsAdmin(dataStreamName); diff --git a/modules/data-streams/src/javaRestTest/resources/roles.yml b/modules/data-streams/src/javaRestTest/resources/roles.yml index 4c1350e4b058d..63e506dff8d39 100644 --- a/modules/data-streams/src/javaRestTest/resources/roles.yml +++ b/modules/data-streams/src/javaRestTest/resources/roles.yml @@ -7,7 +7,7 @@ manage_data_stream_lifecycle: - read - write - manage_data_stream_lifecycle -not_privileged: +under_privilged: cluster: - monitor indices: @@ -16,3 +16,10 @@ not_privileged: - read - write - view_index_metadata +manage_data_stream_global_retention: + cluster: + - manage_data_stream_global_retention +monitor_data_stream_global_retention: + cluster: + - monitor_data_stream_global_retention +no_privilege: \ No newline at end of file diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java index 6e17964aa179a..721630d29b4c9 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java @@ -11,6 +11,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.rollover.LazyRolloverAction; import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService; +import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention; import org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthInfoPublisher; import org.elasticsearch.features.FeatureSpecification; import org.elasticsearch.features.NodeFeature; @@ -33,9 +34,10 @@ public Map getHistoricalFeatures() { @Override public Set getFeatures() { return Set.of( - DataStreamLifecycleHealthInfoPublisher.DSL_HEALTH_INFO_FEATURE, // Added in 8.12 + DataStreamLifecycleHealthInfoPublisher.DSL_HEALTH_INFO_FEATURE, // Added in 8.12 LazyRolloverAction.DATA_STREAM_LAZY_ROLLOVER, // Added in 8.13 - DataStreamAutoShardingService.DATA_STREAM_AUTO_SHARDING_FEATURE + DataStreamAutoShardingService.DATA_STREAM_AUTO_SHARDING_FEATURE, + DataStreamGlobalRetention.GLOBAL_RETENTION // Added in 8.14 ); } } diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java index 4cebba155518b..53e0bc287d3ec 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java @@ -19,6 +19,9 @@ import org.elasticsearch.action.datastreams.ModifyDataStreamsAction; import org.elasticsearch.action.datastreams.PromoteDataStreamAction; import org.elasticsearch.client.internal.OriginSettingClient; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.NamedDiff; +import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -38,10 +41,14 @@ import org.elasticsearch.datastreams.action.PromoteDataStreamTransportAction; import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore; import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService; +import org.elasticsearch.datastreams.lifecycle.UpdateDataStreamGlobalRetentionService; +import org.elasticsearch.datastreams.lifecycle.action.DeleteDataStreamGlobalRetentionAction; import org.elasticsearch.datastreams.lifecycle.action.DeleteDataStreamLifecycleAction; import org.elasticsearch.datastreams.lifecycle.action.ExplainDataStreamLifecycleAction; +import org.elasticsearch.datastreams.lifecycle.action.GetDataStreamGlobalRetentionAction; import org.elasticsearch.datastreams.lifecycle.action.GetDataStreamLifecycleAction; import org.elasticsearch.datastreams.lifecycle.action.GetDataStreamLifecycleStatsAction; +import org.elasticsearch.datastreams.lifecycle.action.PutDataStreamGlobalRetentionAction; import org.elasticsearch.datastreams.lifecycle.action.PutDataStreamLifecycleAction; import org.elasticsearch.datastreams.lifecycle.action.TransportDeleteDataStreamLifecycleAction; import org.elasticsearch.datastreams.lifecycle.action.TransportExplainDataStreamLifecycleAction; @@ -51,9 +58,12 @@ import org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthIndicatorService; import org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthInfoPublisher; import org.elasticsearch.datastreams.lifecycle.rest.RestDataStreamLifecycleStatsAction; +import org.elasticsearch.datastreams.lifecycle.rest.RestDeleteDataStreamGlobalRetentionAction; import org.elasticsearch.datastreams.lifecycle.rest.RestDeleteDataStreamLifecycleAction; import org.elasticsearch.datastreams.lifecycle.rest.RestExplainDataStreamLifecycleAction; +import org.elasticsearch.datastreams.lifecycle.rest.RestGetDataStreamGlobalRetentionAction; import org.elasticsearch.datastreams.lifecycle.rest.RestGetDataStreamLifecycleAction; +import org.elasticsearch.datastreams.lifecycle.rest.RestPutDataStreamGlobalRetentionAction; import org.elasticsearch.datastreams.lifecycle.rest.RestPutDataStreamLifecycleAction; import org.elasticsearch.datastreams.rest.RestCreateDataStreamAction; import org.elasticsearch.datastreams.rest.RestDataStreamsStatsAction; @@ -132,6 +142,7 @@ public static TimeValue getLookAheadTime(Settings settings) { private final SetOnce dataLifecycleInitialisationService = new SetOnce<>(); private final SetOnce dataStreamLifecycleErrorsPublisher = new SetOnce<>(); private final SetOnce dataStreamLifecycleHealthIndicatorService = new SetOnce<>(); + private final SetOnce dataStreamGlobalRetentionService = new SetOnce<>(); private final Settings settings; public DataStreamsPlugin(Settings settings) { @@ -205,10 +216,12 @@ public Collection createComponents(PluginServices services) { ); dataLifecycleInitialisationService.get().init(); dataStreamLifecycleHealthIndicatorService.set(new DataStreamLifecycleHealthIndicatorService()); + dataStreamGlobalRetentionService.set(new UpdateDataStreamGlobalRetentionService(services.clusterService())); components.add(errorStoreInitialisationService.get()); components.add(dataLifecycleInitialisationService.get()); components.add(dataStreamLifecycleErrorsPublisher.get()); + components.add(dataStreamGlobalRetentionService.get()); return components; } @@ -227,6 +240,24 @@ public Collection createComponents(PluginServices services) { actions.add(new ActionHandler<>(DeleteDataStreamLifecycleAction.INSTANCE, TransportDeleteDataStreamLifecycleAction.class)); actions.add(new ActionHandler<>(ExplainDataStreamLifecycleAction.INSTANCE, TransportExplainDataStreamLifecycleAction.class)); actions.add(new ActionHandler<>(GetDataStreamLifecycleStatsAction.INSTANCE, TransportGetDataStreamLifecycleStatsAction.class)); + actions.add( + new ActionHandler<>( + PutDataStreamGlobalRetentionAction.INSTANCE, + PutDataStreamGlobalRetentionAction.TransportPutDataStreamGlobalRetentionAction.class + ) + ); + actions.add( + new ActionHandler<>( + GetDataStreamGlobalRetentionAction.INSTANCE, + GetDataStreamGlobalRetentionAction.TransportGetDataStreamGlobalSettingsAction.class + ) + ); + actions.add( + new ActionHandler<>( + DeleteDataStreamGlobalRetentionAction.INSTANCE, + DeleteDataStreamGlobalRetentionAction.TransportDeleteDataStreamGlobalRetentionAction.class + ) + ); return actions; } @@ -259,9 +290,20 @@ public List getRestHandlers( handlers.add(new RestDeleteDataStreamLifecycleAction()); handlers.add(new RestExplainDataStreamLifecycleAction()); handlers.add(new RestDataStreamLifecycleStatsAction()); + handlers.add(new RestPutDataStreamGlobalRetentionAction()); + handlers.add(new RestGetDataStreamGlobalRetentionAction()); + handlers.add(new RestDeleteDataStreamGlobalRetentionAction()); return handlers; } + @Override + public List getNamedWriteables() { + return List.of( + new NamedWriteableRegistry.Entry(ClusterState.Custom.class, DataStreamGlobalRetention.TYPE, DataStreamGlobalRetention::read), + new NamedWriteableRegistry.Entry(NamedDiff.class, DataStreamGlobalRetention.TYPE, DataStreamGlobalRetention::readDiffFrom) + ); + } + @Override public Collection getAdditionalIndexSettingProviders(IndexSettingProvider.Parameters parameters) { return List.of(new DataStreamIndexSettingsProvider(parameters.mapperServiceFactory())); diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/UpdateDataStreamGlobalRetentionService.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/UpdateDataStreamGlobalRetentionService.java new file mode 100644 index 0000000000000..953c651f821c3 --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/UpdateDataStreamGlobalRetentionService.java @@ -0,0 +1,175 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.datastreams.lifecycle; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateAckListener; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.ClusterStateTaskListener; +import org.elasticsearch.cluster.SimpleBatchedAckListenerTaskExecutor; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.service.MasterServiceTaskQueue; +import org.elasticsearch.common.Priority; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.datastreams.lifecycle.action.DeleteDataStreamGlobalRetentionAction; +import org.elasticsearch.datastreams.lifecycle.action.PutDataStreamGlobalRetentionAction; +import org.elasticsearch.datastreams.lifecycle.action.UpdateDataStreamGlobalRetentionResponse; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; + +/** + * This service manages the global retention configuration, it provides an API to set or remove global retention + * from the cluster state. + */ +public class UpdateDataStreamGlobalRetentionService { + + private static final Logger logger = LogManager.getLogger(UpdateDataStreamGlobalRetentionService.class); + + private final MasterServiceTaskQueue taskQueue; + + public UpdateDataStreamGlobalRetentionService(ClusterService clusterService) { + ClusterStateTaskExecutor executor = new SimpleBatchedAckListenerTaskExecutor<>() { + + @Override + public Tuple executeTask( + UpsertGlobalDataStreamMetadataTask task, + ClusterState clusterState + ) { + return new Tuple<>(updateGlobalRetention(clusterState, task.globalRetention()), task); + } + }; + this.taskQueue = clusterService.createTaskQueue("data-stream-global-retention", Priority.HIGH, executor); + + } + + public void updateGlobalRetention( + PutDataStreamGlobalRetentionAction.Request request, + List affectedDataStreams, + final ActionListener listener + ) { + taskQueue.submitTask( + "update-data-stream-global-retention", + new UpsertGlobalDataStreamMetadataTask( + request.getGlobalRetention(), + affectedDataStreams, + listener, + request.masterNodeTimeout() + ), + request.masterNodeTimeout() + ); + } + + public void removeGlobalRetention( + DeleteDataStreamGlobalRetentionAction.Request request, + List affectedDataStreams, + final ActionListener listener + ) { + taskQueue.submitTask( + "remove-data-stream-global-retention", + new UpsertGlobalDataStreamMetadataTask(null, affectedDataStreams, listener, request.masterNodeTimeout()), + request.masterNodeTimeout() + ); + } + + public List determineAffectedDataStreams( + @Nullable DataStreamGlobalRetention newGlobalRetention, + ClusterState clusterState + ) { + var previousGlobalRetention = DataStreamGlobalRetention.getFromClusterState(clusterState); + if (Objects.equals(newGlobalRetention, previousGlobalRetention)) { + return List.of(); + } + List affectedDataStreams = new ArrayList<>(); + for (DataStream dataStream : clusterState.metadata().dataStreams().values()) { + if (dataStream.getLifecycle() != null) { + TimeValue previousEffectiveRetention = dataStream.getLifecycle().getEffectiveDataRetention(previousGlobalRetention); + TimeValue newEffectiveRetention = dataStream.getLifecycle().getEffectiveDataRetention(newGlobalRetention); + if (Objects.equals(previousEffectiveRetention, newEffectiveRetention) == false) { + affectedDataStreams.add( + new UpdateDataStreamGlobalRetentionResponse.AffectedDataStream( + dataStream.getName(), + newEffectiveRetention, + previousEffectiveRetention + ) + ); + } + } + } + affectedDataStreams.sort(Comparator.comparing(UpdateDataStreamGlobalRetentionResponse.AffectedDataStream::dataStreamName)); + return affectedDataStreams; + } + + // Visible for testing + ClusterState updateGlobalRetention(ClusterState clusterState, @Nullable DataStreamGlobalRetention retentionFromRequest) { + final var initialRetention = DataStreamGlobalRetention.getFromClusterState(clusterState); + // Avoid storing empty retention in the cluster state + final var newRetention = DataStreamGlobalRetention.EMPTY.equals(retentionFromRequest) ? null : retentionFromRequest; + if (Objects.equals(newRetention, initialRetention)) { + return clusterState; + } + if (newRetention == null) { + return clusterState.copyAndUpdate(b -> b.removeCustom(DataStreamGlobalRetention.TYPE)); + } + return clusterState.copyAndUpdate(b -> b.putCustom(DataStreamGlobalRetention.TYPE, newRetention)); + } + + /** + * A base class for the task updating the global retention in the cluster state. + */ + record UpsertGlobalDataStreamMetadataTask( + @Nullable DataStreamGlobalRetention globalRetention, + List affectedDataStreams, + ActionListener listener, + TimeValue masterTimeout + ) implements ClusterStateTaskListener, ClusterStateAckListener { + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + @Override + public boolean mustAck(DiscoveryNode discoveryNode) { + return true; + } + + @Override + public void onAllNodesAcked() { + listener.onResponse(new UpdateDataStreamGlobalRetentionResponse(true, affectedDataStreams)); + } + + @Override + public void onAckFailure(Exception e) { + logger.debug("Failed to update global retention [{}] with error [{}]", globalRetention, e.getMessage()); + listener.onResponse(UpdateDataStreamGlobalRetentionResponse.FAILED); + } + + @Override + public void onAckTimeout() { + logger.debug("Failed to update global retention [{}] because timeout was reached", globalRetention); + listener.onResponse(UpdateDataStreamGlobalRetentionResponse.FAILED); + } + + @Override + public TimeValue ackTimeout() { + return masterTimeout; + } + } +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/DeleteDataStreamGlobalRetentionAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/DeleteDataStreamGlobalRetentionAction.java new file mode 100644 index 0000000000000..a6060923bd396 --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/DeleteDataStreamGlobalRetentionAction.java @@ -0,0 +1,153 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.datastreams.lifecycle.action; + +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.datastreams.lifecycle.UpdateDataStreamGlobalRetentionService; +import org.elasticsearch.features.FeatureService; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +/** + * Deletes the global retention for data streams (if it's not a dry run) and it returns the affected data streams. + */ +public class DeleteDataStreamGlobalRetentionAction { + + public static final ActionType INSTANCE = new ActionType<>( + "cluster:admin/data_stream/global_retention/delete" + ); + + private DeleteDataStreamGlobalRetentionAction() {/* no instances */} + + public static final class Request extends MasterNodeRequest { + private boolean dryRun = false; + + public Request(StreamInput in) throws IOException { + super(in); + dryRun = in.readBoolean(); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(dryRun); + } + + public Request() {} + + public boolean dryRun() { + return dryRun; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DeleteDataStreamGlobalRetentionAction.Request request = (DeleteDataStreamGlobalRetentionAction.Request) o; + return dryRun == request.dryRun; + } + + @Override + public int hashCode() { + return Objects.hash(dryRun); + } + + public void dryRun(boolean dryRun) { + this.dryRun = dryRun; + } + } + + public static class TransportDeleteDataStreamGlobalRetentionAction extends TransportMasterNodeAction< + Request, + UpdateDataStreamGlobalRetentionResponse> { + + private final UpdateDataStreamGlobalRetentionService globalRetentionService; + private final FeatureService featureService; + + @Inject + public TransportDeleteDataStreamGlobalRetentionAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + UpdateDataStreamGlobalRetentionService globalRetentionService, + FeatureService featureService + ) { + super( + INSTANCE.name(), + transportService, + clusterService, + threadPool, + actionFilters, + Request::new, + indexNameExpressionResolver, + UpdateDataStreamGlobalRetentionResponse::new, + threadPool.executor(ThreadPool.Names.MANAGEMENT) + ); + this.globalRetentionService = globalRetentionService; + this.featureService = featureService; + } + + @Override + protected void masterOperation( + Task task, + Request request, + ClusterState state, + ActionListener listener + ) throws Exception { + if (featureService.clusterHasFeature(state, DataStreamGlobalRetention.GLOBAL_RETENTION) == false) { + listener.onFailure( + new ResourceNotFoundException( + "Data stream global retention feature not found, please ensure all nodes have the feature " + + DataStreamGlobalRetention.GLOBAL_RETENTION.id() + ) + ); + return; + } + List affectedDataStreams = globalRetentionService + .determineAffectedDataStreams(null, state); + if (request.dryRun()) { + listener.onResponse(new UpdateDataStreamGlobalRetentionResponse(false, true, affectedDataStreams)); + } else { + globalRetentionService.removeGlobalRetention(request, affectedDataStreams, listener); + } + } + + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + } +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/GetDataStreamGlobalRetentionAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/GetDataStreamGlobalRetentionAction.java new file mode 100644 index 0000000000000..b694e12767854 --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/GetDataStreamGlobalRetentionAction.java @@ -0,0 +1,168 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.datastreams.lifecycle.action; + +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.MasterNodeReadRequest; +import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.features.FeatureService; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +/** + * Retrieves the global retention for data streams. + */ +public class GetDataStreamGlobalRetentionAction { + + public static final ActionType INSTANCE = new ActionType<>("cluster:monitor/data_stream/global_retention/get"); + + private GetDataStreamGlobalRetentionAction() {/* no instances */} + + public static final class Request extends MasterNodeReadRequest { + + public Request() {} + + public Request(StreamInput in) throws IOException { + super(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public boolean equals(Object obj) { + return super.equals(obj); + } + + @Override + public int hashCode() { + return super.hashCode(); + } + } + + public static class Response extends ActionResponse implements ToXContentObject { + + private final DataStreamGlobalRetention globalRetention; + + public Response(DataStreamGlobalRetention globalRetention) { + this.globalRetention = globalRetention; + } + + public Response(StreamInput in) throws IOException { + super(in); + globalRetention = DataStreamGlobalRetention.read(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + globalRetention.writeTo(out); + } + + @Override + public String toString() { + return "Response{" + "globalRetention=" + globalRetention + '}'; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + globalRetention.toXContentFragment(builder, params); + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Response that = (Response) o; + return Objects.equals(globalRetention, that.globalRetention); + } + + @Override + public int hashCode() { + return Objects.hash(globalRetention); + } + } + + public static class TransportGetDataStreamGlobalSettingsAction extends TransportMasterNodeReadAction { + + private final FeatureService featureService; + + @Inject + public TransportGetDataStreamGlobalSettingsAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + FeatureService featureService + ) { + super( + INSTANCE.name(), + transportService, + clusterService, + threadPool, + actionFilters, + Request::new, + indexNameExpressionResolver, + Response::new, + threadPool.executor(ThreadPool.Names.MANAGEMENT) + ); + this.featureService = featureService; + } + + @Override + protected void masterOperation(Task task, Request request, ClusterState state, ActionListener listener) throws Exception { + if (featureService.clusterHasFeature(state, DataStreamGlobalRetention.GLOBAL_RETENTION) == false) { + listener.onFailure( + new ResourceNotFoundException( + "Data stream global retention feature not found, please ensure all nodes have the feature " + + DataStreamGlobalRetention.GLOBAL_RETENTION.id() + ) + ); + return; + } + DataStreamGlobalRetention globalRetention = DataStreamGlobalRetention.getFromClusterState(state); + listener.onResponse(new Response(globalRetention == null ? DataStreamGlobalRetention.EMPTY : globalRetention)); + } + + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + } + } +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/PutDataStreamGlobalRetentionAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/PutDataStreamGlobalRetentionAction.java new file mode 100644 index 0000000000000..2aa5b4b4d3acd --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/PutDataStreamGlobalRetentionAction.java @@ -0,0 +1,202 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.datastreams.lifecycle.action; + +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.ValidateActions; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.datastreams.lifecycle.UpdateDataStreamGlobalRetentionService; +import org.elasticsearch.features.FeatureService; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ObjectParser; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +/** + * Sets the global retention for data streams (if it's not a dry run) and it returns the affected data streams. + */ +public class PutDataStreamGlobalRetentionAction { + + public static final ActionType INSTANCE = new ActionType<>( + "cluster:admin/data_stream/global_retention/put" + ); + + private PutDataStreamGlobalRetentionAction() {/* no instances */} + + public static final class Request extends MasterNodeRequest { + + public static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>( + "put_data_stream_global_retention_request", + args -> new PutDataStreamGlobalRetentionAction.Request((TimeValue) args[0], (TimeValue) args[1]) + ); + + static { + PARSER.declareField( + ConstructingObjectParser.optionalConstructorArg(), + (p, c) -> TimeValue.parseTimeValue(p.textOrNull(), DataStreamGlobalRetention.DEFAULT_RETENTION_FIELD.getPreferredName()), + DataStreamGlobalRetention.DEFAULT_RETENTION_FIELD, + ObjectParser.ValueType.STRING_OR_NULL + ); + PARSER.declareField( + ConstructingObjectParser.optionalConstructorArg(), + (p, c) -> TimeValue.parseTimeValue(p.textOrNull(), DataStreamGlobalRetention.MAX_RETENTION_FIELD.getPreferredName()), + DataStreamGlobalRetention.MAX_RETENTION_FIELD, + ObjectParser.ValueType.STRING_OR_NULL + ); + } + + private final DataStreamGlobalRetention globalRetention; + private boolean dryRun = false; + + public static PutDataStreamGlobalRetentionAction.Request parseRequest(XContentParser parser) { + return PARSER.apply(parser, null); + } + + public Request(StreamInput in) throws IOException { + super(in); + globalRetention = DataStreamGlobalRetention.read(in); + dryRun = in.readBoolean(); + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (globalRetention.equals(DataStreamGlobalRetention.EMPTY)) { + return ValidateActions.addValidationError( + "At least one of 'default_retention' or 'max_retention' should be defined." + + " If you want to remove the configuration please use the DELETE method", + validationException + ); + } + return validationException; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + globalRetention.writeTo(out); + out.writeBoolean(dryRun); + } + + public Request(@Nullable TimeValue defaultRetention, @Nullable TimeValue maxRetention) { + this.globalRetention = new DataStreamGlobalRetention(defaultRetention, maxRetention); + } + + public DataStreamGlobalRetention getGlobalRetention() { + return globalRetention; + } + + public boolean dryRun() { + return dryRun; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + PutDataStreamGlobalRetentionAction.Request request = (PutDataStreamGlobalRetentionAction.Request) o; + return Objects.equals(globalRetention, request.globalRetention) && dryRun == request.dryRun; + } + + @Override + public int hashCode() { + return Objects.hash(globalRetention, dryRun); + } + + public void dryRun(boolean dryRun) { + this.dryRun = dryRun; + } + } + + public static class TransportPutDataStreamGlobalRetentionAction extends TransportMasterNodeAction< + Request, + UpdateDataStreamGlobalRetentionResponse> { + + private final UpdateDataStreamGlobalRetentionService globalRetentionService; + private final FeatureService featureService; + + @Inject + public TransportPutDataStreamGlobalRetentionAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + UpdateDataStreamGlobalRetentionService globalRetentionService, + FeatureService featureService + ) { + super( + INSTANCE.name(), + transportService, + clusterService, + threadPool, + actionFilters, + Request::new, + indexNameExpressionResolver, + UpdateDataStreamGlobalRetentionResponse::new, + threadPool.executor(ThreadPool.Names.MANAGEMENT) + ); + this.globalRetentionService = globalRetentionService; + this.featureService = featureService; + } + + @Override + protected void masterOperation( + Task task, + Request request, + ClusterState state, + ActionListener listener + ) throws Exception { + if (featureService.clusterHasFeature(state, DataStreamGlobalRetention.GLOBAL_RETENTION) == false) { + listener.onFailure( + new ResourceNotFoundException( + "Data stream global retention feature not found, please ensure all nodes have the feature " + + DataStreamGlobalRetention.GLOBAL_RETENTION.id() + ) + ); + return; + } + List affectedDataStreams = globalRetentionService + .determineAffectedDataStreams(request.globalRetention, state); + if (request.dryRun()) { + listener.onResponse(new UpdateDataStreamGlobalRetentionResponse(false, true, affectedDataStreams)); + } else { + globalRetentionService.updateGlobalRetention(request, affectedDataStreams, listener); + } + } + + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + } +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/UpdateDataStreamGlobalRetentionResponse.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/UpdateDataStreamGlobalRetentionResponse.java new file mode 100644 index 0000000000000..d0ab707b91f20 --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/UpdateDataStreamGlobalRetentionResponse.java @@ -0,0 +1,122 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.datastreams.lifecycle.action; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ChunkedToXContentHelper; +import org.elasticsearch.common.xcontent.ChunkedToXContentObject; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +/** + * This response is used by {@link PutDataStreamGlobalRetentionAction} and {@link DeleteDataStreamGlobalRetentionAction} to + * communicate to the user the result of a global retention update and the affected data streams. + */ +public final class UpdateDataStreamGlobalRetentionResponse extends ActionResponse implements ChunkedToXContentObject { + + public static final UpdateDataStreamGlobalRetentionResponse FAILED = new UpdateDataStreamGlobalRetentionResponse( + false, + false, + List.of() + ); + + private final boolean acknowledged; + private final boolean dryRun; + private final List affectedDataStreams; + + public UpdateDataStreamGlobalRetentionResponse(StreamInput in) throws IOException { + super(in); + acknowledged = in.readBoolean(); + dryRun = in.readBoolean(); + affectedDataStreams = in.readCollectionAsImmutableList(AffectedDataStream::read); + } + + public UpdateDataStreamGlobalRetentionResponse(boolean acknowledged, List affectedDataStreams) { + this(acknowledged, false, affectedDataStreams); + } + + public UpdateDataStreamGlobalRetentionResponse(boolean acknowledged, boolean dryRun, List affectedDataStreams) { + this.acknowledged = acknowledged; + this.dryRun = dryRun; + this.affectedDataStreams = affectedDataStreams; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeBoolean(acknowledged); + out.writeBoolean(dryRun); + out.writeCollection(affectedDataStreams); + } + + @Override + public Iterator toXContentChunked(ToXContent.Params params) { + return Iterators.concat(ChunkedToXContentHelper.startObject(), Iterators.single(((builder, params1) -> { + builder.field("acknowledged", acknowledged); + builder.field("dry_run", dryRun); + return builder; + })), + ChunkedToXContentHelper.startArray("affected_data_streams"), + Iterators.map(affectedDataStreams.iterator(), affectedDataStream -> affectedDataStream::toXContent), + ChunkedToXContentHelper.endArray(), + ChunkedToXContentHelper.endObject() + ); + } + + public record AffectedDataStream(String dataStreamName, TimeValue newEffectiveRetention, TimeValue previousEffectiveRetention) + implements + Writeable, + ToXContentObject { + + public static AffectedDataStream read(StreamInput in) throws IOException { + return new AffectedDataStream(in.readString(), in.readOptionalTimeValue(), in.readOptionalTimeValue()); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(dataStreamName); + out.writeOptionalTimeValue(newEffectiveRetention); + out.writeOptionalTimeValue(previousEffectiveRetention); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("name", dataStreamName); + builder.field("new_effective_retention", newEffectiveRetention == null ? "infinite" : newEffectiveRetention.getStringRep()); + builder.field( + "previous_effective_retention", + previousEffectiveRetention == null ? "infinite" : previousEffectiveRetention.getStringRep() + ); + builder.endObject(); + return builder; + } + } + + public boolean isAcknowledged() { + return acknowledged; + } + + public boolean isDryRun() { + return dryRun; + } + + public List getAffectedDataStreams() { + return affectedDataStreams; + } +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/rest/RestDeleteDataStreamGlobalRetentionAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/rest/RestDeleteDataStreamGlobalRetentionAction.java new file mode 100644 index 0000000000000..1ac12c918605f --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/rest/RestDeleteDataStreamGlobalRetentionAction.java @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +package org.elasticsearch.datastreams.lifecycle.rest; + +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.datastreams.lifecycle.action.DeleteDataStreamGlobalRetentionAction; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.Scope; +import org.elasticsearch.rest.ServerlessScope; +import org.elasticsearch.rest.action.RestChunkedToXContentListener; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.DELETE; + +/** + * Removes the data stream global retention configuration + */ +@ServerlessScope(Scope.PUBLIC) +public class RestDeleteDataStreamGlobalRetentionAction extends BaseRestHandler { + + @Override + public String getName() { + return "delete_data_stream_global_retention_action"; + } + + @Override + public List routes() { + return List.of(new Route(DELETE, "/_data_stream/_global_retention")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + DeleteDataStreamGlobalRetentionAction.Request request = new DeleteDataStreamGlobalRetentionAction.Request(); + request.dryRun(restRequest.paramAsBoolean("dry_run", false)); + return channel -> client.execute( + DeleteDataStreamGlobalRetentionAction.INSTANCE, + request, + new RestChunkedToXContentListener<>(channel) + ); + } +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/rest/RestGetDataStreamGlobalRetentionAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/rest/RestGetDataStreamGlobalRetentionAction.java new file mode 100644 index 0000000000000..cbe403af35f72 --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/rest/RestGetDataStreamGlobalRetentionAction.java @@ -0,0 +1,47 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +package org.elasticsearch.datastreams.lifecycle.rest; + +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.datastreams.lifecycle.action.GetDataStreamGlobalRetentionAction; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.Scope; +import org.elasticsearch.rest.ServerlessScope; +import org.elasticsearch.rest.action.RestToXContentListener; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.GET; + +/** + * Retrieves the data stream global retention configuration. + */ +@ServerlessScope(Scope.PUBLIC) +public class RestGetDataStreamGlobalRetentionAction extends BaseRestHandler { + + @Override + public String getName() { + return "get_data_stream_global_retention_action"; + } + + @Override + public List routes() { + return List.of(new Route(GET, "/_data_stream/_global_retention")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + GetDataStreamGlobalRetentionAction.Request request = new GetDataStreamGlobalRetentionAction.Request(); + request.local(restRequest.paramAsBoolean("local", request.local())); + request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout())); + + return channel -> client.execute(GetDataStreamGlobalRetentionAction.INSTANCE, request, new RestToXContentListener<>(channel)); + } +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/rest/RestPutDataStreamGlobalRetentionAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/rest/RestPutDataStreamGlobalRetentionAction.java new file mode 100644 index 0000000000000..5331c4df16db0 --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/rest/RestPutDataStreamGlobalRetentionAction.java @@ -0,0 +1,53 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +package org.elasticsearch.datastreams.lifecycle.rest; + +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.datastreams.lifecycle.action.PutDataStreamGlobalRetentionAction; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.Scope; +import org.elasticsearch.rest.ServerlessScope; +import org.elasticsearch.rest.action.RestChunkedToXContentListener; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.PUT; + +/** + * Updates the default_retention and the max_retention of the data stream global retention configuration. It + * does not accept an empty payload. + */ +@ServerlessScope(Scope.PUBLIC) +public class RestPutDataStreamGlobalRetentionAction extends BaseRestHandler { + + @Override + public String getName() { + return "put_data_stream_global_retention_action"; + } + + @Override + public List routes() { + return List.of(new Route(PUT, "/_data_stream/_global_retention")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + try (XContentParser parser = restRequest.contentParser()) { + PutDataStreamGlobalRetentionAction.Request request = PutDataStreamGlobalRetentionAction.Request.parseRequest(parser); + request.dryRun(restRequest.paramAsBoolean("dry_run", false)); + return channel -> client.execute( + PutDataStreamGlobalRetentionAction.INSTANCE, + request, + new RestChunkedToXContentListener<>(channel) + ); + } + } +} diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/UpdateDataStreamGlobalRetentionServiceTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/UpdateDataStreamGlobalRetentionServiceTests.java new file mode 100644 index 0000000000000..65b8473e4df91 --- /dev/null +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/UpdateDataStreamGlobalRetentionServiceTests.java @@ -0,0 +1,200 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.datastreams.lifecycle; + +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention; +import org.elasticsearch.cluster.metadata.DataStreamLifecycle; +import org.elasticsearch.cluster.metadata.DataStreamTestHelper; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.Index; +import org.elasticsearch.test.ClusterServiceUtils; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; + +public class UpdateDataStreamGlobalRetentionServiceTests extends ESTestCase { + private static TestThreadPool threadPool; + private ClusterService clusterService; + private UpdateDataStreamGlobalRetentionService service; + + @BeforeClass + public static void setupThreadPool() { + threadPool = new TestThreadPool(getTestClass().getName()); + } + + @Before + public void setupServices() { + clusterService = ClusterServiceUtils.createClusterService(threadPool); + service = new UpdateDataStreamGlobalRetentionService(clusterService); + } + + @After + public void closeClusterService() { + clusterService.close(); + } + + @AfterClass + public static void tearDownThreadPool() { + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + threadPool = null; + } + + public void testUpdateClusterState() { + // Removing from a cluster state without global retention + { + assertThat(service.updateGlobalRetention(ClusterState.EMPTY_STATE, null), equalTo(ClusterState.EMPTY_STATE)); + assertThat( + service.updateGlobalRetention(ClusterState.EMPTY_STATE, DataStreamGlobalRetention.EMPTY), + equalTo(ClusterState.EMPTY_STATE) + ); + } + + // Removing from a cluster state with global retention + { + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .putCustom(DataStreamGlobalRetention.TYPE, randomNonEmptyGlobalRetention()) + .build(); + DataStreamGlobalRetention updatedRetention = DataStreamGlobalRetention.getFromClusterState( + service.updateGlobalRetention(clusterState, null) + ); + assertThat(updatedRetention, nullValue()); + updatedRetention = DataStreamGlobalRetention.getFromClusterState( + service.updateGlobalRetention(clusterState, DataStreamGlobalRetention.EMPTY) + ); + assertThat(updatedRetention, nullValue()); + } + + // Updating retention + { + var initialRetention = randomNonEmptyGlobalRetention(); + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .putCustom(DataStreamGlobalRetention.TYPE, initialRetention) + .build(); + var expectedRetention = randomValueOtherThan( + initialRetention, + UpdateDataStreamGlobalRetentionServiceTests::randomNonEmptyGlobalRetention + ); + var updatedRetention = DataStreamGlobalRetention.getFromClusterState( + service.updateGlobalRetention(clusterState, expectedRetention) + ); + assertThat(updatedRetention, equalTo(expectedRetention)); + } + } + + public void testDetermineAffectedDataStreams() { + Metadata.Builder builder = Metadata.builder(); + DataStream dataStreamWithoutLifecycle = DataStreamTestHelper.newInstance( + "ds-no-lifecycle", + List.of(new Index(randomAlphaOfLength(10), randomAlphaOfLength(10))), + 1, + null, + false, + null, + List.of() + ); + builder.put(dataStreamWithoutLifecycle); + String dataStreamNoRetention = "ds-no-retention"; + DataStream dataStreamWithLifecycleNoRetention = DataStreamTestHelper.newInstance( + dataStreamNoRetention, + List.of(new Index(randomAlphaOfLength(10), randomAlphaOfLength(10))), + 1, + null, + false, + DataStreamLifecycle.DEFAULT, + List.of() + ); + + builder.put(dataStreamWithLifecycleNoRetention); + DataStream dataStreamWithLifecycleShortRetention = DataStreamTestHelper.newInstance( + "ds-no-short-retention", + List.of(new Index(randomAlphaOfLength(10), randomAlphaOfLength(10))), + 1, + null, + false, + DataStreamLifecycle.newBuilder().dataRetention(TimeValue.timeValueDays(7)).build(), + List.of() + ); + builder.put(dataStreamWithLifecycleShortRetention); + String dataStreamLongRetention = "ds-long-retention"; + DataStream dataStreamWithLifecycleLongRetention = DataStreamTestHelper.newInstance( + dataStreamLongRetention, + List.of(new Index(randomAlphaOfLength(10), randomAlphaOfLength(10))), + 1, + null, + false, + DataStreamLifecycle.newBuilder().dataRetention(TimeValue.timeValueDays(365)).build(), + List.of() + ); + builder.put(dataStreamWithLifecycleLongRetention); + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).build(); + // No global retention + { + var affectedDataStreams = service.determineAffectedDataStreams(null, clusterState); + assertThat(affectedDataStreams.isEmpty(), is(true)); + } + // No difference in global retention + { + var globalRetention = randomNonEmptyGlobalRetention(); + var clusterStateWithRetention = ClusterState.builder(clusterState) + .putCustom(DataStreamGlobalRetention.TYPE, globalRetention) + .build(); + var affectedDataStreams = service.determineAffectedDataStreams(globalRetention, clusterStateWithRetention); + assertThat(affectedDataStreams.isEmpty(), is(true)); + } + // Default retention in effect + { + var globalRetention = new DataStreamGlobalRetention(TimeValue.timeValueDays(randomIntBetween(1, 10)), null); + var affectedDataStreams = service.determineAffectedDataStreams(globalRetention, clusterState); + assertThat(affectedDataStreams.size(), is(1)); + var dataStream = affectedDataStreams.get(0); + assertThat(dataStream.dataStreamName(), equalTo(dataStreamNoRetention)); + assertThat(dataStream.previousEffectiveRetention(), nullValue()); + assertThat(dataStream.newEffectiveRetention(), equalTo(globalRetention.getDefaultRetention())); + } + // Max retention in effect + { + var globalRetention = new DataStreamGlobalRetention(null, TimeValue.timeValueDays(randomIntBetween(10, 90))); + var affectedDataStreams = service.determineAffectedDataStreams(globalRetention, clusterState); + assertThat(affectedDataStreams.size(), is(2)); + var dataStream = affectedDataStreams.get(0); + assertThat(dataStream.dataStreamName(), equalTo(dataStreamLongRetention)); + assertThat(dataStream.previousEffectiveRetention(), notNullValue()); + assertThat(dataStream.newEffectiveRetention(), equalTo(globalRetention.getMaxRetention())); + dataStream = affectedDataStreams.get(1); + assertThat(dataStream.dataStreamName(), equalTo(dataStreamNoRetention)); + assertThat(dataStream.previousEffectiveRetention(), nullValue()); + assertThat(dataStream.newEffectiveRetention(), equalTo(globalRetention.getMaxRetention())); + } + } + + private static DataStreamGlobalRetention randomNonEmptyGlobalRetention() { + boolean withDefault = randomBoolean(); + return new DataStreamGlobalRetention( + withDefault ? TimeValue.timeValueDays(randomIntBetween(1, 1000)) : null, + withDefault == false || randomBoolean() ? TimeValue.timeValueDays(randomIntBetween(1000, 2000)) : null + ); + } +} diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/10_explain_lifecycle.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/10_explain_lifecycle.yml index b52c860a812ee..ca579aea4b7ef 100644 --- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/10_explain_lifecycle.yml +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/10_explain_lifecycle.yml @@ -1,8 +1,8 @@ --- "Explain backing index lifecycle": - skip: - version: " - 8.10.99" - reason: "Data stream lifecycle was released as tech preview in 8.11" + cluster_features: ["datastream_lifecycle", "data_stream.lifecycle.global_retention"] + reason: "Data stream lifecycle with effective retention was released in 8.14" features: allowed_warnings - do: allowed_warnings: @@ -36,7 +36,9 @@ indices.explain_data_lifecycle: index: $backing_index - match: { indices.$backing_index.managed_by_lifecycle: true } - - match: { indices.$backing_index.lifecycle.data_retention: '30d' } + - match: { indices.$backing_index.lifecycle.data_retention: "30d" } + - match: { indices.$backing_index.lifecycle.effective_retention: "30d"} + - match: { indices.$backing_index.lifecycle.retention_determined_by: "data_stream_configuration"} - match: { indices.$backing_index.lifecycle.enabled: true } - is_false: indices.$backing_index.lifecycle.rollover @@ -46,7 +48,9 @@ index: $backing_index include_defaults: true - match: { indices.$backing_index.managed_by_lifecycle: true } - - match: { indices.$backing_index.lifecycle.data_retention: '30d' } + - match: { indices.$backing_index.lifecycle.data_retention: "30d" } + - match: { indices.$backing_index.lifecycle.effective_retention: "30d"} + - match: { indices.$backing_index.lifecycle.retention_determined_by: "data_stream_configuration"} - is_true: indices.$backing_index.lifecycle.rollover diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/190_create_data_stream_with_lifecycle.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/190_create_data_stream_with_lifecycle.yml index 0e4bbd795c18a..e2268ee9118ac 100644 --- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/190_create_data_stream_with_lifecycle.yml +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/190_create_data_stream_with_lifecycle.yml @@ -1,8 +1,8 @@ --- "Create data stream with lifecycle": - skip: - version: " - 8.10.99" - reason: "Data stream lifecycle was GA in 8.11" + version: " - 8.13.99" + reason: "Data stream lifecycle with effective retention was released in 8.14" features: allowed_warnings - do: allowed_warnings: @@ -35,5 +35,7 @@ - match: { data_streams.0.template: 'template-with-lifecycle' } - match: { data_streams.0.hidden: false } - match: { data_streams.0.lifecycle.data_retention: '30d' } + - match: { data_streams.0.lifecycle.effective_retention: '30d'} + - match: { data_streams.0.lifecycle.retention_determined_by: 'data_stream_configuration'} - match: { data_streams.0.lifecycle.enabled: true } - is_true: data_streams.0.lifecycle.rollover diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/20_basic.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/20_basic.yml index 1ea39087211dd..3f0a91db2d7f2 100644 --- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/20_basic.yml +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/20_basic.yml @@ -1,8 +1,8 @@ setup: - skip: features: allowed_warnings - version: " - 8.10.99" - reason: "Data stream lifecycles only supported in 8.11+" + cluster_features: ["datastream_lifecycle", "data_stream.lifecycle.global_retention"] + reason: "Data stream lifecycles with global retention are only supported in 8.14+" - do: allowed_warnings: - "index template [my-lifecycle] has index patterns [data-stream-with-lifecycle] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-lifecycle] will take precedence during new index creation" @@ -47,6 +47,8 @@ setup: - length: { data_streams: 1} - match: { data_streams.0.name: data-stream-with-lifecycle } - match: { data_streams.0.lifecycle.data_retention: '10d' } + - match: { data_streams.0.lifecycle.effective_retention: '10d' } + - match: { data_streams.0.lifecycle.retention_determined_by: 'data_stream_configuration' } - match: { data_streams.0.lifecycle.enabled: true} --- @@ -61,6 +63,7 @@ setup: - length: { data_streams: 1} - match: { data_streams.0.name: simple-data-stream1 } - match: { data_streams.0.lifecycle.enabled: true} + - is_false: data_streams.0.lifecycle.effective_retention --- "Put data stream lifecycle": @@ -91,14 +94,16 @@ setup: name: "*" - length: { data_streams: 2 } - match: { data_streams.0.name: data-stream-with-lifecycle } - - match: { data_streams.0.lifecycle.data_retention: '30d' } + - match: { data_streams.0.lifecycle.data_retention: "30d" } + - is_false: data_streams.0.lifecycle.effective_retention - match: { data_streams.0.lifecycle.enabled: false} - match: { data_streams.0.lifecycle.downsampling.0.after: '10d'} - match: { data_streams.0.lifecycle.downsampling.0.fixed_interval: '1h'} - match: { data_streams.0.lifecycle.downsampling.1.after: '100d'} - match: { data_streams.0.lifecycle.downsampling.1.fixed_interval: '10h'} - match: { data_streams.1.name: simple-data-stream1 } - - match: { data_streams.1.lifecycle.data_retention: '30d' } + - match: { data_streams.1.lifecycle.data_retention: "30d" } + - is_false: data_streams.0.lifecycle.effective_retention - match: { data_streams.1.lifecycle.enabled: false} - match: { data_streams.1.lifecycle.downsampling.0.after: '10d'} - match: { data_streams.1.lifecycle.downsampling.0.fixed_interval: '1h'} @@ -123,7 +128,9 @@ setup: - match: { data_streams.0.lifecycle.data_retention: '30d' } - match: { data_streams.0.lifecycle.enabled: true} - match: { data_streams.1.name: simple-data-stream1 } - - match: { data_streams.1.lifecycle.data_retention: '30d' } + - match: { data_streams.1.lifecycle.data_retention: "30d" } + - match: { data_streams.1.lifecycle.effective_retention: "30d"} + - match: { data_streams.1.lifecycle.retention_determined_by: "data_stream_configuration"} - match: { data_streams.1.lifecycle.enabled: true} @@ -136,7 +143,9 @@ setup: include_defaults: true - length: { data_streams: 1} - match: { data_streams.0.name: data-stream-with-lifecycle } - - match: { data_streams.0.lifecycle.data_retention: '10d' } + - match: { data_streams.0.lifecycle.data_retention: "10d" } + - match: { data_streams.0.lifecycle.effective_retention: "10d"} + - match: { data_streams.0.lifecycle.retention_determined_by: "data_stream_configuration"} - is_true: data_streams.0.lifecycle.rollover --- @@ -153,7 +162,9 @@ setup: name: "simple-data-stream1" - length: { data_streams: 1 } - match: { data_streams.0.name: simple-data-stream1 } - - match: { data_streams.0.lifecycle.data_retention: '30d' } + - match: { data_streams.0.lifecycle.data_retention: "30d" } + - match: { data_streams.0.lifecycle.effective_retention: "30d"} + - match: { data_streams.0.lifecycle.retention_determined_by: "data_stream_configuration"} - match: { data_streams.0.lifecycle.enabled: true } - do: diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/30_not_found.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/30_not_found.yml index e0646ba27751e..0687c00cac8d2 100644 --- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/30_not_found.yml +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/30_not_found.yml @@ -1,7 +1,7 @@ setup: - skip: features: allowed_warnings - version: " - 8.10.99" + cluster_features: ["datastream_lifecycle"] reason: "Data stream lifecycle was GA in 8.11" - do: allowed_warnings: @@ -23,13 +23,18 @@ setup: --- "Get data stream lifecycle": + - skip: + cluster_features: ["datastream_lifecycle", "data_stream.lifecycle.global_retention"] + reason: "Data stream lifecycle with effective retention was released in 8.14" - do: indices.get_data_lifecycle: name: "*" - length: { data_streams: 1} - match: { data_streams.0.name: my-data-stream-1 } - - match: { data_streams.0.lifecycle.data_retention: '10d' } + - match: { data_streams.0.lifecycle.data_retention: "10d" } + - match: { data_streams.0.lifecycle.effective_retention: "10d"} + - match: { data_streams.0.lifecycle.retention_determined_by: "data_stream_configuration"} - match: { data_streams.0.lifecycle.enabled: true} --- @@ -43,7 +48,9 @@ setup: --- "Put data stream lifecycle does not succeed when at lease one data stream does not exist": - + - skip: + cluster_features: ["datastream_lifecycle", "data_stream.lifecycle.global_retention"] + reason: "Data stream lifecycle with effective retention was released in 8.14" - do: catch: missing indices.put_data_lifecycle: @@ -57,12 +64,16 @@ setup: name: "*" - length: { data_streams: 1 } - match: { data_streams.0.name: my-data-stream-1 } - - match: { data_streams.0.lifecycle.data_retention: '10d' } + - match: { data_streams.0.lifecycle.data_retention: "10d" } + - match: { data_streams.0.lifecycle.effective_retention: "10d"} + - match: { data_streams.0.lifecycle.retention_determined_by: "data_stream_configuration"} - match: { data_streams.0.lifecycle.enabled: true } --- "Delete data stream lifecycle does not succeed when at lease one data stream does not exist": - + - skip: + cluster_features: ["datastream_lifecycle", "data_stream.lifecycle.global_retention"] + reason: "Data stream lifecycle with effective retention was released in 8.14" - do: catch: missing indices.delete_data_lifecycle: @@ -74,5 +85,7 @@ setup: name: "*" - length: { data_streams: 1 } - match: { data_streams.0.name: my-data-stream-1 } - - match: { data_streams.0.lifecycle.data_retention: '10d' } + - match: { data_streams.0.lifecycle.data_retention: "10d" } + - match: { data_streams.0.lifecycle.effective_retention: "10d"} + - match: { data_streams.0.lifecycle.retention_determined_by: "data_stream_configuration"} - match: { data_streams.0.lifecycle.enabled: true } diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/40_global_retention.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/40_global_retention.yml new file mode 100644 index 0000000000000..c4c1c4b928fcd --- /dev/null +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/40_global_retention.yml @@ -0,0 +1,139 @@ +setup: + - skip: + features: allowed_warnings + cluster_features: ["datastream_lifecycle", "data_stream.lifecycle.global_retention"] + reason: "Global retention was added in 8.14" + - do: + allowed_warnings: + - "index template [my-lifecycle] has index patterns [my-data-stream-1] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-lifecycle] will take precedence during new index creation" + indices.put_index_template: + name: my-lifecycle + body: + index_patterns: [my-data-stream-*] + template: + settings: + index.number_of_replicas: 0 + lifecycle: {} + data_stream: {} + +--- +"CRUD global retention": + - do: + indices.create_data_stream: + name: my-data-stream-1 + - do: + cluster.health: + index: my-data-stream-1 + wait_for_status: green + - do: + data_streams.put_global_retention: + body: + default_retention: "7d" + max_retention: "90d" + - is_true: acknowledged + - is_false: dry_run + - match: {affected_data_streams.0.name: "my-data-stream-1"} + - match: {affected_data_streams.0.previous_effective_retention: "infinite"} + - match: {affected_data_streams.0.new_effective_retention: "7d"} + + - do: + data_streams.get_global_retention: { } + - match: { default_retention: "7d" } + - match: { max_retention: "90d" } + + - do: + data_streams.delete_global_retention: { } + - is_true: acknowledged + - is_false: dry_run + - match: { affected_data_streams.0.name: "my-data-stream-1" } + - match: { affected_data_streams.0.previous_effective_retention: "7d" } + - match: { affected_data_streams.0.new_effective_retention: "infinite" } + + - do: + data_streams.get_global_retention: { } + - is_false: default_retention + - is_false: max_retention + + - do: + indices.delete_data_stream: + name: my-data-stream-1 +--- +"Dry run global retention": + - do: + indices.create_data_stream: + name: my-data-stream-2 + - do: + indices.put_data_lifecycle: + name: "my-data-stream-2" + body: > + { + "data_retention": "90d" + } + - is_true: acknowledged + + - do: + data_streams.put_global_retention: + dry_run: true + body: + default_retention: "7d" + max_retention: "30d" + - is_false: acknowledged + - is_true: dry_run + - match: {affected_data_streams.0.name: "my-data-stream-2"} + - match: {affected_data_streams.0.previous_effective_retention: "90d"} + - match: {affected_data_streams.0.new_effective_retention: "30d"} + + - do: + indices.get_data_stream: + name: "my-data-stream-2" + include_defaults: true + - match: { data_streams.0.name: my-data-stream-2 } + - match: { data_streams.0.lifecycle.effective_retention: '90d' } + - match: { data_streams.0.lifecycle.retention_determined_by: 'data_stream_configuration' } + - do: + indices.delete_data_stream: + name: my-data-stream-2 +--- +"Default global retention is retrieved by data stream and index templates": + - do: + indices.create_data_stream: + name: my-data-stream-3 + + - do: + data_streams.put_global_retention: + body: + default_retention: "7d" + max_retention: "90d" + - is_true: acknowledged + - is_false: dry_run + - match: {affected_data_streams.0.name: "my-data-stream-3"} + - match: {affected_data_streams.0.previous_effective_retention: "infinite"} + - match: {affected_data_streams.0.new_effective_retention: "7d"} + + - do: + data_streams.get_global_retention: { } + - match: { default_retention: "7d" } + - match: { max_retention: "90d" } + + - do: + indices.get_data_stream: + name: "my-data-stream-3" + - match: { data_streams.0.name: my-data-stream-3 } + - match: { data_streams.0.lifecycle.effective_retention: '7d' } + - match: { data_streams.0.lifecycle.retention_determined_by: 'default_global_retention' } + - match: { data_streams.0.lifecycle.enabled: true } + + - do: + indices.get_index_template: + name: my-lifecycle + + - match: { index_templates.0.name: my-lifecycle } + - match: { index_templates.0.index_template.template.lifecycle.enabled: true } + - match: { index_templates.0.index_template.template.lifecycle.effective_retention: "7d" } + - match: { index_templates.0.index_template.template.lifecycle.retention_determined_by: "default_global_retention" } + + - do: + data_streams.delete_global_retention: { } + - do: + indices.delete_data_stream: + name: my-data-stream-3 diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/data_streams.delete_global_retention.json b/rest-api-spec/src/main/resources/rest-api-spec/api/data_streams.delete_global_retention.json new file mode 100644 index 0000000000000..1eb4621a7b055 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/data_streams.delete_global_retention.json @@ -0,0 +1,35 @@ +{ + "data_streams.delete_global_retention":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams-delete-global-retention.html", + "description":"Deletes the global retention configuration that applies to all data streams managed by the data stream lifecycle." + }, + "stability":"experimental", + "visibility":"public", + "headers":{ + "accept": [ "application/json"], + "content_type": ["application/json"] + }, + "url":{ + "paths":[ + { + "path":"/_data_stream/_global_retention", + "methods":[ + "DELETE" + ] + } + ] + }, + "params":{ + "dry_run":{ + "type":"boolean", + "description":"Determines whether the global retention provided should be applied or only the impact should be determined.", + "default":false + }, + "master_timeout":{ + "type":"time", + "description":"Specify timeout for connection to master." + } + } + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/data_streams.get_global_retention.json b/rest-api-spec/src/main/resources/rest-api-spec/api/data_streams.get_global_retention.json new file mode 100644 index 0000000000000..9084db36d7d90 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/data_streams.get_global_retention.json @@ -0,0 +1,29 @@ +{ + "data_streams.get_global_retention":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams-get-global-retention.html", + "description":"Returns global retention configuration that applies to all data streams managed by the data stream lifecycle." + }, + "stability":"experimental", + "visibility":"public", + "headers":{ + "accept": [ "application/json"] + }, + "url":{ + "paths":[ + { + "path":"/_data_stream/_global_retention", + "methods":[ + "GET" + ] + } + ] + }, + "params":{ + "local":{ + "type":"boolean", + "description":"Return the global retention retrieved from the node that received the request." + } + } + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/data_streams.put_global_retention.json b/rest-api-spec/src/main/resources/rest-api-spec/api/data_streams.put_global_retention.json new file mode 100644 index 0000000000000..9f369f4c7616d --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/data_streams.put_global_retention.json @@ -0,0 +1,39 @@ +{ + "data_streams.put_global_retention":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams-put-global-retention.html", + "description":"Updates the global retention configuration that applies to all data streams managed by the data stream lifecycle." + }, + "stability":"experimental", + "visibility":"public", + "headers":{ + "accept": [ "application/json"], + "content_type": ["application/json"] + }, + "url":{ + "paths":[ + { + "path":"/_data_stream/_global_retention", + "methods":[ + "PUT" + ] + } + ] + }, + "params":{ + "dry_run":{ + "type":"boolean", + "description":"Determines whether the global retention provided should be applied or only the impact should be determined.", + "default":false + }, + "master_timeout":{ + "type":"time", + "description":"Specify timeout for connection to master" + } + }, + "body":{ + "description":"The global retention configuration including optional values for default and max retention.", + "required":true + } + } +} diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/cluster.component_template/10_basic.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/cluster.component_template/10_basic.yml index 0308a68dae2cd..dcf883e5f4f14 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/cluster.component_template/10_basic.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/cluster.component_template/10_basic.yml @@ -117,8 +117,8 @@ --- "Add data stream lifecycle": - requires: - cluster_features: ["datastream_lifecycle"] - reason: "Data stream lifecycle was available from 8.11" + cluster_features: ["datastream_lifecycle", "data_stream.lifecycle.global_retention"] + reason: "Data stream lifecycle with global retention was available from 8.14" - do: cluster.put_component_template: @@ -141,12 +141,14 @@ - match: {component_templates.0.component_template.version: 1} - match: {component_templates.0.component_template.template.lifecycle.enabled: true} - match: {component_templates.0.component_template.template.lifecycle.data_retention: "10d"} + - match: {component_templates.0.component_template.template.lifecycle.effective_retention: "10d"} + - match: {component_templates.0.component_template.template.lifecycle.retention_determined_by: "data_stream_configuration"} --- "Get data stream lifecycle with default rollover": - requires: - cluster_features: ["datastream_lifecycle"] - reason: "Data stream lifecycle was available from 8.11" + cluster_features: ["datastream_lifecycle", "data_stream.lifecycle.global_retention"] + reason: "Data stream lifecycle with effective retention was available from 8.14" - do: cluster.put_component_template: @@ -170,4 +172,6 @@ - match: {component_templates.0.component_template.version: 1} - match: {component_templates.0.component_template.template.lifecycle.enabled: true} - match: {component_templates.0.component_template.template.lifecycle.data_retention: "10d"} + - match: {component_templates.0.component_template.template.lifecycle.effective_retention: "10d"} + - match: {component_templates.0.component_template.template.lifecycle.retention_determined_by: "data_stream_configuration"} - is_true: component_templates.0.component_template.template.lifecycle.rollover diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_index_template/10_basic.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_index_template/10_basic.yml index fcf4a75af2227..75a80f94c62bf 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_index_template/10_basic.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_index_template/10_basic.yml @@ -1,6 +1,6 @@ setup: - skip: - version: " - 7.7.99" + cluster_features: [ "templates_v2" ] reason: "index template v2 API unavailable before 7.8" features: allowed_warnings @@ -22,9 +22,6 @@ setup: --- "Get index template": - - skip: - version: " - 7.99.99" - reason: "index template v2 API has not been backported" - do: indices.get_index_template: @@ -37,10 +34,6 @@ setup: --- "Get all index templates": - - skip: - version: " - 7.7.99" - reason: "index template v2 API unavailable before 7.8" - features: allowed_warnings - do: allowed_warnings: @@ -61,10 +54,6 @@ setup: --- "Pattern matching in index templates": - - skip: - version: " - 7.7.99" - reason: "index template v2 API unavailable before 7.8" - features: allowed_warnings - do: allowed_warnings: @@ -93,9 +82,6 @@ setup: --- "Get index template with local flag": - - skip: - version: " - 7.7.99" - reason: "index template v2 API unavailable before 7.8" - do: indices.get_index_template: @@ -107,8 +93,8 @@ setup: --- "Add data stream lifecycle": - skip: - version: " - 8.10.99" - reason: "Data stream lifecycle in index templates was updated after 8.10" + cluster_features: ["datastream_lifecycle", "data_stream.lifecycle.global_retention"] + reason: "Data stream lifecycle with effective retention was released in 8.14" features: allowed_warnings - do: @@ -138,12 +124,14 @@ setup: - match: {index_templates.0.index_template.template.mappings: {properties: {field: {type: keyword}}}} - match: {index_templates.0.index_template.template.lifecycle.enabled: true} - match: {index_templates.0.index_template.template.lifecycle.data_retention: "30d"} + - match: {index_templates.0.index_template.template.lifecycle.effective_retention: "30d"} + - match: {index_templates.0.index_template.template.lifecycle.retention_determined_by: "data_stream_configuration"} --- "Get data stream lifecycle with default rollover": - skip: - version: " - 8.10.99" - reason: "Data stream lifecycle in index templates was updated after 8.10" + cluster_features: ["datastream_lifecycle", "data_stream.lifecycle.global_retention"] + reason: "Data stream lifecycle with effective retention was released in 8.14" features: allowed_warnings - do: @@ -166,12 +154,14 @@ setup: - match: {index_templates.0.index_template.index_patterns: ["data-stream-with-lifecycle-*"]} - match: {index_templates.0.index_template.template.lifecycle.enabled: true} - match: {index_templates.0.index_template.template.lifecycle.data_retention: "30d"} + - match: {index_templates.0.index_template.template.lifecycle.effective_retention: "30d"} + - match: {index_templates.0.index_template.template.lifecycle.retention_determined_by: "data_stream_configuration"} - is_true: index_templates.0.index_template.template.lifecycle.rollover --- "Reject data stream lifecycle without data stream configuration": - skip: - version: " - 8.10.99" + cluster_features: ["datastream_lifecycle"] reason: "Data stream lifecycle in index templates was updated after 8.10" - do: catch: bad_request diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.simulate_index_template/10_basic.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.simulate_index_template/10_basic.yml index 7256c1736ebd9..4950f9bbf2dc5 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.simulate_index_template/10_basic.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.simulate_index_template/10_basic.yml @@ -227,8 +227,8 @@ --- "Simulate index template with lifecycle and include defaults": - skip: - version: " - 8.10.99" - reason: "Lifecycle is only available in 8.11+" + version: " - 8.13.99" + reason: "Data stream lifecycle with effective retention was released in 8.14" features: ["default_shards"] - do: @@ -248,5 +248,7 @@ - match: {template.lifecycle.enabled: true} - match: {template.lifecycle.data_retention: "7d"} + - match: {template.lifecycle.effective_retention: "7d"} + - match: {template.lifecycle.retention_determined_by: "data_stream_configuration"} - is_true: template.lifecycle.rollover - match: {overlapping: []} diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.simulate_template/10_basic.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.simulate_template/10_basic.yml index 887a2bc2ce705..2e27a694f4705 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.simulate_template/10_basic.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.simulate_template/10_basic.yml @@ -202,8 +202,8 @@ --- "Simulate template with lifecycle and include defaults": - skip: - version: " - 8.10.99" - reason: "Lifecycle is only available in 8.11+" + version: " - 8.13.99" + reason: "Data stream lifecycle with effective retention was released in 8.14" features: ["default_shards"] - do: @@ -223,4 +223,6 @@ - match: {template.lifecycle.enabled: true} - match: {template.lifecycle.data_retention: "7d"} + - match: {template.lifecycle.effective_retention: "7d"} + - match: {template.lifecycle.retention_determined_by: "data_stream_configuration"} - is_true: template.lifecycle.rollover diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/template/get/GetComponentTemplateAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/template/get/GetComponentTemplateAction.java index 626feeed161f4..d352f1be5e65a 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/template/get/GetComponentTemplateAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/template/get/GetComponentTemplateAction.java @@ -137,10 +137,8 @@ public Response(StreamInput in) throws IOException { } } - public Response(Map componentTemplates) { - this.componentTemplates = componentTemplates; - this.rolloverConfiguration = null; - this.globalRetention = null; + public Response(Map componentTemplates, @Nullable DataStreamGlobalRetention globalRetention) { + this(componentTemplates, null, globalRetention); } public Response( diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/template/get/GetComposableIndexTemplateAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/template/get/GetComposableIndexTemplateAction.java index 515b8f9fd5c1a..5c06494df7021 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/template/get/GetComposableIndexTemplateAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/template/get/GetComposableIndexTemplateAction.java @@ -140,10 +140,8 @@ public Response(StreamInput in) throws IOException { } } - public Response(Map indexTemplates) { - this.indexTemplates = indexTemplates; - this.rolloverConfiguration = null; - this.globalRetention = null; + public Response(Map indexTemplates, @Nullable DataStreamGlobalRetention globalRetention) { + this(indexTemplates, null, globalRetention); } public Response( diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/template/get/TransportGetComponentTemplateAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/template/get/TransportGetComponentTemplateAction.java index d081570b2a365..d238209fa88f0 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/template/get/TransportGetComponentTemplateAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/template/get/TransportGetComponentTemplateAction.java @@ -102,7 +102,7 @@ protected void masterOperation( ) ); } else { - listener.onResponse(new GetComponentTemplateAction.Response(results)); + listener.onResponse(new GetComponentTemplateAction.Response(results, DataStreamGlobalRetention.getFromClusterState(state))); } } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/template/get/TransportGetComposableIndexTemplateAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/template/get/TransportGetComposableIndexTemplateAction.java index 99360d2eb7bf8..35a91e70fd787 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/template/get/TransportGetComposableIndexTemplateAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/template/get/TransportGetComposableIndexTemplateAction.java @@ -100,7 +100,9 @@ protected void masterOperation( ) ); } else { - listener.onResponse(new GetComposableIndexTemplateAction.Response(results)); + listener.onResponse( + new GetComposableIndexTemplateAction.Response(results, DataStreamGlobalRetention.getFromClusterState(state)) + ); } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetention.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetention.java index f3b88ba6083c3..3b58acfedbc15 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetention.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetention.java @@ -18,6 +18,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.features.NodeFeature; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; @@ -35,10 +36,13 @@ public final class DataStreamGlobalRetention extends AbstractNamedDiffable= MIN_RETENTION_VALUE.getMillis(); + } + public static DataStreamGlobalRetention read(StreamInput in) throws IOException { return new DataStreamGlobalRetention(in.readOptionalTimeValue(), in.readOptionalTimeValue()); } diff --git a/server/src/main/java/org/elasticsearch/rest/action/cat/RestTemplatesAction.java b/server/src/main/java/org/elasticsearch/rest/action/cat/RestTemplatesAction.java index 1c52022d59f51..929de981ce146 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/cat/RestTemplatesAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/cat/RestTemplatesAction.java @@ -25,7 +25,6 @@ import org.elasticsearch.rest.ServerlessScope; import org.elasticsearch.rest.action.RestResponseListener; -import java.util.Collections; import java.util.List; import java.util.Map; @@ -78,7 +77,7 @@ protected RestChannelConsumer doCatRequest(final RestRequest request, NodeClient getComposableTemplatesRequest, getComposableTemplatesStep.delegateResponse((l, e) -> { if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) { - l.onResponse(new GetComposableIndexTemplateAction.Response(Collections.emptyMap())); + l.onResponse(new GetComposableIndexTemplateAction.Response(Map.of(), null)); } else { l.onFailure(e); } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/template/get/GetComposableIndexTemplateResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/template/get/GetComposableIndexTemplateResponseTests.java index fd13d1cdc98b0..aa9989257aa39 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/template/get/GetComposableIndexTemplateResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/template/get/GetComposableIndexTemplateResponseTests.java @@ -10,10 +10,11 @@ import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.ComposableIndexTemplateTests; +import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention; +import org.elasticsearch.cluster.metadata.DataStreamGlobalRetentionTests; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.test.AbstractWireSerializingTestCase; -import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -25,14 +26,15 @@ protected Writeable.Reader instanceRe @Override protected GetComposableIndexTemplateAction.Response createTestInstance() { + DataStreamGlobalRetention globalRetention = randomBoolean() ? null : DataStreamGlobalRetentionTests.randomGlobalRetention(); if (randomBoolean()) { - return new GetComposableIndexTemplateAction.Response(Collections.emptyMap()); + return new GetComposableIndexTemplateAction.Response(Map.of(), globalRetention); } Map templates = new HashMap<>(); for (int i = 0; i < randomIntBetween(1, 4); i++) { templates.put(randomAlphaOfLength(4), ComposableIndexTemplateTests.randomInstance()); } - return new GetComposableIndexTemplateAction.Response(templates); + return new GetComposableIndexTemplateAction.Response(templates, globalRetention); } @Override diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetentionTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetentionTests.java index e65b4d41bbe02..f482ab4307860 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetentionTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetentionTests.java @@ -94,5 +94,7 @@ public void testValidation() { TimeValue.timeValueDays(randomIntBetween(1, 1000)) ) ); + expectThrows(IllegalArgumentException.class, () -> new DataStreamGlobalRetention(TimeValue.ZERO, null)); + expectThrows(IllegalArgumentException.class, () -> new DataStreamGlobalRetention(null, TimeValue.ZERO)); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index 9db7d1047e249..56d36d8fb18b0 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -1128,10 +1128,10 @@ public void testGetIndicesPastRetention() { long now = System.currentTimeMillis(); List creationAndRolloverTimes = List.of( - DataStreamMetadata.dataStreamMetadata(now - 5000, now - 4000), - DataStreamMetadata.dataStreamMetadata(now - 4000, now - 3000), - DataStreamMetadata.dataStreamMetadata(now - 3000, now - 2000), - DataStreamMetadata.dataStreamMetadata(now - 2000, now - 1000), + DataStreamMetadata.dataStreamMetadata(now - 5000_000, now - 4000_000), + DataStreamMetadata.dataStreamMetadata(now - 4000_000, now - 3000_000), + DataStreamMetadata.dataStreamMetadata(now - 3000_000, now - 2000_000), + DataStreamMetadata.dataStreamMetadata(now - 2000_000, now - 1000_000), DataStreamMetadata.dataStreamMetadata(now, null) ); @@ -1153,8 +1153,8 @@ public void testGetIndicesPastRetention() { { // no retention configured but we have default retention DataStreamGlobalRetention globalRetention = new DataStreamGlobalRetention( - TimeValue.timeValueMillis(2500), - randomBoolean() ? TimeValue.timeValueMillis(randomIntBetween(2500, 5000)) : null + TimeValue.timeValueSeconds(2500), + randomBoolean() ? TimeValue.timeValueSeconds(randomIntBetween(2500, 5000)) : null ); Metadata.Builder builder = Metadata.builder(); DataStream dataStream = createDataStream( @@ -1174,7 +1174,7 @@ public void testGetIndicesPastRetention() { { // no retention configured but we have max retention - DataStreamGlobalRetention globalRetention = new DataStreamGlobalRetention(null, TimeValue.timeValueMillis(2500)); + DataStreamGlobalRetention globalRetention = new DataStreamGlobalRetention(null, TimeValue.timeValueSeconds(2500)); Metadata.Builder builder = Metadata.builder(); DataStream dataStream = createDataStream( builder, @@ -1198,7 +1198,7 @@ public void testGetIndicesPastRetention() { dataStreamName, creationAndRolloverTimes, settings(IndexVersion.current()), - DataStreamLifecycle.newBuilder().dataRetention(2500).build() + DataStreamLifecycle.newBuilder().dataRetention(TimeValue.timeValueSeconds(2500)).build() ); Metadata metadata = builder.build(); @@ -1237,7 +1237,7 @@ public void testGetIndicesPastRetention() { dataStreamName, creationAndRolloverTimes, settings(IndexVersion.current()), - DataStreamLifecycle.newBuilder().dataRetention(6000).build() + DataStreamLifecycle.newBuilder().dataRetention(TimeValue.timeValueSeconds(6000)).build() ); Metadata metadata = builder.build(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/ClusterPrivilegeResolver.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/ClusterPrivilegeResolver.java index 501bd307b74ca..f87123d48ccea 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/ClusterPrivilegeResolver.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/ClusterPrivilegeResolver.java @@ -358,6 +358,14 @@ public class ClusterPrivilegeResolver { "cluster:admin/xpack/connector/secret/put" ) ); + public static final NamedClusterPrivilege MONITOR_GLOBAL_RETENTION = new ActionClusterPrivilege( + "monitor_data_stream_global_retention", + Set.of("cluster:monitor/data_stream/global_retention/*") + ); + public static final NamedClusterPrivilege MANAGE_GLOBAL_RETENTION = new ActionClusterPrivilege( + "manage_data_stream_global_retention", + Set.of("cluster:admin/data_stream/global_retention/*", "cluster:monitor/data_stream/global_retention/*") + ); private static final Map VALUES = sortByAccessLevel( Stream.of( @@ -417,7 +425,9 @@ public class ClusterPrivilegeResolver { CROSS_CLUSTER_SEARCH, CROSS_CLUSTER_REPLICATION, READ_CONNECTOR_SECRETS, - WRITE_CONNECTOR_SECRETS + WRITE_CONNECTOR_SECRETS, + MONITOR_GLOBAL_RETENTION, + MANAGE_GLOBAL_RETENTION ).filter(Objects::nonNull).toList() ); diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index e65db8632062d..1009499d91b41 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -22,6 +22,8 @@ public class Constants { "cluster:admin/component_template/delete", "cluster:admin/component_template/get", "cluster:admin/component_template/put", + "cluster:admin/data_stream/global_retention/delete", + "cluster:admin/data_stream/global_retention/put", "cluster:admin/deprecation/cache/reset", "cluster:admin/fleet/secrets/delete", "cluster:admin/fleet/secrets/get", @@ -326,6 +328,7 @@ public class Constants { "cluster:monitor/ccr/follow_info", "cluster:monitor/ccr/follow_stats", "cluster:monitor/ccr/stats", + "cluster:monitor/data_stream/global_retention/get", "cluster:monitor/data_stream/lifecycle/stats", "cluster:monitor/eql/async/status", "cluster:monitor/fetch/health/info", diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/privileges/11_builtin.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/privileges/11_builtin.yml index 7e65691ea17dd..bb784f52884f6 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/privileges/11_builtin.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/privileges/11_builtin.yml @@ -15,5 +15,5 @@ setup: # This is fragile - it needs to be updated every time we add a new cluster/index privilege # I would much prefer we could just check that specific entries are in the array, but we don't have # an assertion for that - - length: { "cluster" : 57 } + - length: { "cluster" : 59 } - length: { "index" : 22 }