Skip to content

Conversation

@parkertimmins
Copy link
Contributor

It is possible for documents in a data stream to not have a @timestamp field. For example, if an existing index is added to a data stream. But, this will cause the reindex operation to fail as the destination index will already contained the _data_stream_timestamp value from the source mappings, causing each document to be checked for an @timestamp field.

This change adds a pipeline, tentatively called reindex-data-stream, which adds a @timestamp field with a value of 0 to destination docs if a timestamp is missing. If a user creates a pipeline with this name, and without a version field, the user's pipeline will be used instead of the built in pipeline.

@parkertimmins parkertimmins added :Data Management/Data streams Data streams and their lifecycles auto-backport Automatically create backport pull requests when merged v9.0.0 v8.18.0 v8.18.1 v9.0.1 labels Feb 4, 2025
@parkertimmins parkertimmins marked this pull request as ready for review February 4, 2025 15:06
@elasticsearchmachine elasticsearchmachine added the Team:Data Management Meta label for data/management team label Feb 4, 2025
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-data-management (Team:Data Management)

@elasticsearchmachine
Copy link
Collaborator

Hi @parkertimmins, I've created a changelog YAML for you.

@masseyke masseyke requested review from dakrone and masseyke February 4, 2025 15:15
final BytesReference pipeline = BytesReference.bytes(currentPipelineDefinition());
client.execute(
PutPipelineTransportAction.TYPE,
new PutPipelineRequest(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably ought to set the parent task id here so that it's cancellable (although I'm not 100% sure it's worth it).

{
builder.startObject();
{
builder.startObject("set");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this going to set every @timestamp to 0? Shouldn't this be a script processor that checks if it exists first?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh never mind! This is what override is for.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be interesting to compare which performs better: a set with override: false or a set with an if condition checking for a null @timestamp

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard to imagine that a script would be faster, but it would be interesting. We could always change that later though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And if the script is faster, that would probably shame @joegallo into action, making the set processor faster.

@masseyke
Copy link
Member

masseyke commented Feb 4, 2025

Looks pretty good, but a couple of things remain:

  • The REINDEX_DATA_STREAM_USER needs a new cluster privilege for creating the pipeline
  • Once you merge in main, DataStreamsUpgradeIT will begin failing because now we no longer expect failures when there is no @timestamp field.

Copy link
Member

@dakrone dakrone left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this generally looks good, but I'm curious why we don't use our existing infrastructure for installing pipelines?

{
builder.startObject();
{
builder.startObject("set");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be interesting to compare which performs better: a set with override: false or a set with an if condition checking for a null @timestamp

}

@Override
protected String getOrigin() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this no longer needs to be run with the user permissions, it seemed better to not require the user to have put-pipeline, and to make new user with system perms (or something like that) and only give it to this registry.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I definitely agree.

}
}
],
"version": 1000
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since version is now handled by the index template registry, the way to keep it from overwriting a custom template is to use a higher version number.

Copy link
Member

@masseyke masseyke left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@dakrone dakrone left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I left one comment about which origin to use

public static final String APM_ORIGIN = "apm";
public static final String OTEL_ORIGIN = "otel";
public static final String REINDEX_DATA_STREAM_ORIGIN = "reindex_data_stream";
public static final String MIGRATE_ORIGIN = "migrate";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use the existing STACK_ORIGIN origin, I think there's an aversion to adding too many of these if I remember correctly.

@parkertimmins parkertimmins merged commit 29965bc into elastic:main Feb 4, 2025
17 checks passed
@parkertimmins parkertimmins deleted the reindex-data-stream-pipeline branch February 5, 2025 00:00
parkertimmins added a commit to parkertimmins/elasticsearch that referenced this pull request Feb 5, 2025
Add the pipeline "reindex-data-stream-pipeline" to the reindex request within ReindexDataStreamIndexAction. This cleans up documents as needed before inserting into the destination index. Currently, the pipeline only sets a timestamp field with a value of 0, if the document is missing a timestamp field. This is needed because existing indices which are added to a data stream may not contain a timestamp, but reindex validates that a timestamp field exists when creating data stream destination indices.
This pipeline is managed by ES, but can be overriden by users if necessary. To do this, the version field of the pipeline should be set to a value higher than the MigrateRegistry version.
@elasticsearchmachine
Copy link
Collaborator

💔 Backport failed

Status Branch Result
9.0
8.18 Commit could not be cherrypicked due to conflicts

You can use sqren/backport to manually backport by running backport --upstream elastic/elasticsearch --pr 121617

@parkertimmins
Copy link
Contributor Author

💚 All backports created successfully

Status Branch Result
8.18

Questions ?

Please refer to the Backport tool documentation

@parkertimmins
Copy link
Contributor Author

💚 All backports created successfully

Status Branch Result
8.x

Questions ?

Please refer to the Backport tool documentation

parkertimmins added a commit to parkertimmins/elasticsearch that referenced this pull request Feb 5, 2025
Add the pipeline "reindex-data-stream-pipeline" to the reindex request within ReindexDataStreamIndexAction. This cleans up documents as needed before inserting into the destination index. Currently, the pipeline only sets a timestamp field with a value of 0, if the document is missing a timestamp field. This is needed because existing indices which are added to a data stream may not contain a timestamp, but reindex validates that a timestamp field exists when creating data stream destination indices.
This pipeline is managed by ES, but can be overriden by users if necessary. To do this, the version field of the pipeline should be set to a value higher than the MigrateRegistry version.

(cherry picked from commit 29965bc)

# Conflicts:
#	x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java
parkertimmins added a commit to parkertimmins/elasticsearch that referenced this pull request Feb 5, 2025
Add the pipeline "reindex-data-stream-pipeline" to the reindex request within ReindexDataStreamIndexAction. This cleans up documents as needed before inserting into the destination index. Currently, the pipeline only sets a timestamp field with a value of 0, if the document is missing a timestamp field. This is needed because existing indices which are added to a data stream may not contain a timestamp, but reindex validates that a timestamp field exists when creating data stream destination indices.
This pipeline is managed by ES, but can be overriden by users if necessary. To do this, the version field of the pipeline should be set to a value higher than the MigrateRegistry version.

(cherry picked from commit 29965bc)

# Conflicts:
#	x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java
elasticsearchmachine pushed a commit that referenced this pull request Feb 5, 2025
)

Add the pipeline "reindex-data-stream-pipeline" to the reindex request within ReindexDataStreamIndexAction. This cleans up documents as needed before inserting into the destination index. Currently, the pipeline only sets a timestamp field with a value of 0, if the document is missing a timestamp field. This is needed because existing indices which are added to a data stream may not contain a timestamp, but reindex validates that a timestamp field exists when creating data stream destination indices.
This pipeline is managed by ES, but can be overriden by users if necessary. To do this, the version field of the pipeline should be set to a value higher than the MigrateRegistry version.
elasticsearchmachine pushed a commit that referenced this pull request Feb 5, 2025
…#121730)

* Add pipeline to clean docs during data stream reindex (#121617)

Add the pipeline "reindex-data-stream-pipeline" to the reindex request within ReindexDataStreamIndexAction. This cleans up documents as needed before inserting into the destination index. Currently, the pipeline only sets a timestamp field with a value of 0, if the document is missing a timestamp field. This is needed because existing indices which are added to a data stream may not contain a timestamp, but reindex validates that a timestamp field exists when creating data stream destination indices.
This pipeline is managed by ES, but can be overriden by users if necessary. To do this, the version field of the pipeline should be set to a value higher than the MigrateRegistry version.

(cherry picked from commit 29965bc)

# Conflicts:
#	x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java

* Remove timeouts that werent present in 8x.
elasticsearchmachine pushed a commit that referenced this pull request Feb 5, 2025
#121729)

* Add pipeline to clean docs during data stream reindex (#121617)

Add the pipeline "reindex-data-stream-pipeline" to the reindex request within ReindexDataStreamIndexAction. This cleans up documents as needed before inserting into the destination index. Currently, the pipeline only sets a timestamp field with a value of 0, if the document is missing a timestamp field. This is needed because existing indices which are added to a data stream may not contain a timestamp, but reindex validates that a timestamp field exists when creating data stream destination indices.
This pipeline is managed by ES, but can be overriden by users if necessary. To do this, the version field of the pipeline should be set to a value higher than the MigrateRegistry version.

(cherry picked from commit 29965bc)

# Conflicts:
#	x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java

* Remove timeouts that werent present in 8x.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

auto-backport Automatically create backport pull requests when merged backport pending :Data Management/Data streams Data streams and their lifecycles >non-issue Team:Data Management Meta label for data/management team v8.18.0 v8.18.1 v9.0.0 v9.1.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants