diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java index 1229d62dce047..a704b350dba4b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java @@ -225,6 +225,7 @@ public class InternalUsers { .build() }, null, null, + new String[] {}, MetadataUtils.DEFAULT_RESERVED_METADATA, Map.of() diff --git a/x-pack/plugin/core/template-resources/src/main/resources/reindex-data-stream-pipeline.json b/x-pack/plugin/core/template-resources/src/main/resources/reindex-data-stream-pipeline.json new file mode 100644 index 0000000000000..e8c3352131700 --- /dev/null +++ b/x-pack/plugin/core/template-resources/src/main/resources/reindex-data-stream-pipeline.json @@ -0,0 +1,16 @@ +{ + "description": "This pipeline sanitizes documents that are being reindexed into a data stream using the reindex data stream API. It is an internal pipeline and should not be modified.", + "processors": [ + { + "set": { + "field": "@timestamp", + "value": 0, + "override": false + } + } + ], + "_meta": { + "managed": true + }, + "version": ${xpack.migrate.reindex.pipeline.version} +} diff --git a/x-pack/plugin/migrate/build.gradle b/x-pack/plugin/migrate/build.gradle index 283362a637e78..f179a311e0fea 100644 --- a/x-pack/plugin/migrate/build.gradle +++ b/x-pack/plugin/migrate/build.gradle @@ -19,6 +19,7 @@ dependencies { testImplementation project(xpackModule('ccr')) testImplementation project(':modules:data-streams') testImplementation project(path: ':modules:reindex') + testImplementation project(path: ':modules:ingest-common') } addQaCheckDependencies(project) diff --git a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java index e22c26bd6b8bb..1ffeaed0bd1d3 100644 --- a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java +++ b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java @@ -24,12 +24,17 @@ import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.ingest.DeletePipelineRequest; +import org.elasticsearch.action.ingest.DeletePipelineTransportAction; +import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.ingest.PutPipelineTransportAction; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.MappingMetadata; import org.elasticsearch.cluster.metadata.MetadataIndexStateService; import org.elasticsearch.cluster.metadata.Template; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.time.DateFormatter; @@ -38,12 +43,15 @@ import org.elasticsearch.datastreams.DataStreamsPlugin; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.ingest.common.IngestCommonPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.reindex.ReindexPlugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.migrate.MigratePlugin; +import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry; +import org.junit.After; import java.io.IOException; import java.time.Instant; @@ -56,19 +64,27 @@ import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.equalTo; public class ReindexDatastreamIndexTransportActionIT extends ESIntegTestCase { + @After + private void cleanupCluster() throws Exception { + clusterAdmin().execute( + DeletePipelineTransportAction.TYPE, + new DeletePipelineRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, MigrateTemplateRegistry.REINDEX_DATA_STREAM_PIPELINE_NAME) + ); + super.cleanUpCluster(); + } private static final String MAPPING = """ { "_doc":{ "dynamic":"strict", "properties":{ - "foo1":{ - "type":"text" - } + "foo1": {"type":"text"}, + "@timestamp": {"type":"date"} } } } @@ -76,7 +92,116 @@ public class ReindexDatastreamIndexTransportActionIT extends ESIntegTestCase { @Override protected Collection> nodePlugins() { - return List.of(MigratePlugin.class, ReindexPlugin.class, MockTransportService.TestPlugin.class, DataStreamsPlugin.class); + return List.of( + MigratePlugin.class, + ReindexPlugin.class, + MockTransportService.TestPlugin.class, + DataStreamsPlugin.class, + IngestCommonPlugin.class + ); + } + + private static String DATA_STREAM_MAPPING = """ + { + "dynamic": true, + "_data_stream_timestamp": { + "enabled": true + }, + "properties": { + "@timestamp": {"type":"date"} + } + } + """; + + public void testTimestamp0AddedIfMissing() { + var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + indicesAdmin().create(new CreateIndexRequest(sourceIndex)).actionGet(); + + // add doc without timestamp + addDoc(sourceIndex, "{\"foo\":\"baz\"}"); + + // add timestamp to source mapping + indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get(); + + // call reindex + var destIndex = client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex)) + .actionGet() + .getDestIndex(); + + assertResponse(prepareSearch(destIndex), response -> { + Map sourceAsMap = response.getHits().getAt(0).getSourceAsMap(); + assertEquals(Integer.valueOf(0), sourceAsMap.get(DEFAULT_TIMESTAMP_FIELD)); + }); + } + + public void testTimestampNotAddedIfExists() { + var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + indicesAdmin().create(new CreateIndexRequest(sourceIndex)).actionGet(); + + // add doc with timestamp + String time = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis()); + var doc = String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, time); + addDoc(sourceIndex, doc); + + // add timestamp to source mapping + indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get(); + + // call reindex + var destIndex = client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex)) + .actionGet() + .getDestIndex(); + + assertResponse(prepareSearch(destIndex), response -> { + Map sourceAsMap = response.getHits().getAt(0).getSourceAsMap(); + assertEquals(time, sourceAsMap.get(DEFAULT_TIMESTAMP_FIELD)); + }); + } + + public void testCustomReindexPipeline() { + String customPipeline = """ + { + "processors": [ + { + "set": { + "field": "cheese", + "value": "gorgonzola" + } + } + ], + "version": 1000 + } + """; + + PutPipelineRequest putRequest = new PutPipelineRequest( + TEST_REQUEST_TIMEOUT, + TEST_REQUEST_TIMEOUT, + MigrateTemplateRegistry.REINDEX_DATA_STREAM_PIPELINE_NAME, + new BytesArray(customPipeline), + XContentType.JSON + ); + + clusterAdmin().execute(PutPipelineTransportAction.TYPE, putRequest).actionGet(); + + var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + indicesAdmin().create(new CreateIndexRequest(sourceIndex)).actionGet(); + + // add doc with timestamp + String time = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis()); + var doc = String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, time); + addDoc(sourceIndex, doc); + + // add timestamp to source mapping + indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get(); + + String destIndex = client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex)) + .actionGet() + .getDestIndex(); + + assertResponse(prepareSearch(destIndex), response -> { + Map sourceAsMap = response.getHits().getAt(0).getSourceAsMap(); + assertEquals("gorgonzola", sourceAsMap.get("cheese")); + assertEquals(time, sourceAsMap.get(DEFAULT_TIMESTAMP_FIELD)); + }); } public void testDestIndexDeletedIfExists() throws Exception { @@ -200,7 +325,7 @@ public void testSettingsAddedBeforeReindex() throws Exception { assertEquals(refreshInterval, settingsResponse.getSetting(destIndex, IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey())); } - public void testMappingsAddedToDestIndex() throws Exception { + public void testMappingsAddedToDestIndex() { var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); indicesAdmin().create(new CreateIndexRequest(sourceIndex).mapping(MAPPING)).actionGet(); @@ -479,12 +604,9 @@ private static String formatInstant(Instant instant) { return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant); } - private static String getIndexUUID(String index) { - return indicesAdmin().getIndex(new GetIndexRequest(TEST_REQUEST_TIMEOUT).indices(index)) - .actionGet() - .getSettings() - .get(index) - .get(IndexMetadata.SETTING_INDEX_UUID); + void addDoc(String index, String doc) { + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(new IndexRequest(index).opType(DocWriteRequest.OpType.CREATE).source(doc, XContentType.JSON)); + client().bulk(bulkRequest).actionGet(); } - } diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java index f5f8beba26d8f..7811e84ac9f53 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java @@ -55,6 +55,7 @@ import org.elasticsearch.xpack.migrate.task.ReindexDataStreamTaskParams; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.function.Predicate; import java.util.function.Supplier; @@ -64,6 +65,18 @@ import static org.elasticsearch.xpack.migrate.task.ReindexDataStreamPersistentTaskExecutor.MAX_CONCURRENT_INDICES_REINDEXED_PER_DATA_STREAM_SETTING; public class MigratePlugin extends Plugin implements ActionPlugin, PersistentTaskPlugin { + @Override + public Collection createComponents(PluginServices services) { + var registry = new MigrateTemplateRegistry( + services.environment().settings(), + services.clusterService(), + services.threadPool(), + services.client(), + services.xContentRegistry() + ); + registry.initialize(); + return List.of(registry); + } @Override public List getRestHandlers( diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigrateTemplateRegistry.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigrateTemplateRegistry.java new file mode 100644 index 0000000000000..2a9dc97e16352 --- /dev/null +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigrateTemplateRegistry.java @@ -0,0 +1,55 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.migrate; + +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xcontent.NamedXContentRegistry; +import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.template.IndexTemplateRegistry; +import org.elasticsearch.xpack.core.template.IngestPipelineConfig; +import org.elasticsearch.xpack.core.template.JsonIngestPipelineConfig; + +import java.util.List; + +public class MigrateTemplateRegistry extends IndexTemplateRegistry { + + // This number must be incremented when we make changes to built-in pipeline. + // If a specific user pipeline is needed instead, its version should be set to a value higher than the REGISTRY_VERSION. + static final int REGISTRY_VERSION = 1; + public static final String REINDEX_DATA_STREAM_PIPELINE_NAME = "reindex-data-stream-pipeline"; + private static final String TEMPLATE_VERSION_VARIABLE = "xpack.migrate.reindex.pipeline.version"; + + public MigrateTemplateRegistry( + Settings nodeSettings, + ClusterService clusterService, + ThreadPool threadPool, + Client client, + NamedXContentRegistry xContentRegistry + ) { + super(nodeSettings, clusterService, threadPool, client, xContentRegistry); + } + + @Override + protected List getIngestPipelines() { + return List.of( + new JsonIngestPipelineConfig( + REINDEX_DATA_STREAM_PIPELINE_NAME, + "/" + REINDEX_DATA_STREAM_PIPELINE_NAME + ".json", + REGISTRY_VERSION, + TEMPLATE_VERSION_VARIABLE + ) + ); + } + + @Override + protected String getOrigin() { + return ClientHelper.STACK_ORIGIN; + } +} diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java index 456c61451f7f2..792ec4ec2b6f9 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java @@ -53,6 +53,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate; +import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry; import java.util.Locale; import java.util.Map; @@ -271,6 +272,7 @@ void reindex(String sourceIndexName, String destIndexName, ActionListener