Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/121617.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 121617
summary: Add pipeline to clean docs during data stream reindex
area: Data streams
type: bug
issues: []
1 change: 1 addition & 0 deletions x-pack/plugin/migrate/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies {
testImplementation project(xpackModule('ccr'))
testImplementation project(':modules:data-streams')
testImplementation project(path: ':modules:reindex')
testImplementation project(path: ':modules:ingest-common')
}

addQaCheckDependencies(project)
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,17 @@
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.DeletePipelineTransportAction;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineTransportAction;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.metadata.MetadataIndexStateService;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateFormatter;
Expand All @@ -38,12 +43,15 @@
import org.elasticsearch.datastreams.DataStreamsPlugin;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.ingest.common.IngestCommonPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.reindex.ReindexPlugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.migrate.MigratePlugin;
import org.elasticsearch.xpack.migrate.ReindexDataStreamPipeline;
import org.junit.After;

import java.io.IOException;
import java.time.Instant;
Expand All @@ -56,27 +64,143 @@
import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;

public class ReindexDatastreamIndexTransportActionIT extends ESIntegTestCase {
@After
private void cleanupCluster() throws Exception {
clusterAdmin().execute(
DeletePipelineTransportAction.TYPE,
new DeletePipelineRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, ReindexDataStreamPipeline.PIPELINE_NAME)
);
super.cleanUpCluster();
}

private static final String MAPPING = """
{
"_doc":{
"dynamic":"strict",
"properties":{
"foo1":{
"type":"text"
}
"foo1": {"type":"text"},
"@timestamp": {"type":"date"}
}
}
}
""";

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(MigratePlugin.class, ReindexPlugin.class, MockTransportService.TestPlugin.class, DataStreamsPlugin.class);
return List.of(
MigratePlugin.class,
ReindexPlugin.class,
MockTransportService.TestPlugin.class,
DataStreamsPlugin.class,
IngestCommonPlugin.class
);
}

private static String DATA_STREAM_MAPPING = """
{
"dynamic": true,
"_data_stream_timestamp": {
"enabled": true
},
"properties": {
"@timestamp": {"type":"date"}
}
}
""";

public void testTimestamp0AddedIfMissing() {
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
indicesAdmin().create(new CreateIndexRequest(sourceIndex)).actionGet();

// add doc without timestamp
addDoc(sourceIndex, "{\"foo\":\"baz\"}");

// add timestamp to source mapping
indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get();

// call reindex
var destIndex = client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex))
.actionGet()
.getDestIndex();

assertResponse(prepareSearch(destIndex), response -> {
Map<String, Object> sourceAsMap = response.getHits().getAt(0).getSourceAsMap();
assertEquals(Integer.valueOf(0), sourceAsMap.get(DEFAULT_TIMESTAMP_FIELD));
});
}

public void testTimestampNotAddedIfExists() {
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
indicesAdmin().create(new CreateIndexRequest(sourceIndex)).actionGet();

// add doc with timestamp
String time = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis());
var doc = String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, time);
addDoc(sourceIndex, doc);

// add timestamp to source mapping
indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get();

// call reindex
var destIndex = client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex))
.actionGet()
.getDestIndex();

assertResponse(prepareSearch(destIndex), response -> {
Map<String, Object> sourceAsMap = response.getHits().getAt(0).getSourceAsMap();
assertEquals(time, sourceAsMap.get(DEFAULT_TIMESTAMP_FIELD));
});
}

public void testCustomReindexPipeline() {
String customPipeline = """
{
"processors": [
{
"set": {
"field": "cheese",
"value": "gorgonzola"
}
}
]
}
""";

PutPipelineRequest putRequest = new PutPipelineRequest(
TEST_REQUEST_TIMEOUT,
TEST_REQUEST_TIMEOUT,
ReindexDataStreamPipeline.PIPELINE_NAME,
new BytesArray(customPipeline),
XContentType.JSON
);

clusterAdmin().execute(PutPipelineTransportAction.TYPE, putRequest).actionGet();

var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
indicesAdmin().create(new CreateIndexRequest(sourceIndex)).actionGet();

// add doc with timestamp
String time = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis());
var doc = String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, time);
addDoc(sourceIndex, doc);

// add timestamp to source mapping
indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get();

String destIndex = client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex))
.actionGet()
.getDestIndex();

assertResponse(prepareSearch(destIndex), response -> {
Map<String, Object> sourceAsMap = response.getHits().getAt(0).getSourceAsMap();
assertEquals("gorgonzola", sourceAsMap.get("cheese"));
assertEquals(time, sourceAsMap.get(DEFAULT_TIMESTAMP_FIELD));
});
}

