Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions docs/changelog/93329.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
pr: 93329
summary: Handle a default/request pipeline and a final pipeline with minimal additional
overhead
area: Ingest Node
type: bug
issues:
- 92843
- 81244
- 93118
highlight:
title: Speed up ingest processing with multiple pipelines
body: |-
Processing documents with both a request/default and a final
pipeline is significantly faster.

Rather than marshalling a document from and to json once per
pipeline, a document is now marshalled from json before any
pipelines execute and then back to json after all pipelines have
executed.
notable: true
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ teardown:
"foo": "bar"
}
}
- match: { error.root_cause.0.reason: "Failed to generate the source document for ingest pipeline [my_pipeline]" }
- match: { error.root_cause.0.reason: "Failed to generate the source document for ingest pipeline [my_pipeline] for document [test/1]" }

---
"Test metadata":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,12 @@ private static Set<String> getAllFields(Map<String, Object> input, String prefix
* @param handler handles the result or failure
*/
public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Exception> handler) {
// shortcut if the pipeline is empty
if (pipeline.getProcessors().isEmpty()) {
handler.accept(this, null);
return;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this going to potentially confuse people who are looking at ingest metrics? I would think it would be a pretty rare case -- how much does this optimization save us? Is it worth the potential future "why are my metrics wrong?" support tickets?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be wrong about the metrics?

Copy link
Contributor Author

@joegallo joegallo Jan 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also note that this is the same logic as before, it's just in a slightly different place. (See c2fbb08 for the deets.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I don't know what would be wrong about the metrics -- I thought I had traced where this would impact them last week, but now I have no idea what I was seeing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries, it's certainly a valid question to ask and we are indeed in a maze of twisty passages all alike. 😄

}

if (executedPipelines.add(pipeline.getId())) {
Object previousPipeline = ingestMetadata.put("pipeline", pipeline.getId());
pipeline.execute(this, (result, e) -> {
Expand Down
189 changes: 101 additions & 88 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.env.Environment;
Expand Down Expand Up @@ -713,15 +715,37 @@ protected void doRun() {
continue;
}

executePipelines(
i,
pipelines.iterator(),
hasFinalPipeline,
indexRequest,
onDropped,
onFailure,
refs.acquireListener()
);
// start the stopwatch and acquire a ref to indicate that we're working on this document
final long startTimeInNanos = System.nanoTime();
totalMetrics.preIngest();
final int slot = i;
final Releasable ref = refs.acquire();
// the document listener gives us three-way logic: a document can fail processing (1), or it can
// be successfully processed. a successfully processed document can be kept (2) or dropped (3).
final ActionListener<Boolean> documentListener = ActionListener.runAfter(new ActionListener<>() {
@Override
public void onResponse(Boolean kept) {
assert kept != null;
if (kept == false) {
onDropped.accept(slot);
}
}

@Override
public void onFailure(Exception e) {
totalMetrics.ingestFailed();
onFailure.accept(slot, e);
}
}, () -> {
// regardless of success or failure, we always stop the ingest "stopwatch" and release the ref to indicate
// that we're finished with this document
final long ingestTimeInNanos = System.nanoTime() - startTimeInNanos;
totalMetrics.postIngest(ingestTimeInNanos);
ref.close();
});

IngestDocument ingestDocument = newIngestDocument(indexRequest);
executePipelines(pipelines.iterator(), hasFinalPipeline, indexRequest, ingestDocument, documentListener);

i++;
}
Expand All @@ -731,30 +755,25 @@ protected void doRun() {
}

private void executePipelines(
final int slot,
final Iterator<String> it,
final boolean hasFinalPipeline,
final IndexRequest indexRequest,
final IntConsumer onDropped,
final BiConsumer<Integer, Exception> onFailure,
final ActionListener<Void> onFinished
final IngestDocument ingestDocument,
final ActionListener<Boolean> listener
) {
assert it.hasNext();
final String pipelineId = it.next();
try {
PipelineHolder holder = pipelines.get(pipelineId);
final PipelineHolder holder = pipelines.get(pipelineId);
if (holder == null) {
throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist");
}
Pipeline pipeline = holder.pipeline;
String originalIndex = indexRequest.indices()[0];
long startTimeInNanos = System.nanoTime();
totalMetrics.preIngest();
innerExecute(slot, indexRequest, pipeline, onDropped, e -> {
long ingestTimeInNanos = System.nanoTime() - startTimeInNanos;
totalMetrics.postIngest(ingestTimeInNanos);
final Pipeline pipeline = holder.pipeline;
final String originalIndex = indexRequest.indices()[0];
executePipeline(ingestDocument, pipeline, (keep, e) -> {
assert keep != null;

if (e != null) {
totalMetrics.ingestFailed();
logger.debug(
() -> format(
"failed to execute pipeline [%s] for document [%s/%s]",
Expand All @@ -764,10 +783,38 @@ private void executePipelines(
),
e
);
onFailure.accept(slot, e);
// document failed! no further processing of this doc
onFinished.onResponse(null);
return;
listener.onFailure(e);
return; // document failed!
}

if (keep == false) {
listener.onResponse(false);
return; // document dropped!
}

// update the index request so that we can execute additional pipelines (if any), etc
updateIndexRequestMetadata(indexRequest, ingestDocument.getMetadata());
try {
// check for self-references if necessary, (i.e. if a script processor has run), and clear the bit
if (ingestDocument.doNoSelfReferencesCheck()) {
CollectionUtils.ensureNoSelfReferences(ingestDocument.getSource(), null);
ingestDocument.doNoSelfReferencesCheck(false);
}
} catch (IllegalArgumentException ex) {
// An IllegalArgumentException can be thrown when an ingest processor creates a source map that is self-referencing.
// In that case, we catch and wrap the exception, so we can include more details
listener.onFailure(
new IllegalArgumentException(
format(
"Failed to generate the source document for ingest pipeline [%s] for document [%s/%s]",
pipelineId,
indexRequest.index(),
indexRequest.id()
),
ex
)
);
return; // document failed!
}

Iterator<String> newIt = it;
Expand All @@ -776,14 +823,8 @@ private void executePipelines(

if (Objects.equals(originalIndex, newIndex) == false) {
if (hasFinalPipeline && it.hasNext() == false) {
totalMetrics.ingestFailed();
onFailure.accept(
slot,
new IllegalStateException("final pipeline [" + pipelineId + "] can't change the target index")
);
// document failed! no further processing of this doc
onFinished.onResponse(null);
return;
listener.onFailure(new IllegalStateException("final pipeline [" + pipelineId + "] can't change the target index"));
return; // document failed!
} else {
indexRequest.isPipelineResolved(false);
resolvePipelines(null, indexRequest, state.metadata());
Expand All @@ -797,21 +838,40 @@ private void executePipelines(
}

if (newIt.hasNext()) {
executePipelines(slot, newIt, newHasFinalPipeline, indexRequest, onDropped, onFailure, onFinished);
executePipelines(newIt, newHasFinalPipeline, indexRequest, ingestDocument, listener);
} else {
onFinished.onResponse(null);
// update the index request's source and (potentially) cache the timestamp for TSDB
updateIndexRequestSource(indexRequest, ingestDocument);
cacheRawTimestamp(indexRequest, ingestDocument);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 nice work

listener.onResponse(true); // document succeeded!
}
});
} catch (Exception e) {
logger.debug(
() -> format("failed to execute pipeline [%s] for document [%s/%s]", pipelineId, indexRequest.index(), indexRequest.id()),
e
);
onFailure.accept(slot, e);
onFinished.onResponse(null);
listener.onFailure(e); // document failed!
}
}

private void executePipeline(
final IngestDocument ingestDocument,
final Pipeline pipeline,
final BiConsumer<Boolean, Exception> handler
) {
// adapt our {@code BiConsumer<Boolean, Exception>} handler shape to the
// {@code BiConsumer<IngestDocument, Exception>} handler shape used internally
// by ingest pipelines and processors
ingestDocument.executePipeline(pipeline, (result, e) -> {
if (e != null) {
handler.accept(true, e);
} else {
handler.accept(result != null, null);
}
});
}

public IngestStats stats() {
IngestStats.Builder statsBuilder = new IngestStats.Builder();
statsBuilder.addTotalMetrics(totalMetrics);
Expand Down Expand Up @@ -863,56 +923,6 @@ static String getProcessorName(Processor processor) {
return sb.toString();
}

private void innerExecute(
final int slot,
final IndexRequest indexRequest,
final Pipeline pipeline,
final IntConsumer itemDroppedHandler,
final Consumer<Exception> handler
) {
if (pipeline.getProcessors().isEmpty()) {
handler.accept(null);
return;
}

IngestDocument ingestDocument = newIngestDocument(indexRequest);
ingestDocument.executePipeline(pipeline, (result, e) -> {
if (e != null) {
handler.accept(e);
} else if (result == null) {
itemDroppedHandler.accept(slot);
handler.accept(null);
} else {
updateIndexRequestMetadata(indexRequest, ingestDocument.getMetadata());
try {
updateIndexRequestSource(indexRequest, ingestDocument);
} catch (IllegalArgumentException ex) {
// An IllegalArgumentException can be thrown when an ingest processor creates a source map that is self-referencing.
// In that case, we catch and wrap the exception, so we can include which pipeline failed.
handler.accept(
new IllegalArgumentException(
"Failed to generate the source document for ingest pipeline [" + pipeline.getId() + "]",
ex
)
);
return;
} catch (Exception ex) {
// If anything goes wrong here, we want to know, and cannot proceed with normal execution. For example,
// *rarely*, a ConcurrentModificationException could be thrown if a pipeline leaks a reference to a shared mutable
// collection, and another indexing thread modifies the shared reference while we're trying to ensure it has
// no self references.
handler.accept(
new RuntimeException("Failed to generate the source document for ingest pipeline [" + pipeline.getId() + "]", ex)
);
return;
}
cacheRawTimestamp(indexRequest, ingestDocument);

handler.accept(null);
}
});
}

/**
* Builds a new ingest document from the passed-in index request.
*/
Expand Down Expand Up @@ -960,6 +970,9 @@ private static void updateIndexRequestMetadata(final IndexRequest request, final
*/
private static void updateIndexRequestSource(final IndexRequest request, final IngestDocument document) {
boolean ensureNoSelfReferences = document.doNoSelfReferencesCheck();
// we already check for self references elsewhere (and clear the bit), so this should always be false,
// keeping the check and assert as a guard against extraordinarily surprising circumstances
assert ensureNoSelfReferences == false;
request.source(document.getSource(), request.getContentType(), ensureNoSelfReferences);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1484,8 +1484,7 @@ public String execute() {
assertThat(ingestStats.getPipelineStats().size(), equalTo(3));

// total
// see https://github.com/elastic/elasticsearch/issues/92843 -- this should be 1, but it's actually 2
// assertStats(ingestStats.getTotalStats(), 1, 0, 0);
assertStats(ingestStats.getTotalStats(), 1, 0, 0);
// pipeline
assertPipelineStats(ingestStats.getPipelineStats(), "_id1", 1, 0, 0);
assertPipelineStats(ingestStats.getPipelineStats(), "_id2", 1, 0, 0);
Expand Down Expand Up @@ -1865,8 +1864,7 @@ public void testSetsRawTimestamp() {
assertThat(indexRequest5.getRawTimestamp(), nullValue());
assertThat(indexRequest6.getRawTimestamp(), equalTo(10));
assertThat(indexRequest7.getRawTimestamp(), equalTo(100));
// see https://github.com/elastic/elasticsearch/issues/93118 -- this should be 100, but it's actually 10
// assertThat(indexRequest8.getRawTimestamp(), equalTo(100));
assertThat(indexRequest8.getRawTimestamp(), equalTo(100));
}

public void testResolveRequiredOrDefaultPipelineDefaultPipeline() {
Expand Down