Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,17 @@ public void testOversizedRequestReturns413() throws Exception {
}

protected static String createApiKey(String indexPattern) throws IOException {
return createApiKey(new String[] { indexPattern });
}

protected static String createApiKey(String... indexPatterns) throws IOException {
StringBuilder indexPatternsJson = new StringBuilder();
for (int i = 0; i < indexPatterns.length; i++) {
if (i > 0) {
indexPatternsJson.append(", ");
}
indexPatternsJson.append('"').append(indexPatterns[i]).append('"');
}
Request createApiKeyRequest = new Request("POST", "/_security/api_key");
createApiKeyRequest.setJsonEntity("""
{
Expand All @@ -103,14 +114,14 @@ protected static String createApiKey(String indexPattern) throws IOException {
"writer": {
"index": [
{
"names": ["$INDEX_PATTERN"],
"names": [$INDEX_PATTERNS],
"privileges": ["create_doc", "auto_configure"]
}
]
}
}
}
""".replace("$INDEX_PATTERN", indexPattern));
""".replace("$INDEX_PATTERNS", indexPatternsJson.toString()));
ObjectPath createApiKeyResponse = ObjectPath.createFromResponse(client().performRequest(createApiKeyRequest));
return createApiKeyResponse.evaluate("encoded");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,24 @@

package org.elasticsearch.xpack.oteldata.otlp;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;

import org.elasticsearch.test.rest.ObjectPath;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;

import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.isA;

public class OTLPTracesIndexingRestIT extends AbstractOTLPIndexingRestIT {

Expand All @@ -36,7 +42,7 @@ public void setUp() throws Exception {
super.setUp();
OtlpHttpSpanExporter exporter = OtlpHttpSpanExporter.builder()
.setEndpoint(getClusterHosts().getFirst().toURI() + otlpEndpointPath())
.addHeader("Authorization", "ApiKey " + createApiKey("traces-*"))
.addHeader("Authorization", "ApiKey " + createApiKey("traces-*", "logs-*"))
.build();
tracerProvider = SdkTracerProvider.builder()
.setResource(TEST_RESOURCE)
Expand All @@ -54,10 +60,101 @@ public void tearDown() throws Exception {
}
}

public void testBatchSpanIndexing() throws Exception {
int numSpans = 25;
for (int i = 0; i < numSpans; i++) {
tracer.spanBuilder("span-" + i).startSpan().end();
}

indexTraces();

ObjectPath search = search("traces-generic.otel-default");
assertThat(search.evaluate("hits.total.value"), equalTo(numSpans));
}

public void testSpanWithAttributes() throws Exception {
var span = tracer.spanBuilder("GET /orders").setSpanKind(SpanKind.SERVER).startSpan();
String traceId = span.getSpanContext().getTraceId();
String spanId = span.getSpanContext().getSpanId();
span.setAttribute("http.method", "GET");
span.setAttribute("http.status_code", 404L);
span.setStatus(StatusCode.ERROR, "not found");
span.end();

indexTraces();

ObjectPath search = search("traces-generic.otel-default");
assertThat(search.evaluate("hits.total.value"), equalTo(1));
var source = new ObjectPath(search.evaluate("hits.hits.0._source"));
assertThat(source.evaluate("name"), equalTo("GET /orders"));
assertThat(source.evaluate("kind"), equalTo("Server"));
assertThat(source.evaluate("trace_id"), equalTo(traceId));
assertThat(source.evaluate("span_id"), equalTo(spanId));
assertThat(source.evaluate("duration"), isA(Number.class));
assertThat(source.evaluate("status.code"), equalTo("Error"));
assertThat(source.evaluate("status.message"), equalTo("not found"));
assertThat(source.evaluate("attributes.http\\.method"), equalTo("GET"));
assertThat(source.evaluate("attributes.http\\.status_code"), equalTo(404));
assertThat(source.evaluate("resource.attributes.service\\.name"), equalTo("elasticsearch"));
assertThat(source.evaluate("scope.name"), equalTo(getClass().getSimpleName()));
}

public void testSpanEventsAreIndexedAsLogs() throws Exception {
var span = tracer.spanBuilder("span-with-event").startSpan();
String traceId = span.getSpanContext().getTraceId();
String spanId = span.getSpanContext().getSpanId();
span.addEvent("exception", Attributes.of(stringKey("event.attr.foo"), "event.attr.bar"));
span.end();

indexTraces();

ObjectPath tracesSearch = search("traces-generic.otel-default");
assertThat(tracesSearch.evaluate("hits.total.value"), equalTo(1));

ObjectPath logsSearch = search("logs-generic.otel-default");
assertThat(logsSearch.evaluate("hits.total.value"), equalTo(1));
var source = new ObjectPath(logsSearch.evaluate("hits.hits.0._source"));
assertThat(source.evaluate("event_name"), equalTo("exception"));
assertThat(source.evaluate("trace_id"), equalTo(traceId));
assertThat(source.evaluate("span_id"), equalTo(spanId));
assertThat(source.evaluate("attributes.event\\.attr\\.foo"), equalTo("event.attr.bar"));
assertThat(source.evaluate("attributes.event\\.name"), equalTo("exception"));
assertThat(source.evaluate("data_stream.type"), equalTo("logs"));
}

public void testSpanEventsUseDocumentIdAttribute() throws Exception {
var span = tracer.spanBuilder("span-with-event-id").startSpan();
span.addEvent("exception", Attributes.of(stringKey("elasticsearch.document_id"), "span-event-doc-id"));
span.end();

indexTraces();

ObjectPath logsSearch = search("logs-generic.otel-default");
assertThat(logsSearch.evaluate("hits.total.value"), equalTo(1));
assertThat(logsSearch.evaluate("hits.hits.0._id"), equalTo("span-event-doc-id"));
}

public void testDataStreamRouting() throws Exception {
var span = tracer.spanBuilder("routed span").startSpan();
span.setAttribute("data_stream.dataset", "checkout");
span.setAttribute("data_stream.namespace", "production");
span.end();

indexTraces();

ObjectPath search = search("traces-checkout.otel-production");
assertThat(search.evaluate("hits.total.value"), equalTo(1));
var source = new ObjectPath(search.evaluate("hits.hits.0._source"));
assertThat(source.evaluate("data_stream.type"), equalTo("traces"));
assertThat(source.evaluate("data_stream.dataset"), equalTo("checkout.otel"));
assertThat(source.evaluate("data_stream.namespace"), equalTo("production"));
}

private void indexTraces() throws IOException {
var result = tracerProvider.forceFlush().join(TEST_REQUEST_TIMEOUT.millis(), MILLISECONDS);
assertThat(result.isSuccess(), equalTo(true));
refresh("traces-*.otel-*");
refresh("logs-*.otel-*");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse;
import io.opentelemetry.proto.common.v1.InstrumentationScope;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.resource.v1.Resource;
import io.opentelemetry.proto.trace.v1.ResourceSpans;
import io.opentelemetry.proto.trace.v1.ScopeSpans;
Expand All @@ -32,6 +33,7 @@
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xpack.oteldata.otlp.datapoint.TargetIndex;
import org.elasticsearch.xpack.oteldata.otlp.docbuilder.SpanDocumentBuilder;
import org.elasticsearch.xpack.oteldata.otlp.docbuilder.SpanEventDocumentBuilder;
import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor;

import java.io.IOException;
Expand All @@ -52,6 +54,7 @@ public class OTLPTracesTransportAction extends AbstractOTLPTransportAction {
public static final ActionType<OTLPActionResponse> TYPE = new ActionType<>(NAME);

public static final String TYPE_TRACES = "traces";
private static final String DOCUMENT_ID_ATTRIBUTE = "elasticsearch.document_id";

@Inject
public OTLPTracesTransportAction(TransportService transportService, ActionFilters actionFilters, ThreadPool threadPool, Client client) {
Expand All @@ -63,6 +66,7 @@ protected ProcessingContext prepareBulkRequest(OTLPActionRequest request, BulkRe
BufferedByteStringAccessor byteStringAccessor = new BufferedByteStringAccessor();
var tracesServiceRequest = ExportTraceServiceRequest.parseFrom(request.getRequest().streamInput());
SpanDocumentBuilder spanDocumentBuilder = new SpanDocumentBuilder(byteStringAccessor);
SpanEventDocumentBuilder spanEventDocumentBuilder = new SpanEventDocumentBuilder(byteStringAccessor);
List<ResourceSpans> resourceSpansList = tracesServiceRequest.getResourceSpansList();
for (int i = 0, resourceSpansListSize = resourceSpansList.size(); i < resourceSpansListSize; i++) {
ResourceSpans resourceSpans = resourceSpansList.get(i);
Expand Down Expand Up @@ -92,12 +96,46 @@ protected ProcessingContext prepareBulkRequest(OTLPActionRequest request, BulkRe
index,
span
);
IndexRequest indexRequest = new IndexRequest(index.index());
String documentId = extractDocumentId(span.getAttributesList());
if (documentId.isEmpty() == false) {
indexRequest.id(documentId);
}
bulkRequestBuilder.add(
new IndexRequest(index.index()).opType(DocWriteRequest.OpType.CREATE)
.setRequireDataStream(true)
.source(xContentBuilder)
indexRequest.opType(DocWriteRequest.OpType.CREATE).setRequireDataStream(true).source(xContentBuilder)
);
}
List<Span.Event> eventsList = span.getEventsList();
for (int l = 0, eventsListSize = eventsList.size(); l < eventsListSize; l++) {
Span.Event event = eventsList.get(l);
TargetIndex eventIndex = TargetIndex.evaluate(
OTLPLogsTransportAction.TYPE_LOGS,
event.getAttributesList(),
receiverName,
scope.getAttributesList(),
resource.getAttributesList()
);
try (XContentBuilder xContentBuilder = XContentFactory.cborBuilder(new BytesStreamOutput())) {
spanEventDocumentBuilder.buildSpanEventDocument(
xContentBuilder,
resource,
resourceSpans.getSchemaUrlBytes(),
scope,
scopeSpans.getSchemaUrlBytes(),
eventIndex,
span,
event
);
IndexRequest eventRequest = new IndexRequest(eventIndex.index());
String eventDocumentId = extractDocumentId(event.getAttributesList());
if (eventDocumentId.isEmpty() == false) {
eventRequest.id(eventDocumentId);
}
bulkRequestBuilder.add(
eventRequest.opType(DocWriteRequest.OpType.CREATE).setRequireDataStream(true).source(xContentBuilder)
);
}
}
}
}
}
Expand All @@ -112,4 +150,14 @@ MessageLite responseWithRejectedDataPoints(int rejectedDataPoints, String messag
.build();
return ExportTraceServiceResponse.newBuilder().setPartialSuccess(partialSuccess).build();
}

private static String extractDocumentId(List<KeyValue> attributes) {
for (int i = 0, size = attributes.size(); i < size; i++) {
KeyValue attribute = attributes.get(i);
if (DOCUMENT_ID_ATTRIBUTE.equals(attribute.getKey())) {
return attribute.getValue().getStringValue();
}
}
return "";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import com.google.protobuf.ByteString;

import org.elasticsearch.common.hash.MessageDigests;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.oteldata.otlp.datapoint.TargetIndex;
import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor;
Expand Down Expand Up @@ -56,6 +57,12 @@ protected void addFieldIfNotEmpty(XContentBuilder builder, String name, ByteStri
}
}

protected void addHexFieldIfNotEmpty(XContentBuilder builder, String name, ByteString value) throws IOException {
if (value != null && value.isEmpty() == false) {
builder.field(name, MessageDigests.toHexString(value.toByteArray()));
}
}

protected void buildDataStream(XContentBuilder builder, TargetIndex targetIndex) throws IOException {
if (targetIndex.isDataStream() == false) {
return;
Expand Down Expand Up @@ -110,7 +117,18 @@ protected void buildAnyValue(XContentBuilder builder, AnyValue value) throws IOE
}
builder.endArray();
}
default -> throw new IllegalArgumentException("Unsupported attribute value type: " + value.getValueCase());
case KVLIST_VALUE -> {
builder.startObject();
List<KeyValue> kvList = value.getKvlistValue().getValuesList();
for (int i = 0, kvListSize = kvList.size(); i < kvListSize; i++) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: why not

for (KeyValue kv: value.getKvlistValue().getValuesList()) {

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It avoids having to allocate an iterator. But the JIT will most likely replace with a stack allocation anyway. Both works. The more practical reason is that it follows the precedent of ARRAY_VALUE

KeyValue kv = kvList.get(i);
builder.field(kv.getKey());
buildAnyValue(builder, kv.getValue());
}
builder.endObject();
}
case BYTES_VALUE -> builder.value(value.getBytesValue().toByteArray());
case VALUE_NOT_SET -> builder.nullValue();
}
}
}
Loading
Loading