diff --git a/x-pack/plugin/otel-data/src/javaRestTest/java/org/elasticsearch/xpack/oteldata/otlp/AbstractOTLPIndexingRestIT.java b/x-pack/plugin/otel-data/src/javaRestTest/java/org/elasticsearch/xpack/oteldata/otlp/AbstractOTLPIndexingRestIT.java index c034ad2c3f6cf..67421b9ab9ce5 100644 --- a/x-pack/plugin/otel-data/src/javaRestTest/java/org/elasticsearch/xpack/oteldata/otlp/AbstractOTLPIndexingRestIT.java +++ b/x-pack/plugin/otel-data/src/javaRestTest/java/org/elasticsearch/xpack/oteldata/otlp/AbstractOTLPIndexingRestIT.java @@ -95,6 +95,17 @@ public void testOversizedRequestReturns413() throws Exception { } protected static String createApiKey(String indexPattern) throws IOException { + return createApiKey(new String[] { indexPattern }); + } + + protected static String createApiKey(String... indexPatterns) throws IOException { + StringBuilder indexPatternsJson = new StringBuilder(); + for (int i = 0; i < indexPatterns.length; i++) { + if (i > 0) { + indexPatternsJson.append(", "); + } + indexPatternsJson.append('"').append(indexPatterns[i]).append('"'); + } Request createApiKeyRequest = new Request("POST", "/_security/api_key"); createApiKeyRequest.setJsonEntity(""" { @@ -103,14 +114,14 @@ protected static String createApiKey(String indexPattern) throws IOException { "writer": { "index": [ { - "names": ["$INDEX_PATTERN"], + "names": [$INDEX_PATTERNS], "privileges": ["create_doc", "auto_configure"] } ] } } } - """.replace("$INDEX_PATTERN", indexPattern)); + """.replace("$INDEX_PATTERNS", indexPatternsJson.toString())); ObjectPath createApiKeyResponse = ObjectPath.createFromResponse(client().performRequest(createApiKeyRequest)); return createApiKeyResponse.evaluate("encoded"); } diff --git a/x-pack/plugin/otel-data/src/javaRestTest/java/org/elasticsearch/xpack/oteldata/otlp/OTLPTracesIndexingRestIT.java b/x-pack/plugin/otel-data/src/javaRestTest/java/org/elasticsearch/xpack/oteldata/otlp/OTLPTracesIndexingRestIT.java index cf78ae10c388a..014743cb8a544 100644 --- a/x-pack/plugin/otel-data/src/javaRestTest/java/org/elasticsearch/xpack/oteldata/otlp/OTLPTracesIndexingRestIT.java +++ b/x-pack/plugin/otel-data/src/javaRestTest/java/org/elasticsearch/xpack/oteldata/otlp/OTLPTracesIndexingRestIT.java @@ -7,18 +7,24 @@ package org.elasticsearch.xpack.oteldata.otlp; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter; import io.opentelemetry.sdk.trace.SdkTracerProvider; import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; +import org.elasticsearch.test.rest.ObjectPath; import org.junit.After; import org.junit.Before; import java.io.IOException; +import static io.opentelemetry.api.common.AttributeKey.stringKey; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.isA; public class OTLPTracesIndexingRestIT extends AbstractOTLPIndexingRestIT { @@ -36,7 +42,7 @@ public void setUp() throws Exception { super.setUp(); OtlpHttpSpanExporter exporter = OtlpHttpSpanExporter.builder() .setEndpoint(getClusterHosts().getFirst().toURI() + otlpEndpointPath()) - .addHeader("Authorization", "ApiKey " + createApiKey("traces-*")) + .addHeader("Authorization", "ApiKey " + createApiKey("traces-*", "logs-*")) .build(); tracerProvider = SdkTracerProvider.builder() .setResource(TEST_RESOURCE) @@ -54,10 +60,101 @@ public void tearDown() throws Exception { } } + public void testBatchSpanIndexing() throws Exception { + int numSpans = 25; + for (int i = 0; i < numSpans; i++) { + tracer.spanBuilder("span-" + i).startSpan().end(); + } + + indexTraces(); + + ObjectPath search = search("traces-generic.otel-default"); + assertThat(search.evaluate("hits.total.value"), equalTo(numSpans)); + } + + public void testSpanWithAttributes() throws Exception { + var span = tracer.spanBuilder("GET /orders").setSpanKind(SpanKind.SERVER).startSpan(); + String traceId = span.getSpanContext().getTraceId(); + String spanId = span.getSpanContext().getSpanId(); + span.setAttribute("http.method", "GET"); + span.setAttribute("http.status_code", 404L); + span.setStatus(StatusCode.ERROR, "not found"); + span.end(); + + indexTraces(); + + ObjectPath search = search("traces-generic.otel-default"); + assertThat(search.evaluate("hits.total.value"), equalTo(1)); + var source = new ObjectPath(search.evaluate("hits.hits.0._source")); + assertThat(source.evaluate("name"), equalTo("GET /orders")); + assertThat(source.evaluate("kind"), equalTo("Server")); + assertThat(source.evaluate("trace_id"), equalTo(traceId)); + assertThat(source.evaluate("span_id"), equalTo(spanId)); + assertThat(source.evaluate("duration"), isA(Number.class)); + assertThat(source.evaluate("status.code"), equalTo("Error")); + assertThat(source.evaluate("status.message"), equalTo("not found")); + assertThat(source.evaluate("attributes.http\\.method"), equalTo("GET")); + assertThat(source.evaluate("attributes.http\\.status_code"), equalTo(404)); + assertThat(source.evaluate("resource.attributes.service\\.name"), equalTo("elasticsearch")); + assertThat(source.evaluate("scope.name"), equalTo(getClass().getSimpleName())); + } + + public void testSpanEventsAreIndexedAsLogs() throws Exception { + var span = tracer.spanBuilder("span-with-event").startSpan(); + String traceId = span.getSpanContext().getTraceId(); + String spanId = span.getSpanContext().getSpanId(); + span.addEvent("exception", Attributes.of(stringKey("event.attr.foo"), "event.attr.bar")); + span.end(); + + indexTraces(); + + ObjectPath tracesSearch = search("traces-generic.otel-default"); + assertThat(tracesSearch.evaluate("hits.total.value"), equalTo(1)); + + ObjectPath logsSearch = search("logs-generic.otel-default"); + assertThat(logsSearch.evaluate("hits.total.value"), equalTo(1)); + var source = new ObjectPath(logsSearch.evaluate("hits.hits.0._source")); + assertThat(source.evaluate("event_name"), equalTo("exception")); + assertThat(source.evaluate("trace_id"), equalTo(traceId)); + assertThat(source.evaluate("span_id"), equalTo(spanId)); + assertThat(source.evaluate("attributes.event\\.attr\\.foo"), equalTo("event.attr.bar")); + assertThat(source.evaluate("attributes.event\\.name"), equalTo("exception")); + assertThat(source.evaluate("data_stream.type"), equalTo("logs")); + } + + public void testSpanEventsUseDocumentIdAttribute() throws Exception { + var span = tracer.spanBuilder("span-with-event-id").startSpan(); + span.addEvent("exception", Attributes.of(stringKey("elasticsearch.document_id"), "span-event-doc-id")); + span.end(); + + indexTraces(); + + ObjectPath logsSearch = search("logs-generic.otel-default"); + assertThat(logsSearch.evaluate("hits.total.value"), equalTo(1)); + assertThat(logsSearch.evaluate("hits.hits.0._id"), equalTo("span-event-doc-id")); + } + + public void testDataStreamRouting() throws Exception { + var span = tracer.spanBuilder("routed span").startSpan(); + span.setAttribute("data_stream.dataset", "checkout"); + span.setAttribute("data_stream.namespace", "production"); + span.end(); + + indexTraces(); + + ObjectPath search = search("traces-checkout.otel-production"); + assertThat(search.evaluate("hits.total.value"), equalTo(1)); + var source = new ObjectPath(search.evaluate("hits.hits.0._source")); + assertThat(source.evaluate("data_stream.type"), equalTo("traces")); + assertThat(source.evaluate("data_stream.dataset"), equalTo("checkout.otel")); + assertThat(source.evaluate("data_stream.namespace"), equalTo("production")); + } + private void indexTraces() throws IOException { var result = tracerProvider.forceFlush().join(TEST_REQUEST_TIMEOUT.millis(), MILLISECONDS); assertThat(result.isSuccess(), equalTo(true)); refresh("traces-*.otel-*"); + refresh("logs-*.otel-*"); } } diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPTracesTransportAction.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPTracesTransportAction.java index 1bef09ebfa0c0..f554e2342aeba 100644 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPTracesTransportAction.java +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPTracesTransportAction.java @@ -11,6 +11,7 @@ import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse; import io.opentelemetry.proto.common.v1.InstrumentationScope; +import io.opentelemetry.proto.common.v1.KeyValue; import io.opentelemetry.proto.resource.v1.Resource; import io.opentelemetry.proto.trace.v1.ResourceSpans; import io.opentelemetry.proto.trace.v1.ScopeSpans; @@ -32,6 +33,7 @@ import org.elasticsearch.xcontent.XContentFactory; import org.elasticsearch.xpack.oteldata.otlp.datapoint.TargetIndex; import org.elasticsearch.xpack.oteldata.otlp.docbuilder.SpanDocumentBuilder; +import org.elasticsearch.xpack.oteldata.otlp.docbuilder.SpanEventDocumentBuilder; import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; import java.io.IOException; @@ -52,6 +54,7 @@ public class OTLPTracesTransportAction extends AbstractOTLPTransportAction { public static final ActionType TYPE = new ActionType<>(NAME); public static final String TYPE_TRACES = "traces"; + private static final String DOCUMENT_ID_ATTRIBUTE = "elasticsearch.document_id"; @Inject public OTLPTracesTransportAction(TransportService transportService, ActionFilters actionFilters, ThreadPool threadPool, Client client) { @@ -63,6 +66,7 @@ protected ProcessingContext prepareBulkRequest(OTLPActionRequest request, BulkRe BufferedByteStringAccessor byteStringAccessor = new BufferedByteStringAccessor(); var tracesServiceRequest = ExportTraceServiceRequest.parseFrom(request.getRequest().streamInput()); SpanDocumentBuilder spanDocumentBuilder = new SpanDocumentBuilder(byteStringAccessor); + SpanEventDocumentBuilder spanEventDocumentBuilder = new SpanEventDocumentBuilder(byteStringAccessor); List resourceSpansList = tracesServiceRequest.getResourceSpansList(); for (int i = 0, resourceSpansListSize = resourceSpansList.size(); i < resourceSpansListSize; i++) { ResourceSpans resourceSpans = resourceSpansList.get(i); @@ -92,12 +96,46 @@ protected ProcessingContext prepareBulkRequest(OTLPActionRequest request, BulkRe index, span ); + IndexRequest indexRequest = new IndexRequest(index.index()); + String documentId = extractDocumentId(span.getAttributesList()); + if (documentId.isEmpty() == false) { + indexRequest.id(documentId); + } bulkRequestBuilder.add( - new IndexRequest(index.index()).opType(DocWriteRequest.OpType.CREATE) - .setRequireDataStream(true) - .source(xContentBuilder) + indexRequest.opType(DocWriteRequest.OpType.CREATE).setRequireDataStream(true).source(xContentBuilder) ); } + List eventsList = span.getEventsList(); + for (int l = 0, eventsListSize = eventsList.size(); l < eventsListSize; l++) { + Span.Event event = eventsList.get(l); + TargetIndex eventIndex = TargetIndex.evaluate( + OTLPLogsTransportAction.TYPE_LOGS, + event.getAttributesList(), + receiverName, + scope.getAttributesList(), + resource.getAttributesList() + ); + try (XContentBuilder xContentBuilder = XContentFactory.cborBuilder(new BytesStreamOutput())) { + spanEventDocumentBuilder.buildSpanEventDocument( + xContentBuilder, + resource, + resourceSpans.getSchemaUrlBytes(), + scope, + scopeSpans.getSchemaUrlBytes(), + eventIndex, + span, + event + ); + IndexRequest eventRequest = new IndexRequest(eventIndex.index()); + String eventDocumentId = extractDocumentId(event.getAttributesList()); + if (eventDocumentId.isEmpty() == false) { + eventRequest.id(eventDocumentId); + } + bulkRequestBuilder.add( + eventRequest.opType(DocWriteRequest.OpType.CREATE).setRequireDataStream(true).source(xContentBuilder) + ); + } + } } } } @@ -112,4 +150,14 @@ MessageLite responseWithRejectedDataPoints(int rejectedDataPoints, String messag .build(); return ExportTraceServiceResponse.newBuilder().setPartialSuccess(partialSuccess).build(); } + + private static String extractDocumentId(List attributes) { + for (int i = 0, size = attributes.size(); i < size; i++) { + KeyValue attribute = attributes.get(i); + if (DOCUMENT_ID_ATTRIBUTE.equals(attribute.getKey())) { + return attribute.getValue().getStringValue(); + } + } + return ""; + } } diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/OTelDocumentBuilder.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/OTelDocumentBuilder.java index 46e17b9e577f7..a1d1748fe70de 100644 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/OTelDocumentBuilder.java +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/OTelDocumentBuilder.java @@ -14,6 +14,7 @@ import com.google.protobuf.ByteString; +import org.elasticsearch.common.hash.MessageDigests; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xpack.oteldata.otlp.datapoint.TargetIndex; import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; @@ -56,6 +57,12 @@ protected void addFieldIfNotEmpty(XContentBuilder builder, String name, ByteStri } } + protected void addHexFieldIfNotEmpty(XContentBuilder builder, String name, ByteString value) throws IOException { + if (value != null && value.isEmpty() == false) { + builder.field(name, MessageDigests.toHexString(value.toByteArray())); + } + } + protected void buildDataStream(XContentBuilder builder, TargetIndex targetIndex) throws IOException { if (targetIndex.isDataStream() == false) { return; @@ -110,7 +117,18 @@ protected void buildAnyValue(XContentBuilder builder, AnyValue value) throws IOE } builder.endArray(); } - default -> throw new IllegalArgumentException("Unsupported attribute value type: " + value.getValueCase()); + case KVLIST_VALUE -> { + builder.startObject(); + List kvList = value.getKvlistValue().getValuesList(); + for (int i = 0, kvListSize = kvList.size(); i < kvListSize; i++) { + KeyValue kv = kvList.get(i); + builder.field(kv.getKey()); + buildAnyValue(builder, kv.getValue()); + } + builder.endObject(); + } + case BYTES_VALUE -> builder.value(value.getBytesValue().toByteArray()); + case VALUE_NOT_SET -> builder.nullValue(); } } } diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/SpanDocumentBuilder.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/SpanDocumentBuilder.java index 9bcda18d9535d..a82b73fc08b79 100644 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/SpanDocumentBuilder.java +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/SpanDocumentBuilder.java @@ -10,6 +10,8 @@ import io.opentelemetry.proto.common.v1.InstrumentationScope; import io.opentelemetry.proto.resource.v1.Resource; import io.opentelemetry.proto.trace.v1.Span; +import io.opentelemetry.proto.trace.v1.Status; +import io.opentelemetry.proto.trace.v1.Status.StatusCode; import com.google.protobuf.ByteString; @@ -18,6 +20,8 @@ import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; /** * This class constructs an Elasticsearch document representation of an OTel span. @@ -38,6 +42,87 @@ public void buildSpanDocument( Span span ) throws IOException { builder.startObject(); + long timestamp = span.getStartTimeUnixNano(); + if (timestamp == 0) { + timestamp = span.getEndTimeUnixNano(); + } + // OTLP semantically requires span timestamps, but proto3 can still carry zeroes. + // If both are zero, keep the malformed span indexable and let @timestamp remain epoch. + builder.field("@timestamp", TimeUnit.NANOSECONDS.toMillis(timestamp)); + addHexFieldIfNotEmpty(builder, "trace_id", span.getTraceId()); + addHexFieldIfNotEmpty(builder, "span_id", span.getSpanId()); + addHexFieldIfNotEmpty(builder, "parent_span_id", span.getParentSpanId()); + addFieldIfNotEmpty(builder, "trace_state", span.getTraceStateBytes()); + addFieldIfNotEmpty(builder, "name", span.getNameBytes()); + if (span.getKind() != Span.SpanKind.SPAN_KIND_UNSPECIFIED) { + builder.field("kind", normalizeSpanKind(span.getKind())); + } + if (span.getStartTimeUnixNano() != 0 && span.getEndTimeUnixNano() != 0) { + builder.field("duration", span.getEndTimeUnixNano() - span.getStartTimeUnixNano()); + } + if (span.getDroppedEventsCount() > 0) { + builder.field("dropped_events_count", span.getDroppedEventsCount()); + } + if (span.getDroppedLinksCount() > 0) { + builder.field("dropped_links_count", span.getDroppedLinksCount()); + } + buildResource(resource, resourceSchemaUrl, builder); + buildDataStream(builder, targetIndex); + buildScope(builder, scope, scopeSchemaUrl); + buildAttributes(builder, span.getAttributesList(), span.getDroppedAttributesCount()); + buildLinks(builder, span.getLinksList()); + buildStatus(builder, span); builder.endObject(); } + + private void buildLinks(XContentBuilder builder, List links) throws IOException { + if (links.isEmpty()) { + return; + } + builder.startArray("links"); + for (int i = 0, size = links.size(); i < size; i++) { + Span.Link link = links.get(i); + builder.startObject(); + addHexFieldIfNotEmpty(builder, "trace_id", link.getTraceId()); + addHexFieldIfNotEmpty(builder, "span_id", link.getSpanId()); + addFieldIfNotEmpty(builder, "trace_state", link.getTraceStateBytes()); + buildAttributes(builder, link.getAttributesList(), link.getDroppedAttributesCount()); + builder.endObject(); + } + builder.endArray(); + } + + private void buildStatus(XContentBuilder builder, Span span) throws IOException { + Status status = span.getStatus(); + boolean hasCode = status.getCode() != StatusCode.STATUS_CODE_UNSET; + if (hasCode == false) { + // OTel status messages are only meaningful with an error code, so skip message-only unset statuses. + return; + } + builder.startObject("status"); + builder.field("code", normalizeStatusCode(status.getCode())); + addFieldIfNotEmpty(builder, "message", status.getMessageBytes()); + builder.endObject(); + } + + private static String normalizeSpanKind(Span.SpanKind kind) { + return switch (kind) { + case SPAN_KIND_UNSPECIFIED -> "Unspecified"; + case SPAN_KIND_INTERNAL -> "Internal"; + case SPAN_KIND_SERVER -> "Server"; + case SPAN_KIND_CLIENT -> "Client"; + case SPAN_KIND_PRODUCER -> "Producer"; + case SPAN_KIND_CONSUMER -> "Consumer"; + case UNRECOGNIZED -> kind.name(); + }; + } + + private static String normalizeStatusCode(StatusCode statusCode) { + return switch (statusCode) { + case STATUS_CODE_UNSET -> "Unset"; + case STATUS_CODE_OK -> "Ok"; + case STATUS_CODE_ERROR -> "Error"; + case UNRECOGNIZED -> statusCode.name(); + }; + } } diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/SpanEventDocumentBuilder.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/SpanEventDocumentBuilder.java new file mode 100644 index 0000000000000..f64c641221fa1 --- /dev/null +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/SpanEventDocumentBuilder.java @@ -0,0 +1,80 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp.docbuilder; + +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.InstrumentationScope; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.resource.v1.Resource; +import io.opentelemetry.proto.trace.v1.Span; + +import com.google.protobuf.ByteString; + +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xpack.oteldata.otlp.datapoint.TargetIndex; +import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * This class constructs an Elasticsearch document representation of an OTel span event. + */ +public class SpanEventDocumentBuilder extends OTelDocumentBuilder { + + private static final String EVENT_NAME_ATTRIBUTE = "event.name"; + + public SpanEventDocumentBuilder(BufferedByteStringAccessor byteStringAccessor) { + super(byteStringAccessor); + } + + public void buildSpanEventDocument( + XContentBuilder builder, + Resource resource, + ByteString resourceSchemaUrl, + InstrumentationScope scope, + ByteString scopeSchemaUrl, + TargetIndex targetIndex, + Span span, + Span.Event event + ) throws IOException { + builder.startObject(); + builder.field("@timestamp", TimeUnit.NANOSECONDS.toMillis(event.getTimeUnixNano())); + buildDataStream(builder, targetIndex); + addHexFieldIfNotEmpty(builder, "trace_id", span.getTraceId()); + addHexFieldIfNotEmpty(builder, "span_id", span.getSpanId()); + addFieldIfNotEmpty(builder, "event_name", event.getNameBytes()); + buildAttributes(builder, attributesWithEventName(event), event.getDroppedAttributesCount()); + buildResource(resource, resourceSchemaUrl, builder); + buildScope(builder, scope, scopeSchemaUrl); + builder.endObject(); + } + + private static List attributesWithEventName(Span.Event event) { + if (event.getNameBytes().isEmpty()) { + return event.getAttributesList(); + } + List attributes = new ArrayList<>(event.getAttributesCount() + 1); + List existingAttributes = event.getAttributesList(); + for (int i = 0, size = existingAttributes.size(); i < size; i++) { + KeyValue attribute = existingAttributes.get(i); + if (EVENT_NAME_ATTRIBUTE.equals(attribute.getKey()) == false) { + attributes.add(attribute); + } + } + attributes.add( + KeyValue.newBuilder() + .setKey(EVENT_NAME_ATTRIBUTE) + .setValue(AnyValue.newBuilder().setStringValue(event.getName()).build()) + .build() + ); + return attributes; + } +} diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OTLPTracesTransportActionTests.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OTLPTracesTransportActionTests.java index fdd5fe94f8248..27ba8a91332f5 100644 --- a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OTLPTracesTransportActionTests.java +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OTLPTracesTransportActionTests.java @@ -11,13 +11,19 @@ import com.google.protobuf.InvalidProtocolBufferException; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.util.List; +import java.util.Map; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.nullValue; import static org.mockito.Mockito.mock; public class OTLPTracesTransportActionTests extends AbstractOTLPTransportActionTests { @@ -58,4 +64,101 @@ protected String parseErrorMessage(byte[] responseBytes) throws InvalidProtocolB protected String dataStreamType() { return "traces"; } + + public void testPrepareBulkRequestUsesDocumentIdAttribute() throws Exception { + BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); + createAction().prepareBulkRequest( + new OTLPActionRequest( + new BytesArray( + OtlpTraceUtils.createTracesRequest( + List.of( + OtlpTraceUtils.createSpan("test-span", List.of(OtlpUtils.keyValue("elasticsearch.document_id", "trace-doc-id"))) + ) + ).toByteArray() + ) + ), + bulkRequestBuilder + ); + + IndexRequest indexRequest = (IndexRequest) bulkRequestBuilder.request().requests().get(0); + assertThat(indexRequest.id(), equalTo("trace-doc-id")); + } + + public void testPrepareBulkRequestLeavesDocumentIdUnsetWhenAttributeMissing() throws Exception { + BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); + createAction().prepareBulkRequest(createRequestWithData(), bulkRequestBuilder); + + IndexRequest indexRequest = (IndexRequest) bulkRequestBuilder.request().requests().get(0); + assertThat(indexRequest.id(), nullValue()); + } + + public void testPrepareBulkRequestAddsSpanEventDocuments() throws Exception { + BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); + createAction().prepareBulkRequest( + new OTLPActionRequest( + new BytesArray( + OtlpTraceUtils.createTracesRequest( + List.of( + OtlpTraceUtils.createSpan( + "test-span", + List.of(), + List.of( + OtlpTraceUtils.createEvent( + "exception", + 2_000_000_000L, + List.of(OtlpUtils.keyValue("event.attr.foo", "event.attr.bar")) + ) + ) + ) + ) + ).toByteArray() + ) + ), + bulkRequestBuilder + ); + + var bulkRequest = bulkRequestBuilder.request(); + assertThat(bulkRequest.requests(), hasSize(2)); + + IndexRequest spanRequest = (IndexRequest) bulkRequest.requests().get(0); + IndexRequest eventRequest = (IndexRequest) bulkRequest.requests().get(1); + + assertThat(spanRequest.index(), equalTo("traces-generic.otel-default")); + assertThat(eventRequest.index(), equalTo("logs-generic.otel-default")); + assertThat(eventRequest.sourceAsMap().get("event_name"), equalTo("exception")); + @SuppressWarnings("unchecked") + Map attributes = (Map) eventRequest.sourceAsMap().get("attributes"); + assertThat(attributes.get("event.attr.foo"), equalTo("event.attr.bar")); + assertThat(attributes.get("event.name"), equalTo("exception")); + } + + public void testPrepareBulkRequestUsesSpanEventDocumentIdAttribute() throws Exception { + BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); + createAction().prepareBulkRequest( + new OTLPActionRequest( + new BytesArray( + OtlpTraceUtils.createTracesRequest( + List.of( + OtlpTraceUtils.createSpan( + "test-span", + List.of(), + List.of( + OtlpTraceUtils.createEvent( + "exception", + 2_000_000_000L, + List.of(OtlpUtils.keyValue("elasticsearch.document_id", "span-event-doc-id")) + ) + ) + ) + ) + ).toByteArray() + ) + ), + bulkRequestBuilder + ); + + var bulkRequest = bulkRequestBuilder.request(); + IndexRequest eventRequest = (IndexRequest) bulkRequest.requests().get(1); + assertThat(eventRequest.id(), equalTo("span-event-doc-id")); + } } diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OtlpTraceUtils.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OtlpTraceUtils.java index e92b9fed44d66..0ff3258e42161 100644 --- a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OtlpTraceUtils.java +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OtlpTraceUtils.java @@ -40,6 +40,10 @@ public static Span createSpan(String name) { } public static Span createSpan(String name, List attributes) { + return createSpan(name, attributes, List.of()); + } + + public static Span createSpan(String name, List attributes, List events) { return Span.newBuilder() .setName(name) .setTraceId(EMPTY_TRACE_ID) @@ -49,9 +53,14 @@ public static Span createSpan(String name, List attributes) { .setKind(Span.SpanKind.SPAN_KIND_INTERNAL) .setStatus(Status.newBuilder().setCode(Status.StatusCode.STATUS_CODE_OK).build()) .addAllAttributes(attributes) + .addAllEvents(events) .build(); } + public static Span.Event createEvent(String name, long timestamp, List attributes) { + return Span.Event.newBuilder().setName(name).setTimeUnixNano(timestamp).addAllAttributes(attributes).build(); + } + public static ResourceSpans createResourceSpans(List resourceAttributes, List scopeSpans) { return ResourceSpans.newBuilder().setResource(createResource(resourceAttributes)).addAllScopeSpans(scopeSpans).build(); } diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/SpanDocumentBuilderTests.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/SpanDocumentBuilderTests.java new file mode 100644 index 0000000000000..b2e730a18359e --- /dev/null +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/SpanDocumentBuilderTests.java @@ -0,0 +1,182 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp.docbuilder; + +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.InstrumentationScope; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.common.v1.KeyValueList; +import io.opentelemetry.proto.resource.v1.Resource; +import io.opentelemetry.proto.trace.v1.Span; +import io.opentelemetry.proto.trace.v1.Status; + +import com.google.protobuf.ByteString; + +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.hash.MessageDigests; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.rest.ObjectPath; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.oteldata.otlp.datapoint.TargetIndex; +import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; + +import java.io.IOException; +import java.util.Base64; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.keyValue; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; + +public class SpanDocumentBuilderTests extends ESTestCase { + + private static final ByteString RESOURCE_SCHEMA_URL = ByteString.copyFromUtf8("https://opentelemetry.io/schemas/1.0.0"); + private static final ByteString SCOPE_SCHEMA_URL = ByteString.copyFromUtf8("https://opentelemetry.io/schemas/1.0.0"); + + private final SpanDocumentBuilder documentBuilder = new SpanDocumentBuilder(new BufferedByteStringAccessor()); + + public void testBuildSpanDocument() throws IOException { + ByteString traceId = randomHexByteString(16); + ByteString spanId = randomHexByteString(8); + ByteString parentSpanId = randomHexByteString(8); + byte[] linkPayload = randomByteArrayOfLength(8); + + Resource resource = Resource.newBuilder() + .setDroppedAttributesCount(1) + .addAttributes(keyValue("service.name", "checkout-service")) + .addAttributes(keyValue("host.name", "test-host")) + .build(); + InstrumentationScope scope = InstrumentationScope.newBuilder() + .setName("checkout-tracer") + .setVersion("1.0.0") + .setDroppedAttributesCount(2) + .addAttributes(keyValue("scope_attr", "value")) + .build(); + Span span = Span.newBuilder() + .setTraceId(traceId) + .setSpanId(spanId) + .setParentSpanId(parentSpanId) + .setTraceState("vendor=value") + .setName("GET /orders") + .setKind(Span.SpanKind.SPAN_KIND_SERVER) + .setStartTimeUnixNano(2_000_000_000L) + .setEndTimeUnixNano(2_500_000_000L) + .setDroppedAttributesCount(3) + .setDroppedEventsCount(4) + .setDroppedLinksCount(5) + .addAttributes(keyValue("http.method", "GET")) + .addAttributes(keyValue("http.status_code", 404L)) + .addLinks( + Span.Link.newBuilder() + .setTraceId(randomHexByteString(16)) + .setSpanId(randomHexByteString(8)) + .setTraceState("linked=true") + .setDroppedAttributesCount(1) + .addAttributes( + keyValue( + "labels", + KeyValueList.newBuilder().addValues(keyValue("environment", "prod")).addValues(keyValue("zone", "a")).build() + ) + ) + .addAttributes( + KeyValue.newBuilder() + .setKey("payload") + .setValue(AnyValue.newBuilder().setBytesValue(ByteString.copyFrom(linkPayload)).build()) + .build() + ) + .build() + ) + .setStatus(Status.newBuilder().setCode(Status.StatusCode.STATUS_CODE_ERROR).setMessage("not found").build()) + .build(); + + ObjectPath doc = buildDocument(resource, scope, span); + + assertThat(doc.evaluate("@timestamp").longValue(), equalTo(TimeUnit.NANOSECONDS.toMillis(2_000_000_000L))); + assertThat(doc.evaluate("trace_id"), equalTo(MessageDigests.toHexString(traceId.toByteArray()))); + assertThat(doc.evaluate("span_id"), equalTo(MessageDigests.toHexString(spanId.toByteArray()))); + assertThat(doc.evaluate("parent_span_id"), equalTo(MessageDigests.toHexString(parentSpanId.toByteArray()))); + assertThat(doc.evaluate("trace_state"), equalTo("vendor=value")); + assertThat(doc.evaluate("name"), equalTo("GET /orders")); + assertThat(doc.evaluate("kind"), equalTo("Server")); + assertThat(doc.evaluate("duration").longValue(), equalTo(500_000_000L)); + assertThat(doc.evaluate("dropped_events_count"), equalTo(4)); + assertThat(doc.evaluate("dropped_links_count"), equalTo(5)); + assertThat(doc.evaluate("resource.schema_url"), equalTo("https://opentelemetry.io/schemas/1.0.0")); + assertThat(doc.evaluate("resource.dropped_attributes_count"), equalTo(1)); + assertThat(doc.evaluate("resource.attributes.service\\.name"), equalTo("checkout-service")); + assertThat(doc.evaluate("scope.name"), equalTo("checkout-tracer")); + assertThat(doc.evaluate("scope.version"), equalTo("1.0.0")); + assertThat(doc.evaluate("scope.schema_url"), equalTo("https://opentelemetry.io/schemas/1.0.0")); + assertThat(doc.evaluate("scope.dropped_attributes_count"), equalTo(2)); + assertThat(doc.evaluate("scope.attributes.scope_attr"), equalTo("value")); + assertThat(doc.evaluate("data_stream.type"), equalTo("traces")); + assertThat(doc.evaluate("data_stream.dataset"), equalTo("generic.otel")); + assertThat(doc.evaluate("data_stream.namespace"), equalTo("default")); + assertThat(doc.evaluate("dropped_attributes_count"), equalTo(3)); + assertThat(doc.evaluate("attributes.http\\.method"), equalTo("GET")); + assertThat(doc.evaluate("attributes.http\\.status_code"), equalTo(404)); + assertThat(doc.evaluate("links.0.trace_state"), equalTo("linked=true")); + assertThat(doc.evaluate("links.0.dropped_attributes_count"), equalTo(1)); + assertThat(doc.evaluate("links.0.attributes.labels"), equalTo(Map.of("environment", "prod", "zone", "a"))); + assertThat(doc.evaluate("links.0.attributes.payload"), equalTo(Base64.getEncoder().encodeToString(linkPayload))); + assertThat(doc.evaluate("status.code"), equalTo("Error")); + assertThat(doc.evaluate("status.message"), equalTo("not found")); + } + + public void testTimestampFallsBackToEndTime() throws IOException { + Span span = Span.newBuilder().setEndTimeUnixNano(5_000_000_000L).build(); + + ObjectPath doc = buildDocument(Resource.getDefaultInstance(), InstrumentationScope.getDefaultInstance(), span); + + assertThat(doc.evaluate("@timestamp").longValue(), equalTo(TimeUnit.NANOSECONDS.toMillis(5_000_000_000L))); + assertThat(doc.evaluate("duration"), nullValue()); + assertThat(doc.evaluate("status"), nullValue()); + assertThat(doc.evaluate("links"), nullValue()); + } + + public void testStatusMessageWithoutCodeIsIgnored() throws IOException { + Span span = Span.newBuilder() + .setStartTimeUnixNano(2_000_000_000L) + .setStatus(Status.newBuilder().setMessage("status message without code").build()) + .build(); + + ObjectPath doc = buildDocument(Resource.getDefaultInstance(), InstrumentationScope.getDefaultInstance(), span); + + assertThat(doc.evaluate("status"), nullValue()); + } + + public void testSpanWithoutTimestampsUsesEpoch() throws IOException { + Span span = Span.newBuilder().setName("span-without-timestamp").build(); + + ObjectPath doc = buildDocument(Resource.getDefaultInstance(), InstrumentationScope.getDefaultInstance(), span); + + assertThat(doc.evaluate("@timestamp").longValue(), equalTo(0L)); + assertThat(doc.evaluate("duration"), nullValue()); + } + + private ObjectPath buildDocument(Resource resource, InstrumentationScope scope, Span span) throws IOException { + TargetIndex targetIndex = TargetIndex.evaluate( + "traces", + span.getAttributesList(), + null, + scope.getAttributesList(), + resource.getAttributesList() + ); + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); + documentBuilder.buildSpanDocument(builder, resource, RESOURCE_SCHEMA_URL, scope, SCOPE_SCHEMA_URL, targetIndex, span); + return ObjectPath.createFromXContent(JsonXContent.jsonXContent, BytesReference.bytes(builder)); + } + + private static ByteString randomHexByteString(int length) { + return ByteString.copyFrom(randomByteArrayOfLength(length)); + } +} diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/SpanEventDocumentBuilderTests.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/SpanEventDocumentBuilderTests.java new file mode 100644 index 0000000000000..3e4d27a2be2cd --- /dev/null +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/SpanEventDocumentBuilderTests.java @@ -0,0 +1,95 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp.docbuilder; + +import io.opentelemetry.proto.common.v1.InstrumentationScope; +import io.opentelemetry.proto.resource.v1.Resource; +import io.opentelemetry.proto.trace.v1.Span; + +import com.google.protobuf.ByteString; + +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.hash.MessageDigests; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.rest.ObjectPath; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.oteldata.otlp.OtlpUtils; +import org.elasticsearch.xpack.oteldata.otlp.datapoint.TargetIndex; +import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.equalTo; + +public class SpanEventDocumentBuilderTests extends ESTestCase { + + private static final ByteString RESOURCE_SCHEMA_URL = ByteString.copyFromUtf8("https://opentelemetry.io/schemas/1.0.0"); + private static final ByteString SCOPE_SCHEMA_URL = ByteString.copyFromUtf8("https://opentelemetry.io/schemas/1.0.0"); + + private final SpanEventDocumentBuilder documentBuilder = new SpanEventDocumentBuilder(new BufferedByteStringAccessor()); + + public void testBuildSpanEventDocument() throws IOException { + ByteString traceId = ByteString.copyFrom(new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 }); + ByteString spanId = ByteString.copyFrom(new byte[] { 16, 17, 18, 19, 20, 21, 22, 23 }); + Resource resource = Resource.newBuilder() + .setDroppedAttributesCount(1) + .addAttributes(OtlpUtils.keyValue("service.name", "checkout-service")) + .build(); + InstrumentationScope scope = InstrumentationScope.newBuilder() + .setName("checkout-tracer") + .setVersion("1.0.0") + .addAttributes(OtlpUtils.keyValue("scope_attr", "value")) + .build(); + Span span = Span.newBuilder().setTraceId(traceId).setSpanId(spanId).build(); + Span.Event event = Span.Event.newBuilder() + .setName("exception") + .setTimeUnixNano(2_000_000_000L) + .setDroppedAttributesCount(3) + .addAttributes(OtlpUtils.keyValue("event.attr.foo", "event.attr.bar")) + .addAttributes(OtlpUtils.keyValue("event.name", "ignored-original-name")) + .build(); + + ObjectPath doc = buildDocument(resource, scope, span, event); + + assertThat(doc.evaluate("@timestamp").longValue(), equalTo(TimeUnit.NANOSECONDS.toMillis(2_000_000_000L))); + assertThat(doc.evaluate("trace_id"), equalTo(MessageDigests.toHexString(traceId.toByteArray()))); + assertThat(doc.evaluate("span_id"), equalTo(MessageDigests.toHexString(spanId.toByteArray()))); + assertThat(doc.evaluate("event_name"), equalTo("exception")); + assertThat(doc.evaluate("dropped_attributes_count"), equalTo(3)); + assertThat(doc.evaluate("data_stream.type"), equalTo("logs")); + assertThat(doc.evaluate("data_stream.dataset"), equalTo("generic.otel")); + assertThat(doc.evaluate("data_stream.namespace"), equalTo("default")); + assertThat(doc.evaluate("attributes.event\\.attr\\.foo"), equalTo("event.attr.bar")); + assertThat(doc.evaluate("attributes.event\\.name"), equalTo("exception")); + assertThat(doc.evaluate("resource.schema_url"), equalTo("https://opentelemetry.io/schemas/1.0.0")); + assertThat(doc.evaluate("resource.attributes.service\\.name"), equalTo("checkout-service")); + assertThat(doc.evaluate("resource.dropped_attributes_count"), equalTo(1)); + assertThat(doc.evaluate("scope.schema_url"), equalTo("https://opentelemetry.io/schemas/1.0.0")); + assertThat(doc.evaluate("scope.name"), equalTo("checkout-tracer")); + assertThat(doc.evaluate("scope.version"), equalTo("1.0.0")); + assertThat(doc.evaluate("scope.attributes"), equalTo(Map.of("scope_attr", "value"))); + } + + private ObjectPath buildDocument(Resource resource, InstrumentationScope scope, Span span, Span.Event event) throws IOException { + TargetIndex targetIndex = TargetIndex.evaluate( + "logs", + event.getAttributesList(), + null, + scope.getAttributesList(), + resource.getAttributesList() + ); + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); + documentBuilder.buildSpanEventDocument(builder, resource, RESOURCE_SCHEMA_URL, scope, SCOPE_SCHEMA_URL, targetIndex, span, event); + return ObjectPath.createFromXContent(JsonXContent.jsonXContent, BytesReference.bytes(builder)); + } +}