Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/130847.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 130847
summary: "Pipelines: Add `created_date` and `modified_date`"
area: Ingest Node
type: enhancement
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
setup:
- requires:
test_runner_features: capabilities
capabilities:
- method: PUT
path: /_ingest/pipeline/{id}
capabilities: [ pipeline_tracking_info ]
reason: "Pipelines have tracking info: modified_date and created_date"

---
"Test creating and getting pipeline returns created_date and modified_date":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"processors": []
}
- match: { acknowledged: true }

- do:
ingest.get_pipeline:
human: true
id: "my_pipeline"
- gte: { my_pipeline.created_date_millis: 0 }
- gte: { my_pipeline.modified_date_millis: 0 }
- match: { my_pipeline.created_date: "/^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z$/" }
- match: { my_pipeline.modified_date: "/^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z$/" }

---
"Test PUT setting created_date":
- do:
catch: bad_request
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"processors": [],
"created_date": "2025-07-04T12:50:48.415Z"
}
- match: { status: 400 }
- match: { error.reason: "Provided a pipeline property which is managed by the system: created_date." }

---
"Test PUT setting created_date_millis":
- do:
catch: bad_request
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"processors": [],
"created_date_millis": 0
}
- match: { status: 400 }
- match: { error.reason: "Provided a pipeline property which is managed by the system: created_date_millis." }

---
"Test PUT setting modified_date_millis":
- do:
catch: bad_request
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"processors": [],
"modified_date_millis": 0
}
- match: { status: 400 }
- match: { error.reason: "Provided a pipeline property which is managed by the system: modified_date_millis." }

---
"Test PUT setting modified_date":
- do:
catch: bad_request
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"processors": [],
"modified_date": "2025-07-04T12:50:48.415Z"
}
- match: { status: 400 }
- match: { error.reason: "Provided a pipeline property which is managed by the system: modified_date." }
Original file line number Diff line number Diff line change
Expand Up @@ -748,3 +748,42 @@ setup:
- match: { docs.1.doc._index: "index-2" }
- match: { docs.1.doc._source.foo: "rab" }
- match: { docs.1.doc.executed_pipelines: ["my-pipeline"] }