public void testDestIndexDeletedIfExists() throws Exception {
Expand Down Expand Up @@ -200,7 +324,7 @@ public void testSettingsAddedBeforeReindex() throws Exception {
assertEquals(refreshInterval, settingsResponse.getSetting(destIndex, IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey()));
}

public void testMappingsAddedToDestIndex() throws Exception {
public void testMappingsAddedToDestIndex() {
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
indicesAdmin().create(new CreateIndexRequest(sourceIndex).mapping(MAPPING)).actionGet();

Expand Down Expand Up @@ -479,12 +603,9 @@ private static String formatInstant(Instant instant) {
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
}

private static String getIndexUUID(String index) {
return indicesAdmin().getIndex(new GetIndexRequest(TEST_REQUEST_TIMEOUT).indices(index))
.actionGet()
.getSettings()
.get(index)
.get(IndexMetadata.SETTING_INDEX_UUID);
void addDoc(String index, String doc) {
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(new IndexRequest(index).opType(DocWriteRequest.OpType.CREATE).source(doc, XContentType.JSON));
client().bulk(bulkRequest).actionGet();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.migrate;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineTransportAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.ingest.IngestMetadata;
import org.elasticsearch.ingest.PipelineConfiguration;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentType;

import java.io.IOException;
import java.io.UncheckedIOException;

/**
* Manages the definitions and lifecycle of the ingest pipeline used by the reindex data stream operation.
*/
public class ReindexDataStreamPipeline {

/**
* The last version of the distribution that updated the pipeline definition.
*/
static final int LAST_UPDATED_VERSION = Version.V_8_18_0.id;

public static final String PIPELINE_NAME = "reindex-data-stream";

/**
* Checks if the current version of the pipeline definition is installed in the cluster
* @param clusterState The cluster state to check
* @return true if a pipeline exists that is compatible with this version, false otherwise
*/
public static boolean exists(ClusterState clusterState) {
final IngestMetadata ingestMetadata = clusterState.getMetadata().custom(IngestMetadata.TYPE);
// we ensure that we both have the pipeline and its version represents the current (or later) version
if (ingestMetadata != null) {
final PipelineConfiguration pipeline = ingestMetadata.getPipelines().get(PIPELINE_NAME);
if (pipeline != null) {
Object version = pipeline.getConfig().get("version");
// do not replace if pipeline was created by user and has no version
if (version == null) {
return true;
}
return version instanceof Number number && number.intValue() >= LAST_UPDATED_VERSION;
}
}
return false;
}

/**
* Creates a pipeline with the current version's pipeline definition
* @param client Client used to execute put pipeline
* @param listener Callback used after pipeline has been created
*/
public static void create(Client client, ActionListener<AcknowledgedResponse> listener) {
final BytesReference pipeline = BytesReference.bytes(currentPipelineDefinition());
client.execute(
PutPipelineTransportAction.TYPE,
new PutPipelineRequest(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably ought to set the parent task id here so that it's cancellable (although I'm not 100% sure it's worth it).

MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT,
MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT,
PIPELINE_NAME,
pipeline,
XContentType.JSON
),
listener
);
}

private static XContentBuilder currentPipelineDefinition() {
try {
XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent());
builder.startObject();
{
builder.field(
"description",
"This pipeline sanitizes documents that are being reindexed into a data stream using the reindex data stream API. "
+ "It is an internal pipeline and should not be modified."
);
builder.field("version", LAST_UPDATED_VERSION);
builder.startArray("processors");
{
builder.startObject();
{
builder.startObject("set");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this going to set every @timestamp to 0? Shouldn't this be a script processor that checks if it exists first?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh never mind! This is what override is for.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be interesting to compare which performs better: a set with override: false or a set with an if condition checking for a null @timestamp

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard to imagine that a script would be faster, but it would be interesting. We could always change that later though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And if the script is faster, that would probably shame @joegallo into action, making the set processor faster.

{
builder.field("field", "@timestamp");
builder.field("value", 0);
builder.field("override", false);
}
builder.endObject();
}
builder.endObject();
}
builder.endArray();
}
builder.endObject();
return builder;
} catch (final IOException e) {
throw new UncheckedIOException("Failed to create pipeline for reindex data stream document sanitization", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate;
import org.elasticsearch.xpack.migrate.ReindexDataStreamPipeline;

import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -153,7 +154,8 @@ protected void doExecute(
return;
}
final boolean wasClosed = isClosed(sourceIndex);
SubscribableListener.<AcknowledgedResponse>newForked(l -> setBlockWrites(sourceIndexName, l, taskId))
SubscribableListener.newForked(this::prepareReindexOperation)
.<AcknowledgedResponse>andThen(l -> setBlockWrites(sourceIndexName, l, taskId))
.<OpenIndexResponse>andThen(l -> openIndexIfClosed(sourceIndexName, wasClosed, l, taskId))
.<BroadcastResponse>andThen(l -> refresh(sourceIndexName, l, taskId))
.<AcknowledgedResponse>andThen(l -> deleteDestIfExists(destIndexName, l, taskId))
Expand Down Expand Up @@ -197,6 +199,14 @@ private static boolean isClosed(IndexMetadata indexMetadata) {
return indexMetadata.getState().equals(IndexMetadata.State.CLOSE);
}

private void prepareReindexOperation(ActionListener<AcknowledgedResponse> listener) {
if (ReindexDataStreamPipeline.exists(clusterService.state())) {
listener.onResponse(null);
} else {
ReindexDataStreamPipeline.create(client, listener);
}
}

private void setBlockWrites(String sourceIndexName, ActionListener<AcknowledgedResponse> listener, TaskId parentTaskId) {
logger.debug("Setting write block on source index [{}]", sourceIndexName);
addBlockToIndex(WRITE, sourceIndexName, new ActionListener<>() {
Expand Down Expand Up @@ -269,6 +279,7 @@ void reindex(String sourceIndexName, String destIndexName, ActionListener<BulkBy
logger.debug("Reindex to destination index [{}] from source index [{}]", destIndexName, sourceIndexName);
var reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(sourceIndexName);
reindexRequest.setDestPipeline(ReindexDataStreamPipeline.PIPELINE_NAME);
reindexRequest.getSearchRequest().allowPartialSearchResults(false);
reindexRequest.getSearchRequest().source().fetchSource(true);
reindexRequest.setDestIndex(destIndexName);
Expand Down