Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.oteldata.otlp;

import io.opentelemetry.proto.common.v1.KeyValue;

import org.elasticsearch.core.Nullable;

import java.util.List;

/**
* Elastic-specific OTLP attributes that configure per-document indexing metadata.
*/
public final class DocumentMetadata {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure there’s much value in putting those functions in a standalone class, since they aren't useful outside the OTLPLogsTransportAction context. I actually prefer local extractDocumentId.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The span doc builder also supports the document_id, which is why I think the shared helper makes sense.


public static final String ATTRIBUTE_PREFIX = "elasticsearch";
public static final String DOCUMENT_ID_ATTRIBUTE = ATTRIBUTE_PREFIX + ".document_id";
public static final String INGEST_PIPELINE_ATTRIBUTE = ATTRIBUTE_PREFIX + ".ingest_pipeline";

private DocumentMetadata() {}

/**
* Extracts the Elasticsearch document id metadata attribute, if present.
*/
public static @Nullable String documentId(List<KeyValue> attributes) {
return attributeValue(DOCUMENT_ID_ATTRIBUTE, attributes);
}

/**
* Extracts the Elasticsearch ingest pipeline metadata attribute, if present.
*/
public static @Nullable String ingestPipeline(List<KeyValue> attributes) {
return attributeValue(INGEST_PIPELINE_ATTRIBUTE, attributes);
}

/**
* Returns whether an attribute configures per-document indexing metadata.
*/
public static boolean isDocumentMetadataAttribute(String attributeKey) {
return attributeKey.equals(DOCUMENT_ID_ATTRIBUTE) || attributeKey.equals(INGEST_PIPELINE_ATTRIBUTE);
}

private static @Nullable String attributeValue(String key, List<KeyValue> attributes) {
for (int i = 0, size = attributes.size(); i < size; i++) {
KeyValue attribute = attributes.get(i);
if (key.equals(attribute.getKey())) {
return attribute.getValue().getStringValue();
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -92,10 +93,17 @@ protected ProcessingContext prepareBulkRequest(OTLPActionRequest request, BulkRe
index,
logRecord
);
IndexRequest indexRequest = new IndexRequest(index.index());
String documentId = DocumentMetadata.documentId(logRecord.getAttributesList());
if (Strings.hasLength(documentId)) {
indexRequest.id(documentId);
}
String ingestPipeline = DocumentMetadata.ingestPipeline(logRecord.getAttributesList());
if (Strings.hasLength(ingestPipeline)) {
indexRequest.setPipeline(ingestPipeline);
}
bulkRequestBuilder.add(
new IndexRequest(index.index()).opType(DocWriteRequest.OpType.CREATE)
.setRequireDataStream(true)
.source(xContentBuilder)
indexRequest.opType(DocWriteRequest.OpType.CREATE).setRequireDataStream(true).source(xContentBuilder)
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse;
import io.opentelemetry.proto.common.v1.InstrumentationScope;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.resource.v1.Resource;
import io.opentelemetry.proto.trace.v1.ResourceSpans;
import io.opentelemetry.proto.trace.v1.ScopeSpans;
Expand All @@ -25,6 +24,7 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -54,7 +54,6 @@ public class OTLPTracesTransportAction extends AbstractOTLPTransportAction {
public static final ActionType<OTLPActionResponse> TYPE = new ActionType<>(NAME);

public static final String TYPE_TRACES = "traces";
private static final String DOCUMENT_ID_ATTRIBUTE = "elasticsearch.document_id";

@Inject
public OTLPTracesTransportAction(TransportService transportService, ActionFilters actionFilters, ThreadPool threadPool, Client client) {
Expand Down Expand Up @@ -97,8 +96,8 @@ protected ProcessingContext prepareBulkRequest(OTLPActionRequest request, BulkRe
span
);
IndexRequest indexRequest = new IndexRequest(index.index());
String documentId = extractDocumentId(span.getAttributesList());
if (documentId.isEmpty() == false) {
String documentId = DocumentMetadata.documentId(span.getAttributesList());
if (Strings.hasLength(documentId)) {
indexRequest.id(documentId);
}
bulkRequestBuilder.add(
Expand Down Expand Up @@ -127,8 +126,8 @@ protected ProcessingContext prepareBulkRequest(OTLPActionRequest request, BulkRe
event
);
IndexRequest eventRequest = new IndexRequest(eventIndex.index());
String eventDocumentId = extractDocumentId(event.getAttributesList());
if (eventDocumentId.isEmpty() == false) {
String eventDocumentId = DocumentMetadata.documentId(event.getAttributesList());
if (Strings.hasLength(eventDocumentId)) {
eventRequest.id(eventDocumentId);
}
bulkRequestBuilder.add(
Expand All @@ -150,14 +149,4 @@ MessageLite responseWithRejectedDataPoints(int rejectedDataPoints, String messag
.build();
return ExportTraceServiceResponse.newBuilder().setPartialSuccess(partialSuccess).build();
}

private static String extractDocumentId(List<KeyValue> attributes) {
for (int i = 0, size = attributes.size(); i < size; i++) {
KeyValue attribute = attributes.get(i);
if (DOCUMENT_ID_ATTRIBUTE.equals(attribute.getKey())) {
return attribute.getValue().getStringValue();
}
}
return "";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.hash.MessageDigests;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.oteldata.otlp.DocumentMetadata;
import org.elasticsearch.xpack.oteldata.otlp.datapoint.TargetIndex;
import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor;

Expand All @@ -33,7 +34,6 @@ public abstract class OTelDocumentBuilder {

private static final String DATA_STREAM_TYPE = "data_stream.type";
private static final String ELASTIC_MAPPING_MODE = "elastic.mapping.mode";
private static final String ELASTICSEARCH_DOCUMENT_ID = "elasticsearch.document_id";
private static final HexFormat HEX = HexFormat.of();
private final BufferedByteStringAccessor byteStringAccessor;

Expand Down Expand Up @@ -118,9 +118,9 @@ protected void buildAttributes(XContentBuilder builder, List<KeyValue> attribute
public static boolean isIgnoredAttribute(String attributeKey) {
return TargetIndex.isTargetIndexAttribute(attributeKey)
|| MappingHints.isMappingHintsAttribute(attributeKey)
|| DocumentMetadata.isDocumentMetadataAttribute(attributeKey)
|| DATA_STREAM_TYPE.equals(attributeKey)
|| ELASTIC_MAPPING_MODE.equals(attributeKey)
|| ELASTICSEARCH_DOCUMENT_ID.equals(attributeKey);
|| ELASTIC_MAPPING_MODE.equals(attributeKey);
}

protected void buildAnyValue(XContentBuilder builder, AnyValue value) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,23 @@
package org.elasticsearch.xpack.oteldata.otlp;

import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.logs.v1.SeverityNumber;

import com.google.protobuf.InvalidProtocolBufferException;

import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;

public class OTLPLogsTransportActionTests extends AbstractOTLPTransportActionTests {
Expand Down Expand Up @@ -63,4 +69,57 @@ protected String parseErrorMessage(byte[] responseBytes) throws InvalidProtocolB
protected String dataStreamType() {
return "logs";
}

public void testPrepareBulkRequestUsesDocumentIdAttribute() throws Exception {
IndexRequest indexRequest = prepareIndexRequestWithAttributes(
List.of(OtlpUtils.keyValue(DocumentMetadata.DOCUMENT_ID_ATTRIBUTE, "log-doc-id"))
);

assertThat(indexRequest.id(), equalTo("log-doc-id"));
@SuppressWarnings("unchecked")
Map<String, Object> attributes = (Map<String, Object>) indexRequest.sourceAsMap().get("attributes");
assertThat(attributes.get(DocumentMetadata.DOCUMENT_ID_ATTRIBUTE), nullValue());
}

public void testPrepareBulkRequestLeavesDocumentIdUnsetWhenAttributeEmpty() throws Exception {
IndexRequest indexRequest = prepareIndexRequestWithAttributes(
List.of(OtlpUtils.keyValue(DocumentMetadata.DOCUMENT_ID_ATTRIBUTE, ""))
);

assertThat(indexRequest.id(), nullValue());
}

public void testPrepareBulkRequestUsesIngestPipelineAttribute() throws Exception {
IndexRequest indexRequest = prepareIndexRequestWithAttributes(
List.of(OtlpUtils.keyValue(DocumentMetadata.INGEST_PIPELINE_ATTRIBUTE, "logs-pipeline"))
);

assertThat(indexRequest.getPipeline(), equalTo("logs-pipeline"));
@SuppressWarnings("unchecked")
Map<String, Object> attributes = (Map<String, Object>) indexRequest.sourceAsMap().get("attributes");
assertThat(attributes.get(DocumentMetadata.INGEST_PIPELINE_ATTRIBUTE), nullValue());
}

public void testPrepareBulkRequestLeavesPipelineUnsetWhenAttributeEmpty() throws Exception {
IndexRequest indexRequest = prepareIndexRequestWithAttributes(
List.of(OtlpUtils.keyValue(DocumentMetadata.INGEST_PIPELINE_ATTRIBUTE, ""))
);

assertThat(indexRequest.getPipeline(), nullValue());
}

private IndexRequest prepareIndexRequestWithAttributes(List<KeyValue> attributes) throws Exception {
BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
createAction().prepareBulkRequest(
new OTLPActionRequest(
new BytesArray(
OtlpLogUtils.createLogsRequest(
List.of(OtlpLogUtils.createLogRecord("Hello world", SeverityNumber.SEVERITY_NUMBER_INFO, "INFO", attributes))
).toByteArray()
)
),
bulkRequestBuilder
);
return (IndexRequest) bulkRequestBuilder.request().requests().get(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.oteldata.otlp.DocumentMetadata;
import org.elasticsearch.xpack.oteldata.otlp.datapoint.TargetIndex;
import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor;

Expand Down Expand Up @@ -354,13 +355,15 @@ public void testControlAttributesAreFiltered() throws IOException {
.addAttributes(keyValue("keep", "value"))
.addAttributes(keyValue("data_stream.type", "logs"))
.addAttributes(keyValue("elasticsearch.document_id", "doc-id"))
.addAttributes(keyValue(DocumentMetadata.INGEST_PIPELINE_ATTRIBUTE, "logs-pipeline"))
.build();

ObjectPath doc = buildDocument(resource, scope, logRecord);

assertThat(doc.evaluate("attributes.keep"), equalTo("value"));
assertThat(doc.evaluate("attributes.data_stream\\.type"), nullValue());
assertThat(doc.evaluate("attributes.elasticsearch\\.document_id"), nullValue());
assertThat(doc.evaluate("attributes.elasticsearch\\.ingest_pipeline"), nullValue());
assertThat(doc.evaluate("scope.attributes.scope\\.keep"), equalTo("value"));
assertThat(doc.evaluate("resource.attributes.service\\.name"), equalTo("test-service"));
assertThat(doc.evaluate("resource.attributes.elastic\\.mapping\\.mode"), nullValue());
Expand All @@ -374,6 +377,7 @@ public void testFilteredControlAttributesCreateEmptyAttributesObject() throws IO
.setBody(AnyValue.newBuilder().setStringValue("msg").build())
.addAttributes(keyValue("data_stream.type", "logs"))
.addAttributes(keyValue("elasticsearch.document_id", "doc-id"))
.addAttributes(keyValue(DocumentMetadata.INGEST_PIPELINE_ATTRIBUTE, "logs-pipeline"))
.build();

ObjectPath doc = buildDocument(resource, scope, logRecord);
Expand Down
Loading