Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
import org.junit.ClassRule;

import java.io.IOException;
import java.util.Map;

import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.equalTo;

public abstract class AbstractOTLPIndexingRestIT extends ESRestTestCase {
Expand Down Expand Up @@ -131,4 +133,19 @@ protected ObjectPath search(String target) throws IOException {
assertOK(response);
return ObjectPath.createFromResponse(response);
}

protected ObjectPath search(String target, String body) throws IOException {
Request request = new Request("GET", target + "/_search");
request.setJsonEntity(body);
var response = client().performRequest(request);
assertOK(response);
return ObjectPath.createFromResponse(response);
}

protected static ObjectPath getIndexMappingPath(String target) throws IOException {
Map<String, Object> mappings = ObjectPath.createFromResponse(client().performRequest(new Request("GET", target + "/_mapping")))
.evaluate("");
assertThat(mappings, aMapWithSize(1));
return new ObjectPath(new ObjectPath(mappings.values().iterator().next()).evaluate("mappings"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

import java.io.IOException;

import static io.opentelemetry.api.common.AttributeKey.doubleKey;
import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static io.opentelemetry.api.logs.Severity.INFO;
import static io.opentelemetry.api.logs.Severity.WARN;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;

public class OTLPLogsIndexingRestIT extends AbstractOTLPIndexingRestIT {

Expand Down Expand Up @@ -91,6 +94,40 @@ public void testLogWithAttributes() throws Exception {
assertThat(source.evaluate("resource.attributes.service\\.name"), equalTo("elasticsearch"));
}

public void testGeoLocationAttributesAreMerged() throws Exception {
logger.logRecordBuilder()
.setBody("geo log")
.setSeverity(INFO)
.setAttribute(doubleKey("client.geo.location.lon"), 143.2104)
.setAttribute(doubleKey("client.geo.location.lat"), -33.494)
.setAttribute(doubleKey("server.geo.location.lon"), 1.1)
.emit();
indexLogs();

ObjectPath search = search("logs-generic.otel-default", """
{
"fields": ["attributes.client.geo.location"]
}
""");
assertThat(search.evaluate("hits.total.value"), equalTo(1));
var source = new ObjectPath(search.evaluate("hits.hits.0._source"));
assertThat(search.evaluate("hits.hits.0.fields.attributes\\.client\\.geo\\.location.0.type"), equalTo("Point"));
assertThat(
search.<Number>evaluate("hits.hits.0.fields.attributes\\.client\\.geo\\.location.0.coordinates.0").doubleValue(),
closeTo(143.2104, 0.001)
);
assertThat(
search.<Number>evaluate("hits.hits.0.fields.attributes\\.client\\.geo\\.location.0.coordinates.1").doubleValue(),
closeTo(-33.494, 0.001)
);
assertThat(
getIndexMappingPath("logs-generic.otel-default").evaluate("properties.attributes.properties.client\\.geo\\.location.type"),
equalTo("geo_point")
);
assertThat(source.evaluate("attributes.server\\.geo\\.location\\.lon"), equalTo(1.1));
assertThat(source.evaluate("attributes.server\\.geo\\.location\\.lat"), nullValue());
}

public void testDataStreamRouting() throws Exception {
logger.logRecordBuilder()
.setBody("routed log")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.isA;

Expand Down Expand Up @@ -78,12 +79,18 @@ public void testSpanWithAttributes() throws Exception {
String spanId = span.getSpanContext().getSpanId();
span.setAttribute("http.method", "GET");
span.setAttribute("http.status_code", 404L);
span.setAttribute("client.geo.location.lon", 143.2104);
span.setAttribute("client.geo.location.lat", -33.494);
span.setStatus(StatusCode.ERROR, "not found");
span.end();

indexTraces();

ObjectPath search = search("traces-generic.otel-default");
ObjectPath search = search("traces-generic.otel-default", """
{
"fields": ["attributes.client.geo.location"]
}
""");
assertThat(search.evaluate("hits.total.value"), equalTo(1));
var source = new ObjectPath(search.evaluate("hits.hits.0._source"));
assertThat(source.evaluate("name"), equalTo("GET /orders"));
Expand All @@ -95,6 +102,19 @@ public void testSpanWithAttributes() throws Exception {
assertThat(source.evaluate("status.message"), equalTo("not found"));
assertThat(source.evaluate("attributes.http\\.method"), equalTo("GET"));
assertThat(source.evaluate("attributes.http\\.status_code"), equalTo(404));
assertThat(search.evaluate("hits.hits.0.fields.attributes\\.client\\.geo\\.location.0.type"), equalTo("Point"));
assertThat(
search.<Number>evaluate("hits.hits.0.fields.attributes\\.client\\.geo\\.location.0.coordinates.0").doubleValue(),
closeTo(143.2104, 0.001)
);
assertThat(
search.<Number>evaluate("hits.hits.0.fields.attributes\\.client\\.geo\\.location.0.coordinates.1").doubleValue(),
closeTo(-33.494, 0.001)
);
assertThat(
getIndexMappingPath("traces-generic.otel-default").evaluate("properties.attributes.properties.client\\.geo\\.location.type"),
equalTo("geo_point")
);
assertThat(source.evaluate("resource.attributes.service\\.name"), equalTo("elasticsearch"));
assertThat(source.evaluate("scope.name"), equalTo(getClass().getSimpleName()));
}
Expand All @@ -103,21 +123,47 @@ public void testSpanEventsAreIndexedAsLogs() throws Exception {
var span = tracer.spanBuilder("span-with-event").startSpan();
String traceId = span.getSpanContext().getTraceId();
String spanId = span.getSpanContext().getSpanId();
span.addEvent("exception", Attributes.of(stringKey("event.attr.foo"), "event.attr.bar"));
span.addEvent(
"exception",
Attributes.builder()
.put(stringKey("event.attr.foo"), "event.attr.bar")
.put("client.geo.location.lon", 143.2104)
.put("client.geo.location.lat", -33.494)
.put("server.geo.location.lon", 1.1)
.build()
);
span.end();

indexTraces();

ObjectPath tracesSearch = search("traces-generic.otel-default");
assertThat(tracesSearch.evaluate("hits.total.value"), equalTo(1));

ObjectPath logsSearch = search("logs-generic.otel-default");
ObjectPath logsSearch = search("logs-generic.otel-default", """
{
"fields": ["attributes.client.geo.location"]
}
""");
assertThat(logsSearch.evaluate("hits.total.value"), equalTo(1));
var source = new ObjectPath(logsSearch.evaluate("hits.hits.0._source"));
assertThat(source.evaluate("event_name"), equalTo("exception"));
assertThat(source.evaluate("trace_id"), equalTo(traceId));
assertThat(source.evaluate("span_id"), equalTo(spanId));
assertThat(source.evaluate("attributes.event\\.attr\\.foo"), equalTo("event.attr.bar"));
assertThat(logsSearch.evaluate("hits.hits.0.fields.attributes\\.client\\.geo\\.location.0.type"), equalTo("Point"));
assertThat(
logsSearch.<Number>evaluate("hits.hits.0.fields.attributes\\.client\\.geo\\.location.0.coordinates.0").doubleValue(),
closeTo(143.2104, 0.001)
);
assertThat(
logsSearch.<Number>evaluate("hits.hits.0.fields.attributes\\.client\\.geo\\.location.0.coordinates.1").doubleValue(),
closeTo(-33.494, 0.001)
);
assertThat(
getIndexMappingPath("logs-generic.otel-default").evaluate("properties.attributes.properties.client\\.geo\\.location.type"),
equalTo("geo_point")
);
assertThat(source.evaluate("attributes.server\\.geo\\.location\\.lon"), equalTo(1.1));
assertThat(source.evaluate("attributes.event\\.name"), equalTo("exception"));
assertThat(source.evaluate("data_stream.type"), equalTo("logs"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ public void buildLogDocument(
}
addSpanId(builder, logRecord.getSpanId().toByteArray());
addTraceId(builder, logRecord.getTraceId().toByteArray());
buildResource(resource, resourceSchemaUrl, builder);
buildResourceWithGeoPoints(resource, resourceSchemaUrl, builder);
buildDataStream(builder, targetIndex);
buildScope(builder, scope, scopeSchemaUrl);
buildAttributes(builder, logRecord.getAttributesList(), logRecord.getDroppedAttributesCount());
buildScopeWithGeoPoints(builder, scope, scopeSchemaUrl);
buildAttributesWithGeoPoints(builder, logRecord.getAttributesList(), logRecord.getDroppedAttributesCount());
buildBody(builder, logRecord);
builder.endObject();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ public BytesRef buildMetricDocument(
if (dataPointGroup.getStartTimestampUnixNano() != 0) {
builder.field("start_timestamp", TimeUnit.NANOSECONDS.toMillis(dataPointGroup.getStartTimestampUnixNano()));
}
// Metrics intentionally skip merging paired *.geo.location.lat/.lon attributes into a geo_point: geo_point is not a
// supported dimension type for time series metrics, and geo attributes on metrics are expected to be very rare, so the
// per-attribute scan is not worth its cost on the metrics hot path.
Comment thread
felixbarny marked this conversation as resolved.
Outdated
buildResource(dataPointGroup.resource(), dataPointGroup.resourceSchemaUrl(), builder);
buildDataStream(builder, dataPointGroup.targetIndex());
buildScope(builder, dataPointGroup.scope(), dataPointGroup.scopeSchemaUrl());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@

import java.io.IOException;
import java.util.HexFormat;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -42,18 +44,36 @@ public OTelDocumentBuilder(BufferedByteStringAccessor byteStringAccessor) {
}

protected void buildResource(Resource resource, ByteString schemaUrl, XContentBuilder builder) throws IOException {
buildResource(resource, schemaUrl, builder, false);
}

protected void buildResourceWithGeoPoints(Resource resource, ByteString schemaUrl, XContentBuilder builder) throws IOException {
buildResource(resource, schemaUrl, builder, true);
}

private void buildResource(Resource resource, ByteString schemaUrl, XContentBuilder builder, boolean mergeGeoLocationAttributes)
throws IOException {
builder.startObject("resource");
addFieldIfNotEmpty(builder, "schema_url", schemaUrl);
buildAttributes(builder, resource.getAttributesList(), resource.getDroppedAttributesCount());
buildAttributes(builder, resource.getAttributesList(), resource.getDroppedAttributesCount(), mergeGeoLocationAttributes);
builder.endObject();
}

protected void buildScope(XContentBuilder builder, InstrumentationScope scope, ByteString schemaUrl) throws IOException {
buildScope(builder, scope, schemaUrl, false);
}

protected void buildScopeWithGeoPoints(XContentBuilder builder, InstrumentationScope scope, ByteString schemaUrl) throws IOException {
buildScope(builder, scope, schemaUrl, true);
}

private void buildScope(XContentBuilder builder, InstrumentationScope scope, ByteString schemaUrl, boolean mergeGeoLocationAttributes)
throws IOException {
builder.startObject("scope");
addFieldIfNotEmpty(builder, "schema_url", schemaUrl);
addFieldIfNotEmpty(builder, "name", scope.getNameBytes());
addFieldIfNotEmpty(builder, "version", scope.getVersionBytes());
buildAttributes(builder, scope.getAttributesList(), scope.getDroppedAttributesCount());
buildAttributes(builder, scope.getAttributesList(), scope.getDroppedAttributesCount(), mergeGeoLocationAttributes);
builder.endObject();
}

Expand Down Expand Up @@ -91,19 +111,49 @@ protected void buildDataStream(XContentBuilder builder, TargetIndex targetIndex)
builder.endObject();
}

/**
* Builds attributes as received from OTLP, except for Elastic control attributes that are filtered out.
* Geo attributes are not merged; use {@link #buildAttributesWithGeoPoints} when paired geo attributes
* should be converted into {@code geo_point} values.
*/
protected void buildAttributes(XContentBuilder builder, List<KeyValue> attributes, int droppedAttributesCount) throws IOException {
buildAttributes(builder, attributes, droppedAttributesCount, false);
}

/**
* Builds attributes while merging paired double {@code *.geo.location.lon} and {@code *.geo.location.lat}
* values into {@code *.geo.location} arrays in {@code [lon, lat]} order.
*/
protected void buildAttributesWithGeoPoints(XContentBuilder builder, List<KeyValue> attributes, int droppedAttributesCount)
throws IOException {
buildAttributes(builder, attributes, droppedAttributesCount, true);
}

private void buildAttributes(
XContentBuilder builder,
List<KeyValue> attributes,
int droppedAttributesCount,
boolean mergeGeoLocationAttributes
) throws IOException {
if (droppedAttributesCount > 0) {
builder.field("dropped_attributes_count", droppedAttributesCount);
}
builder.startObject("attributes");
GeoLocationAttributes geoLocationAttributes = mergeGeoLocationAttributes ? new GeoLocationAttributes() : null;
for (int i = 0, size = attributes.size(); i < size; i++) {
KeyValue attribute = attributes.get(i);
String key = attribute.getKey();
if (isIgnoredAttribute(key) == false) {
if (geoLocationAttributes != null && geoLocationAttributes.addIfGeoLocationAttribute(attribute)) {
continue;
}
builder.field(key);
buildAnyValue(builder, attribute.getValue());
}
}
if (geoLocationAttributes != null) {
geoLocationAttributes.write(builder);
}
builder.endObject();
}

Expand Down Expand Up @@ -166,4 +216,79 @@ private static void addHexIdIfNotEmpty(XContentBuilder builder, String fieldName
builder.field(fieldName, HEX.formatHex(id));
}
}

private static class GeoLocationAttributes {
private static final String GEO_LOCATION = "geo.location";
private static final String GEO_LOCATION_LAT = "geo.location.lat";
private static final String GEO_LOCATION_LON = "geo.location.lon";
private static final String NAMESPACED_GEO_LOCATION_LAT = "." + GEO_LOCATION_LAT;
private static final String NAMESPACED_GEO_LOCATION_LON = "." + GEO_LOCATION_LON;

private Map<String, GeoLocation> locationsByPrefix;

boolean addIfGeoLocationAttribute(KeyValue attribute) {
AnyValue value = attribute.getValue();
if (value.getValueCase() != AnyValue.ValueCase.DOUBLE_VALUE) {
return false;
}
String key = attribute.getKey();
String prefix;
boolean isLon;
if (GEO_LOCATION_LON.equals(key)) {
prefix = "";
isLon = true;
} else if (GEO_LOCATION_LAT.equals(key)) {
prefix = "";
isLon = false;
} else if (key.endsWith(NAMESPACED_GEO_LOCATION_LON)) {
prefix = key.substring(0, key.length() - NAMESPACED_GEO_LOCATION_LON.length());
isLon = true;
} else if (key.endsWith(NAMESPACED_GEO_LOCATION_LAT)) {
prefix = key.substring(0, key.length() - NAMESPACED_GEO_LOCATION_LAT.length());
isLon = false;
} else {
return false;
}
if (locationsByPrefix == null) {
locationsByPrefix = new LinkedHashMap<>();
}
GeoLocation location = locationsByPrefix.computeIfAbsent(prefix, ignored -> new GeoLocation());
if (isLon) {
location.lon = value.getDoubleValue();
location.lonKey = key;
} else {
location.lat = value.getDoubleValue();
location.latKey = key;
}
return true;
}

void write(XContentBuilder builder) throws IOException {
if (locationsByPrefix == null) {
return;
}
for (Map.Entry<String, GeoLocation> entry : locationsByPrefix.entrySet()) {
String prefix = entry.getKey();
GeoLocation location = entry.getValue();
if (location.lonKey != null && location.latKey != null) {
builder.field(prefix.isEmpty() ? GEO_LOCATION : prefix + "." + GEO_LOCATION);
builder.startArray();
builder.value(location.lon);
builder.value(location.lat);
builder.endArray();
} else if (location.lonKey != null) {
builder.field(location.lonKey, location.lon);
} else {
builder.field(location.latKey, location.lat);
}
}
}
}

private static class GeoLocation {
private double lon;
private double lat;
private String lonKey;
private String latKey;
}
}
Loading
Loading