Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,21 @@

package org.elasticsearch.xpack.oteldata.otlp;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.logs.Logger;
import io.opentelemetry.exporter.otlp.http.logs.OtlpHttpLogRecordExporter;
import io.opentelemetry.sdk.logs.SdkLoggerProvider;
import io.opentelemetry.sdk.logs.export.BatchLogRecordProcessor;

import org.elasticsearch.test.rest.ObjectPath;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;

import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static io.opentelemetry.api.logs.Severity.INFO;
import static io.opentelemetry.api.logs.Severity.WARN;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.hamcrest.Matchers.equalTo;

Expand Down Expand Up @@ -54,6 +59,65 @@ public void tearDown() throws Exception {
}
}

public void testBatchLogIndexing() throws Exception {
int numLogs = 100;
for (int i = 0; i < numLogs; i++) {
logger.logRecordBuilder().setBody("log message " + i).setSeverity(INFO).emit();
}
indexLogs();

ObjectPath search = search("logs-generic.otel-default");
assertThat(search.evaluate("hits.total.value"), equalTo(numLogs));
}

public void testLogWithAttributes() throws Exception {
logger.logRecordBuilder()
.setBody("request handled")
.setSeverity(WARN)
.setSeverityText("WARN")
.setAttribute(stringKey("http.method"), "GET")
.setAttribute(AttributeKey.longKey("http.status_code"), 404L)
.emit();
indexLogs();

ObjectPath search = search("logs-generic.otel-default");
assertThat(search.evaluate("hits.total.value"), equalTo(1));
var source = new ObjectPath(search.evaluate("hits.hits.0._source"));
assertThat(source.evaluate("body.text"), equalTo("request handled"));
assertThat(source.evaluate("severity_text"), equalTo("WARN"));
assertThat(source.evaluate("severity_number"), equalTo(WARN.getSeverityNumber()));
assertThat(source.evaluate("attributes.http\\.method"), equalTo("GET"));
assertThat(source.evaluate("attributes.http\\.status_code"), equalTo(404));
assertThat(source.evaluate("resource.attributes.service\\.name"), equalTo("elasticsearch"));
}

public void testDataStreamRouting() throws Exception {
logger.logRecordBuilder()
.setBody("routed log")
.setSeverity(INFO)
.setAttribute(stringKey("data_stream.dataset"), "myapp")
.setAttribute(stringKey("data_stream.namespace"), "production")
.emit();
indexLogs();

ObjectPath search = search("logs-myapp.otel-production");
assertThat(search.evaluate("hits.total.value"), equalTo(1));
var source = new ObjectPath(search.evaluate("hits.hits.0._source"));
assertThat(source.evaluate("data_stream.type"), equalTo("logs"));
assertThat(source.evaluate("data_stream.dataset"), equalTo("myapp.otel"));
assertThat(source.evaluate("data_stream.namespace"), equalTo("production"));
}

public void testScopeIsIndexed() throws Exception {
logger.logRecordBuilder().setBody("scoped log").setSeverity(INFO).emit();
indexLogs();

ObjectPath search = search("logs-generic.otel-default");
assertThat(search.evaluate("hits.total.value"), equalTo(1));
var source = new ObjectPath(search.evaluate("hits.hits.0._source"));
assertThat(source.evaluate("scope.name"), equalTo(getClass().getSimpleName()));
}

