diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/DocumentMetadata.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/DocumentMetadata.java new file mode 100644 index 0000000000000..2e8b7159acb9b --- /dev/null +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/DocumentMetadata.java @@ -0,0 +1,57 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp; + +import io.opentelemetry.proto.common.v1.KeyValue; + +import org.elasticsearch.core.Nullable; + +import java.util.List; + +/** + * Elastic-specific OTLP attributes that configure per-document indexing metadata. + */ +public final class DocumentMetadata { + + public static final String ATTRIBUTE_PREFIX = "elasticsearch"; + public static final String DOCUMENT_ID_ATTRIBUTE = ATTRIBUTE_PREFIX + ".document_id"; + public static final String INGEST_PIPELINE_ATTRIBUTE = ATTRIBUTE_PREFIX + ".ingest_pipeline"; + + private DocumentMetadata() {} + + /** + * Extracts the Elasticsearch document id metadata attribute, if present. + */ + public static @Nullable String documentId(List attributes) { + return attributeValue(DOCUMENT_ID_ATTRIBUTE, attributes); + } + + /** + * Extracts the Elasticsearch ingest pipeline metadata attribute, if present. + */ + public static @Nullable String ingestPipeline(List attributes) { + return attributeValue(INGEST_PIPELINE_ATTRIBUTE, attributes); + } + + /** + * Returns whether an attribute configures per-document indexing metadata. + */ + public static boolean isDocumentMetadataAttribute(String attributeKey) { + return attributeKey.equals(DOCUMENT_ID_ATTRIBUTE) || attributeKey.equals(INGEST_PIPELINE_ATTRIBUTE); + } + + private static @Nullable String attributeValue(String key, List attributes) { + for (int i = 0, size = attributes.size(); i < size; i++) { + KeyValue attribute = attributes.get(i); + if (key.equals(attribute.getKey())) { + return attribute.getValue().getStringValue(); + } + } + return null; + } +} diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPLogsTransportAction.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPLogsTransportAction.java index 5b8052de28570..3f428f5d8d7d2 100644 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPLogsTransportAction.java +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPLogsTransportAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.threadpool.ThreadPool; @@ -92,10 +93,17 @@ protected ProcessingContext prepareBulkRequest(OTLPActionRequest request, BulkRe index, logRecord ); + IndexRequest indexRequest = new IndexRequest(index.index()); + String documentId = DocumentMetadata.documentId(logRecord.getAttributesList()); + if (Strings.hasLength(documentId)) { + indexRequest.id(documentId); + } + String ingestPipeline = DocumentMetadata.ingestPipeline(logRecord.getAttributesList()); + if (Strings.hasLength(ingestPipeline)) { + indexRequest.setPipeline(ingestPipeline); + } bulkRequestBuilder.add( - new IndexRequest(index.index()).opType(DocWriteRequest.OpType.CREATE) - .setRequireDataStream(true) - .source(xContentBuilder) + indexRequest.opType(DocWriteRequest.OpType.CREATE).setRequireDataStream(true).source(xContentBuilder) ); } } diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPTracesTransportAction.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPTracesTransportAction.java index a56480b5a959b..b3acdc204c39d 100644 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPTracesTransportAction.java +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPTracesTransportAction.java @@ -11,7 +11,6 @@ import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse; import io.opentelemetry.proto.common.v1.InstrumentationScope; -import io.opentelemetry.proto.common.v1.KeyValue; import io.opentelemetry.proto.resource.v1.Resource; import io.opentelemetry.proto.trace.v1.ResourceSpans; import io.opentelemetry.proto.trace.v1.ScopeSpans; @@ -25,6 +24,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.threadpool.ThreadPool; @@ -54,7 +54,6 @@ public class OTLPTracesTransportAction extends AbstractOTLPTransportAction { public static final ActionType TYPE = new ActionType<>(NAME); public static final String TYPE_TRACES = "traces"; - private static final String DOCUMENT_ID_ATTRIBUTE = "elasticsearch.document_id"; @Inject public OTLPTracesTransportAction(TransportService transportService, ActionFilters actionFilters, ThreadPool threadPool, Client client) { @@ -97,8 +96,8 @@ protected ProcessingContext prepareBulkRequest(OTLPActionRequest request, BulkRe span ); IndexRequest indexRequest = new IndexRequest(index.index()); - String documentId = extractDocumentId(span.getAttributesList()); - if (documentId.isEmpty() == false) { + String documentId = DocumentMetadata.documentId(span.getAttributesList()); + if (Strings.hasLength(documentId)) { indexRequest.id(documentId); } bulkRequestBuilder.add( @@ -127,8 +126,8 @@ protected ProcessingContext prepareBulkRequest(OTLPActionRequest request, BulkRe event ); IndexRequest eventRequest = new IndexRequest(eventIndex.index()); - String eventDocumentId = extractDocumentId(event.getAttributesList()); - if (eventDocumentId.isEmpty() == false) { + String eventDocumentId = DocumentMetadata.documentId(event.getAttributesList()); + if (Strings.hasLength(eventDocumentId)) { eventRequest.id(eventDocumentId); } bulkRequestBuilder.add( @@ -150,14 +149,4 @@ MessageLite responseWithRejectedDataPoints(int rejectedDataPoints, String messag .build(); return ExportTraceServiceResponse.newBuilder().setPartialSuccess(partialSuccess).build(); } - - private static String extractDocumentId(List attributes) { - for (int i = 0, size = attributes.size(); i < size; i++) { - KeyValue attribute = attributes.get(i); - if (DOCUMENT_ID_ATTRIBUTE.equals(attribute.getKey())) { - return attribute.getValue().getStringValue(); - } - } - return ""; - } } diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/OTelDocumentBuilder.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/OTelDocumentBuilder.java index 86d47bbc102e8..01d087c65736a 100644 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/OTelDocumentBuilder.java +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/OTelDocumentBuilder.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.hash.MessageDigests; import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xpack.oteldata.otlp.DocumentMetadata; import org.elasticsearch.xpack.oteldata.otlp.datapoint.TargetIndex; import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; @@ -33,7 +34,6 @@ public abstract class OTelDocumentBuilder { private static final String DATA_STREAM_TYPE = "data_stream.type"; private static final String ELASTIC_MAPPING_MODE = "elastic.mapping.mode"; - private static final String ELASTICSEARCH_DOCUMENT_ID = "elasticsearch.document_id"; private static final HexFormat HEX = HexFormat.of(); private final BufferedByteStringAccessor byteStringAccessor; @@ -118,9 +118,9 @@ protected void buildAttributes(XContentBuilder builder, List attribute public static boolean isIgnoredAttribute(String attributeKey) { return TargetIndex.isTargetIndexAttribute(attributeKey) || MappingHints.isMappingHintsAttribute(attributeKey) + || DocumentMetadata.isDocumentMetadataAttribute(attributeKey) || DATA_STREAM_TYPE.equals(attributeKey) - || ELASTIC_MAPPING_MODE.equals(attributeKey) - || ELASTICSEARCH_DOCUMENT_ID.equals(attributeKey); + || ELASTIC_MAPPING_MODE.equals(attributeKey); } protected void buildAnyValue(XContentBuilder builder, AnyValue value) throws IOException { diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OTLPLogsTransportActionTests.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OTLPLogsTransportActionTests.java index e66c183411299..9907145c2fb9d 100644 --- a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OTLPLogsTransportActionTests.java +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OTLPLogsTransportActionTests.java @@ -8,17 +8,23 @@ package org.elasticsearch.xpack.oteldata.otlp; import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse; +import io.opentelemetry.proto.common.v1.KeyValue; import io.opentelemetry.proto.logs.v1.SeverityNumber; import com.google.protobuf.InvalidProtocolBufferException; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.util.List; +import java.util.Map; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; import static org.mockito.Mockito.mock; public class OTLPLogsTransportActionTests extends AbstractOTLPTransportActionTests { @@ -63,4 +69,57 @@ protected String parseErrorMessage(byte[] responseBytes) throws InvalidProtocolB protected String dataStreamType() { return "logs"; } + + public void testPrepareBulkRequestUsesDocumentIdAttribute() throws Exception { + IndexRequest indexRequest = prepareIndexRequestWithAttributes( + List.of(OtlpUtils.keyValue(DocumentMetadata.DOCUMENT_ID_ATTRIBUTE, "log-doc-id")) + ); + + assertThat(indexRequest.id(), equalTo("log-doc-id")); + @SuppressWarnings("unchecked") + Map attributes = (Map) indexRequest.sourceAsMap().get("attributes"); + assertThat(attributes.get(DocumentMetadata.DOCUMENT_ID_ATTRIBUTE), nullValue()); + } + + public void testPrepareBulkRequestLeavesDocumentIdUnsetWhenAttributeEmpty() throws Exception { + IndexRequest indexRequest = prepareIndexRequestWithAttributes( + List.of(OtlpUtils.keyValue(DocumentMetadata.DOCUMENT_ID_ATTRIBUTE, "")) + ); + + assertThat(indexRequest.id(), nullValue()); + } + + public void testPrepareBulkRequestUsesIngestPipelineAttribute() throws Exception { + IndexRequest indexRequest = prepareIndexRequestWithAttributes( + List.of(OtlpUtils.keyValue(DocumentMetadata.INGEST_PIPELINE_ATTRIBUTE, "logs-pipeline")) + ); + + assertThat(indexRequest.getPipeline(), equalTo("logs-pipeline")); + @SuppressWarnings("unchecked") + Map attributes = (Map) indexRequest.sourceAsMap().get("attributes"); + assertThat(attributes.get(DocumentMetadata.INGEST_PIPELINE_ATTRIBUTE), nullValue()); + } + + public void testPrepareBulkRequestLeavesPipelineUnsetWhenAttributeEmpty() throws Exception { + IndexRequest indexRequest = prepareIndexRequestWithAttributes( + List.of(OtlpUtils.keyValue(DocumentMetadata.INGEST_PIPELINE_ATTRIBUTE, "")) + ); + + assertThat(indexRequest.getPipeline(), nullValue()); + } + + private IndexRequest prepareIndexRequestWithAttributes(List attributes) throws Exception { + BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); + createAction().prepareBulkRequest( + new OTLPActionRequest( + new BytesArray( + OtlpLogUtils.createLogsRequest( + List.of(OtlpLogUtils.createLogRecord("Hello world", SeverityNumber.SEVERITY_NUMBER_INFO, "INFO", attributes)) + ).toByteArray() + ) + ), + bulkRequestBuilder + ); + return (IndexRequest) bulkRequestBuilder.request().requests().get(0); + } } diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/LogDocumentBuilderTests.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/LogDocumentBuilderTests.java index 784835251e8b4..f374a0a35e8f2 100644 --- a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/LogDocumentBuilderTests.java +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/LogDocumentBuilderTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.xcontent.XContentFactory; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.oteldata.otlp.DocumentMetadata; import org.elasticsearch.xpack.oteldata.otlp.datapoint.TargetIndex; import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; @@ -354,6 +355,7 @@ public void testControlAttributesAreFiltered() throws IOException { .addAttributes(keyValue("keep", "value")) .addAttributes(keyValue("data_stream.type", "logs")) .addAttributes(keyValue("elasticsearch.document_id", "doc-id")) + .addAttributes(keyValue(DocumentMetadata.INGEST_PIPELINE_ATTRIBUTE, "logs-pipeline")) .build(); ObjectPath doc = buildDocument(resource, scope, logRecord); @@ -361,6 +363,7 @@ public void testControlAttributesAreFiltered() throws IOException { assertThat(doc.evaluate("attributes.keep"), equalTo("value")); assertThat(doc.evaluate("attributes.data_stream\\.type"), nullValue()); assertThat(doc.evaluate("attributes.elasticsearch\\.document_id"), nullValue()); + assertThat(doc.evaluate("attributes.elasticsearch\\.ingest_pipeline"), nullValue()); assertThat(doc.evaluate("scope.attributes.scope\\.keep"), equalTo("value")); assertThat(doc.evaluate("resource.attributes.service\\.name"), equalTo("test-service")); assertThat(doc.evaluate("resource.attributes.elastic\\.mapping\\.mode"), nullValue()); @@ -374,6 +377,7 @@ public void testFilteredControlAttributesCreateEmptyAttributesObject() throws IO .setBody(AnyValue.newBuilder().setStringValue("msg").build()) .addAttributes(keyValue("data_stream.type", "logs")) .addAttributes(keyValue("elasticsearch.document_id", "doc-id")) + .addAttributes(keyValue(DocumentMetadata.INGEST_PIPELINE_ATTRIBUTE, "logs-pipeline")) .build(); ObjectPath doc = buildDocument(resource, scope, logRecord);