diff --git a/docs/changelog/93329.yaml b/docs/changelog/93329.yaml new file mode 100644 index 0000000000000..4585edad9b6c1 --- /dev/null +++ b/docs/changelog/93329.yaml @@ -0,0 +1,20 @@ +pr: 93329 +summary: Handle a default/request pipeline and a final pipeline with minimal additional + overhead +area: Ingest Node +type: bug +issues: + - 92843 + - 81244 + - 93118 +highlight: + title: Speed up ingest processing with multiple pipelines + body: |- + Processing documents with both a request/default and a final + pipeline is significantly faster. + + Rather than marshalling a document from and to json once per + pipeline, a document is now marshalled from json before any + pipelines execute and then back to json after all pipelines have + executed. + notable: true diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/190_script_processor.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/190_script_processor.yml index 614b329c699fe..d833e0111f83b 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/190_script_processor.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/190_script_processor.yml @@ -313,7 +313,7 @@ teardown: "foo": "bar" } } - - match: { error.root_cause.0.reason: "Failed to generate the source document for ingest pipeline [my_pipeline]" } + - match: { error.root_cause.0.reason: "Failed to generate the source document for ingest pipeline [my_pipeline] for document [test/1]" } --- "Test metadata": diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java index 60084b384c642..98ce3933f9d3c 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java @@ -53,6 +53,7 @@ import java.util.function.Supplier; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasToString; @@ -89,7 +90,12 @@ public void testFinalPipelineCantChangeDestination() { IllegalStateException.class, () -> client().prepareIndex("index").setId("1").setSource(Map.of("field", "value")).get() ); - assertThat(e, hasToString(containsString("final pipeline [final_pipeline] can't change the target index"))); + assertThat( + e, + hasToString( + endsWith("final pipeline [final_pipeline] can't change the target index (from [index] to [target]) for document [1]") + ) + ); } public void testFinalPipelineOfOldDestinationIsNotInvoked() { diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 76c42b7fc8a41..f471926087ae5 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -829,6 +829,12 @@ private static Set getAllFields(Map input, String prefix * @param handler handles the result or failure */ public void executePipeline(Pipeline pipeline, BiConsumer handler) { + // shortcut if the pipeline is empty + if (pipeline.getProcessors().isEmpty()) { + handler.accept(this, null); + return; + } + if (executedPipelines.add(pipeline.getId())) { Object previousPipeline = ingestMetadata.put("pipeline", pipeline.getId()); pipeline.execute(this, (result, e) -> { diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index cb913abae81fb..12018ce906515 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -44,8 +44,10 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.Releasable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.env.Environment; @@ -713,15 +715,37 @@ protected void doRun() { continue; } - executePipelines( - i, - pipelines.iterator(), - hasFinalPipeline, - indexRequest, - onDropped, - onFailure, - refs.acquireListener() - ); + // start the stopwatch and acquire a ref to indicate that we're working on this document + final long startTimeInNanos = System.nanoTime(); + totalMetrics.preIngest(); + final int slot = i; + final Releasable ref = refs.acquire(); + // the document listener gives us three-way logic: a document can fail processing (1), or it can + // be successfully processed. a successfully processed document can be kept (2) or dropped (3). + final ActionListener documentListener = ActionListener.runAfter(new ActionListener<>() { + @Override + public void onResponse(Boolean kept) { + assert kept != null; + if (kept == false) { + onDropped.accept(slot); + } + } + + @Override + public void onFailure(Exception e) { + totalMetrics.ingestFailed(); + onFailure.accept(slot, e); + } + }, () -> { + // regardless of success or failure, we always stop the ingest "stopwatch" and release the ref to indicate + // that we're finished with this document + final long ingestTimeInNanos = System.nanoTime() - startTimeInNanos; + totalMetrics.postIngest(ingestTimeInNanos); + ref.close(); + }); + + IngestDocument ingestDocument = newIngestDocument(indexRequest); + executePipelines(pipelines.iterator(), hasFinalPipeline, indexRequest, ingestDocument, documentListener); i++; } @@ -731,30 +755,25 @@ protected void doRun() { } private void executePipelines( - final int slot, - final Iterator it, + final Iterator pipelineIds, final boolean hasFinalPipeline, final IndexRequest indexRequest, - final IntConsumer onDropped, - final BiConsumer onFailure, - final ActionListener onFinished + final IngestDocument ingestDocument, + final ActionListener listener ) { - assert it.hasNext(); - final String pipelineId = it.next(); + assert pipelineIds.hasNext(); + final String pipelineId = pipelineIds.next(); try { - PipelineHolder holder = pipelines.get(pipelineId); + final PipelineHolder holder = pipelines.get(pipelineId); if (holder == null) { throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); } - Pipeline pipeline = holder.pipeline; - String originalIndex = indexRequest.indices()[0]; - long startTimeInNanos = System.nanoTime(); - totalMetrics.preIngest(); - innerExecute(slot, indexRequest, pipeline, onDropped, e -> { - long ingestTimeInNanos = System.nanoTime() - startTimeInNanos; - totalMetrics.postIngest(ingestTimeInNanos); + final Pipeline pipeline = holder.pipeline; + final String originalIndex = indexRequest.indices()[0]; + executePipeline(ingestDocument, pipeline, (keep, e) -> { + assert keep != null; + if (e != null) { - totalMetrics.ingestFailed(); logger.debug( () -> format( "failed to execute pipeline [%s] for document [%s/%s]", @@ -764,42 +783,77 @@ private void executePipelines( ), e ); - onFailure.accept(slot, e); - // document failed! no further processing of this doc - onFinished.onResponse(null); - return; + listener.onFailure(e); + return; // document failed! + } + + if (keep == false) { + listener.onResponse(false); + return; // document dropped! } - Iterator newIt = it; + // update the index request so that we can execute additional pipelines (if any), etc + updateIndexRequestMetadata(indexRequest, ingestDocument.getMetadata()); + try { + // check for self-references if necessary, (i.e. if a script processor has run), and clear the bit + if (ingestDocument.doNoSelfReferencesCheck()) { + CollectionUtils.ensureNoSelfReferences(ingestDocument.getSource(), null); + ingestDocument.doNoSelfReferencesCheck(false); + } + } catch (IllegalArgumentException ex) { + // An IllegalArgumentException can be thrown when an ingest processor creates a source map that is self-referencing. + // In that case, we catch and wrap the exception, so we can include more details + listener.onFailure( + new IllegalArgumentException( + format( + "Failed to generate the source document for ingest pipeline [%s] for document [%s/%s]", + pipelineId, + indexRequest.index(), + indexRequest.id() + ), + ex + ) + ); + return; // document failed! + } + + Iterator newPipelineIds = pipelineIds; boolean newHasFinalPipeline = hasFinalPipeline; - String newIndex = indexRequest.indices()[0]; + final String newIndex = indexRequest.indices()[0]; if (Objects.equals(originalIndex, newIndex) == false) { - if (hasFinalPipeline && it.hasNext() == false) { - totalMetrics.ingestFailed(); - onFailure.accept( - slot, - new IllegalStateException("final pipeline [" + pipelineId + "] can't change the target index") + if (hasFinalPipeline && pipelineIds.hasNext() == false) { + listener.onFailure( + new IllegalStateException( + format( + "final pipeline [%s] can't change the target index (from [%s] to [%s]) for document [%s]", + pipelineId, + originalIndex, + newIndex, + indexRequest.id() + ) + ) ); - // document failed! no further processing of this doc - onFinished.onResponse(null); - return; + return; // document failed! } else { indexRequest.isPipelineResolved(false); resolvePipelines(null, indexRequest, state.metadata()); if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false) { - newIt = Collections.singleton(indexRequest.getFinalPipeline()).iterator(); + newPipelineIds = Collections.singleton(indexRequest.getFinalPipeline()).iterator(); newHasFinalPipeline = true; } else { - newIt = Collections.emptyIterator(); + newPipelineIds = Collections.emptyIterator(); } } } - if (newIt.hasNext()) { - executePipelines(slot, newIt, newHasFinalPipeline, indexRequest, onDropped, onFailure, onFinished); + if (newPipelineIds.hasNext()) { + executePipelines(newPipelineIds, newHasFinalPipeline, indexRequest, ingestDocument, listener); } else { - onFinished.onResponse(null); + // update the index request's source and (potentially) cache the timestamp for TSDB + updateIndexRequestSource(indexRequest, ingestDocument); + cacheRawTimestamp(indexRequest, ingestDocument); + listener.onResponse(true); // document succeeded! } }); } catch (Exception e) { @@ -807,11 +861,27 @@ private void executePipelines( () -> format("failed to execute pipeline [%s] for document [%s/%s]", pipelineId, indexRequest.index(), indexRequest.id()), e ); - onFailure.accept(slot, e); - onFinished.onResponse(null); + listener.onFailure(e); // document failed! } } + private void executePipeline( + final IngestDocument ingestDocument, + final Pipeline pipeline, + final BiConsumer handler + ) { + // adapt our {@code BiConsumer} handler shape to the + // {@code BiConsumer} handler shape used internally + // by ingest pipelines and processors + ingestDocument.executePipeline(pipeline, (result, e) -> { + if (e != null) { + handler.accept(true, e); + } else { + handler.accept(result != null, null); + } + }); + } + public IngestStats stats() { IngestStats.Builder statsBuilder = new IngestStats.Builder(); statsBuilder.addTotalMetrics(totalMetrics); @@ -863,56 +933,6 @@ static String getProcessorName(Processor processor) { return sb.toString(); } - private void innerExecute( - final int slot, - final IndexRequest indexRequest, - final Pipeline pipeline, - final IntConsumer itemDroppedHandler, - final Consumer handler - ) { - if (pipeline.getProcessors().isEmpty()) { - handler.accept(null); - return; - } - - IngestDocument ingestDocument = newIngestDocument(indexRequest); - ingestDocument.executePipeline(pipeline, (result, e) -> { - if (e != null) { - handler.accept(e); - } else if (result == null) { - itemDroppedHandler.accept(slot); - handler.accept(null); - } else { - updateIndexRequestMetadata(indexRequest, ingestDocument.getMetadata()); - try { - updateIndexRequestSource(indexRequest, ingestDocument); - } catch (IllegalArgumentException ex) { - // An IllegalArgumentException can be thrown when an ingest processor creates a source map that is self-referencing. - // In that case, we catch and wrap the exception, so we can include which pipeline failed. - handler.accept( - new IllegalArgumentException( - "Failed to generate the source document for ingest pipeline [" + pipeline.getId() + "]", - ex - ) - ); - return; - } catch (Exception ex) { - // If anything goes wrong here, we want to know, and cannot proceed with normal execution. For example, - // *rarely*, a ConcurrentModificationException could be thrown if a pipeline leaks a reference to a shared mutable - // collection, and another indexing thread modifies the shared reference while we're trying to ensure it has - // no self references. - handler.accept( - new RuntimeException("Failed to generate the source document for ingest pipeline [" + pipeline.getId() + "]", ex) - ); - return; - } - cacheRawTimestamp(indexRequest, ingestDocument); - - handler.accept(null); - } - }); - } - /** * Builds a new ingest document from the passed-in index request. */ @@ -960,6 +980,9 @@ private static void updateIndexRequestMetadata(final IndexRequest request, final */ private static void updateIndexRequestSource(final IndexRequest request, final IngestDocument document) { boolean ensureNoSelfReferences = document.doNoSelfReferencesCheck(); + // we already check for self references elsewhere (and clear the bit), so this should always be false, + // keeping the check and assert as a guard against extraordinarily surprising circumstances + assert ensureNoSelfReferences == false; request.source(document.getSource(), request.getContentType(), ensureNoSelfReferences); } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 9e041ca1d6210..ffafe12ce0dd7 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -1484,8 +1484,7 @@ public String execute() { assertThat(ingestStats.getPipelineStats().size(), equalTo(3)); // total - // see https://github.com/elastic/elasticsearch/issues/92843 -- this should be 1, but it's actually 2 - // assertStats(ingestStats.getTotalStats(), 1, 0, 0); + assertStats(ingestStats.getTotalStats(), 1, 0, 0); // pipeline assertPipelineStats(ingestStats.getPipelineStats(), "_id1", 1, 0, 0); assertPipelineStats(ingestStats.getPipelineStats(), "_id2", 1, 0, 0); @@ -1865,8 +1864,7 @@ public void testSetsRawTimestamp() { assertThat(indexRequest5.getRawTimestamp(), nullValue()); assertThat(indexRequest6.getRawTimestamp(), equalTo(10)); assertThat(indexRequest7.getRawTimestamp(), equalTo(100)); - // see https://github.com/elastic/elasticsearch/issues/93118 -- this should be 100, but it's actually 10 - // assertThat(indexRequest8.getRawTimestamp(), equalTo(100)); + assertThat(indexRequest8.getRawTimestamp(), equalTo(100)); } public void testResolveRequiredOrDefaultPipelineDefaultPipeline() {