---
"Test simulate with pipeline with created_date":
- requires:
test_runner_features: capabilities
capabilities:
- method: PUT
path: /_ingest/pipeline/{id}
capabilities: [ pipeline_tracking_info ]
reason: "Pipelines have tracking info: modified_date and created_date"
- requires:
test_runner_features: contains
- skip:
features: headers
- do:
catch: request
headers:
Content-Type: application/json
simulate.ingest:
pipeline: "my_pipeline"
body: >
{
"docs": [
{
"_index": "index-1",
"_source": {
"foo": "bar"
}
}
],
"pipeline_substitutions": {
"my_pipeline": {
"processors": [],
"created_date": "asd"
}
}
}
- match: { status: 500 }
- contains: { error.reason: "Provided a pipeline property which is managed by the system: created_date." }
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_TOPN_TIMINGS = def(9_128_0_00);
public static final TransportVersion NODE_WEIGHTS_ADDED_TO_NODE_BALANCE_STATS = def(9_129_0_00);
public static final TransportVersion RERANK_SNIPPETS = def(9_130_0_00);
public static final TransportVersion PIPELINE_TRACKING_INFO = def(9_131_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.PipelineConfiguration;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xcontent.ToXContentObject;
Expand Down Expand Up @@ -74,7 +75,25 @@ public RestStatus status() {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
for (PipelineConfiguration pipeline : pipelines) {
builder.field(pipeline.getId(), summary ? Map.of() : pipeline.getConfig());
builder.startObject(pipeline.getId());
for (final Map.Entry<String, Object> configProperty : (summary ? Map.<String, Object>of() : pipeline.getConfig()).entrySet()) {
if (Pipeline.CREATED_DATE_MILLIS.equals(configProperty.getKey())) {
builder.timestampFieldsFromUnixEpochMillis(
Pipeline.CREATED_DATE_MILLIS,
Pipeline.CREATED_DATE,
(Long) configProperty.getValue()
);
} else if (Pipeline.MODIFIED_DATE_MILLIS.equals(configProperty.getKey())) {
builder.timestampFieldsFromUnixEpochMillis(
Pipeline.MODIFIED_DATE_MILLIS,
Pipeline.MODIFIED_DATE,
(Long) configProperty.getValue()
);
} else {
builder.field(configProperty.getKey(), configProperty.getValue());
}
}
builder.endObject();
}
builder.endObject();
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ static void executeDocument(
pipeline.getMetadata(),
verbosePipelineProcessor,
pipeline.getFieldAccessPattern(),
pipeline.getDeprecated()
pipeline.getDeprecated(),
pipeline.getCreatedDateMillis().orElse(null),
pipeline.getModifiedDateMillis().orElse(null)
);
ingestDocument.executePipeline(verbosePipeline, (result, e) -> {
handler.accept(new SimulateDocumentVerboseResult(processorResultList), e);
Expand Down
98 changes: 69 additions & 29 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.TriConsumer;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.logging.DeprecationCategory;
import org.elasticsearch.common.logging.DeprecationLogger;
Expand Down Expand Up @@ -78,9 +77,9 @@
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.time.Instant;
import java.time.InstantSource;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -569,16 +568,36 @@ public void validatePipelineRequest(ProjectId projectId, PutPipelineRequest requ
validatePipeline(ingestInfos, projectId, request.getId(), config);
}

public static void validateNoSystemPropertiesInPipelineConfig(final Map<String, Object> pipelineConfig) {
if (pipelineConfig.containsKey(Pipeline.CREATED_DATE_MILLIS)) {
throw new ElasticsearchParseException("Provided a pipeline property which is managed by the system: created_date_millis.");
} else if (pipelineConfig.containsKey(Pipeline.CREATED_DATE)) {
throw new ElasticsearchParseException("Provided a pipeline property which is managed by the system: created_date.");
} else if (pipelineConfig.containsKey(Pipeline.MODIFIED_DATE_MILLIS)) {
throw new ElasticsearchParseException("Provided a pipeline property which is managed by the system: modified_date_millis.");
} else if (pipelineConfig.containsKey(Pipeline.MODIFIED_DATE)) {
throw new ElasticsearchParseException("Provided a pipeline property which is managed by the system: modified_date.");
}
}

/** Check whether updating a potentially existing pipeline will be a NOP.
* Will return <code>false</code> if request contains system-properties like created or modified_date,
* these should be rejected later.*/
public static boolean isNoOpPipelineUpdate(ProjectMetadata metadata, PutPipelineRequest request) {
IngestMetadata currentIngestMetadata = metadata.custom(IngestMetadata.TYPE);
if (request.getVersion() == null
&& currentIngestMetadata != null
&& currentIngestMetadata.getPipelines().containsKey(request.getId())) {
var pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
var currentPipeline = currentIngestMetadata.getPipelines().get(request.getId());
if (currentPipeline.getConfig().equals(pipelineConfig)) {
return true;
}

var newPipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();

Map<String, Object> currentConfigWithoutSystemProps = new HashMap<>(
currentIngestMetadata.getPipelines().get(request.getId()).getConfig()
);
currentConfigWithoutSystemProps.remove(Pipeline.CREATED_DATE_MILLIS);
currentConfigWithoutSystemProps.remove(Pipeline.MODIFIED_DATE_MILLIS);

return newPipelineConfig.equals(currentConfigWithoutSystemProps);
}

return false;
Expand Down Expand Up @@ -676,10 +695,26 @@ private static void collectProcessorMetrics(
*/
public static class PutPipelineClusterStateUpdateTask extends PipelineClusterStateUpdateTask {
private final PutPipelineRequest request;

PutPipelineClusterStateUpdateTask(ProjectId projectId, ActionListener<AcknowledgedResponse> listener, PutPipelineRequest request) {
private final InstantSource instantSource;

// constructor allowing for injection of InstantSource/time for testing
PutPipelineClusterStateUpdateTask(
final ProjectId projectId,
final ActionListener<AcknowledgedResponse> listener,
final PutPipelineRequest request,
final InstantSource instantSource
) {
super(projectId, listener);
this.request = request;
this.instantSource = instantSource;
}

PutPipelineClusterStateUpdateTask(
final ProjectId projectId,
final ActionListener<AcknowledgedResponse> listener,
final PutPipelineRequest request
) {
this(projectId, listener, request, Instant::now);
}

/**
Expand All @@ -691,10 +726,15 @@ public PutPipelineClusterStateUpdateTask(ProjectId projectId, PutPipelineRequest

@Override
public IngestMetadata execute(IngestMetadata currentIngestMetadata, Collection<IndexMetadata> allIndexMetadata) {
BytesReference pipelineSource = request.getSource();
final Map<String, PipelineConfiguration> pipelines = currentIngestMetadata == null
? new HashMap<>(1)
: new HashMap<>(currentIngestMetadata.getPipelines());
final PipelineConfiguration existingPipeline = pipelines.get(request.getId());
final Map<String, Object> newPipelineConfig = XContentHelper.convertToMap(request.getSource(), true, request.getXContentType())
.v2();

if (request.getVersion() != null) {
var currentPipeline = currentIngestMetadata != null ? currentIngestMetadata.getPipelines().get(request.getId()) : null;
if (currentPipeline == null) {
if (existingPipeline == null) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
Expand All @@ -705,7 +745,7 @@ public IngestMetadata execute(IngestMetadata currentIngestMetadata, Collection<I
);
}

final Integer currentVersion = currentPipeline.getVersion();
final Integer currentVersion = existingPipeline.getVersion();
if (Objects.equals(request.getVersion(), currentVersion) == false) {
throw new IllegalArgumentException(
String.format(
Expand All @@ -718,9 +758,8 @@ public IngestMetadata execute(IngestMetadata currentIngestMetadata, Collection<I
);
}

var pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
final Integer specifiedVersion = (Integer) pipelineConfig.get("version");
if (pipelineConfig.containsKey("version") && Objects.equals(specifiedVersion, currentVersion)) {
final Integer specifiedVersion = (Integer) newPipelineConfig.get("version");
if (newPipelineConfig.containsKey("version") && Objects.equals(specifiedVersion, currentVersion)) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
Expand All @@ -733,24 +772,24 @@ public IngestMetadata execute(IngestMetadata currentIngestMetadata, Collection<I

// if no version specified in the pipeline definition, inject a version of [request.getVersion() + 1]
if (specifiedVersion == null) {
pipelineConfig.put("version", request.getVersion() == null ? 1 : request.getVersion() + 1);
try {
var builder = XContentBuilder.builder(request.getXContentType().xContent()).map(pipelineConfig);
pipelineSource = BytesReference.bytes(builder);
} catch (IOException e) {
throw new IllegalStateException(e);
}
newPipelineConfig.put("version", request.getVersion() == null ? 1 : request.getVersion() + 1);
}
}

Map<String, PipelineConfiguration> pipelines;
if (currentIngestMetadata != null) {
pipelines = new HashMap<>(currentIngestMetadata.getPipelines());
final long nowMillis = instantSource.millis();
if (existingPipeline == null) {
newPipelineConfig.put(Pipeline.CREATED_DATE_MILLIS, nowMillis);
} else {
pipelines = new HashMap<>();
Object existingCreatedAt = existingPipeline.getConfig().get(Pipeline.CREATED_DATE_MILLIS);
// only set/carry over `created_date` if existing pipeline already has it.
// would be confusing if existing pipelines were all updated to have `created_date` set to now.
if (existingCreatedAt != null) {
newPipelineConfig.put(Pipeline.CREATED_DATE_MILLIS, existingCreatedAt);
}
}
newPipelineConfig.put(Pipeline.MODIFIED_DATE_MILLIS, nowMillis);

pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), pipelineSource, request.getXContentType()));
pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), newPipelineConfig));
return new IngestMetadata(pipelines);
}
}
Expand All @@ -762,6 +801,7 @@ void validatePipeline(
String pipelineId,
Map<String, Object> pipelineConfig
) throws Exception {
validateNoSystemPropertiesInPipelineConfig(pipelineConfig);
if (ingestInfos.isEmpty()) {
throw new IllegalStateException("Ingest info is empty");
}
Expand Down
Loading
Loading