private void indexLogs() throws IOException {
var result = loggerProvider.forceFlush().join(TEST_REQUEST_TIMEOUT.millis(), MILLISECONDS);
assertThat(result.isSuccess(), equalTo(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@

package org.elasticsearch.xpack.oteldata.otlp.docbuilder;

import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.common.v1.InstrumentationScope;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.logs.v1.LogRecord;
import io.opentelemetry.proto.logs.v1.SeverityNumber;
import io.opentelemetry.proto.resource.v1.Resource;

import com.google.protobuf.ByteString;
Expand Down Expand Up @@ -38,6 +41,87 @@ public void buildLogDocument(
LogRecord logRecord
) throws IOException {
builder.startObject();
long observedTimestamp = logRecord.getObservedTimeUnixNano();
long docTimestamp = logRecord.getTimeUnixNano();
if (docTimestamp == 0) {
docTimestamp = observedTimestamp;
}
addEpochMillisNanosField(builder, "@timestamp", docTimestamp);
addEpochMillisNanosField(builder, "observed_timestamp", observedTimestamp);
if (logRecord.getSeverityNumber() != SeverityNumber.SEVERITY_NUMBER_UNSPECIFIED) {
builder.field("severity_number", logRecord.getSeverityNumber().getNumber());
}
addFieldIfNotEmpty(builder, "severity_text", logRecord.getSeverityTextBytes());
ByteString eventName = logRecord.getEventNameBytes();
if (eventName.isEmpty() == false) {
addFieldIfNotEmpty(builder, "event_name", eventName);
} else {
for (KeyValue attribute : logRecord.getAttributesList()) {
if ("event.name".equals(attribute.getKey())) {
addFieldIfNotEmpty(builder, "event_name", attribute.getValue().getStringValueBytes());
break;
}
}
}
addSpanId(builder, logRecord.getSpanId().toByteArray());
addTraceId(builder, logRecord.getTraceId().toByteArray());
buildResource(resource, resourceSchemaUrl, builder);
buildDataStream(builder, targetIndex);
buildScope(builder, scope, scopeSchemaUrl);
buildAttributes(builder, logRecord.getAttributesList(), logRecord.getDroppedAttributesCount());
buildBody(builder, logRecord);
builder.endObject();
}

private void buildBody(XContentBuilder builder, LogRecord logRecord) throws IOException {
AnyValue body = logRecord.getBody();
AnyValue.ValueCase valueCase = body.getValueCase();
if (valueCase == AnyValue.ValueCase.VALUE_NOT_SET) {
return;
}
if (valueCase == AnyValue.ValueCase.ARRAY_VALUE && body.getArrayValue().getValuesCount() == 0) {
return;
}
builder.startObject("body");
switch (valueCase) {
case ARRAY_VALUE: {
boolean allMaps = true;
for (var v : body.getArrayValue().getValuesList()) {
if (v.hasKvlistValue() == false) {
allMaps = false;
break;
}
}
if (allMaps) {
buildStructuredBody(builder, body);
} else {
// The flattened field type only accepts objects or arrays of objects
// If this is an array of primitive values, for example, wrap the array in a 'value' object
builder.field("structured");
builder.startObject();
builder.field("value");
buildAnyValue(builder, body);
builder.endObject();
}
break;
}
case KVLIST_VALUE:
buildStructuredBody(builder, body);
break;
default:
buildTextBody(builder, body);
break;
}
builder.endObject();
}
Comment thread
felixbarny marked this conversation as resolved.

private void buildTextBody(XContentBuilder builder, AnyValue value) throws IOException {
builder.field("text");
buildAnyValue(builder, value);
}

private void buildStructuredBody(XContentBuilder builder, AnyValue body) throws IOException {
builder.field("structured");
buildAnyValue(builder, body);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,27 @@

import com.google.protobuf.ByteString;

import org.elasticsearch.common.Strings;
import org.elasticsearch.common.hash.MessageDigests;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.oteldata.otlp.datapoint.TargetIndex;
import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor;

import java.io.IOException;
import java.util.HexFormat;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* Base class for constructing Elasticsearch document representations of OTel data (metrics, logs, traces).
* Provides shared logic for building resource, scope, attribute, and data stream fields.
*/
public abstract class OTelDocumentBuilder {

private static final String DATA_STREAM_TYPE = "data_stream.type";
private static final String ELASTIC_MAPPING_MODE = "elastic.mapping.mode";
private static final String ELASTICSEARCH_DOCUMENT_ID = "elasticsearch.document_id";
private static final HexFormat HEX = HexFormat.of();
private final BufferedByteStringAccessor byteStringAccessor;

public OTelDocumentBuilder(BufferedByteStringAccessor byteStringAccessor) {
Expand Down Expand Up @@ -63,6 +70,16 @@ protected void addHexFieldIfNotEmpty(XContentBuilder builder, String name, ByteS
}
}

protected void addEpochMillisNanosField(XContentBuilder builder, String fieldName, long unixNanos) throws IOException {
long millis = TimeUnit.NANOSECONDS.toMillis(unixNanos);
long nanosRemainder = unixNanos - TimeUnit.MILLISECONDS.toNanos(millis);
if (nanosRemainder == 0) {
builder.field(fieldName, millis);
} else {
builder.field(fieldName, Strings.format("%d.%06d", millis, nanosRemainder));
}
}

protected void buildDataStream(XContentBuilder builder, TargetIndex targetIndex) throws IOException {
if (targetIndex.isDataStream() == false) {
return;
Expand Down Expand Up @@ -93,13 +110,17 @@ protected void buildAttributes(XContentBuilder builder, List<KeyValue> attribute
/**
* Checks if the given attribute key is an ignored attribute.
* Ignored attributes are well-known Elastic-specific attributes
* that influence how the documents are indexed but are not stored themselves.
* that control routing, mapping, or document metadata but are not stored themselves.
*
* @param attributeKey the attribute key to check
* @return true if the attribute is ignored, false otherwise
*/
public static boolean isIgnoredAttribute(String attributeKey) {
return TargetIndex.isTargetIndexAttribute(attributeKey) || MappingHints.isMappingHintsAttribute(attributeKey);
return TargetIndex.isTargetIndexAttribute(attributeKey)
|| MappingHints.isMappingHintsAttribute(attributeKey)
|| DATA_STREAM_TYPE.equals(attributeKey)
|| ELASTIC_MAPPING_MODE.equals(attributeKey)
|| ELASTICSEARCH_DOCUMENT_ID.equals(attributeKey);
}

protected void buildAnyValue(XContentBuilder builder, AnyValue value) throws IOException {
Expand Down Expand Up @@ -131,4 +152,18 @@ protected void buildAnyValue(XContentBuilder builder, AnyValue value) throws IOE
case VALUE_NOT_SET -> builder.nullValue();
}
}

protected void addSpanId(XContentBuilder builder, byte[] spanId) throws IOException {
addHexIdIfNotEmpty(builder, "span_id", spanId);
}

protected void addTraceId(XContentBuilder builder, byte[] traceId) throws IOException {
addHexIdIfNotEmpty(builder, "trace_id", traceId);
}

private static void addHexIdIfNotEmpty(XContentBuilder builder, String fieldName, byte[] id) throws IOException {
if (id.length > 0) {
builder.field(fieldName, HEX.formatHex(id));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* This class constructs an Elasticsearch document representation of an OTel span.
Expand All @@ -48,15 +47,13 @@ public void buildSpanDocument(
}
// OTLP semantically requires span timestamps, but proto3 can still carry zeroes.
// If both are zero, keep the malformed span indexable and let @timestamp remain epoch.
builder.field("@timestamp", TimeUnit.NANOSECONDS.toMillis(timestamp));
addEpochMillisNanosField(builder, "@timestamp", timestamp);
addHexFieldIfNotEmpty(builder, "trace_id", span.getTraceId());
addHexFieldIfNotEmpty(builder, "span_id", span.getSpanId());
addHexFieldIfNotEmpty(builder, "parent_span_id", span.getParentSpanId());
addFieldIfNotEmpty(builder, "trace_state", span.getTraceStateBytes());
addFieldIfNotEmpty(builder, "name", span.getNameBytes());
if (span.getKind() != Span.SpanKind.SPAN_KIND_UNSPECIFIED) {
builder.field("kind", normalizeSpanKind(span.getKind()));
}
builder.field("kind", normalizeSpanKind(span.getKind()));
if (span.getStartTimeUnixNano() != 0 && span.getEndTimeUnixNano() != 0) {
builder.field("duration", span.getEndTimeUnixNano() - span.getStartTimeUnixNano());
}
Expand Down
Loading
Loading