From 848ccd15465cd2d32d834f5f674286d3b9103207 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Wed, 25 Jan 2023 12:53:08 -0500 Subject: [PATCH 01/15] Make these variables final --- .../main/java/org/elasticsearch/ingest/IngestService.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index cb913abae81fb..31134eb603d42 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -742,12 +742,12 @@ private void executePipelines( assert it.hasNext(); final String pipelineId = it.next(); try { - PipelineHolder holder = pipelines.get(pipelineId); + final PipelineHolder holder = pipelines.get(pipelineId); if (holder == null) { throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); } - Pipeline pipeline = holder.pipeline; - String originalIndex = indexRequest.indices()[0]; + final Pipeline pipeline = holder.pipeline; + final String originalIndex = indexRequest.indices()[0]; long startTimeInNanos = System.nanoTime(); totalMetrics.preIngest(); innerExecute(slot, indexRequest, pipeline, onDropped, e -> { From 073c2ab2994704b308b397427cde8638ac167038 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 24 Jan 2023 16:36:16 -0500 Subject: [PATCH 02/15] Drop early returns --- .../main/java/org/elasticsearch/ingest/IngestService.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 31134eb603d42..016dd76e1b2f6 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -886,6 +886,8 @@ private void innerExecute( updateIndexRequestMetadata(indexRequest, ingestDocument.getMetadata()); try { updateIndexRequestSource(indexRequest, ingestDocument); + cacheRawTimestamp(indexRequest, ingestDocument); + handler.accept(null); } catch (IllegalArgumentException ex) { // An IllegalArgumentException can be thrown when an ingest processor creates a source map that is self-referencing. // In that case, we catch and wrap the exception, so we can include which pipeline failed. @@ -895,7 +897,6 @@ private void innerExecute( ex ) ); - return; } catch (Exception ex) { // If anything goes wrong here, we want to know, and cannot proceed with normal execution. For example, // *rarely*, a ConcurrentModificationException could be thrown if a pipeline leaks a reference to a shared mutable @@ -904,11 +905,7 @@ private void innerExecute( handler.accept( new RuntimeException("Failed to generate the source document for ingest pipeline [" + pipeline.getId() + "]", ex) ); - return; } - cacheRawTimestamp(indexRequest, ingestDocument); - - handler.accept(null); } }); } From 72ff31d793296e88e74217d61aa124a5d85b9e68 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Mon, 23 Jan 2023 15:42:48 -0500 Subject: [PATCH 03/15] Use a document listener for all this --- .../elasticsearch/ingest/IngestService.java | 88 ++++++++++--------- 1 file changed, 47 insertions(+), 41 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 016dd76e1b2f6..33cdb194fde96 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -46,6 +46,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.Releasable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.env.Environment; @@ -713,15 +714,29 @@ protected void doRun() { continue; } - executePipelines( - i, - pipelines.iterator(), - hasFinalPipeline, - indexRequest, - onDropped, - onFailure, - refs.acquireListener() - ); + // acquire a ref to indicate that we're working on this document + final int slot = i; + final Releasable ref = refs.acquire(); + // the document listener gives us three-way logic: a document can fail processing (1), or it can + // be successfully processed. a successfully processed document can be kept (2) or dropped (3). + final ActionListener documentListener = ActionListener.runAfter(new ActionListener<>() { + @Override + public void onResponse(Boolean kept) { + assert kept != null; + if (kept == false) { + onDropped.accept(slot); + } + } + + @Override + public void onFailure(Exception e) { + onFailure.accept(slot, e); + } + }, () -> { + // regardless of success or failure, we always release the ref to indicate that we're finished + ref.close(); + }); + executePipelines(pipelines.iterator(), hasFinalPipeline, indexRequest, documentListener); i++; } @@ -731,13 +746,10 @@ protected void doRun() { } private void executePipelines( - final int slot, final Iterator it, final boolean hasFinalPipeline, final IndexRequest indexRequest, - final IntConsumer onDropped, - final BiConsumer onFailure, - final ActionListener onFinished + final ActionListener listener ) { assert it.hasNext(); final String pipelineId = it.next(); @@ -750,7 +762,9 @@ private void executePipelines( final String originalIndex = indexRequest.indices()[0]; long startTimeInNanos = System.nanoTime(); totalMetrics.preIngest(); - innerExecute(slot, indexRequest, pipeline, onDropped, e -> { + innerExecute(indexRequest, pipeline, (keep, e) -> { + assert keep != null; + long ingestTimeInNanos = System.nanoTime() - startTimeInNanos; totalMetrics.postIngest(ingestTimeInNanos); if (e != null) { @@ -764,10 +778,13 @@ private void executePipelines( ), e ); - onFailure.accept(slot, e); - // document failed! no further processing of this doc - onFinished.onResponse(null); - return; + listener.onFailure(e); + return; // document failed! + } + + if (keep == false) { + listener.onResponse(false); + return; // document dropped! } Iterator newIt = it; @@ -777,13 +794,8 @@ private void executePipelines( if (Objects.equals(originalIndex, newIndex) == false) { if (hasFinalPipeline && it.hasNext() == false) { totalMetrics.ingestFailed(); - onFailure.accept( - slot, - new IllegalStateException("final pipeline [" + pipelineId + "] can't change the target index") - ); - // document failed! no further processing of this doc - onFinished.onResponse(null); - return; + listener.onFailure(new IllegalStateException("final pipeline [" + pipelineId + "] can't change the target index")); + return; // document failed! } else { indexRequest.isPipelineResolved(false); resolvePipelines(null, indexRequest, state.metadata()); @@ -797,9 +809,9 @@ private void executePipelines( } if (newIt.hasNext()) { - executePipelines(slot, newIt, newHasFinalPipeline, indexRequest, onDropped, onFailure, onFinished); + executePipelines(newIt, newHasFinalPipeline, indexRequest, listener); } else { - onFinished.onResponse(null); + listener.onResponse(true); // document succeeded! } }); } catch (Exception e) { @@ -807,8 +819,7 @@ private void executePipelines( () -> format("failed to execute pipeline [%s] for document [%s/%s]", pipelineId, indexRequest.index(), indexRequest.id()), e ); - onFailure.accept(slot, e); - onFinished.onResponse(null); + listener.onFailure(e); // document failed! } } @@ -863,35 +874,29 @@ static String getProcessorName(Processor processor) { return sb.toString(); } - private void innerExecute( - final int slot, - final IndexRequest indexRequest, - final Pipeline pipeline, - final IntConsumer itemDroppedHandler, - final Consumer handler - ) { + private void innerExecute(final IndexRequest indexRequest, final Pipeline pipeline, final BiConsumer handler) { if (pipeline.getProcessors().isEmpty()) { - handler.accept(null); + handler.accept(true, null); return; } IngestDocument ingestDocument = newIngestDocument(indexRequest); ingestDocument.executePipeline(pipeline, (result, e) -> { if (e != null) { - handler.accept(e); + handler.accept(true, e); } else if (result == null) { - itemDroppedHandler.accept(slot); - handler.accept(null); + handler.accept(false, null); } else { updateIndexRequestMetadata(indexRequest, ingestDocument.getMetadata()); try { updateIndexRequestSource(indexRequest, ingestDocument); cacheRawTimestamp(indexRequest, ingestDocument); - handler.accept(null); + handler.accept(true, null); } catch (IllegalArgumentException ex) { // An IllegalArgumentException can be thrown when an ingest processor creates a source map that is self-referencing. // In that case, we catch and wrap the exception, so we can include which pipeline failed. handler.accept( + true, new IllegalArgumentException( "Failed to generate the source document for ingest pipeline [" + pipeline.getId() + "]", ex @@ -903,6 +908,7 @@ private void innerExecute( // collection, and another indexing thread modifies the shared reference while we're trying to ensure it has // no self references. handler.accept( + true, new RuntimeException("Failed to generate the source document for ingest pipeline [" + pipeline.getId() + "]", ex) ); } From 822d900d421abc53867c32f482fb6f0b37c70038 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Wed, 25 Jan 2023 12:54:52 -0500 Subject: [PATCH 04/15] Move top-level ingest stats to the top-level --- .../org/elasticsearch/ingest/IngestService.java | 16 ++++++++-------- .../elasticsearch/ingest/IngestServiceTests.java | 3 +-- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 33cdb194fde96..8d5ae91c9d2ff 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -714,7 +714,9 @@ protected void doRun() { continue; } - // acquire a ref to indicate that we're working on this document + // start the stopwatch and acquire a ref to indicate that we're working on this document + final long startTimeInNanos = System.nanoTime(); + totalMetrics.preIngest(); final int slot = i; final Releasable ref = refs.acquire(); // the document listener gives us three-way logic: a document can fail processing (1), or it can @@ -730,10 +732,14 @@ public void onResponse(Boolean kept) { @Override public void onFailure(Exception e) { + totalMetrics.ingestFailed(); onFailure.accept(slot, e); } }, () -> { - // regardless of success or failure, we always release the ref to indicate that we're finished + // regardless of success or failure, we always stop the ingest "stopwatch" and release the ref to indicate + // that we're finished with this document + final long ingestTimeInNanos = System.nanoTime() - startTimeInNanos; + totalMetrics.postIngest(ingestTimeInNanos); ref.close(); }); executePipelines(pipelines.iterator(), hasFinalPipeline, indexRequest, documentListener); @@ -760,15 +766,10 @@ private void executePipelines( } final Pipeline pipeline = holder.pipeline; final String originalIndex = indexRequest.indices()[0]; - long startTimeInNanos = System.nanoTime(); - totalMetrics.preIngest(); innerExecute(indexRequest, pipeline, (keep, e) -> { assert keep != null; - long ingestTimeInNanos = System.nanoTime() - startTimeInNanos; - totalMetrics.postIngest(ingestTimeInNanos); if (e != null) { - totalMetrics.ingestFailed(); logger.debug( () -> format( "failed to execute pipeline [%s] for document [%s/%s]", @@ -793,7 +794,6 @@ private void executePipelines( if (Objects.equals(originalIndex, newIndex) == false) { if (hasFinalPipeline && it.hasNext() == false) { - totalMetrics.ingestFailed(); listener.onFailure(new IllegalStateException("final pipeline [" + pipelineId + "] can't change the target index")); return; // document failed! } else { diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 9e041ca1d6210..a9fcb086b6c12 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -1484,8 +1484,7 @@ public String execute() { assertThat(ingestStats.getPipelineStats().size(), equalTo(3)); // total - // see https://github.com/elastic/elasticsearch/issues/92843 -- this should be 1, but it's actually 2 - // assertStats(ingestStats.getTotalStats(), 1, 0, 0); + assertStats(ingestStats.getTotalStats(), 1, 0, 0); // pipeline assertPipelineStats(ingestStats.getPipelineStats(), "_id1", 1, 0, 0); assertPipelineStats(ingestStats.getPipelineStats(), "_id2", 1, 0, 0); From 3ced88febba48970cba6b81a8b91c3aa59baddc3 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Mon, 23 Jan 2023 17:02:35 -0500 Subject: [PATCH 05/15] Move parse/generate out of innerExecute --- .../elasticsearch/ingest/IngestService.java | 67 ++++++++++--------- .../ingest/IngestServiceTests.java | 3 +- 2 files changed, 36 insertions(+), 34 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 8d5ae91c9d2ff..cfe20096acb52 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -44,6 +44,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.Releasable; @@ -742,7 +743,9 @@ public void onFailure(Exception e) { totalMetrics.postIngest(ingestTimeInNanos); ref.close(); }); - executePipelines(pipelines.iterator(), hasFinalPipeline, indexRequest, documentListener); + + IngestDocument ingestDocument = newIngestDocument(indexRequest); + executePipelines(pipelines.iterator(), hasFinalPipeline, indexRequest, ingestDocument, documentListener); i++; } @@ -755,6 +758,7 @@ private void executePipelines( final Iterator it, final boolean hasFinalPipeline, final IndexRequest indexRequest, + final IngestDocument ingestDocument, final ActionListener listener ) { assert it.hasNext(); @@ -766,7 +770,7 @@ private void executePipelines( } final Pipeline pipeline = holder.pipeline; final String originalIndex = indexRequest.indices()[0]; - innerExecute(indexRequest, pipeline, (keep, e) -> { + innerExecute(ingestDocument, pipeline, (keep, e) -> { assert keep != null; if (e != null) { @@ -788,6 +792,26 @@ private void executePipelines( return; // document dropped! } + // update the index request so that we can execute additional pipelines (if any), etc + updateIndexRequestMetadata(indexRequest, ingestDocument.getMetadata()); + try { + // check for self-references if necessary, (i.e. if a script processor has run), and clear the bit + if (ingestDocument.doNoSelfReferencesCheck()) { + CollectionUtils.ensureNoSelfReferences(ingestDocument.getSource(), null); + ingestDocument.doNoSelfReferencesCheck(false); + } + } catch (IllegalArgumentException ex) { + // An IllegalArgumentException can be thrown when an ingest processor creates a source map that is self-referencing. + // In that case, we catch and wrap the exception, so we can include more details + listener.onFailure( + new IllegalArgumentException( + format("Failed to generate the source document for ingest pipeline [%s]", pipelineId), + ex + ) + ); + return; // document failed! + } + Iterator newIt = it; boolean newHasFinalPipeline = hasFinalPipeline; String newIndex = indexRequest.indices()[0]; @@ -809,8 +833,11 @@ private void executePipelines( } if (newIt.hasNext()) { - executePipelines(newIt, newHasFinalPipeline, indexRequest, listener); + executePipelines(newIt, newHasFinalPipeline, indexRequest, ingestDocument, listener); } else { + // update the index request's source and (potentially) cache the timestamp for TSDB + updateIndexRequestSource(indexRequest, ingestDocument); + cacheRawTimestamp(indexRequest, ingestDocument); listener.onResponse(true); // document succeeded! } }); @@ -874,44 +901,17 @@ static String getProcessorName(Processor processor) { return sb.toString(); } - private void innerExecute(final IndexRequest indexRequest, final Pipeline pipeline, final BiConsumer handler) { + private void innerExecute(final IngestDocument ingestDocument, final Pipeline pipeline, final BiConsumer handler) { if (pipeline.getProcessors().isEmpty()) { handler.accept(true, null); return; } - IngestDocument ingestDocument = newIngestDocument(indexRequest); ingestDocument.executePipeline(pipeline, (result, e) -> { if (e != null) { handler.accept(true, e); - } else if (result == null) { - handler.accept(false, null); } else { - updateIndexRequestMetadata(indexRequest, ingestDocument.getMetadata()); - try { - updateIndexRequestSource(indexRequest, ingestDocument); - cacheRawTimestamp(indexRequest, ingestDocument); - handler.accept(true, null); - } catch (IllegalArgumentException ex) { - // An IllegalArgumentException can be thrown when an ingest processor creates a source map that is self-referencing. - // In that case, we catch and wrap the exception, so we can include which pipeline failed. - handler.accept( - true, - new IllegalArgumentException( - "Failed to generate the source document for ingest pipeline [" + pipeline.getId() + "]", - ex - ) - ); - } catch (Exception ex) { - // If anything goes wrong here, we want to know, and cannot proceed with normal execution. For example, - // *rarely*, a ConcurrentModificationException could be thrown if a pipeline leaks a reference to a shared mutable - // collection, and another indexing thread modifies the shared reference while we're trying to ensure it has - // no self references. - handler.accept( - true, - new RuntimeException("Failed to generate the source document for ingest pipeline [" + pipeline.getId() + "]", ex) - ); - } + handler.accept(result != null, null); } }); } @@ -963,6 +963,9 @@ private static void updateIndexRequestMetadata(final IndexRequest request, final */ private static void updateIndexRequestSource(final IndexRequest request, final IngestDocument document) { boolean ensureNoSelfReferences = document.doNoSelfReferencesCheck(); + // we already check for self references elsewhere (and clear the bit), so this should always be false, + // keeping the check and assert as a guard against extraordinarily surprising circumstances + assert ensureNoSelfReferences == false; request.source(document.getSource(), request.getContentType(), ensureNoSelfReferences); } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index a9fcb086b6c12..ffafe12ce0dd7 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -1864,8 +1864,7 @@ public void testSetsRawTimestamp() { assertThat(indexRequest5.getRawTimestamp(), nullValue()); assertThat(indexRequest6.getRawTimestamp(), equalTo(10)); assertThat(indexRequest7.getRawTimestamp(), equalTo(100)); - // see https://github.com/elastic/elasticsearch/issues/93118 -- this should be 100, but it's actually 10 - // assertThat(indexRequest8.getRawTimestamp(), equalTo(100)); + assertThat(indexRequest8.getRawTimestamp(), equalTo(100)); } public void testResolveRequiredOrDefaultPipelineDefaultPipeline() { From 887ee71268d1bbe63c30db413ad5393e266cd04f Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Fri, 27 Jan 2023 12:00:12 -0500 Subject: [PATCH 06/15] Add more detail to this error message --- .../rest-api-spec/test/ingest/190_script_processor.yml | 2 +- .../main/java/org/elasticsearch/ingest/IngestService.java | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/190_script_processor.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/190_script_processor.yml index 614b329c699fe..d833e0111f83b 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/190_script_processor.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/190_script_processor.yml @@ -313,7 +313,7 @@ teardown: "foo": "bar" } } - - match: { error.root_cause.0.reason: "Failed to generate the source document for ingest pipeline [my_pipeline]" } + - match: { error.root_cause.0.reason: "Failed to generate the source document for ingest pipeline [my_pipeline] for document [test/1]" } --- "Test metadata": diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index cfe20096acb52..1698c38ddc155 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -805,7 +805,12 @@ private void executePipelines( // In that case, we catch and wrap the exception, so we can include more details listener.onFailure( new IllegalArgumentException( - format("Failed to generate the source document for ingest pipeline [%s]", pipelineId), + format( + "Failed to generate the source document for ingest pipeline [%s] for document [%s/%s]", + pipelineId, + indexRequest.index(), + indexRequest.id() + ), ex ) ); From c2fbb088801e293ec38c6460e0c6f0214a480570 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Wed, 25 Jan 2023 11:39:52 -0500 Subject: [PATCH 07/15] Move this logic It used to make sense for this to live in the ingest service, because we avoided allocating an ingest document if the pipeline was empty. Now we already have the document regardless, so this can just live in IngestDocument anyway. In any case, this would be a rare and unusual thing to have happen at all. I don't want to drop the logic completely, but I'm also not worried about the performance implications of where it lives. --- .../main/java/org/elasticsearch/ingest/IngestDocument.java | 6 ++++++ .../main/java/org/elasticsearch/ingest/IngestService.java | 5 ----- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 76c42b7fc8a41..f471926087ae5 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -829,6 +829,12 @@ private static Set getAllFields(Map input, String prefix * @param handler handles the result or failure */ public void executePipeline(Pipeline pipeline, BiConsumer handler) { + // shortcut if the pipeline is empty + if (pipeline.getProcessors().isEmpty()) { + handler.accept(this, null); + return; + } + if (executedPipelines.add(pipeline.getId())) { Object previousPipeline = ingestMetadata.put("pipeline", pipeline.getId()); pipeline.execute(this, (result, e) -> { diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 1698c38ddc155..1ea023026d505 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -907,11 +907,6 @@ static String getProcessorName(Processor processor) { } private void innerExecute(final IngestDocument ingestDocument, final Pipeline pipeline, final BiConsumer handler) { - if (pipeline.getProcessors().isEmpty()) { - handler.accept(true, null); - return; - } - ingestDocument.executePipeline(pipeline, (result, e) -> { if (e != null) { handler.accept(true, e); From 02f6160c723d230ac78bc6939a4174627fa74842 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Wed, 25 Jan 2023 11:51:11 -0500 Subject: [PATCH 08/15] Rename and move innerExecute --- .../elasticsearch/ingest/IngestService.java | 29 ++++++++++++------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 1ea023026d505..0c299f706be9a 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -770,7 +770,7 @@ private void executePipelines( } final Pipeline pipeline = holder.pipeline; final String originalIndex = indexRequest.indices()[0]; - innerExecute(ingestDocument, pipeline, (keep, e) -> { + executePipeline(ingestDocument, pipeline, (keep, e) -> { assert keep != null; if (e != null) { @@ -855,6 +855,23 @@ private void executePipelines( } } + private void executePipeline( + final IngestDocument ingestDocument, + final Pipeline pipeline, + final BiConsumer handler + ) { + // adapt our {@code BiConsumer} handler shape to the + // {@code BiConsumer} handler shape used internally + // by ingest pipelines and processors + ingestDocument.executePipeline(pipeline, (result, e) -> { + if (e != null) { + handler.accept(true, e); + } else { + handler.accept(result != null, null); + } + }); + } + public IngestStats stats() { IngestStats.Builder statsBuilder = new IngestStats.Builder(); statsBuilder.addTotalMetrics(totalMetrics); @@ -906,16 +923,6 @@ static String getProcessorName(Processor processor) { return sb.toString(); } - private void innerExecute(final IngestDocument ingestDocument, final Pipeline pipeline, final BiConsumer handler) { - ingestDocument.executePipeline(pipeline, (result, e) -> { - if (e != null) { - handler.accept(true, e); - } else { - handler.accept(result != null, null); - } - }); - } - /** * Builds a new ingest document from the passed-in index request. */ From ae2e3ff0553519970a28673666c7d32e42906dbe Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Fri, 27 Jan 2023 13:04:58 -0500 Subject: [PATCH 09/15] Update docs/changelog/93329.yaml --- docs/changelog/93329.yaml | 57 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 docs/changelog/93329.yaml diff --git a/docs/changelog/93329.yaml b/docs/changelog/93329.yaml new file mode 100644 index 0000000000000..eecf1936872fa --- /dev/null +++ b/docs/changelog/93329.yaml @@ -0,0 +1,57 @@ +pr: 93329 +summary: Handle a default/request pipeline and a final pipeline with minimal additional + overhead +area: Ingest Node +type: refactoring +issues: + - 92843 + - 81244 + - 93118 +highlight: + title: Handle a default/request pipeline and a final pipeline with minimal additional + overhead + body: |- + Closes #81244 Closes #92843 Closes #93118 + + Tightens up the document handling aspects of `executePipelines` and its + callees. `innerExecute` becomes trivial, and nearly drops out (renamed + to `executePipeline` where it remains just to adapt handler shapes). + + At a high level, the execution goes from: + + ``` + - pipeline 1 (default/request pipeline): + - parse json + - execute processors + - generate json + - pipeline 2 (final pipeline): + - parse json + - execute processors + - generate json + ``` + + to + + ``` + - parse json + - pipeline 1 (default/request pipeline): + - execute processors + - pipeline 2 (final pipeline): + - execute processors + - generate json + ``` + + The difference in the flame graph is pretty clear. Before: + + Screen Shot 2023-01-25 at 3 29 39 PM + + After: + + Screen Shot 2023-01-27 at 12 59 31 PM + + And the performance is much better, as one would expect, with the total + time spent in any ingest code for the nightly security benchmark + dropping from 4994128 to 3568490 millis -- a decrease of 29%. + notable: true From 5b9d4a9ed6995a3b84d8f2e810f0dec8ece811f0 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Fri, 27 Jan 2023 14:47:55 -0500 Subject: [PATCH 10/15] Update docs/changelog/93329.yaml --- docs/changelog/93329.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/changelog/93329.yaml b/docs/changelog/93329.yaml index eecf1936872fa..37bbbbbc0d97b 100644 --- a/docs/changelog/93329.yaml +++ b/docs/changelog/93329.yaml @@ -2,7 +2,7 @@ pr: 93329 summary: Handle a default/request pipeline and a final pipeline with minimal additional overhead area: Ingest Node -type: refactoring +type: "bug, refactoring" issues: - 92843 - 81244 From b9439134a94ba00cca4c55d3a66fdef7a27c7563 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Fri, 27 Jan 2023 14:48:49 -0500 Subject: [PATCH 11/15] Update docs/changelog/93329.yaml --- docs/changelog/93329.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/changelog/93329.yaml b/docs/changelog/93329.yaml index 37bbbbbc0d97b..362790fd6a2c8 100644 --- a/docs/changelog/93329.yaml +++ b/docs/changelog/93329.yaml @@ -2,7 +2,7 @@ pr: 93329 summary: Handle a default/request pipeline and a final pipeline with minimal additional overhead area: Ingest Node -type: "bug, refactoring" +type: bug issues: - 92843 - 81244 From 8bd83b8ad84ec067b1787ad1cd88ae8100cf88d6 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Fri, 27 Jan 2023 14:54:45 -0500 Subject: [PATCH 12/15] Reword the release highlights --- docs/changelog/93329.yaml | 51 ++++++--------------------------------- 1 file changed, 7 insertions(+), 44 deletions(-) diff --git a/docs/changelog/93329.yaml b/docs/changelog/93329.yaml index 362790fd6a2c8..4585edad9b6c1 100644 --- a/docs/changelog/93329.yaml +++ b/docs/changelog/93329.yaml @@ -8,50 +8,13 @@ issues: - 81244 - 93118 highlight: - title: Handle a default/request pipeline and a final pipeline with minimal additional - overhead + title: Speed up ingest processing with multiple pipelines body: |- - Closes #81244 Closes #92843 Closes #93118 + Processing documents with both a request/default and a final + pipeline is significantly faster. - Tightens up the document handling aspects of `executePipelines` and its - callees. `innerExecute` becomes trivial, and nearly drops out (renamed - to `executePipeline` where it remains just to adapt handler shapes). - - At a high level, the execution goes from: - - ``` - - pipeline 1 (default/request pipeline): - - parse json - - execute processors - - generate json - - pipeline 2 (final pipeline): - - parse json - - execute processors - - generate json - ``` - - to - - ``` - - parse json - - pipeline 1 (default/request pipeline): - - execute processors - - pipeline 2 (final pipeline): - - execute processors - - generate json - ``` - - The difference in the flame graph is pretty clear. Before: - - Screen Shot 2023-01-25 at 3 29 39 PM - - After: - - Screen Shot 2023-01-27 at 12 59 31 PM - - And the performance is much better, as one would expect, with the total - time spent in any ingest code for the nightly security benchmark - dropping from 4994128 to 3568490 millis -- a decrease of 29%. + Rather than marshalling a document from and to json once per + pipeline, a document is now marshalled from json before any + pipelines execute and then back to json after all pipelines have + executed. notable: true From be9d35ef26b548d27b3c02149d7875ad38ed1fb2 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Fri, 27 Jan 2023 15:31:51 -0500 Subject: [PATCH 13/15] This can be final --- .../src/main/java/org/elasticsearch/ingest/IngestService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 0c299f706be9a..31e724889abca 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -819,7 +819,7 @@ private void executePipelines( Iterator newIt = it; boolean newHasFinalPipeline = hasFinalPipeline; - String newIndex = indexRequest.indices()[0]; + final String newIndex = indexRequest.indices()[0]; if (Objects.equals(originalIndex, newIndex) == false) { if (hasFinalPipeline && it.hasNext() == false) { From 8b60a8b31b9b79d09b7316a71c2246fdc8c87943 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Fri, 27 Jan 2023 15:32:01 -0500 Subject: [PATCH 14/15] Rename these iterators for clarity --- .../elasticsearch/ingest/IngestService.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 31e724889abca..265a965cdd10d 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -755,14 +755,14 @@ public void onFailure(Exception e) { } private void executePipelines( - final Iterator it, + final Iterator pipelineIds, final boolean hasFinalPipeline, final IndexRequest indexRequest, final IngestDocument ingestDocument, final ActionListener listener ) { - assert it.hasNext(); - final String pipelineId = it.next(); + assert pipelineIds.hasNext(); + final String pipelineId = pipelineIds.next(); try { final PipelineHolder holder = pipelines.get(pipelineId); if (holder == null) { @@ -817,28 +817,28 @@ private void executePipelines( return; // document failed! } - Iterator newIt = it; + Iterator newPipelineIds = pipelineIds; boolean newHasFinalPipeline = hasFinalPipeline; final String newIndex = indexRequest.indices()[0]; if (Objects.equals(originalIndex, newIndex) == false) { - if (hasFinalPipeline && it.hasNext() == false) { + if (hasFinalPipeline && pipelineIds.hasNext() == false) { listener.onFailure(new IllegalStateException("final pipeline [" + pipelineId + "] can't change the target index")); return; // document failed! } else { indexRequest.isPipelineResolved(false); resolvePipelines(null, indexRequest, state.metadata()); if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false) { - newIt = Collections.singleton(indexRequest.getFinalPipeline()).iterator(); + newPipelineIds = Collections.singleton(indexRequest.getFinalPipeline()).iterator(); newHasFinalPipeline = true; } else { - newIt = Collections.emptyIterator(); + newPipelineIds = Collections.emptyIterator(); } } } - if (newIt.hasNext()) { - executePipelines(newIt, newHasFinalPipeline, indexRequest, ingestDocument, listener); + if (newPipelineIds.hasNext()) { + executePipelines(newPipelineIds, newHasFinalPipeline, indexRequest, ingestDocument, listener); } else { // update the index request's source and (potentially) cache the timestamp for TSDB updateIndexRequestSource(indexRequest, ingestDocument); From 87c64f1427f791c5ef79dce4f5d6de7033e36bf0 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Fri, 27 Jan 2023 17:34:03 -0500 Subject: [PATCH 15/15] Add more detail to this error message --- .../org/elasticsearch/index/FinalPipelineIT.java | 8 +++++++- .../java/org/elasticsearch/ingest/IngestService.java | 12 +++++++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java index 60084b384c642..98ce3933f9d3c 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java @@ -53,6 +53,7 @@ import java.util.function.Supplier; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasToString; @@ -89,7 +90,12 @@ public void testFinalPipelineCantChangeDestination() { IllegalStateException.class, () -> client().prepareIndex("index").setId("1").setSource(Map.of("field", "value")).get() ); - assertThat(e, hasToString(containsString("final pipeline [final_pipeline] can't change the target index"))); + assertThat( + e, + hasToString( + endsWith("final pipeline [final_pipeline] can't change the target index (from [index] to [target]) for document [1]") + ) + ); } public void testFinalPipelineOfOldDestinationIsNotInvoked() { diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 265a965cdd10d..12018ce906515 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -823,7 +823,17 @@ private void executePipelines( if (Objects.equals(originalIndex, newIndex) == false) { if (hasFinalPipeline && pipelineIds.hasNext() == false) { - listener.onFailure(new IllegalStateException("final pipeline [" + pipelineId + "] can't change the target index")); + listener.onFailure( + new IllegalStateException( + format( + "final pipeline [%s] can't change the target index (from [%s] to [%s]) for document [%s]", + pipelineId, + originalIndex, + newIndex, + indexRequest.id() + ) + ) + ); return; // document failed! } else { indexRequest.isPipelineResolved(false);