Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 62 additions & 38 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -875,41 +875,17 @@ private void innerExecute(
return;
}

String index = indexRequest.index();
String id = indexRequest.id();
String routing = indexRequest.routing();
long version = indexRequest.version();
VersionType versionType = indexRequest.versionType();
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
IngestDocument ingestDocument = new IngestDocument(index, id, version, routing, versionType, sourceAsMap);
IngestDocument ingestDocument = newIngestDocument(indexRequest);
ingestDocument.executePipeline(pipeline, (result, e) -> {
if (e != null) {
handler.accept(e);
} else if (result == null) {
itemDroppedHandler.accept(slot);
handler.accept(null);
} else {
org.elasticsearch.script.Metadata metadata = ingestDocument.getMetadata();

// it's fine to set all metadata fields all the time, as ingest document holds their starting values
// before ingestion, which might also get modified during ingestion.
indexRequest.index(metadata.getIndex());
indexRequest.id(metadata.getId());
indexRequest.routing(metadata.getRouting());
indexRequest.version(metadata.getVersion());
if (metadata.getVersionType() != null) {
indexRequest.versionType(VersionType.fromString(metadata.getVersionType()));
}
Number number;
if ((number = metadata.getIfSeqNo()) != null) {
indexRequest.setIfSeqNo(number.longValue());
}
if ((number = metadata.getIfPrimaryTerm()) != null) {
indexRequest.setIfPrimaryTerm(number.longValue());
}
updateIndexRequestMetadata(indexRequest, ingestDocument.getMetadata());
try {
boolean ensureNoSelfReferences = ingestDocument.doNoSelfReferencesCheck();
indexRequest.source(ingestDocument.getSource(), indexRequest.getContentType(), ensureNoSelfReferences);
updateIndexRequestSource(indexRequest, ingestDocument);
} catch (IllegalArgumentException ex) {
// An IllegalArgumentException can be thrown when an ingest processor creates a source map that is self-referencing.
// In that case, we catch and wrap the exception, so we can include which pipeline failed.
Expand All @@ -930,25 +906,73 @@ private void innerExecute(
);
return;
}
Map<String, String> map;
if ((map = metadata.getDynamicTemplates()) != null) {
Map<String, String> mergedDynamicTemplates = new HashMap<>(indexRequest.getDynamicTemplates());
mergedDynamicTemplates.putAll(map);
indexRequest.setDynamicTemplates(mergedDynamicTemplates);
}
postIngest(ingestDocument, indexRequest);
cacheRawTimestamp(indexRequest, ingestDocument);

handler.accept(null);
}
});
}

private void postIngest(IngestDocument ingestDocument, IndexRequest indexRequest) {
if (indexRequest.getRawTimestamp() == null) {
/**
* Builds a new ingest document from the passed-in index request.
*/
private static IngestDocument newIngestDocument(final IndexRequest request) {
return new IngestDocument(
request.index(),
request.id(),
request.version(),
request.routing(),
request.versionType(),
request.sourceAsMap()
);
}

/**
* Updates an index request based on the metadata of an ingest document.
*/
private static void updateIndexRequestMetadata(final IndexRequest request, final org.elasticsearch.script.Metadata metadata) {
// it's fine to set all metadata fields all the time, as ingest document holds their starting values
// before ingestion, which might also get modified during ingestion.
request.index(metadata.getIndex());
request.id(metadata.getId());
request.routing(metadata.getRouting());
request.version(metadata.getVersion());
if (metadata.getVersionType() != null) {
request.versionType(VersionType.fromString(metadata.getVersionType()));
}
Number number;
if ((number = metadata.getIfSeqNo()) != null) {
request.setIfSeqNo(number.longValue());
}
if ((number = metadata.getIfPrimaryTerm()) != null) {
request.setIfPrimaryTerm(number.longValue());
}
Map<String, String> map;
if ((map = metadata.getDynamicTemplates()) != null) {
Map<String, String> mergedDynamicTemplates = new HashMap<>(request.getDynamicTemplates());
mergedDynamicTemplates.putAll(map);
request.setDynamicTemplates(mergedDynamicTemplates);
}
}

/**
* Updates an index request based on the source of an ingest document, guarding against self-references if necessary.
*/
private static void updateIndexRequestSource(final IndexRequest request, final IngestDocument document) {
boolean ensureNoSelfReferences = document.doNoSelfReferencesCheck();
request.source(document.getSource(), request.getContentType(), ensureNoSelfReferences);
}

/**
* Grab the @timestamp and store it on the index request so that TSDB can use it without needing to parse
* the source for this document.
*/
private static void cacheRawTimestamp(final IndexRequest request, final IngestDocument document) {
if (request.getRawTimestamp() == null) {
// cache the @timestamp from the ingest document's source map if there is one
Object rawTimestamp = ingestDocument.getSource().get(TimestampField.FIXED_TIMESTAMP_FIELD);
Object rawTimestamp = document.getSource().get(TimestampField.FIXED_TIMESTAMP_FIELD);
if (rawTimestamp != null) {
indexRequest.setRawTimestamp(rawTimestamp);
request.setRawTimestamp(rawTimestamp);
}
}
}
Expand Down