Skip to content
27 changes: 27 additions & 0 deletions .chloggen/elasticsearchexporter-fix-data-stream-attributes.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix a bug where data stream attributes in 'none' mapping mode have invalid prefix.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [42454]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
9 changes: 6 additions & 3 deletions exporter/elasticsearchexporter/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,9 +443,12 @@ func (ecsDataPointsEncoder) encodeMetrics(

func addDataStreamAttributes(document *objmodel.Document, key string, idx elasticsearch.Index) {
if idx.IsDataStream() {
document.AddString(key+"data_stream.type", idx.Type)
document.AddString(key+"data_stream.dataset", idx.Dataset)
document.AddString(key+"data_stream.namespace", idx.Namespace)
if key != "" {
key += "."
}
document.AddString(key+elasticsearch.DataStreamType, idx.Type)
document.AddString(key+elasticsearch.DataStreamDataset, idx.Dataset)
document.AddString(key+elasticsearch.DataStreamNamespace, idx.Namespace)
}
}

Expand Down
56 changes: 39 additions & 17 deletions exporter/elasticsearchexporter/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,17 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/objmodel"
)

var expectedSpanBody = `{"@timestamp":"2023-04-19T03:04:05.000000006Z","Attributes.service.instance.id":"23","Duration":1000000,"EndTimestamp":"2023-04-19T03:04:06.000000006Z","Events.fooEvent.eventMockBar":"bar","Events.fooEvent.eventMockFoo":"foo","Events.fooEvent.time":"2023-04-19T03:04:05.000000006Z","Kind":"SPAN_KIND_CLIENT","Link":"[{\"attribute\":{},\"spanID\":\"\",\"traceID\":\"01020304050607080807060504030200\"}]","Name":"client span","Resource.cloud.platform":"aws_elastic_beanstalk","Resource.cloud.provider":"aws","Resource.deployment.environment":"BETA","Resource.service.instance.id":"23","Resource.service.name":"some-service","Resource.service.version":"env-version-1234","Scope.lib-foo":"lib-bar","Scope.name":"io.opentelemetry.rabbitmq-2.7","Scope.version":"1.30.0-alpha","SpanId":"1920212223242526","TraceId":"01020304050607080807060504030201","TraceStatus":2,"TraceStatusDescription":"Test"}`
const (
expectedSpanBody = `{"@timestamp":"2023-04-19T03:04:05.000000006Z","Attributes.service.instance.id":"23","Duration":1000000,"EndTimestamp":"2023-04-19T03:04:06.000000006Z","Events.fooEvent.eventMockBar":"bar","Events.fooEvent.eventMockFoo":"foo","Events.fooEvent.time":"2023-04-19T03:04:05.000000006Z","Kind":"SPAN_KIND_CLIENT","Link":"[{\"attribute\":{},\"spanID\":\"\",\"traceID\":\"01020304050607080807060504030200\"}]","Name":"client span","Resource.cloud.platform":"aws_elastic_beanstalk","Resource.cloud.provider":"aws","Resource.deployment.environment":"BETA","Resource.service.instance.id":"23","Resource.service.name":"some-service","Resource.service.version":"env-version-1234","Scope.lib-foo":"lib-bar","Scope.name":"io.opentelemetry.rabbitmq-2.7","Scope.version":"1.30.0-alpha","SpanId":"1920212223242526","TraceId":"01020304050607080807060504030201","TraceStatus":2,"TraceStatusDescription":"Test"}`
expectedSpanBodyWithDataStream = `{"@timestamp":"2023-04-19T03:04:05.000000006Z","Attributes.data_stream.dataset":"two","Attributes.data_stream.namespace":"three","Attributes.data_stream.type":"one","Attributes.service.instance.id":"23","Duration":1000000,"EndTimestamp":"2023-04-19T03:04:06.000000006Z","Events.fooEvent.eventMockBar":"bar","Events.fooEvent.eventMockFoo":"foo","Events.fooEvent.time":"2023-04-19T03:04:05.000000006Z","Kind":"SPAN_KIND_CLIENT","Link":"[{\"attribute\":{},\"spanID\":\"\",\"traceID\":\"01020304050607080807060504030200\"}]","Name":"client span","Resource.cloud.platform":"aws_elastic_beanstalk","Resource.cloud.provider":"aws","Resource.deployment.environment":"BETA","Resource.service.instance.id":"23","Resource.service.name":"some-service","Resource.service.version":"env-version-1234","Scope.lib-foo":"lib-bar","Scope.name":"io.opentelemetry.rabbitmq-2.7","Scope.version":"1.30.0-alpha","SpanId":"1920212223242526","TraceId":"01020304050607080807060504030201","TraceStatus":2,"TraceStatusDescription":"Test"}`
)

