From 3f46277b56fdcd227bf8f84549cd32ee9901be5c Mon Sep 17 00:00:00 2001 From: Szymon Bialkowski Date: Tue, 8 Jul 2025 20:52:11 +0100 Subject: [PATCH 1/7] Pipelines: Add `created_date` and `modified_date` --- docs/changelog/130847.yaml | 12 +++ qa/mixed-cluster/build.gradle | 6 ++ .../rest-api-spec/test/ingest/10_basic.yml | 46 ++++++++++ .../test/simulate.ingest/10_basic.yml | 30 +++++++ .../ingest/PipelineConfigurationSyncIT.java | 77 +++++++++++++++++ .../org/elasticsearch/TransportVersions.java | 1 + .../action/ingest/ReservedPipelineAction.java | 2 +- .../ingest/SimulateExecutionService.java | 6 +- .../elasticsearch/ingest/IngestService.java | 55 ++++++++++-- .../org/elasticsearch/ingest/Pipeline.java | 76 +++++++++++++++-- .../ingest/PipelineConfiguration.java | 24 +++++- .../ingest/SimulateIngestService.java | 6 +- .../ingest/TrackingResultProcessor.java | 6 +- .../ingest/IngestServiceTests.java | 83 +++++++++++++++++++ .../ingest/PipelineProcessorTests.java | 6 +- 15 files changed, 415 insertions(+), 21 deletions(-) create mode 100644 docs/changelog/130847.yaml create mode 100644 server/src/internalClusterTest/java/org/elasticsearch/ingest/PipelineConfigurationSyncIT.java diff --git a/docs/changelog/130847.yaml b/docs/changelog/130847.yaml new file mode 100644 index 0000000000000..7aa1f671af049 --- /dev/null +++ b/docs/changelog/130847.yaml @@ -0,0 +1,12 @@ +pr: 130847 +summary: "Pipelines: Add `created_date` and `modified_date`" +area: Ingest Node +type: enhancement +issues: [108754] +highlight: + title: "Pipelines: Add `created_date` and `modified_date`" + body: |- + Pipelines now have extra properties to indicate when they were created, and updated, to improve visibility + and debugging. + These properties are managed by the system and cannot be updated by the user. + notable: true diff --git a/qa/mixed-cluster/build.gradle b/qa/mixed-cluster/build.gradle index a591d4c590b27..04d001a96cdc1 100644 --- a/qa/mixed-cluster/build.gradle +++ b/qa/mixed-cluster/build.gradle @@ -71,6 +71,12 @@ excludeList.add('aggregations/percentiles_hdr_metric/Negative values test') // sync_id is removed in 9.0 excludeList.add("cat.shards/10_basic/Help") +// new optional properties only available on latest cluster +excludeList.add("ingest/10_basic/Test creating and getting pipeline returns created_date and modified_date") +excludeList.add("ingest/10_basic/Test PUT setting created_date") +excludeList.add("ingest/10_basic/Test PUT setting modified_date") +excludeList.add("simulate.ingest/10_basic/Test simulate with pipeline with created_date") + def clusterPath = getPath() buildParams.bwcVersions.withWireCompatible { bwcVersion, baseName -> diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/ingest/10_basic.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/ingest/10_basic.yml index 8dfd499b7d32c..343fa3ebf6e41 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/ingest/10_basic.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/ingest/10_basic.yml @@ -184,3 +184,49 @@ - is_false: first_pipeline.description - is_true: second_pipeline - is_false: second_pipeline.description + + +--- +"Test creating and getting pipeline returns created_date and modified_date": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "processors": [] + } + - match: { acknowledged: true } + + - do: + ingest.get_pipeline: + id: "my_pipeline" + - match: { my_pipeline.created_date: "/\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z/" } + - match: { my_pipeline.modified_date: "/\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z/" } + +--- +"Test PUT setting created_date": + - do: + catch: bad_request + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "processors": [], + "created_date": "2025-07-04T12:50:48.415Z" + } + - match: { status: 400 } + - match: { error.reason: "Provided a pipeline property which is managed by the system, e.g. `created_date`." } + +--- +"Test PUT setting modified_date": + - do: + catch: bad_request + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "processors": [], + "modified_date": "2025-07-04T12:50:48.415Z" + } + - match: { status: 400 } + - match: { error.reason: "Provided a pipeline property which is managed by the system, e.g. `created_date`." } diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/simulate.ingest/10_basic.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/simulate.ingest/10_basic.yml index 70a3b0253c78f..2b03bc7989382 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/simulate.ingest/10_basic.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/simulate.ingest/10_basic.yml @@ -748,3 +748,33 @@ setup: - match: { docs.1.doc._index: "index-2" } - match: { docs.1.doc._source.foo: "rab" } - match: { docs.1.doc.executed_pipelines: ["my-pipeline"] } + +--- +"Test simulate with pipeline with created_date": + - skip: + features: headers + - do: + catch: request + headers: + Content-Type: application/json + simulate.ingest: + pipeline: "my_pipeline" + body: > + { + "docs": [ + { + "_index": "index-1", + "_source": { + "foo": "bar" + } + } + ], + "pipeline_substitutions": { + "my_pipeline": { + "processors": [], + "created_date": "asd" + } + } + } + - match: { status: 500 } + - match: { error.reason: "/(runtime_exception:\\ )?org\\.elasticsearch\\.ElasticsearchParseException:\\ Provided\\ a\\ pipeline\\ property\\ which\\ is\\ managed\\ by\\ the\\ system,\\ e\\.g\\.\\ `created_date`\\./" } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/ingest/PipelineConfigurationSyncIT.java b/server/src/internalClusterTest/java/org/elasticsearch/ingest/PipelineConfigurationSyncIT.java new file mode 100644 index 0000000000000..2d3f632a2fab0 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/ingest/PipelineConfigurationSyncIT.java @@ -0,0 +1,77 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.ingest; + +import org.elasticsearch.action.ingest.GetPipelineResponse; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.node.NodeService; +import org.elasticsearch.test.ESIntegTestCase; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +@ESIntegTestCase.ClusterScope(numDataNodes = 3) +public class PipelineConfigurationSyncIT extends ESIntegTestCase { + + public void testAllNodesGetPipelineTrackingClusterState() throws Exception { + final String pipelineId = "_id"; + GetPipelineResponse getResponse = getPipelines(pipelineId); + assertFalse(getResponse.isFound()); + + final long timeBeforePut = System.currentTimeMillis(); + putJsonPipeline( + "_id", + (builder, params) -> builder.field("description", "my_pipeline").startArray("processors").startObject().endObject().endArray() + ); + final Long timeAfterPut = System.currentTimeMillis(); + + getResponse = getPipelines(pipelineId); + assertTrue(getResponse.isFound()); + + final Pipeline node1Pipeline = internalCluster().getInstance(NodeService.class, "node_s0") + .getIngestService() + .getPipeline(Metadata.DEFAULT_PROJECT_ID, "_id"); + final Pipeline node2Pipeline = internalCluster().getInstance(NodeService.class, "node_s1") + .getIngestService() + .getPipeline(Metadata.DEFAULT_PROJECT_ID, "_id"); + final Pipeline node3Pipeline = internalCluster().getInstance(NodeService.class, "node_s2") + .getIngestService() + .getPipeline(Metadata.DEFAULT_PROJECT_ID, "_id"); + + assertNotSame(node1Pipeline, node2Pipeline); + assertNotSame(node2Pipeline, node3Pipeline); + + assertThat(node1Pipeline.getDescription(), equalTo(node2Pipeline.getDescription())); + assertThat(node2Pipeline.getDescription(), equalTo(node3Pipeline.getDescription())); + + // created_date + final long node1CreatedDate = node1Pipeline.getCreatedDate().orElseThrow(); + final long node2CreatedDate = node2Pipeline.getCreatedDate().orElseThrow(); + final long node3CreatedDate = node3Pipeline.getCreatedDate().orElseThrow(); + + assertThat(node1CreatedDate, equalTo(node2CreatedDate)); + assertThat(node2CreatedDate, equalTo(node3CreatedDate)); + + assertThat(node1CreatedDate, greaterThanOrEqualTo(timeBeforePut)); + assertThat(node1CreatedDate, lessThanOrEqualTo(timeAfterPut)); + + // modified_date + final long node1ModifiedDate = node1Pipeline.getModifiedDate().orElseThrow(); + final long node2ModifiedDate = node2Pipeline.getModifiedDate().orElseThrow(); + final long node3ModifiedDate = node3Pipeline.getModifiedDate().orElseThrow(); + + assertThat(node1ModifiedDate, equalTo(node2ModifiedDate)); + assertThat(node2ModifiedDate, equalTo(node3ModifiedDate)); + + assertThat(node1ModifiedDate, greaterThanOrEqualTo(timeBeforePut)); + assertThat(node1ModifiedDate, lessThanOrEqualTo(timeAfterPut)); + } +} diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 70727715a91c0..a21c713623979 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -338,6 +338,7 @@ static TransportVersion def(int id) { public static final TransportVersion ESQL_FIXED_INDEX_LIKE = def(9_119_0_00); public static final TransportVersion LOOKUP_JOIN_CCS = def(9_120_0_00); public static final TransportVersion NODE_USAGE_STATS_FOR_THREAD_POOLS_IN_CLUSTER_INFO = def(9_121_0_00); + public static final TransportVersion PIPELINE_TRACKING_INFO = def(9_122_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/ingest/ReservedPipelineAction.java b/server/src/main/java/org/elasticsearch/action/ingest/ReservedPipelineAction.java index 65d634aeb498b..b07d535f17ec8 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/ReservedPipelineAction.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/ReservedPipelineAction.java @@ -87,7 +87,7 @@ public TransformState transform(ProjectId projectId, List so ProjectMetadata projectMetadata = clusterState.metadata().getProject(projectId); for (var request : requests) { - var nopUpdate = IngestService.isNoOpPipelineUpdate(projectMetadata, request); + var nopUpdate = IngestService.isNoOpPipelineUpdate(projectMetadata, request, () -> IngestService.readPipelineConfig(request)); if (nopUpdate) { continue; diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java index 92d2a8a45197e..7b9429f2c9336 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java @@ -42,6 +42,8 @@ static void executeDocument( if (verbose) { List processorResultList = new CopyOnWriteArrayList<>(); CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), null, processorResultList); + long createdDate = pipeline.getCreatedDate().orElse(-1); + long modifiedDate = pipeline.getModifiedDate().orElse(-1); Pipeline verbosePipeline = new Pipeline( pipeline.getId(), pipeline.getDescription(), @@ -49,7 +51,9 @@ static void executeDocument( pipeline.getMetadata(), verbosePipelineProcessor, pipeline.getFieldAccessPattern(), - pipeline.getDeprecated() + pipeline.getDeprecated(), + createdDate == -1 ? null : createdDate, + modifiedDate == -1 ? null : modifiedDate ); ingestDocument.executePipeline(verbosePipeline, (result, e) -> { handler.accept(new SimulateDocumentVerboseResult(processorResultList), e); diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index b4233cb94e21b..e33f5a3175154 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -81,6 +81,8 @@ import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -102,6 +104,7 @@ import java.util.function.Function; import java.util.function.IntConsumer; import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.elasticsearch.core.Strings.format; @@ -542,7 +545,9 @@ public void putPipeline( ActionListener listener, Consumer> nodeInfoListener ) throws Exception { - if (isNoOpPipelineUpdate(state.metadata().getProject(projectId), request)) { + Map newPipelineConfig = readPipelineConfig(request); + validateNoSystemPropertiesInPipelineConfig(newPipelineConfig); + if (isNoOpPipelineUpdate(state.metadata().getProject(projectId), request, () -> newPipelineConfig)) { // existing pipeline matches request pipeline -- no need to update listener.onResponse(AcknowledgedResponse.TRUE); return; @@ -569,16 +574,35 @@ public void validatePipelineRequest(ProjectId projectId, PutPipelineRequest requ validatePipeline(ingestInfos, projectId, request.getId(), config); } - public static boolean isNoOpPipelineUpdate(ProjectMetadata metadata, PutPipelineRequest request) { + public static Map readPipelineConfig(PutPipelineRequest request) { + return XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); + } + + public static void validateNoSystemPropertiesInPipelineConfig(final Map pipelineConfig) { + if (pipelineConfig.containsKey(Pipeline.CREATED_DATE_KEY) || pipelineConfig.containsKey(Pipeline.MODIFIED_DATE_KEY)) { + throw new ElasticsearchParseException("Provided a pipeline property which is managed by the system, e.g. `created_date`."); + } + } + + public static boolean isNoOpPipelineUpdate( + ProjectMetadata metadata, + PutPipelineRequest request, + Supplier> newPipelineConfigSupplier + ) { IngestMetadata currentIngestMetadata = metadata.custom(IngestMetadata.TYPE); if (request.getVersion() == null && currentIngestMetadata != null && currentIngestMetadata.getPipelines().containsKey(request.getId())) { - var pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); - var currentPipeline = currentIngestMetadata.getPipelines().get(request.getId()); - if (currentPipeline.getConfig().equals(pipelineConfig)) { - return true; - } + + Map currentConfigWithoutSystemProps = new HashMap<>( + currentIngestMetadata.getPipelines().get(request.getId()).getConfig() + ); + currentConfigWithoutSystemProps.remove(Pipeline.CREATED_DATE_KEY); + currentConfigWithoutSystemProps.remove(Pipeline.MODIFIED_DATE_KEY); + + Map newPipelineConfig = newPipelineConfigSupplier.get(); + + return newPipelineConfig.equals(currentConfigWithoutSystemProps); } return false; @@ -750,7 +774,22 @@ public IngestMetadata execute(IngestMetadata currentIngestMetadata, Collection(); } - pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), pipelineSource, request.getXContentType())); + Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); + Map newPipelineConfig = XContentHelper.convertToMap(pipelineSource, true, request.getXContentType()).v2(); + PipelineConfiguration existingPipeline = pipelines.get(request.getId()); + if (existingPipeline == null) { + newPipelineConfig.put(Pipeline.CREATED_DATE_KEY, now.toString()); + } else { + Object existingCreatedAt = existingPipeline.getConfig().get(Pipeline.CREATED_DATE_KEY); + // only set/carry over `created_date` if existing pipeline already has it. + // would be confusing if existing pipelines were all updated to have `created_date` set to now. + if (existingCreatedAt != null) { + newPipelineConfig.put(Pipeline.CREATED_DATE_KEY, existingCreatedAt); + } + } + newPipelineConfig.put(Pipeline.MODIFIED_DATE_KEY, now.toString()); + + pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), newPipelineConfig)); return new IngestMetadata(pipelines); } } diff --git a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java index e1ade43933289..79cfa3100a758 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -15,9 +15,11 @@ import org.elasticsearch.features.NodeFeature; import org.elasticsearch.script.ScriptService; +import java.time.Instant; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.OptionalLong; import java.util.function.BiConsumer; import java.util.function.LongSupplier; import java.util.function.Predicate; @@ -34,6 +36,8 @@ public final class Pipeline { public static final String META_KEY = "_meta"; public static final String FIELD_ACCESS_PATTERN = "field_access_pattern"; public static final String DEPRECATED_KEY = "deprecated"; + public static final String CREATED_DATE_KEY = "created_date"; + public static final String MODIFIED_DATE_KEY = "modified_date"; private final String id; @Nullable @@ -48,6 +52,10 @@ public final class Pipeline { private final IngestPipelineFieldAccessPattern fieldAccessPattern; @Nullable private final Boolean deprecated; + @Nullable + private final Long createdDate; + @Nullable + private final Long modifiedDate; public Pipeline( String id, @@ -56,7 +64,7 @@ public Pipeline( @Nullable Map metadata, CompoundProcessor compoundProcessor ) { - this(id, description, version, metadata, compoundProcessor, IngestPipelineFieldAccessPattern.CLASSIC, null); + this(id, description, version, metadata, compoundProcessor, IngestPipelineFieldAccessPattern.CLASSIC, null, null, null); } public Pipeline( @@ -66,9 +74,22 @@ public Pipeline( @Nullable Map metadata, CompoundProcessor compoundProcessor, IngestPipelineFieldAccessPattern fieldAccessPattern, - @Nullable Boolean deprecated + @Nullable Boolean deprecated, + @Nullable Long createdDate, + @Nullable Long modifiedDate ) { - this(id, description, version, metadata, compoundProcessor, System::nanoTime, fieldAccessPattern, deprecated); + this( + id, + description, + version, + metadata, + compoundProcessor, + System::nanoTime, + fieldAccessPattern, + deprecated, + createdDate, + modifiedDate + ); } // package private for testing @@ -80,7 +101,9 @@ public Pipeline( CompoundProcessor compoundProcessor, LongSupplier relativeTimeProvider, IngestPipelineFieldAccessPattern fieldAccessPattern, - @Nullable Boolean deprecated + @Nullable Boolean deprecated, + @Nullable Long createdDate, + @Nullable Long modifiedDate ) { this.id = id; this.description = description; @@ -91,6 +114,8 @@ public Pipeline( this.relativeTimeProvider = relativeTimeProvider; this.fieldAccessPattern = fieldAccessPattern; this.deprecated = deprecated; + this.createdDate = createdDate; + this.modifiedDate = modifiedDate; } /** @@ -147,6 +172,8 @@ public static Pipeline create( processorFactories, projectId ); + String createdDate = ConfigurationUtils.readOptionalStringOrLongProperty(null, null, config, CREATED_DATE_KEY); + String modifiedDate = ConfigurationUtils.readOptionalStringOrLongProperty(null, null, config, MODIFIED_DATE_KEY); if (config.isEmpty() == false) { throw new ElasticsearchParseException( "pipeline [" @@ -159,7 +186,19 @@ public static Pipeline create( throw new ElasticsearchParseException("pipeline [" + id + "] cannot have an empty on_failure option defined"); } CompoundProcessor compoundProcessor = new CompoundProcessor(false, processors, onFailureProcessors); - return new Pipeline(id, description, version, metadata, compoundProcessor, accessPattern, deprecated); + Long createdDateMillis = createdDate == null ? null : Instant.parse(createdDate).toEpochMilli(); + Long modifiedDateMillis = modifiedDate == null ? null : Instant.parse(modifiedDate).toEpochMilli(); + return new Pipeline( + id, + description, + version, + metadata, + compoundProcessor, + accessPattern, + deprecated, + createdDateMillis, + modifiedDateMillis + ); } /** @@ -265,4 +304,31 @@ public Boolean getDeprecated() { public boolean isDeprecated() { return Boolean.TRUE.equals(deprecated); } + + public OptionalLong getCreatedDate() { + return createdDate == null ? OptionalLong.empty() : OptionalLong.of(createdDate); + } + + public OptionalLong getModifiedDate() { + return modifiedDate == null ? OptionalLong.empty() : OptionalLong.of(modifiedDate); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("Pipeline{"); + sb.append("id='").append(id).append('\''); + sb.append(", description='").append(description).append('\''); + sb.append(", version=").append(version); + sb.append(", metadata=").append(metadata); + sb.append(", compoundProcessor=").append(compoundProcessor); + sb.append(", metrics=").append(metrics); + sb.append(", relativeTimeProvider=").append(relativeTimeProvider); + sb.append(", fieldAccessPattern=").append(fieldAccessPattern); + sb.append(", deprecated=").append(deprecated); + sb.append(", createdDate=").append(createdDate); + sb.append(", modifiedDate=").append(modifiedDate); + sb.append('}'); + return sb.toString(); + } + } diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java b/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java index d258eeb9ed050..4e2c06fd24861 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java @@ -9,6 +9,7 @@ package org.elasticsearch.ingest; +import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.SimpleDiffable; @@ -29,6 +30,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -189,12 +191,14 @@ public String toString() { @Override public void writeTo(StreamOutput out) throws IOException { + final TransportVersion transportVersion = out.getTransportVersion(); + final Map configForTransport = configForTransport(transportVersion); out.writeString(id); - if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_17_0)) { - out.writeGenericMap(config); + if (transportVersion.onOrAfter(TransportVersions.V_8_17_0)) { + out.writeGenericMap(configForTransport); } else { XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).prettyPrint(); - builder.map(config); + builder.map(configForTransport); out.writeBytesReference(BytesReference.bytes(builder)); XContentHelper.writeTo(out, XContentType.JSON); } @@ -246,4 +250,18 @@ PipelineConfiguration maybeUpgradeProcessors(String type, IngestMetadata.Process return this; } } + + private Map configForTransport(final TransportVersion transportVersion) { + final boolean transportSupportsNewProperties = transportVersion.onOrAfter(TransportVersions.PIPELINE_TRACKING_INFO); + final boolean noNewProperties = config.containsKey(Pipeline.CREATED_DATE_KEY) == false + && config.containsKey(Pipeline.MODIFIED_DATE_KEY) == false; + + if (transportSupportsNewProperties || noNewProperties) { + return config; + } + final Map configWithoutNewSystemProperties = new HashMap<>(config); + configWithoutNewSystemProperties.remove(Pipeline.CREATED_DATE_KEY); + configWithoutNewSystemProperties.remove(Pipeline.MODIFIED_DATE_KEY); + return Collections.unmodifiableMap(configWithoutNewSystemProperties); + } } diff --git a/server/src/main/java/org/elasticsearch/ingest/SimulateIngestService.java b/server/src/main/java/org/elasticsearch/ingest/SimulateIngestService.java index 2f91e106248dc..8701d35e0fc44 100644 --- a/server/src/main/java/org/elasticsearch/ingest/SimulateIngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/SimulateIngestService.java @@ -53,9 +53,13 @@ private Map getPipelineSubstitutions( if (rawPipelineSubstitutions != null) { for (Map.Entry> entry : rawPipelineSubstitutions.entrySet()) { String pipelineId = entry.getKey(); + Map pipelineConfig = entry.getValue(); + + IngestService.validateNoSystemPropertiesInPipelineConfig(pipelineConfig); + Pipeline pipeline = Pipeline.create( pipelineId, - entry.getValue(), + pipelineConfig, ingestService.getProcessorFactories(), ingestService.getScriptService(), ingestService.getProjectResolver().getProjectId(), diff --git a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java index 73dae3437acc7..cff0b35c329ef 100644 --- a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java @@ -210,6 +210,8 @@ private void decorateAndExecutePipeline( conditionalWithResult ) ); + long createdDate = pipeline.getCreatedDate().orElse(-1); + long modifiedDate = pipeline.getModifiedDate().orElse(-1); Pipeline verbosePipeline = new Pipeline( pipeline.getId(), pipeline.getDescription(), @@ -217,7 +219,9 @@ private void decorateAndExecutePipeline( pipeline.getMetadata(), verbosePipelineProcessor, pipeline.getFieldAccessPattern(), - pipeline.getDeprecated() + pipeline.getDeprecated(), + createdDate == -1 ? null : createdDate, + modifiedDate == -1 ? null : modifiedDate ); ingestDocument.executePipeline(verbosePipeline, handler); } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 15ff956c598dc..c7024adddf817 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -81,6 +81,7 @@ import java.io.OutputStream; import java.nio.charset.StandardCharsets; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; @@ -116,6 +117,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; @@ -814,6 +816,87 @@ public void testPut() { assertThat(pipeline.getProcessors().size(), equalTo(0)); } + public void testPutWithTracking() { + final Instant beforeInitialPut = Instant.now(); + final IngestService ingestService = createWithProcessors(Map.of()); + final String id = "_id"; + final ProjectId projectId = randomProjectIdOrDefault(); + Pipeline pipeline = ingestService.getPipeline(projectId, id); + assertThat(pipeline, nullValue()); + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .putProjectMetadata(ProjectMetadata.builder(projectId).build()) + .build(); // Start empty + + // add a new pipeline: + PutPipelineRequest putRequest = putJsonPipelineRequest(id, "{\"processors\": []}"); + ClusterState previousClusterState = clusterState; + clusterState = executePut(projectId, putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + final Instant afterInitialPut = Instant.now(); + pipeline = ingestService.getPipeline(projectId, id); + assertThat(pipeline, notNullValue()); + assertThat(pipeline.getId(), equalTo(id)); + assertThat(pipeline.getDescription(), nullValue()); + assertThat(pipeline.getProcessors().size(), equalTo(0)); + assertThat(pipeline.getCreatedDate().orElseThrow(), greaterThanOrEqualTo(beforeInitialPut.toEpochMilli())); + assertThat(pipeline.getCreatedDate().orElseThrow(), lessThanOrEqualTo(afterInitialPut.toEpochMilli())); + assertThat(pipeline.getModifiedDate().orElseThrow(), is(pipeline.getCreatedDate().orElseThrow())); + + // overwrite existing pipeline: + final Instant beforeSecondPut = Instant.now(); + putRequest = putJsonPipelineRequest(id, """ + {"processors": [], "description": "_description"}"""); + previousClusterState = clusterState; + clusterState = executePut(projectId, putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + final Instant afterSecondPut = Instant.now(); + pipeline = ingestService.getPipeline(projectId, id); + assertThat(pipeline, notNullValue()); + assertThat(pipeline.getId(), equalTo(id)); + assertThat(pipeline.getDescription(), equalTo("_description")); + assertThat(pipeline.getProcessors().size(), equalTo(0)); + assertThat(pipeline.getCreatedDate().orElseThrow(), greaterThanOrEqualTo(beforeInitialPut.toEpochMilli())); + assertThat(pipeline.getCreatedDate().orElseThrow(), lessThanOrEqualTo(afterInitialPut.toEpochMilli())); + assertThat(pipeline.getModifiedDate().orElseThrow(), greaterThanOrEqualTo(beforeSecondPut.toEpochMilli())); + assertThat(pipeline.getModifiedDate().orElseThrow(), lessThanOrEqualTo(afterSecondPut.toEpochMilli())); + } + + public void testPutWithTrackingExistingPipelineWithoutCreatedAtOnlyHasModifiedAt() { + final IngestService ingestService = createWithProcessors(); + final String id = "_id"; + final ProjectId projectId = randomProjectIdOrDefault(); + Pipeline pipeline = ingestService.getPipeline(projectId, id); + assertThat(pipeline, nullValue()); + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .putProjectMetadata( + ProjectMetadata.builder(projectId) + .putCustom( + IngestMetadata.TYPE, + new IngestMetadata( + Map.of(id, new PipelineConfiguration(id, Map.of("description", "existing_processor_description"))) + ) + ) + .build() + ) + .build(); // Start empty + + // update existing pipeline which doesn't have `created_date` + final Instant beforePut = Instant.now(); + PutPipelineRequest putRequest = putJsonPipelineRequest(id, "{\"processors\": []}"); + ClusterState previousClusterState = clusterState; + clusterState = executePut(projectId, putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + final Instant afterPut = Instant.now(); + pipeline = ingestService.getPipeline(projectId, id); + assertThat(pipeline, notNullValue()); + assertThat(pipeline.getId(), equalTo(id)); + assertThat(pipeline.getDescription(), nullValue()); + assertThat(pipeline.getProcessors().size(), equalTo(0)); + assertTrue(pipeline.getCreatedDate().isEmpty()); + assertThat(pipeline.getModifiedDate().orElseThrow(), greaterThanOrEqualTo(beforePut.toEpochMilli())); + assertThat(pipeline.getModifiedDate().orElseThrow(), lessThanOrEqualTo(afterPut.toEpochMilli())); + } + public void testPutWithErrorResponse() throws IllegalAccessException { IngestService ingestService = createWithProcessors(); String id = "_id"; diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java index c3eb779e9c190..fd84095d2fcb0 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java @@ -167,6 +167,8 @@ public void testPipelineProcessorWithPipelineChain() throws Exception { new CompoundProcessor(pipeline1Processor), relativeTimeProvider, IngestPipelineFieldAccessPattern.CLASSIC, + null, + null, null ); @@ -183,13 +185,15 @@ public void testPipelineProcessorWithPipelineChain() throws Exception { }), pipeline2Processor), List.of()), relativeTimeProvider, IngestPipelineFieldAccessPattern.CLASSIC, + null, + null, null ); relativeTimeProvider = mock(LongSupplier.class); when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(2)); Pipeline pipeline3 = new Pipeline(pipeline3Id, null, null, null, new CompoundProcessor(new TestProcessor(ingestDocument -> { throw new RuntimeException("error"); - })), relativeTimeProvider, IngestPipelineFieldAccessPattern.CLASSIC, null); + })), relativeTimeProvider, IngestPipelineFieldAccessPattern.CLASSIC, null, null, null); when(ingestService.getPipeline(pipeline1Id)).thenReturn(pipeline1); when(ingestService.getPipeline(pipeline2Id)).thenReturn(pipeline2); when(ingestService.getPipeline(pipeline3Id)).thenReturn(pipeline3); From 0a862e0bce0fde2e233eec72266ac70dde94aeb3 Mon Sep 17 00:00:00 2001 From: Szymon Bialkowski Date: Fri, 11 Jul 2025 16:57:32 +0100 Subject: [PATCH 2/7] Update docs/changelog/130847.yaml --- docs/changelog/130847.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/changelog/130847.yaml b/docs/changelog/130847.yaml index 7aa1f671af049..2dafd647b76fe 100644 --- a/docs/changelog/130847.yaml +++ b/docs/changelog/130847.yaml @@ -2,7 +2,7 @@ pr: 130847 summary: "Pipelines: Add `created_date` and `modified_date`" area: Ingest Node type: enhancement -issues: [108754] +issues: [] highlight: title: "Pipelines: Add `created_date` and `modified_date`" body: |- From 5e140751f3be04913ba062eb9c650a6ad07e12bf Mon Sep 17 00:00:00 2001 From: Szymon Bialkowski Date: Mon, 14 Jul 2025 13:43:55 +0100 Subject: [PATCH 3/7] review comments --- docs/changelog/130847.yaml | 11 ++-- qa/mixed-cluster/build.gradle | 6 --- .../rest-api-spec/test/ingest/10_basic.yml | 46 ---------------- .../rest-api-spec/test/ingest/20_tracking.yml | 53 +++++++++++++++++++ .../test/simulate.ingest/10_basic.yml | 11 +++- .../ingest/PipelineConfigurationSyncIT.java | 12 ++--- .../ingest/SimulateExecutionService.java | 4 +- .../elasticsearch/ingest/IngestService.java | 6 ++- .../org/elasticsearch/ingest/Pipeline.java | 32 +++++------ .../ingest/TrackingResultProcessor.java | 4 +- .../action/ingest/RestPutPipelineAction.java | 6 +++ .../ingest/IngestServiceTests.java | 20 +++---- 12 files changed, 112 insertions(+), 99 deletions(-) create mode 100644 rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/ingest/20_tracking.yml diff --git a/docs/changelog/130847.yaml b/docs/changelog/130847.yaml index 2dafd647b76fe..0f8c37ff0b1ec 100644 --- a/docs/changelog/130847.yaml +++ b/docs/changelog/130847.yaml @@ -2,11 +2,6 @@ pr: 130847 summary: "Pipelines: Add `created_date` and `modified_date`" area: Ingest Node type: enhancement -issues: [] -highlight: - title: "Pipelines: Add `created_date` and `modified_date`" - body: |- - Pipelines now have extra properties to indicate when they were created, and updated, to improve visibility - and debugging. - These properties are managed by the system and cannot be updated by the user. - notable: true +issues: + - 108754 + diff --git a/qa/mixed-cluster/build.gradle b/qa/mixed-cluster/build.gradle index 04d001a96cdc1..a591d4c590b27 100644 --- a/qa/mixed-cluster/build.gradle +++ b/qa/mixed-cluster/build.gradle @@ -71,12 +71,6 @@ excludeList.add('aggregations/percentiles_hdr_metric/Negative values test') // sync_id is removed in 9.0 excludeList.add("cat.shards/10_basic/Help") -// new optional properties only available on latest cluster -excludeList.add("ingest/10_basic/Test creating and getting pipeline returns created_date and modified_date") -excludeList.add("ingest/10_basic/Test PUT setting created_date") -excludeList.add("ingest/10_basic/Test PUT setting modified_date") -excludeList.add("simulate.ingest/10_basic/Test simulate with pipeline with created_date") - def clusterPath = getPath() buildParams.bwcVersions.withWireCompatible { bwcVersion, baseName -> diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/ingest/10_basic.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/ingest/10_basic.yml index 343fa3ebf6e41..8dfd499b7d32c 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/ingest/10_basic.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/ingest/10_basic.yml @@ -184,49 +184,3 @@ - is_false: first_pipeline.description - is_true: second_pipeline - is_false: second_pipeline.description - - ---- -"Test creating and getting pipeline returns created_date and modified_date": - - do: - ingest.put_pipeline: - id: "my_pipeline" - body: > - { - "processors": [] - } - - match: { acknowledged: true } - - - do: - ingest.get_pipeline: - id: "my_pipeline" - - match: { my_pipeline.created_date: "/\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z/" } - - match: { my_pipeline.modified_date: "/\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z/" } - ---- -"Test PUT setting created_date": - - do: - catch: bad_request - ingest.put_pipeline: - id: "my_pipeline" - body: > - { - "processors": [], - "created_date": "2025-07-04T12:50:48.415Z" - } - - match: { status: 400 } - - match: { error.reason: "Provided a pipeline property which is managed by the system, e.g. `created_date`." } - ---- -"Test PUT setting modified_date": - - do: - catch: bad_request - ingest.put_pipeline: - id: "my_pipeline" - body: > - { - "processors": [], - "modified_date": "2025-07-04T12:50:48.415Z" - } - - match: { status: 400 } - - match: { error.reason: "Provided a pipeline property which is managed by the system, e.g. `created_date`." } diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/ingest/20_tracking.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/ingest/20_tracking.yml new file mode 100644 index 0000000000000..2403abfedda64 --- /dev/null +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/ingest/20_tracking.yml @@ -0,0 +1,53 @@ +setup: + - requires: + test_runner_features: capabilities + capabilities: + - method: PUT + path: /_ingest/pipeline/{id} + capabilities: [ pipeline_tracking_info ] + reason: "Pipelines have tracking info: modified_date and created_date" + +--- +"Test creating and getting pipeline returns created_date and modified_date": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "processors": [] + } + - match: { acknowledged: true } + + - do: + ingest.get_pipeline: + id: "my_pipeline" + - match: { my_pipeline.created_date: "/\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z/" } + - match: { my_pipeline.modified_date: "/\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z/" } + +--- +"Test PUT setting created_date": + - do: + catch: bad_request + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "processors": [], + "created_date": "2025-07-04T12:50:48.415Z" + } + - match: { status: 400 } + - match: { error.reason: "Provided a pipeline property which is managed by the system: created_date." } + +--- +"Test PUT setting modified_date": + - do: + catch: bad_request + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "processors": [], + "modified_date": "2025-07-04T12:50:48.415Z" + } + - match: { status: 400 } + - match: { error.reason: "Provided a pipeline property which is managed by the system: modified_date." } diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/simulate.ingest/10_basic.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/simulate.ingest/10_basic.yml index 2b03bc7989382..3c20088877bb3 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/simulate.ingest/10_basic.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/simulate.ingest/10_basic.yml @@ -751,6 +751,15 @@ setup: --- "Test simulate with pipeline with created_date": + - requires: + test_runner_features: capabilities + capabilities: + - method: PUT + path: /_ingest/pipeline/{id} + capabilities: [ pipeline_tracking_info ] + reason: "Pipelines have tracking info: modified_date and created_date" + - requires: + test_runner_features: contains - skip: features: headers - do: @@ -777,4 +786,4 @@ setup: } } - match: { status: 500 } - - match: { error.reason: "/(runtime_exception:\\ )?org\\.elasticsearch\\.ElasticsearchParseException:\\ Provided\\ a\\ pipeline\\ property\\ which\\ is\\ managed\\ by\\ the\\ system,\\ e\\.g\\.\\ `created_date`\\./" } + - contains: { error.reason: "Provided a pipeline property which is managed by the system: created_date." } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/ingest/PipelineConfigurationSyncIT.java b/server/src/internalClusterTest/java/org/elasticsearch/ingest/PipelineConfigurationSyncIT.java index 2d3f632a2fab0..2640ea2b010ad 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/ingest/PipelineConfigurationSyncIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/ingest/PipelineConfigurationSyncIT.java @@ -53,9 +53,9 @@ public void testAllNodesGetPipelineTrackingClusterState() throws Exception { assertThat(node2Pipeline.getDescription(), equalTo(node3Pipeline.getDescription())); // created_date - final long node1CreatedDate = node1Pipeline.getCreatedDate().orElseThrow(); - final long node2CreatedDate = node2Pipeline.getCreatedDate().orElseThrow(); - final long node3CreatedDate = node3Pipeline.getCreatedDate().orElseThrow(); + final long node1CreatedDate = node1Pipeline.getCreatedDateMillis().orElseThrow(); + final long node2CreatedDate = node2Pipeline.getCreatedDateMillis().orElseThrow(); + final long node3CreatedDate = node3Pipeline.getCreatedDateMillis().orElseThrow(); assertThat(node1CreatedDate, equalTo(node2CreatedDate)); assertThat(node2CreatedDate, equalTo(node3CreatedDate)); @@ -64,9 +64,9 @@ public void testAllNodesGetPipelineTrackingClusterState() throws Exception { assertThat(node1CreatedDate, lessThanOrEqualTo(timeAfterPut)); // modified_date - final long node1ModifiedDate = node1Pipeline.getModifiedDate().orElseThrow(); - final long node2ModifiedDate = node2Pipeline.getModifiedDate().orElseThrow(); - final long node3ModifiedDate = node3Pipeline.getModifiedDate().orElseThrow(); + final long node1ModifiedDate = node1Pipeline.getModifiedDateMillis().orElseThrow(); + final long node2ModifiedDate = node2Pipeline.getModifiedDateMillis().orElseThrow(); + final long node3ModifiedDate = node3Pipeline.getModifiedDateMillis().orElseThrow(); assertThat(node1ModifiedDate, equalTo(node2ModifiedDate)); assertThat(node2ModifiedDate, equalTo(node3ModifiedDate)); diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java index 7b9429f2c9336..84f4ac81f65df 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java @@ -42,8 +42,8 @@ static void executeDocument( if (verbose) { List processorResultList = new CopyOnWriteArrayList<>(); CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), null, processorResultList); - long createdDate = pipeline.getCreatedDate().orElse(-1); - long modifiedDate = pipeline.getModifiedDate().orElse(-1); + long createdDate = pipeline.getCreatedDateMillis().orElse(-1); + long modifiedDate = pipeline.getModifiedDateMillis().orElse(-1); Pipeline verbosePipeline = new Pipeline( pipeline.getId(), pipeline.getDescription(), diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index e33f5a3175154..354fa36346345 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -579,8 +579,10 @@ public static Map readPipelineConfig(PutPipelineRequest request) } public static void validateNoSystemPropertiesInPipelineConfig(final Map pipelineConfig) { - if (pipelineConfig.containsKey(Pipeline.CREATED_DATE_KEY) || pipelineConfig.containsKey(Pipeline.MODIFIED_DATE_KEY)) { - throw new ElasticsearchParseException("Provided a pipeline property which is managed by the system, e.g. `created_date`."); + if (pipelineConfig.containsKey(Pipeline.CREATED_DATE_KEY)) { + throw new ElasticsearchParseException("Provided a pipeline property which is managed by the system: created_date."); + } else if (pipelineConfig.containsKey(Pipeline.MODIFIED_DATE_KEY)) { + throw new ElasticsearchParseException("Provided a pipeline property which is managed by the system: modified_date."); } } diff --git a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java index 79cfa3100a758..bdd89b1578bc3 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -53,9 +53,9 @@ public final class Pipeline { @Nullable private final Boolean deprecated; @Nullable - private final Long createdDate; + private final Long createdDateMillis; @Nullable - private final Long modifiedDate; + private final Long modifiedDateMillis; public Pipeline( String id, @@ -75,8 +75,8 @@ public Pipeline( CompoundProcessor compoundProcessor, IngestPipelineFieldAccessPattern fieldAccessPattern, @Nullable Boolean deprecated, - @Nullable Long createdDate, - @Nullable Long modifiedDate + @Nullable Long createdDateMillis, + @Nullable Long modifiedDateMillis ) { this( id, @@ -87,8 +87,8 @@ public Pipeline( System::nanoTime, fieldAccessPattern, deprecated, - createdDate, - modifiedDate + createdDateMillis, + modifiedDateMillis ); } @@ -102,8 +102,8 @@ public Pipeline( LongSupplier relativeTimeProvider, IngestPipelineFieldAccessPattern fieldAccessPattern, @Nullable Boolean deprecated, - @Nullable Long createdDate, - @Nullable Long modifiedDate + @Nullable Long createdDateMillis, + @Nullable Long modifiedDateMillis ) { this.id = id; this.description = description; @@ -114,8 +114,8 @@ public Pipeline( this.relativeTimeProvider = relativeTimeProvider; this.fieldAccessPattern = fieldAccessPattern; this.deprecated = deprecated; - this.createdDate = createdDate; - this.modifiedDate = modifiedDate; + this.createdDateMillis = createdDateMillis; + this.modifiedDateMillis = modifiedDateMillis; } /** @@ -305,12 +305,12 @@ public boolean isDeprecated() { return Boolean.TRUE.equals(deprecated); } - public OptionalLong getCreatedDate() { - return createdDate == null ? OptionalLong.empty() : OptionalLong.of(createdDate); + public OptionalLong getCreatedDateMillis() { + return createdDateMillis == null ? OptionalLong.empty() : OptionalLong.of(createdDateMillis); } - public OptionalLong getModifiedDate() { - return modifiedDate == null ? OptionalLong.empty() : OptionalLong.of(modifiedDate); + public OptionalLong getModifiedDateMillis() { + return modifiedDateMillis == null ? OptionalLong.empty() : OptionalLong.of(modifiedDateMillis); } @Override @@ -325,8 +325,8 @@ public String toString() { sb.append(", relativeTimeProvider=").append(relativeTimeProvider); sb.append(", fieldAccessPattern=").append(fieldAccessPattern); sb.append(", deprecated=").append(deprecated); - sb.append(", createdDate=").append(createdDate); - sb.append(", modifiedDate=").append(modifiedDate); + sb.append(", createdDateMillis=").append(createdDateMillis); + sb.append(", modifiedDateMillis=").append(modifiedDateMillis); sb.append('}'); return sb.toString(); } diff --git a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java index cff0b35c329ef..839df487da930 100644 --- a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java @@ -210,8 +210,8 @@ private void decorateAndExecutePipeline( conditionalWithResult ) ); - long createdDate = pipeline.getCreatedDate().orElse(-1); - long modifiedDate = pipeline.getModifiedDate().orElse(-1); + long createdDate = pipeline.getCreatedDateMillis().orElse(-1); + long modifiedDate = pipeline.getModifiedDateMillis().orElse(-1); Pipeline verbosePipeline = new Pipeline( pipeline.getId(), pipeline.getDescription(), diff --git a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java index c6b3daa38d663..67056bc938aa8 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.List; import java.util.Locale; +import java.util.Set; import static org.elasticsearch.rest.RestRequest.Method.PUT; import static org.elasticsearch.rest.RestUtils.getAckTimeout; @@ -73,4 +74,9 @@ public RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient cl ActionListener.withRef(new RestToXContentListener<>(channel), content) ); } + + @Override + public Set supportedCapabilities() { + return Set.of("pipeline_tracking_info"); + } } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index c7024adddf817..2d69bdced6f48 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -838,9 +838,9 @@ public void testPutWithTracking() { assertThat(pipeline.getId(), equalTo(id)); assertThat(pipeline.getDescription(), nullValue()); assertThat(pipeline.getProcessors().size(), equalTo(0)); - assertThat(pipeline.getCreatedDate().orElseThrow(), greaterThanOrEqualTo(beforeInitialPut.toEpochMilli())); - assertThat(pipeline.getCreatedDate().orElseThrow(), lessThanOrEqualTo(afterInitialPut.toEpochMilli())); - assertThat(pipeline.getModifiedDate().orElseThrow(), is(pipeline.getCreatedDate().orElseThrow())); + assertThat(pipeline.getCreatedDateMillis().orElseThrow(), greaterThanOrEqualTo(beforeInitialPut.toEpochMilli())); + assertThat(pipeline.getCreatedDateMillis().orElseThrow(), lessThanOrEqualTo(afterInitialPut.toEpochMilli())); + assertThat(pipeline.getModifiedDateMillis().orElseThrow(), is(pipeline.getCreatedDateMillis().orElseThrow())); // overwrite existing pipeline: final Instant beforeSecondPut = Instant.now(); @@ -855,10 +855,10 @@ public void testPutWithTracking() { assertThat(pipeline.getId(), equalTo(id)); assertThat(pipeline.getDescription(), equalTo("_description")); assertThat(pipeline.getProcessors().size(), equalTo(0)); - assertThat(pipeline.getCreatedDate().orElseThrow(), greaterThanOrEqualTo(beforeInitialPut.toEpochMilli())); - assertThat(pipeline.getCreatedDate().orElseThrow(), lessThanOrEqualTo(afterInitialPut.toEpochMilli())); - assertThat(pipeline.getModifiedDate().orElseThrow(), greaterThanOrEqualTo(beforeSecondPut.toEpochMilli())); - assertThat(pipeline.getModifiedDate().orElseThrow(), lessThanOrEqualTo(afterSecondPut.toEpochMilli())); + assertThat(pipeline.getCreatedDateMillis().orElseThrow(), greaterThanOrEqualTo(beforeInitialPut.toEpochMilli())); + assertThat(pipeline.getCreatedDateMillis().orElseThrow(), lessThanOrEqualTo(afterInitialPut.toEpochMilli())); + assertThat(pipeline.getModifiedDateMillis().orElseThrow(), greaterThanOrEqualTo(beforeSecondPut.toEpochMilli())); + assertThat(pipeline.getModifiedDateMillis().orElseThrow(), lessThanOrEqualTo(afterSecondPut.toEpochMilli())); } public void testPutWithTrackingExistingPipelineWithoutCreatedAtOnlyHasModifiedAt() { @@ -892,9 +892,9 @@ public void testPutWithTrackingExistingPipelineWithoutCreatedAtOnlyHasModifiedAt assertThat(pipeline.getId(), equalTo(id)); assertThat(pipeline.getDescription(), nullValue()); assertThat(pipeline.getProcessors().size(), equalTo(0)); - assertTrue(pipeline.getCreatedDate().isEmpty()); - assertThat(pipeline.getModifiedDate().orElseThrow(), greaterThanOrEqualTo(beforePut.toEpochMilli())); - assertThat(pipeline.getModifiedDate().orElseThrow(), lessThanOrEqualTo(afterPut.toEpochMilli())); + assertTrue(pipeline.getCreatedDateMillis().isEmpty()); + assertThat(pipeline.getModifiedDateMillis().orElseThrow(), greaterThanOrEqualTo(beforePut.toEpochMilli())); + assertThat(pipeline.getModifiedDateMillis().orElseThrow(), lessThanOrEqualTo(afterPut.toEpochMilli())); } public void testPutWithErrorResponse() throws IllegalAccessException { From 6a025c55aab3671366d12084104f7f84aab8746a Mon Sep 17 00:00:00 2001 From: Szymon Bialkowski Date: Mon, 14 Jul 2025 16:13:12 +0100 Subject: [PATCH 4/7] Update docs/changelog/130847.yaml --- docs/changelog/130847.yaml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/docs/changelog/130847.yaml b/docs/changelog/130847.yaml index 0f8c37ff0b1ec..7dcdf3789398e 100644 --- a/docs/changelog/130847.yaml +++ b/docs/changelog/130847.yaml @@ -2,6 +2,4 @@ pr: 130847 summary: "Pipelines: Add `created_date` and `modified_date`" area: Ingest Node type: enhancement -issues: - - 108754 - +issues: [] From 7e40783635e20760c89a0eabb60e0f464dd1e018 Mon Sep 17 00:00:00 2001 From: Szymon Bialkowski Date: Wed, 16 Jul 2025 13:56:23 +0100 Subject: [PATCH 5/7] second review --- .../rest-api-spec/test/ingest/20_tracking.yml | 4 +- .../ingest/PipelineConfigurationSyncIT.java | 77 --------------- .../action/ingest/ReservedPipelineAction.java | 2 +- .../ingest/SimulateExecutionService.java | 6 +- .../elasticsearch/ingest/IngestService.java | 93 +++++++++---------- .../org/elasticsearch/ingest/Pipeline.java | 15 ++- .../ingest/TrackingResultProcessor.java | 6 +- .../action/ingest/RestPutPipelineAction.java | 1 + .../ingest/IngestServiceTests.java | 88 ++++++++++-------- 9 files changed, 108 insertions(+), 184 deletions(-) delete mode 100644 server/src/internalClusterTest/java/org/elasticsearch/ingest/PipelineConfigurationSyncIT.java diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/ingest/20_tracking.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/ingest/20_tracking.yml index 2403abfedda64..3b9ae4ccf5783 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/ingest/20_tracking.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/ingest/20_tracking.yml @@ -21,8 +21,8 @@ setup: - do: ingest.get_pipeline: id: "my_pipeline" - - match: { my_pipeline.created_date: "/\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z/" } - - match: { my_pipeline.modified_date: "/\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z/" } + - match: { my_pipeline.created_date: "/^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z$/" } + - match: { my_pipeline.modified_date: "/^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z$/" } --- "Test PUT setting created_date": diff --git a/server/src/internalClusterTest/java/org/elasticsearch/ingest/PipelineConfigurationSyncIT.java b/server/src/internalClusterTest/java/org/elasticsearch/ingest/PipelineConfigurationSyncIT.java deleted file mode 100644 index 2640ea2b010ad..0000000000000 --- a/server/src/internalClusterTest/java/org/elasticsearch/ingest/PipelineConfigurationSyncIT.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ - -package org.elasticsearch.ingest; - -import org.elasticsearch.action.ingest.GetPipelineResponse; -import org.elasticsearch.cluster.metadata.Metadata; -import org.elasticsearch.node.NodeService; -import org.elasticsearch.test.ESIntegTestCase; - -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.lessThanOrEqualTo; - -@ESIntegTestCase.ClusterScope(numDataNodes = 3) -public class PipelineConfigurationSyncIT extends ESIntegTestCase { - - public void testAllNodesGetPipelineTrackingClusterState() throws Exception { - final String pipelineId = "_id"; - GetPipelineResponse getResponse = getPipelines(pipelineId); - assertFalse(getResponse.isFound()); - - final long timeBeforePut = System.currentTimeMillis(); - putJsonPipeline( - "_id", - (builder, params) -> builder.field("description", "my_pipeline").startArray("processors").startObject().endObject().endArray() - ); - final Long timeAfterPut = System.currentTimeMillis(); - - getResponse = getPipelines(pipelineId); - assertTrue(getResponse.isFound()); - - final Pipeline node1Pipeline = internalCluster().getInstance(NodeService.class, "node_s0") - .getIngestService() - .getPipeline(Metadata.DEFAULT_PROJECT_ID, "_id"); - final Pipeline node2Pipeline = internalCluster().getInstance(NodeService.class, "node_s1") - .getIngestService() - .getPipeline(Metadata.DEFAULT_PROJECT_ID, "_id"); - final Pipeline node3Pipeline = internalCluster().getInstance(NodeService.class, "node_s2") - .getIngestService() - .getPipeline(Metadata.DEFAULT_PROJECT_ID, "_id"); - - assertNotSame(node1Pipeline, node2Pipeline); - assertNotSame(node2Pipeline, node3Pipeline); - - assertThat(node1Pipeline.getDescription(), equalTo(node2Pipeline.getDescription())); - assertThat(node2Pipeline.getDescription(), equalTo(node3Pipeline.getDescription())); - - // created_date - final long node1CreatedDate = node1Pipeline.getCreatedDateMillis().orElseThrow(); - final long node2CreatedDate = node2Pipeline.getCreatedDateMillis().orElseThrow(); - final long node3CreatedDate = node3Pipeline.getCreatedDateMillis().orElseThrow(); - - assertThat(node1CreatedDate, equalTo(node2CreatedDate)); - assertThat(node2CreatedDate, equalTo(node3CreatedDate)); - - assertThat(node1CreatedDate, greaterThanOrEqualTo(timeBeforePut)); - assertThat(node1CreatedDate, lessThanOrEqualTo(timeAfterPut)); - - // modified_date - final long node1ModifiedDate = node1Pipeline.getModifiedDateMillis().orElseThrow(); - final long node2ModifiedDate = node2Pipeline.getModifiedDateMillis().orElseThrow(); - final long node3ModifiedDate = node3Pipeline.getModifiedDateMillis().orElseThrow(); - - assertThat(node1ModifiedDate, equalTo(node2ModifiedDate)); - assertThat(node2ModifiedDate, equalTo(node3ModifiedDate)); - - assertThat(node1ModifiedDate, greaterThanOrEqualTo(timeBeforePut)); - assertThat(node1ModifiedDate, lessThanOrEqualTo(timeAfterPut)); - } -} diff --git a/server/src/main/java/org/elasticsearch/action/ingest/ReservedPipelineAction.java b/server/src/main/java/org/elasticsearch/action/ingest/ReservedPipelineAction.java index b07d535f17ec8..65d634aeb498b 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/ReservedPipelineAction.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/ReservedPipelineAction.java @@ -87,7 +87,7 @@ public TransformState transform(ProjectId projectId, List so ProjectMetadata projectMetadata = clusterState.metadata().getProject(projectId); for (var request : requests) { - var nopUpdate = IngestService.isNoOpPipelineUpdate(projectMetadata, request, () -> IngestService.readPipelineConfig(request)); + var nopUpdate = IngestService.isNoOpPipelineUpdate(projectMetadata, request); if (nopUpdate) { continue; diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java index 84f4ac81f65df..4ef262c548e52 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java @@ -42,8 +42,6 @@ static void executeDocument( if (verbose) { List processorResultList = new CopyOnWriteArrayList<>(); CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), null, processorResultList); - long createdDate = pipeline.getCreatedDateMillis().orElse(-1); - long modifiedDate = pipeline.getModifiedDateMillis().orElse(-1); Pipeline verbosePipeline = new Pipeline( pipeline.getId(), pipeline.getDescription(), @@ -52,8 +50,8 @@ static void executeDocument( verbosePipelineProcessor, pipeline.getFieldAccessPattern(), pipeline.getDeprecated(), - createdDate == -1 ? null : createdDate, - modifiedDate == -1 ? null : modifiedDate + pipeline.getCreatedDateMillis().orElse(null), + pipeline.getModifiedDateMillis().orElse(null) ); ingestDocument.executePipeline(verbosePipeline, (result, e) -> { handler.accept(new SimulateDocumentVerboseResult(processorResultList), e); diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 354fa36346345..0f83cf137c232 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -49,7 +49,6 @@ import org.elasticsearch.cluster.service.MasterServiceTaskQueue; import org.elasticsearch.common.Priority; import org.elasticsearch.common.TriConsumer; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.logging.DeprecationCategory; import org.elasticsearch.common.logging.DeprecationLogger; @@ -78,10 +77,11 @@ import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xcontent.XContentBuilder; -import java.io.IOException; import java.time.Instant; +import java.time.InstantSource; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Collection; @@ -104,7 +104,6 @@ import java.util.function.Function; import java.util.function.IntConsumer; import java.util.function.Predicate; -import java.util.function.Supplier; import java.util.stream.Collectors; import static org.elasticsearch.core.Strings.format; @@ -545,9 +544,7 @@ public void putPipeline( ActionListener listener, Consumer> nodeInfoListener ) throws Exception { - Map newPipelineConfig = readPipelineConfig(request); - validateNoSystemPropertiesInPipelineConfig(newPipelineConfig); - if (isNoOpPipelineUpdate(state.metadata().getProject(projectId), request, () -> newPipelineConfig)) { + if (isNoOpPipelineUpdate(state.metadata().getProject(projectId), request)) { // existing pipeline matches request pipeline -- no need to update listener.onResponse(AcknowledgedResponse.TRUE); return; @@ -558,7 +555,7 @@ public void putPipeline( taskQueue.submitTask( "put-pipeline-" + request.getId(), - new PutPipelineClusterStateUpdateTask(projectId, l, request), + new PutPipelineClusterStateUpdateTask(projectId, l, request, Instant::now), request.masterNodeTimeout() ); })); @@ -574,10 +571,6 @@ public void validatePipelineRequest(ProjectId projectId, PutPipelineRequest requ validatePipeline(ingestInfos, projectId, request.getId(), config); } - public static Map readPipelineConfig(PutPipelineRequest request) { - return XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); - } - public static void validateNoSystemPropertiesInPipelineConfig(final Map pipelineConfig) { if (pipelineConfig.containsKey(Pipeline.CREATED_DATE_KEY)) { throw new ElasticsearchParseException("Provided a pipeline property which is managed by the system: created_date."); @@ -586,24 +579,23 @@ public static void validateNoSystemPropertiesInPipelineConfig(final Map> newPipelineConfigSupplier - ) { + /** Check whether updating a potentially existing pipeline will be a NOP. + * Will return false if request contains system-properties like `{created,modified}_date, + * these should be rejected later.`*/ + public static boolean isNoOpPipelineUpdate(ProjectMetadata metadata, PutPipelineRequest request) { IngestMetadata currentIngestMetadata = metadata.custom(IngestMetadata.TYPE); if (request.getVersion() == null && currentIngestMetadata != null && currentIngestMetadata.getPipelines().containsKey(request.getId())) { + var newPipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); + Map currentConfigWithoutSystemProps = new HashMap<>( currentIngestMetadata.getPipelines().get(request.getId()).getConfig() ); currentConfigWithoutSystemProps.remove(Pipeline.CREATED_DATE_KEY); currentConfigWithoutSystemProps.remove(Pipeline.MODIFIED_DATE_KEY); - Map newPipelineConfig = newPipelineConfigSupplier.get(); - return newPipelineConfig.equals(currentConfigWithoutSystemProps); } @@ -701,26 +693,42 @@ private static void collectProcessorMetrics( * Used in this class and externally by the {@link org.elasticsearch.action.ingest.ReservedPipelineAction} */ public static class PutPipelineClusterStateUpdateTask extends PipelineClusterStateUpdateTask { + // always output millis even if instantSource returns millis == 0 + private static final DateTimeFormatter ISO8601_WITH_MILLIS_FORMATTER = new DateTimeFormatterBuilder().appendInstant(3) + .toFormatter(Locale.ROOT); + private final PutPipelineRequest request; + private final InstantSource instantSource; - PutPipelineClusterStateUpdateTask(ProjectId projectId, ActionListener listener, PutPipelineRequest request) { + PutPipelineClusterStateUpdateTask( + final ProjectId projectId, + final ActionListener listener, + final PutPipelineRequest request, + final InstantSource instantSource + ) { super(projectId, listener); this.request = request; + this.instantSource = instantSource; } /** * Used by {@link org.elasticsearch.action.ingest.ReservedPipelineAction} */ public PutPipelineClusterStateUpdateTask(ProjectId projectId, PutPipelineRequest request) { - this(projectId, null, request); + this(projectId, null, request, Instant::now); } @Override public IngestMetadata execute(IngestMetadata currentIngestMetadata, Collection allIndexMetadata) { - BytesReference pipelineSource = request.getSource(); + final Map existingPipelines = currentIngestMetadata == null + ? new HashMap<>(1) + : new HashMap<>(currentIngestMetadata.getPipelines()); + final PipelineConfiguration existingPipeline = existingPipelines.get(request.getId()); + final Map newPipelineConfig = XContentHelper.convertToMap(request.getSource(), true, request.getXContentType()) + .v2(); + if (request.getVersion() != null) { - var currentPipeline = currentIngestMetadata != null ? currentIngestMetadata.getPipelines().get(request.getId()) : null; - if (currentPipeline == null) { + if (existingPipeline == null) { throw new IllegalArgumentException( String.format( Locale.ROOT, @@ -731,7 +739,7 @@ public IngestMetadata execute(IngestMetadata currentIngestMetadata, Collection pipelines; - if (currentIngestMetadata != null) { - pipelines = new HashMap<>(currentIngestMetadata.getPipelines()); - } else { - pipelines = new HashMap<>(); - } - - Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); - Map newPipelineConfig = XContentHelper.convertToMap(pipelineSource, true, request.getXContentType()).v2(); - PipelineConfiguration existingPipeline = pipelines.get(request.getId()); + final String iso8601WithMillisNow = ISO8601_WITH_MILLIS_FORMATTER.format( + instantSource.instant().truncatedTo(ChronoUnit.MILLIS) + ); if (existingPipeline == null) { - newPipelineConfig.put(Pipeline.CREATED_DATE_KEY, now.toString()); + newPipelineConfig.put(Pipeline.CREATED_DATE_KEY, iso8601WithMillisNow); } else { Object existingCreatedAt = existingPipeline.getConfig().get(Pipeline.CREATED_DATE_KEY); // only set/carry over `created_date` if existing pipeline already has it. @@ -789,10 +783,10 @@ public IngestMetadata execute(IngestMetadata currentIngestMetadata, Collection pipelineConfig ) throws Exception { + validateNoSystemPropertiesInPipelineConfig(pipelineConfig); if (ingestInfos.isEmpty()) { throw new IllegalStateException("Ingest info is empty"); } diff --git a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java index bdd89b1578bc3..b0cbbe459313e 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -19,7 +19,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.OptionalLong; +import java.util.Optional; import java.util.function.BiConsumer; import java.util.function.LongSupplier; import java.util.function.Predicate; @@ -305,12 +305,12 @@ public boolean isDeprecated() { return Boolean.TRUE.equals(deprecated); } - public OptionalLong getCreatedDateMillis() { - return createdDateMillis == null ? OptionalLong.empty() : OptionalLong.of(createdDateMillis); + public Optional getCreatedDateMillis() { + return Optional.ofNullable(createdDateMillis); } - public OptionalLong getModifiedDateMillis() { - return modifiedDateMillis == null ? OptionalLong.empty() : OptionalLong.of(modifiedDateMillis); + public Optional getModifiedDateMillis() { + return Optional.ofNullable(modifiedDateMillis); } @Override @@ -325,10 +325,9 @@ public String toString() { sb.append(", relativeTimeProvider=").append(relativeTimeProvider); sb.append(", fieldAccessPattern=").append(fieldAccessPattern); sb.append(", deprecated=").append(deprecated); - sb.append(", createdDateMillis=").append(createdDateMillis); - sb.append(", modifiedDateMillis=").append(modifiedDateMillis); + sb.append(", createdDateMillis='").append(createdDateMillis).append('\''); + sb.append(", modifiedDateMillis='").append(modifiedDateMillis).append('\''); sb.append('}'); return sb.toString(); } - } diff --git a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java index 839df487da930..491f3e4828495 100644 --- a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java @@ -210,8 +210,6 @@ private void decorateAndExecutePipeline( conditionalWithResult ) ); - long createdDate = pipeline.getCreatedDateMillis().orElse(-1); - long modifiedDate = pipeline.getModifiedDateMillis().orElse(-1); Pipeline verbosePipeline = new Pipeline( pipeline.getId(), pipeline.getDescription(), @@ -220,8 +218,8 @@ private void decorateAndExecutePipeline( verbosePipelineProcessor, pipeline.getFieldAccessPattern(), pipeline.getDeprecated(), - createdDate == -1 ? null : createdDate, - modifiedDate == -1 ? null : modifiedDate + pipeline.getCreatedDateMillis().orElse(null), + pipeline.getModifiedDateMillis().orElse(null) ); ingestDocument.executePipeline(verbosePipeline, handler); } diff --git a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java index 67056bc938aa8..bb1ba0d4309ad 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java @@ -77,6 +77,7 @@ public RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient cl @Override public Set supportedCapabilities() { + // pipeline_tracking info: `{created,modified}_date` system properties defined within pipeline definition. return Set.of("pipeline_tracking_info"); } } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 2d69bdced6f48..c4acbd54fd5c3 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -82,6 +82,7 @@ import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.time.Instant; +import java.time.InstantSource; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; @@ -117,7 +118,6 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; @@ -817,56 +817,45 @@ public void testPut() { } public void testPutWithTracking() { - final Instant beforeInitialPut = Instant.now(); final IngestService ingestService = createWithProcessors(Map.of()); final String id = "_id"; final ProjectId projectId = randomProjectIdOrDefault(); - Pipeline pipeline = ingestService.getPipeline(projectId, id); - assertThat(pipeline, nullValue()); ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) .putProjectMetadata(ProjectMetadata.builder(projectId).build()) .build(); // Start empty + final AtomicInteger instantSourceInvocationCounter = new AtomicInteger(); + final InstantSource instantSource = () -> Instant.ofEpochMilli(instantSourceInvocationCounter.getAndIncrement()); // add a new pipeline: PutPipelineRequest putRequest = putJsonPipelineRequest(id, "{\"processors\": []}"); ClusterState previousClusterState = clusterState; - clusterState = executePut(projectId, putRequest, clusterState); + clusterState = executePutWithInstantSource(projectId, putRequest, clusterState, instantSource); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); - final Instant afterInitialPut = Instant.now(); - pipeline = ingestService.getPipeline(projectId, id); - assertThat(pipeline, notNullValue()); - assertThat(pipeline.getId(), equalTo(id)); - assertThat(pipeline.getDescription(), nullValue()); - assertThat(pipeline.getProcessors().size(), equalTo(0)); - assertThat(pipeline.getCreatedDateMillis().orElseThrow(), greaterThanOrEqualTo(beforeInitialPut.toEpochMilli())); - assertThat(pipeline.getCreatedDateMillis().orElseThrow(), lessThanOrEqualTo(afterInitialPut.toEpochMilli())); - assertThat(pipeline.getModifiedDateMillis().orElseThrow(), is(pipeline.getCreatedDateMillis().orElseThrow())); + Pipeline pipeline = ingestService.getPipeline(projectId, id); + assertThat(pipeline.getCreatedDateMillis().orElseThrow(), is(0L)); + assertThat(pipeline.getModifiedDateMillis().orElseThrow(), is(0L)); + // test PipelineConfiguration has millis part in iso8601 even if instantSource provides millis part == 0 + final Map clusterStatePipelineConfig = ((IngestMetadata) clusterState.metadata() + .getProject(projectId) + .custom(IngestMetadata.TYPE)).getPipelines().get(id).getConfig(); + assertThat(clusterStatePipelineConfig.get(Pipeline.CREATED_DATE_KEY), is("1970-01-01T00:00:00.000Z")); + assertThat(clusterStatePipelineConfig.get(Pipeline.MODIFIED_DATE_KEY), is("1970-01-01T00:00:00.000Z")); // overwrite existing pipeline: - final Instant beforeSecondPut = Instant.now(); putRequest = putJsonPipelineRequest(id, """ {"processors": [], "description": "_description"}"""); previousClusterState = clusterState; - clusterState = executePut(projectId, putRequest, clusterState); + clusterState = executePutWithInstantSource(projectId, putRequest, clusterState, instantSource); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); - final Instant afterSecondPut = Instant.now(); pipeline = ingestService.getPipeline(projectId, id); - assertThat(pipeline, notNullValue()); - assertThat(pipeline.getId(), equalTo(id)); - assertThat(pipeline.getDescription(), equalTo("_description")); - assertThat(pipeline.getProcessors().size(), equalTo(0)); - assertThat(pipeline.getCreatedDateMillis().orElseThrow(), greaterThanOrEqualTo(beforeInitialPut.toEpochMilli())); - assertThat(pipeline.getCreatedDateMillis().orElseThrow(), lessThanOrEqualTo(afterInitialPut.toEpochMilli())); - assertThat(pipeline.getModifiedDateMillis().orElseThrow(), greaterThanOrEqualTo(beforeSecondPut.toEpochMilli())); - assertThat(pipeline.getModifiedDateMillis().orElseThrow(), lessThanOrEqualTo(afterSecondPut.toEpochMilli())); + assertThat(pipeline.getCreatedDateMillis().orElseThrow(), is(0L)); + assertThat(pipeline.getModifiedDateMillis().orElseThrow(), is(1L)); } public void testPutWithTrackingExistingPipelineWithoutCreatedAtOnlyHasModifiedAt() { final IngestService ingestService = createWithProcessors(); final String id = "_id"; final ProjectId projectId = randomProjectIdOrDefault(); - Pipeline pipeline = ingestService.getPipeline(projectId, id); - assertThat(pipeline, nullValue()); ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) .putProjectMetadata( ProjectMetadata.builder(projectId) @@ -879,22 +868,17 @@ public void testPutWithTrackingExistingPipelineWithoutCreatedAtOnlyHasModifiedAt .build() ) .build(); // Start empty + final AtomicInteger instantSourceInvocationCounter = new AtomicInteger(); + final InstantSource instantSource = () -> Instant.ofEpochMilli(instantSourceInvocationCounter.getAndIncrement()); // update existing pipeline which doesn't have `created_date` - final Instant beforePut = Instant.now(); PutPipelineRequest putRequest = putJsonPipelineRequest(id, "{\"processors\": []}"); ClusterState previousClusterState = clusterState; - clusterState = executePut(projectId, putRequest, clusterState); + clusterState = executePutWithInstantSource(projectId, putRequest, clusterState, instantSource); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); - final Instant afterPut = Instant.now(); - pipeline = ingestService.getPipeline(projectId, id); - assertThat(pipeline, notNullValue()); - assertThat(pipeline.getId(), equalTo(id)); - assertThat(pipeline.getDescription(), nullValue()); - assertThat(pipeline.getProcessors().size(), equalTo(0)); + final Pipeline pipeline = ingestService.getPipeline(projectId, id); assertTrue(pipeline.getCreatedDateMillis().isEmpty()); - assertThat(pipeline.getModifiedDateMillis().orElseThrow(), greaterThanOrEqualTo(beforePut.toEpochMilli())); - assertThat(pipeline.getModifiedDateMillis().orElseThrow(), lessThanOrEqualTo(afterPut.toEpochMilli())); + assertThat(pipeline.getModifiedDateMillis().orElseThrow(), is(0L)); } public void testPutWithErrorResponse() throws IllegalAccessException { @@ -3534,8 +3518,21 @@ private static void executeFailingDelete(ProjectId projectId, DeletePipelineRequ } private static List oneTask(ProjectId projectId, PutPipelineRequest request) { + return oneTaskWithInstantSource(projectId, request, Instant::now); + } + + private static List oneTaskWithInstantSource( + final ProjectId projectId, + final PutPipelineRequest request, + final InstantSource instantSource + ) { return List.of( - new IngestService.PutPipelineClusterStateUpdateTask(projectId, ActionTestUtils.assertNoFailureListener(t -> {}), request) + new IngestService.PutPipelineClusterStateUpdateTask( + projectId, + ActionTestUtils.assertNoFailureListener(t -> {}), + request, + instantSource + ) ); } @@ -3544,8 +3541,21 @@ private static ClusterState executePut(PutPipelineRequest request, ClusterState } private static ClusterState executePut(ProjectId projectId, PutPipelineRequest request, ClusterState clusterState) { + return executePutWithInstantSource(projectId, request, clusterState, Instant::now); + } + + private static ClusterState executePutWithInstantSource( + final ProjectId projectId, + final PutPipelineRequest request, + final ClusterState clusterState, + final InstantSource instantSource + ) { try { - return executeAndAssertSuccessful(clusterState, IngestService.PIPELINE_TASK_EXECUTOR, oneTask(projectId, request)); + return executeAndAssertSuccessful( + clusterState, + IngestService.PIPELINE_TASK_EXECUTOR, + oneTaskWithInstantSource(projectId, request, instantSource) + ); } catch (Exception e) { throw new AssertionError(e); } From 4ff75988cad44bfceeb35ff39f2456a1428cad72 Mon Sep 17 00:00:00 2001 From: Szymon Bialkowski Date: Tue, 22 Jul 2025 10:51:03 +0100 Subject: [PATCH 6/7] petes comments --- .../elasticsearch/ingest/IngestService.java | 25 +++++++++++++------ 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 0f83cf137c232..2f6e3fd19065a 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -555,7 +555,7 @@ public void putPipeline( taskQueue.submitTask( "put-pipeline-" + request.getId(), - new PutPipelineClusterStateUpdateTask(projectId, l, request, Instant::now), + new PutPipelineClusterStateUpdateTask(projectId, l, request), request.masterNodeTimeout() ); })); @@ -580,8 +580,8 @@ public static void validateNoSystemPropertiesInPipelineConfig(final Mapfalse if request contains system-properties like `{created,modified}_date, - * these should be rejected later.`*/ + * Will return false if request contains system-properties like created or modified_date, + * these should be rejected later.*/ public static boolean isNoOpPipelineUpdate(ProjectMetadata metadata, PutPipelineRequest request) { IngestMetadata currentIngestMetadata = metadata.custom(IngestMetadata.TYPE); if (request.getVersion() == null @@ -700,6 +700,7 @@ public static class PutPipelineClusterStateUpdateTask extends PipelineClusterSta private final PutPipelineRequest request; private final InstantSource instantSource; + // constructor allowing for injection of InstantSource/time for testing PutPipelineClusterStateUpdateTask( final ProjectId projectId, final ActionListener listener, @@ -711,19 +712,27 @@ public static class PutPipelineClusterStateUpdateTask extends PipelineClusterSta this.instantSource = instantSource; } + PutPipelineClusterStateUpdateTask( + final ProjectId projectId, + final ActionListener listener, + final PutPipelineRequest request + ) { + this(projectId, listener, request, Instant::now); + } + /** * Used by {@link org.elasticsearch.action.ingest.ReservedPipelineAction} */ public PutPipelineClusterStateUpdateTask(ProjectId projectId, PutPipelineRequest request) { - this(projectId, null, request, Instant::now); + this(projectId, null, request); } @Override public IngestMetadata execute(IngestMetadata currentIngestMetadata, Collection allIndexMetadata) { - final Map existingPipelines = currentIngestMetadata == null + final Map pipelines = currentIngestMetadata == null ? new HashMap<>(1) : new HashMap<>(currentIngestMetadata.getPipelines()); - final PipelineConfiguration existingPipeline = existingPipelines.get(request.getId()); + final PipelineConfiguration existingPipeline = pipelines.get(request.getId()); final Map newPipelineConfig = XContentHelper.convertToMap(request.getSource(), true, request.getXContentType()) .v2(); @@ -785,8 +794,8 @@ public IngestMetadata execute(IngestMetadata currentIngestMetadata, Collection Date: Thu, 24 Jul 2025 11:20:35 +0100 Subject: [PATCH 7/7] proposal --- .../rest-api-spec/test/ingest/20_tracking.yml | 31 +++++++++++++++++++ .../action/ingest/GetPipelineResponse.java | 21 ++++++++++++- .../elasticsearch/ingest/IngestService.java | 31 ++++++++----------- .../org/elasticsearch/ingest/Pipeline.java | 15 ++++----- .../ingest/PipelineConfiguration.java | 8 ++--- .../ingest/IngestServiceTests.java | 6 ---- 6 files changed, 76 insertions(+), 36 deletions(-) diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/ingest/20_tracking.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/ingest/20_tracking.yml index 3b9ae4ccf5783..6e0312a93063a 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/ingest/20_tracking.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/ingest/20_tracking.yml @@ -20,7 +20,10 @@ setup: - do: ingest.get_pipeline: + human: true id: "my_pipeline" + - gte: { my_pipeline.created_date_millis: 0 } + - gte: { my_pipeline.modified_date_millis: 0 } - match: { my_pipeline.created_date: "/^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z$/" } - match: { my_pipeline.modified_date: "/^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z$/" } @@ -38,6 +41,34 @@ setup: - match: { status: 400 } - match: { error.reason: "Provided a pipeline property which is managed by the system: created_date." } +--- +"Test PUT setting created_date_millis": + - do: + catch: bad_request + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "processors": [], + "created_date_millis": 0 + } + - match: { status: 400 } + - match: { error.reason: "Provided a pipeline property which is managed by the system: created_date_millis." } + +--- +"Test PUT setting modified_date_millis": + - do: + catch: bad_request + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "processors": [], + "modified_date_millis": 0 + } + - match: { status: 400 } + - match: { error.reason: "Provided a pipeline property which is managed by the system: modified_date_millis." } + --- "Test PUT setting modified_date": - do: diff --git a/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineResponse.java b/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineResponse.java index 8e919c071edf2..9ea99da3f0492 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineResponse.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineResponse.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.core.UpdateForV10; +import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.ingest.PipelineConfiguration; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.xcontent.ToXContentObject; @@ -74,7 +75,25 @@ public RestStatus status() { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); for (PipelineConfiguration pipeline : pipelines) { - builder.field(pipeline.getId(), summary ? Map.of() : pipeline.getConfig()); + builder.startObject(pipeline.getId()); + for (final Map.Entry configProperty : (summary ? Map.of() : pipeline.getConfig()).entrySet()) { + if (Pipeline.CREATED_DATE_MILLIS.equals(configProperty.getKey())) { + builder.timestampFieldsFromUnixEpochMillis( + Pipeline.CREATED_DATE_MILLIS, + Pipeline.CREATED_DATE, + (Long) configProperty.getValue() + ); + } else if (Pipeline.MODIFIED_DATE_MILLIS.equals(configProperty.getKey())) { + builder.timestampFieldsFromUnixEpochMillis( + Pipeline.MODIFIED_DATE_MILLIS, + Pipeline.MODIFIED_DATE, + (Long) configProperty.getValue() + ); + } else { + builder.field(configProperty.getKey(), configProperty.getValue()); + } + } + builder.endObject(); } builder.endObject(); return builder; diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 2f6e3fd19065a..47ecdaf904801 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -80,9 +80,6 @@ import java.time.Instant; import java.time.InstantSource; -import java.time.format.DateTimeFormatter; -import java.time.format.DateTimeFormatterBuilder; -import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -572,9 +569,13 @@ public void validatePipelineRequest(ProjectId projectId, PutPipelineRequest requ } public static void validateNoSystemPropertiesInPipelineConfig(final Map pipelineConfig) { - if (pipelineConfig.containsKey(Pipeline.CREATED_DATE_KEY)) { + if (pipelineConfig.containsKey(Pipeline.CREATED_DATE_MILLIS)) { + throw new ElasticsearchParseException("Provided a pipeline property which is managed by the system: created_date_millis."); + } else if (pipelineConfig.containsKey(Pipeline.CREATED_DATE)) { throw new ElasticsearchParseException("Provided a pipeline property which is managed by the system: created_date."); - } else if (pipelineConfig.containsKey(Pipeline.MODIFIED_DATE_KEY)) { + } else if (pipelineConfig.containsKey(Pipeline.MODIFIED_DATE_MILLIS)) { + throw new ElasticsearchParseException("Provided a pipeline property which is managed by the system: modified_date_millis."); + } else if (pipelineConfig.containsKey(Pipeline.MODIFIED_DATE)) { throw new ElasticsearchParseException("Provided a pipeline property which is managed by the system: modified_date."); } } @@ -593,8 +594,8 @@ public static boolean isNoOpPipelineUpdate(ProjectMetadata metadata, PutPipeline Map currentConfigWithoutSystemProps = new HashMap<>( currentIngestMetadata.getPipelines().get(request.getId()).getConfig() ); - currentConfigWithoutSystemProps.remove(Pipeline.CREATED_DATE_KEY); - currentConfigWithoutSystemProps.remove(Pipeline.MODIFIED_DATE_KEY); + currentConfigWithoutSystemProps.remove(Pipeline.CREATED_DATE_MILLIS); + currentConfigWithoutSystemProps.remove(Pipeline.MODIFIED_DATE_MILLIS); return newPipelineConfig.equals(currentConfigWithoutSystemProps); } @@ -693,10 +694,6 @@ private static void collectProcessorMetrics( * Used in this class and externally by the {@link org.elasticsearch.action.ingest.ReservedPipelineAction} */ public static class PutPipelineClusterStateUpdateTask extends PipelineClusterStateUpdateTask { - // always output millis even if instantSource returns millis == 0 - private static final DateTimeFormatter ISO8601_WITH_MILLIS_FORMATTER = new DateTimeFormatterBuilder().appendInstant(3) - .toFormatter(Locale.ROOT); - private final PutPipelineRequest request; private final InstantSource instantSource; @@ -779,20 +776,18 @@ public IngestMetadata execute(IngestMetadata currentIngestMetadata, Collection configForTransport(final TransportVersion transportVersion) { final boolean transportSupportsNewProperties = transportVersion.onOrAfter(TransportVersions.PIPELINE_TRACKING_INFO); - final boolean noNewProperties = config.containsKey(Pipeline.CREATED_DATE_KEY) == false - && config.containsKey(Pipeline.MODIFIED_DATE_KEY) == false; + final boolean noNewProperties = config.containsKey(Pipeline.CREATED_DATE_MILLIS) == false + && config.containsKey(Pipeline.MODIFIED_DATE_MILLIS) == false; if (transportSupportsNewProperties || noNewProperties) { return config; } final Map configWithoutNewSystemProperties = new HashMap<>(config); - configWithoutNewSystemProperties.remove(Pipeline.CREATED_DATE_KEY); - configWithoutNewSystemProperties.remove(Pipeline.MODIFIED_DATE_KEY); + configWithoutNewSystemProperties.remove(Pipeline.CREATED_DATE_MILLIS); + configWithoutNewSystemProperties.remove(Pipeline.MODIFIED_DATE_MILLIS); return Collections.unmodifiableMap(configWithoutNewSystemProperties); } } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index c4acbd54fd5c3..829ea86678f4e 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -834,12 +834,6 @@ public void testPutWithTracking() { Pipeline pipeline = ingestService.getPipeline(projectId, id); assertThat(pipeline.getCreatedDateMillis().orElseThrow(), is(0L)); assertThat(pipeline.getModifiedDateMillis().orElseThrow(), is(0L)); - // test PipelineConfiguration has millis part in iso8601 even if instantSource provides millis part == 0 - final Map clusterStatePipelineConfig = ((IngestMetadata) clusterState.metadata() - .getProject(projectId) - .custom(IngestMetadata.TYPE)).getPipelines().get(id).getConfig(); - assertThat(clusterStatePipelineConfig.get(Pipeline.CREATED_DATE_KEY), is("1970-01-01T00:00:00.000Z")); - assertThat(clusterStatePipelineConfig.get(Pipeline.MODIFIED_DATE_KEY), is("1970-01-01T00:00:00.000Z")); // overwrite existing pipeline: putRequest = putJsonPipelineRequest(id, """