var (
const (
expectedLogBody = `{"@timestamp":"2023-04-19T03:04:05.000000006Z","Attributes.log-attr1":"value1","Body":"log-body","Resource.key1":"value1","Scope.name":"","Scope.version":"","SeverityNumber":0,"TraceFlags":0}`
expectedLogBodyWithEmptyTimestamp = `{"@timestamp":"1970-01-01T00:00:00.000000000Z","Attributes.log-attr1":"value1","Body":"log-body","Resource.key1":"value1","Scope.name":"","Scope.version":"","SeverityNumber":0,"TraceFlags":0}`
)

var expectedMetricsEncoded = `{"@timestamp":"2024-06-12T10:20:16.419290690Z","cpu":"cpu0","host":{"hostname":"my-host","name":"my-host","os":{"platform":"linux"}},"state":"idle","system":{"cpu":{"time":440.23}}}
const expectedMetricsEncoded = `{"@timestamp":"2024-06-12T10:20:16.419290690Z","cpu":"cpu0","host":{"hostname":"my-host","name":"my-host","os":{"platform":"linux"}},"state":"idle","system":{"cpu":{"time":440.23}}}
{"@timestamp":"2024-06-12T10:20:16.419290690Z","cpu":"cpu0","host":{"hostname":"my-host","name":"my-host","os":{"platform":"linux"}},"state":"interrupt","system":{"cpu":{"time":0.0}}}
{"@timestamp":"2024-06-12T10:20:16.419290690Z","cpu":"cpu0","host":{"hostname":"my-host","name":"my-host","os":{"platform":"linux"}},"state":"nice","system":{"cpu":{"time":0.14}}}
{"@timestamp":"2024-06-12T10:20:16.419290690Z","cpu":"cpu0","host":{"hostname":"my-host","name":"my-host","os":{"platform":"linux"}},"state":"softirq","system":{"cpu":{"time":0.77}}}
Expand All @@ -54,19 +57,39 @@ var expectedMetricsEncoded = `{"@timestamp":"2024-06-12T10:20:16.419290690Z","cp
{"@timestamp":"2024-06-12T10:20:16.419290690Z","cpu":"cpu1","host":{"hostname":"my-host","name":"my-host","os":{"platform":"linux"}},"state":"wait","system":{"cpu":{"time":0.95}}}`

func TestEncodeSpan(t *testing.T) {
encoder, _ := newEncoder(MappingNone)
td := mockResourceSpans()
var buf bytes.Buffer
err := encoder.encodeSpan(
encodingContext{
resource: td.ResourceSpans().At(0).Resource(),
scope: td.ResourceSpans().At(0).ScopeSpans().At(0).Scope(),
},
td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0),
elasticsearch.Index{}, &buf,
)
assert.NoError(t, err)
assert.Equal(t, expectedSpanBody, buf.String())
t.Run("non data stream", func(t *testing.T) {
encoder, _ := newEncoder(MappingNone)
td := mockResourceSpans()
var buf bytes.Buffer
err := encoder.encodeSpan(
encodingContext{
resource: td.ResourceSpans().At(0).Resource(),
scope: td.ResourceSpans().At(0).ScopeSpans().At(0).Scope(),
},
td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0),
elasticsearch.Index{}, &buf,
)
assert.NoError(t, err)
assert.Equal(t, expectedSpanBody, buf.String())
})

// See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/42454.
t.Run("data stream", func(t *testing.T) {
encoder, _ := newEncoder(MappingNone)
td := mockResourceSpans()
var buf bytes.Buffer
err := encoder.encodeSpan(
encodingContext{
resource: td.ResourceSpans().At(0).Resource(),
scope: td.ResourceSpans().At(0).ScopeSpans().At(0).Scope(),
},
td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0),
elasticsearch.NewDataStreamIndex("one", "two", "three"),
&buf,
)
assert.NoError(t, err)
assert.Equal(t, expectedSpanBodyWithDataStream, buf.String())
})
}

func TestEncodeLog(t *testing.T) {
Expand Down Expand Up @@ -397,7 +420,6 @@ func TestEncodeLogECSModeDuplication(t *testing.T) {
require.NoError(t, err)

want := `{"@timestamp":"2024-03-12T20:00:41.123456789Z","agent":{"name":"otlp"},"container":{"image":{"tag":["v3.4.0"]}},"event":{"action":"user-password-change"},"host":{"hostname":"localhost","name":"localhost","os":{"full":"Mac OS Mojave","name":"Mac OS X","platform":"darwin","type":"macos","version":"10.14.1"}},"service":{"name":"foo.bar","version":"1.1.0"}}`
require.NoError(t, err)

resourceContainerImageTags := resource.Attributes().PutEmptySlice(string(semconv.ContainerImageTagsKey))
err = resourceContainerImageTags.FromRaw([]any{"v3.4.0"})
Expand Down
Loading