diff --git a/.chloggen/elasticsearchexporter_mutates-data-false.yaml b/.chloggen/elasticsearchexporter_mutates-data-false.yaml new file mode 100644 index 0000000000000..8a87906b7ed3f --- /dev/null +++ b/.chloggen/elasticsearchexporter_mutates-data-false.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Declare MutatesData: false" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [37234] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: When multiple exporters are used, the collector doesn't need to clone the incoming data anymore + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/elasticsearchexporter/data_stream_router.go b/exporter/elasticsearchexporter/data_stream_router.go index a64b15d0ad972..9e8054efc7e55 100644 --- a/exporter/elasticsearchexporter/data_stream_router.go +++ b/exporter/elasticsearchexporter/data_stream_router.go @@ -46,7 +46,7 @@ func routeWithDefaults(defaultDSType string) func( string, bool, string, -) string { +) esIndex { return func( recordAttr pcommon.Map, scopeAttr pcommon.Map, @@ -54,7 +54,7 @@ func routeWithDefaults(defaultDSType string) func( fIndex string, otel bool, scopeName string, - ) string { + ) esIndex { // Order: // 1. read data_stream.* from attributes // 2. read elasticsearch.index.* from attributes @@ -67,7 +67,7 @@ func routeWithDefaults(defaultDSType string) func( prefix, prefixExists := getFromAttributes(indexPrefix, "", resourceAttr, scopeAttr, recordAttr) suffix, suffixExists := getFromAttributes(indexSuffix, "", resourceAttr, scopeAttr, recordAttr) if prefixExists || suffixExists { - return fmt.Sprintf("%s%s%s", prefix, fIndex, suffix) + return esIndex{Index: fmt.Sprintf("%s%s%s", prefix, fIndex, suffix)} } } @@ -89,15 +89,30 @@ func routeWithDefaults(defaultDSType string) func( dataset = sanitizeDataStreamField(dataset, disallowedDatasetRunes, datasetSuffix) namespace = sanitizeDataStreamField(namespace, disallowedNamespaceRunes, "") + return newDataStream(defaultDSType, dataset, namespace) + } +} - recordAttr.PutStr(dataStreamDataset, dataset) - recordAttr.PutStr(dataStreamNamespace, namespace) - recordAttr.PutStr(dataStreamType, defaultDSType) +type esIndex struct { + Index string + Type string + Dataset string + Namespace string +} - return fmt.Sprintf("%s-%s-%s", defaultDSType, dataset, namespace) +func newDataStream(typ, dataset, namespace string) esIndex { + return esIndex{ + Index: fmt.Sprintf("%s-%s-%s", typ, dataset, namespace), + Type: typ, + Dataset: dataset, + Namespace: namespace, } } +func (i esIndex) isDataStream() bool { + return i.Type != "" && i.Dataset != "" && i.Namespace != "" +} + var ( // routeLogRecord returns the name of the index to send the log record to according to data stream routing related attributes. // This function may mutate record attributes. diff --git a/exporter/elasticsearchexporter/data_stream_router_test.go b/exporter/elasticsearchexporter/data_stream_router_test.go index 81450da4d7a1a..3bce5cea6d84b 100644 --- a/exporter/elasticsearchexporter/data_stream_router_test.go +++ b/exporter/elasticsearchexporter/data_stream_router_test.go @@ -4,7 +4,6 @@ package elasticsearchexporter import ( - "fmt" "testing" "github.com/stretchr/testify/assert" @@ -15,15 +14,15 @@ type routeTestCase struct { name string otel bool scopeName string - want string + want esIndex } func createRouteTests(dsType string) []routeTestCase { - renderWantRoute := func(dsType, dsDataset string, otel bool) string { + renderWantRoute := func(dsType, dsDataset string, otel bool) esIndex { if otel { - return fmt.Sprintf("%s-%s.otel-%s", dsType, dsDataset, defaultDataStreamNamespace) + dsDataset += ".otel" } - return fmt.Sprintf("%s-%s-%s", dsType, dsDataset, defaultDataStreamNamespace) + return newDataStream(dsType, dsDataset, defaultDataStreamNamespace) } return []routeTestCase{ diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index 27db606c69632..b13d1336b94d8 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -162,27 +162,27 @@ func (e *elasticsearchExporter) pushLogRecord( scopeSchemaURL string, bulkIndexerSession bulkIndexerSession, ) error { - fIndex := e.index + fIndex := esIndex{Index: e.index} if e.dynamicIndex { - fIndex = routeLogRecord(record.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, e.otel, scope.Name()) + fIndex = routeLogRecord(record.Attributes(), scope.Attributes(), resource.Attributes(), e.index, e.otel, scope.Name()) } if e.logstashFormat.Enabled { - formattedIndex, err := generateIndexWithLogstashFormat(fIndex, &e.logstashFormat, time.Now()) + formattedIndex, err := generateIndexWithLogstashFormat(fIndex.Index, &e.logstashFormat, time.Now()) if err != nil { return err } - fIndex = formattedIndex + fIndex = esIndex{Index: formattedIndex} } buf := e.bufferPool.NewPooledBuffer() - err := e.model.encodeLog(resource, resourceSchemaURL, record, scope, scopeSchemaURL, buf.Buffer) + err := e.model.encodeLog(resource, resourceSchemaURL, record, scope, scopeSchemaURL, fIndex, buf.Buffer) if err != nil { buf.Recycle() return fmt.Errorf("failed to encode log event: %w", err) } // not recycling after Add returns an error as we don't know if it's already recycled - return bulkIndexerSession.Add(ctx, fIndex, buf, nil) + return bulkIndexerSession.Add(ctx, fIndex.Index, buf, nil) } func (e *elasticsearchExporter) pushMetricsData( @@ -209,7 +209,7 @@ func (e *elasticsearchExporter) pushMetricsData( var validationErrs []error // log instead of returning these so that upstream does not retry scopeMetrics := scopeMetrics.At(j) scope := scopeMetrics.Scope() - groupedDataPointsByIndex := make(map[string]map[uint32][]dataPoint) + groupedDataPointsByIndex := make(map[esIndex]map[uint32][]dataPoint) for k := 0; k < scopeMetrics.Metrics().Len(); k++ { metric := scopeMetrics.Metrics().At(k) @@ -293,13 +293,13 @@ func (e *elasticsearchExporter) pushMetricsData( for fIndex, groupedDataPoints := range groupedDataPointsByIndex { for _, dataPoints := range groupedDataPoints { buf := e.bufferPool.NewPooledBuffer() - dynamicTemplates, err := e.model.encodeMetrics(resource, resourceMetric.SchemaUrl(), scope, scopeMetrics.SchemaUrl(), dataPoints, &validationErrs, buf.Buffer) + dynamicTemplates, err := e.model.encodeMetrics(resource, resourceMetric.SchemaUrl(), scope, scopeMetrics.SchemaUrl(), dataPoints, &validationErrs, fIndex, buf.Buffer) if err != nil { buf.Recycle() errs = append(errs, err) continue } - if err := session.Add(ctx, fIndex, buf, dynamicTemplates); err != nil { + if err := session.Add(ctx, fIndex.Index, buf, dynamicTemplates); err != nil { // not recycling after Add returns an error as we don't know if it's already recycled if cerr := ctx.Err(); cerr != nil { return cerr @@ -327,18 +327,18 @@ func (e *elasticsearchExporter) getMetricDataPointIndex( resource pcommon.Resource, scope pcommon.InstrumentationScope, dataPoint dataPoint, -) (string, error) { - fIndex := e.index +) (esIndex, error) { + fIndex := esIndex{Index: e.index} if e.dynamicIndex { - fIndex = routeDataPoint(dataPoint.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, e.otel, scope.Name()) + fIndex = routeDataPoint(dataPoint.Attributes(), scope.Attributes(), resource.Attributes(), e.index, e.otel, scope.Name()) } if e.logstashFormat.Enabled { - formattedIndex, err := generateIndexWithLogstashFormat(fIndex, &e.logstashFormat, time.Now()) + formattedIndex, err := generateIndexWithLogstashFormat(fIndex.Index, &e.logstashFormat, time.Now()) if err != nil { - return "", err + return esIndex{}, err } - fIndex = formattedIndex + fIndex = esIndex{Index: formattedIndex} } return fIndex, nil } @@ -402,27 +402,27 @@ func (e *elasticsearchExporter) pushTraceRecord( scopeSchemaURL string, bulkIndexerSession bulkIndexerSession, ) error { - fIndex := e.index + fIndex := esIndex{Index: e.index} if e.dynamicIndex { - fIndex = routeSpan(span.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, e.otel, span.Name()) + fIndex = routeSpan(span.Attributes(), scope.Attributes(), resource.Attributes(), e.index, e.otel, span.Name()) } if e.logstashFormat.Enabled { - formattedIndex, err := generateIndexWithLogstashFormat(fIndex, &e.logstashFormat, time.Now()) + formattedIndex, err := generateIndexWithLogstashFormat(fIndex.Index, &e.logstashFormat, time.Now()) if err != nil { return err } - fIndex = formattedIndex + fIndex = esIndex{Index: formattedIndex} } buf := e.bufferPool.NewPooledBuffer() - err := e.model.encodeSpan(resource, resourceSchemaURL, span, scope, scopeSchemaURL, buf.Buffer) + err := e.model.encodeSpan(resource, resourceSchemaURL, span, scope, scopeSchemaURL, fIndex, buf.Buffer) if err != nil { buf.Recycle() return fmt.Errorf("failed to encode trace record: %w", err) } // not recycling after Add returns an error as we don't know if it's already recycled - return bulkIndexerSession.Add(ctx, fIndex, buf, nil) + return bulkIndexerSession.Add(ctx, fIndex.Index, buf, nil) } func (e *elasticsearchExporter) pushSpanEvent( @@ -435,24 +435,24 @@ func (e *elasticsearchExporter) pushSpanEvent( scopeSchemaURL string, bulkIndexerSession bulkIndexerSession, ) error { - fIndex := e.index + fIndex := esIndex{Index: e.index} if e.dynamicIndex { - fIndex = routeSpanEvent(spanEvent.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, e.otel, scope.Name()) + fIndex = routeSpanEvent(spanEvent.Attributes(), scope.Attributes(), resource.Attributes(), e.index, e.otel, scope.Name()) } if e.logstashFormat.Enabled { - formattedIndex, err := generateIndexWithLogstashFormat(fIndex, &e.logstashFormat, time.Now()) + formattedIndex, err := generateIndexWithLogstashFormat(fIndex.Index, &e.logstashFormat, time.Now()) if err != nil { return err } - fIndex = formattedIndex + fIndex = esIndex{Index: formattedIndex} } buf := e.bufferPool.NewPooledBuffer() - e.model.encodeSpanEvent(resource, resourceSchemaURL, span, spanEvent, scope, scopeSchemaURL, buf.Buffer) + e.model.encodeSpanEvent(resource, resourceSchemaURL, span, spanEvent, scope, scopeSchemaURL, fIndex, buf.Buffer) if buf.Buffer.Len() == 0 { buf.Recycle() return nil } // not recycling after Add returns an error as we don't know if it's already recycled - return bulkIndexerSession.Add(ctx, fIndex, buf, nil) + return bulkIndexerSession.Add(ctx, fIndex.Index, buf, nil) } diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index dbb8ec3ed0ce8..b045ccb325d14 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -1197,19 +1197,19 @@ func TestExporterMetrics(t *testing.T) { expected := []itemRequest{ { Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.metric.foo":"histogram"}}}`), - Document: []byte(`{"@timestamp":"0.0","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"attributes":{},"metrics":{"metric.foo":{"counts":[1,2,3,4],"values":[0.5,1.5,2.5,3.0]}},"resource":{},"scope":{}}`), + Document: []byte(`{"@timestamp":"0.0","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.foo":{"counts":[1,2,3,4],"values":[0.5,1.5,2.5,3.0]}},"resource":{},"scope":{}}`), }, { Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.metric.foo":"histogram"}}}`), - Document: []byte(`{"@timestamp":"3600000.0","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"attributes":{},"metrics":{"metric.foo":{"counts":[4,5,6,7],"values":[2.0,4.5,5.5,6.0]}},"resource":{},"scope":{}}`), + Document: []byte(`{"@timestamp":"3600000.0","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.foo":{"counts":[4,5,6,7],"values":[2.0,4.5,5.5,6.0]}},"resource":{},"scope":{}}`), }, { Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.metric.sum":"gauge_double"}}}`), - Document: []byte(`{"@timestamp":"3600000.0","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"attributes":{},"metrics":{"metric.sum":1.5},"resource":{},"scope":{},"start_timestamp":"7200000.0"}`), + Document: []byte(`{"@timestamp":"3600000.0","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.sum":1.5},"resource":{},"scope":{},"start_timestamp":"7200000.0"}`), }, { Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.metric.summary":"summary"}}}`), - Document: []byte(`{"@timestamp":"10800000.0","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"attributes":{},"metrics":{"metric.summary":{"sum":1.5,"value_count":1}},"resource":{},"scope":{},"start_timestamp":"10800000.0"}`), + Document: []byte(`{"@timestamp":"10800000.0","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.summary":{"sum":1.5,"value_count":1}},"resource":{},"scope":{},"start_timestamp":"10800000.0"}`), }, } @@ -1278,7 +1278,7 @@ func TestExporterMetrics(t *testing.T) { expected := []itemRequest{ { Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.sum":"gauge_long","metrics.summary":"summary"}}}`), - Document: []byte(`{"@timestamp":"0.0","_doc_count":10,"data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"attributes":{},"metrics":{"sum":0,"summary":{"sum":1.0,"value_count":10}},"resource":{},"scope":{}}`), + Document: []byte(`{"@timestamp":"0.0","_doc_count":10,"data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"sum":0,"summary":{"sum":1.0,"value_count":10}},"resource":{},"scope":{}}`), }, } @@ -1371,7 +1371,7 @@ func TestExporterMetrics(t *testing.T) { expected := []itemRequest{ { Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.foo.bar":"gauge_long","metrics.foo":"gauge_long","metrics.foo.bar.baz":"gauge_long"}}}`), - Document: []byte(`{"@timestamp":"0.0","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"attributes":{},"metrics":{"foo":0,"foo.bar":0,"foo.bar.baz":0},"resource":{},"scope":{}}`), + Document: []byte(`{"@timestamp":"0.0","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"foo":0,"foo.bar":0,"foo.bar.baz":0},"resource":{},"scope":{}}`), }, } @@ -1868,6 +1868,7 @@ func mustSendLogRecords(t *testing.T, exporter exporter.Logs, records ...plog.Lo } func mustSendLogs(t *testing.T, exporter exporter.Logs, logs plog.Logs) { + logs.MarkReadOnly() err := exporter.ConsumeLogs(context.Background(), logs) require.NoError(t, err) } @@ -1897,6 +1898,7 @@ func mustSendMetricGaugeDataPoints(t *testing.T, exporter exporter.Metrics, data } func mustSendMetrics(t *testing.T, exporter exporter.Metrics, metrics pmetric.Metrics) { + metrics.MarkReadOnly() err := exporter.ConsumeMetrics(context.Background(), metrics) require.NoError(t, err) } @@ -1912,6 +1914,7 @@ func mustSendSpans(t *testing.T, exporter exporter.Traces, spans ...ptrace.Span) } func mustSendTraces(t *testing.T, exporter exporter.Traces, traces ptrace.Traces) { + traces.MarkReadOnly() err := exporter.ConsumeTraces(context.Background(), traces) require.NoError(t, err) } diff --git a/exporter/elasticsearchexporter/factory.go b/exporter/elasticsearchexporter/factory.go index 887e00cc63ed2..755a4e3d241bd 100644 --- a/exporter/elasticsearchexporter/factory.go +++ b/exporter/elasticsearchexporter/factory.go @@ -166,7 +166,7 @@ func exporterhelperOptions( shutdown component.ShutdownFunc, ) []exporterhelper.Option { opts := []exporterhelper.Option{ - exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}), + exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), exporterhelper.WithStart(start), exporterhelper.WithShutdown(shutdown), exporterhelper.WithQueue(cfg.QueueSettings), diff --git a/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go b/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go index 89581547127e3..cc78984acb3df 100644 --- a/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go +++ b/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go @@ -65,11 +65,12 @@ func benchmarkLogs(b *testing.B, batchSize int, mappingMode string) { require.NoError(b, err) require.NoError(b, exporter.Start(ctx, componenttest.NewNopHost())) + logs, _ := runnerCfg.provider.GenerateLogs() + logs.MarkReadOnly() b.ReportAllocs() b.ResetTimer() b.StopTimer() for i := 0; i < b.N; i++ { - logs, _ := runnerCfg.provider.GenerateLogs() b.StartTimer() require.NoError(b, exporter.ConsumeLogs(ctx, logs)) b.StopTimer() @@ -94,11 +95,12 @@ func benchmarkMetrics(b *testing.B, batchSize int, mappingMode string) { require.NoError(b, err) require.NoError(b, exporter.Start(ctx, componenttest.NewNopHost())) + metrics, _ := runnerCfg.provider.GenerateMetrics() + metrics.MarkReadOnly() b.ReportAllocs() b.ResetTimer() b.StopTimer() for i := 0; i < b.N; i++ { - metrics, _ := runnerCfg.provider.GenerateMetrics() b.StartTimer() require.NoError(b, exporter.ConsumeMetrics(ctx, metrics)) b.StopTimer() @@ -123,11 +125,12 @@ func benchmarkTraces(b *testing.B, batchSize int, mappingMode string) { require.NoError(b, err) require.NoError(b, exporter.Start(ctx, componenttest.NewNopHost())) + traces, _ := runnerCfg.provider.GenerateTraces() + traces.MarkReadOnly() b.ReportAllocs() b.ResetTimer() b.StopTimer() for i := 0; i < b.N; i++ { - traces, _ := runnerCfg.provider.GenerateTraces() b.StartTimer() require.NoError(b, exporter.ConsumeTraces(ctx, traces)) b.StopTimer() diff --git a/exporter/elasticsearchexporter/model.go b/exporter/elasticsearchexporter/model.go index b18c9d2f2917b..1c5bf8a71967f 100644 --- a/exporter/elasticsearchexporter/model.go +++ b/exporter/elasticsearchexporter/model.go @@ -75,12 +75,12 @@ var resourceAttrsToPreserve = map[string]bool{ var ErrInvalidTypeForBodyMapMode = errors.New("invalid log record body type for 'bodymap' mapping mode") type mappingModel interface { - encodeLog(pcommon.Resource, string, plog.LogRecord, pcommon.InstrumentationScope, string, *bytes.Buffer) error - encodeSpan(pcommon.Resource, string, ptrace.Span, pcommon.InstrumentationScope, string, *bytes.Buffer) error - encodeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, scope pcommon.InstrumentationScope, scopeSchemaURL string, buf *bytes.Buffer) + encodeLog(pcommon.Resource, string, plog.LogRecord, pcommon.InstrumentationScope, string, esIndex, *bytes.Buffer) error + encodeSpan(pcommon.Resource, string, ptrace.Span, pcommon.InstrumentationScope, string, esIndex, *bytes.Buffer) error + encodeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, scope pcommon.InstrumentationScope, scopeSchemaURL string, idx esIndex, buf *bytes.Buffer) hashDataPoint(dataPoint) uint32 encodeDocument(objmodel.Document, *bytes.Buffer) error - encodeMetrics(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, dataPoints []dataPoint, validationErrors *[]error, buf *bytes.Buffer) (map[string]string, error) + encodeMetrics(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, dataPoints []dataPoint, validationErrors *[]error, idx esIndex, buf *bytes.Buffer) (map[string]string, error) } // encodeModel tries to keep the event as close to the original open telemetry semantics as is. @@ -111,24 +111,24 @@ const ( attributeField = "attribute" ) -func (m *encodeModel) encodeLog(resource pcommon.Resource, resourceSchemaURL string, record plog.LogRecord, scope pcommon.InstrumentationScope, scopeSchemaURL string, buf *bytes.Buffer) error { +func (m *encodeModel) encodeLog(resource pcommon.Resource, resourceSchemaURL string, record plog.LogRecord, scope pcommon.InstrumentationScope, scopeSchemaURL string, idx esIndex, buf *bytes.Buffer) error { var document objmodel.Document switch m.mode { case MappingECS: - document = m.encodeLogECSMode(resource, record, scope) + document = m.encodeLogECSMode(resource, record, scope, idx) case MappingOTel: - return serializeLog(resource, resourceSchemaURL, scope, scopeSchemaURL, record, buf) + return serializeLog(resource, resourceSchemaURL, scope, scopeSchemaURL, record, idx, buf) case MappingBodyMap: return m.encodeLogBodyMapMode(record, buf) default: - document = m.encodeLogDefaultMode(resource, record, scope) + document = m.encodeLogDefaultMode(resource, record, scope, idx) } document.Dedup() return document.Serialize(buf, m.dedot) } -func (m *encodeModel) encodeLogDefaultMode(resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) objmodel.Document { +func (m *encodeModel) encodeLogDefaultMode(resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope, idx esIndex) objmodel.Document { var document objmodel.Document docTimeStamp := record.Timestamp() @@ -142,7 +142,7 @@ func (m *encodeModel) encodeLogDefaultMode(resource pcommon.Resource, record plo document.AddString("SeverityText", record.SeverityText()) document.AddInt("SeverityNumber", int64(record.SeverityNumber())) document.AddAttribute("Body", record.Body()) - m.encodeAttributes(&document, record.Attributes()) + m.encodeAttributes(&document, record.Attributes(), idx) document.AddAttributes("Resource", resource.Attributes()) document.AddAttributes("Scope", scopeToAttributes(scope)) @@ -159,7 +159,7 @@ func (m *encodeModel) encodeLogBodyMapMode(record plog.LogRecord, buf *bytes.Buf return nil } -func (m *encodeModel) encodeLogECSMode(resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) objmodel.Document { +func (m *encodeModel) encodeLogECSMode(resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope, idx esIndex) objmodel.Document { var document objmodel.Document // First, try to map resource-level attributes to ECS fields. @@ -180,6 +180,7 @@ func (m *encodeModel) encodeLogECSMode(resource pcommon.Resource, record plog.Lo semconv.AttributeExceptionEscaped: "event.error.exception.handled", } encodeAttributesECSMode(&document, record.Attributes(), recordAttrsConversionMap, resourceAttrsToPreserve) + addDataStreamAttributes(&document, "", idx) // Handle special cases. encodeLogAgentNameECSMode(&document, resource) @@ -222,12 +223,13 @@ func (m *encodeModel) hashDataPoint(dp dataPoint) uint32 { } } -func (m *encodeModel) encodeDataPointsECSMode(resource pcommon.Resource, dataPoints []dataPoint, validationErrors *[]error, buf *bytes.Buffer) (map[string]string, error) { +func (m *encodeModel) encodeDataPointsECSMode(resource pcommon.Resource, dataPoints []dataPoint, validationErrors *[]error, idx esIndex, buf *bytes.Buffer) (map[string]string, error) { dp0 := dataPoints[0] var document objmodel.Document encodeAttributesECSMode(&document, resource.Attributes(), resourceAttrsConversionMap, resourceAttrsToPreserve) document.AddTimestamp("@timestamp", dp0.Timestamp()) document.AddAttributes("", dp0.Attributes()) + addDataStreamAttributes(&document, "", idx) for _, dp := range dataPoints { value, err := dp.Value() @@ -242,12 +244,20 @@ func (m *encodeModel) encodeDataPointsECSMode(resource pcommon.Resource, dataPoi return document.DynamicTemplates(), err } -func (m *encodeModel) encodeMetrics(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, dataPoints []dataPoint, validationErrors *[]error, buf *bytes.Buffer) (map[string]string, error) { +func addDataStreamAttributes(document *objmodel.Document, key string, idx esIndex) { + if idx.isDataStream() { + document.AddString(key+"data_stream.type", idx.Type) + document.AddString(key+"data_stream.dataset", idx.Dataset) + document.AddString(key+"data_stream.namespace", idx.Namespace) + } +} + +func (m *encodeModel) encodeMetrics(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, dataPoints []dataPoint, validationErrors *[]error, idx esIndex, buf *bytes.Buffer) (map[string]string, error) { switch m.mode { case MappingOTel: - return serializeMetrics(resource, resourceSchemaURL, scope, scopeSchemaURL, dataPoints, validationErrors, buf) + return serializeMetrics(resource, resourceSchemaURL, scope, scopeSchemaURL, dataPoints, validationErrors, idx, buf) default: - return m.encodeDataPointsECSMode(resource, dataPoints, validationErrors, buf) + return m.encodeDataPointsECSMode(resource, dataPoints, validationErrors, idx, buf) } } @@ -483,20 +493,20 @@ func (dp numberDataPoint) Metric() pmetric.Metric { var errInvalidNumberDataPoint = errors.New("invalid number data point") -func (m *encodeModel) encodeSpan(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, scope pcommon.InstrumentationScope, scopeSchemaURL string, buf *bytes.Buffer) error { +func (m *encodeModel) encodeSpan(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, scope pcommon.InstrumentationScope, scopeSchemaURL string, idx esIndex, buf *bytes.Buffer) error { var document objmodel.Document switch m.mode { case MappingOTel: - return serializeSpan(resource, resourceSchemaURL, scope, scopeSchemaURL, span, buf) + return serializeSpan(resource, resourceSchemaURL, scope, scopeSchemaURL, span, idx, buf) default: - document = m.encodeSpanDefaultMode(resource, span, scope) + document = m.encodeSpanDefaultMode(resource, span, scope, idx) } document.Dedup() err := document.Serialize(buf, m.dedot) return err } -func (m *encodeModel) encodeSpanDefaultMode(resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) objmodel.Document { +func (m *encodeModel) encodeSpanDefaultMode(resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope, idx esIndex) objmodel.Document { var document objmodel.Document document.AddTimestamp("@timestamp", span.StartTimestamp()) // We use @timestamp in order to ensure that we can index if the default data stream logs template is used. document.AddTimestamp("EndTimestamp", span.EndTimestamp()) @@ -508,7 +518,7 @@ func (m *encodeModel) encodeSpanDefaultMode(resource pcommon.Resource, span ptra document.AddInt("TraceStatus", int64(span.Status().Code())) document.AddString("TraceStatusDescription", span.Status().Message()) document.AddString("Link", spanLinksToString(span.Links())) - m.encodeAttributes(&document, span.Attributes()) + m.encodeAttributes(&document, span.Attributes(), idx) document.AddAttributes("Resource", resource.Attributes()) m.encodeEvents(&document, span.Events()) document.AddInt("Duration", durationAsMicroseconds(span.StartTimestamp().AsTime(), span.EndTimestamp().AsTime())) // unit is microseconds @@ -516,21 +526,22 @@ func (m *encodeModel) encodeSpanDefaultMode(resource pcommon.Resource, span ptra return document } -func (m *encodeModel) encodeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, scope pcommon.InstrumentationScope, scopeSchemaURL string, buf *bytes.Buffer) { +func (m *encodeModel) encodeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, scope pcommon.InstrumentationScope, scopeSchemaURL string, idx esIndex, buf *bytes.Buffer) { if m.mode != MappingOTel { // Currently span events are stored separately only in OTel mapping mode. // In other modes, they are stored within the span document. return } - serializeSpanEvent(resource, resourceSchemaURL, scope, scopeSchemaURL, span, spanEvent, buf) + serializeSpanEvent(resource, resourceSchemaURL, scope, scopeSchemaURL, span, spanEvent, idx, buf) } -func (m *encodeModel) encodeAttributes(document *objmodel.Document, attributes pcommon.Map) { +func (m *encodeModel) encodeAttributes(document *objmodel.Document, attributes pcommon.Map, idx esIndex) { key := "Attributes" if m.mode == MappingRaw { key = "" } document.AddAttributes(key, attributes) + addDataStreamAttributes(document, key, idx) } func (m *encodeModel) encodeEvents(document *objmodel.Document, events ptrace.SpanEventSlice) { diff --git a/exporter/elasticsearchexporter/model_test.go b/exporter/elasticsearchexporter/model_test.go index bdafe4b60459c..2975d877cd1a6 100644 --- a/exporter/elasticsearchexporter/model_test.go +++ b/exporter/elasticsearchexporter/model_test.go @@ -57,7 +57,7 @@ func TestEncodeSpan(t *testing.T) { model := &encodeModel{dedot: false} td := mockResourceSpans() var buf bytes.Buffer - err := model.encodeSpan(td.ResourceSpans().At(0).Resource(), "", td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0), td.ResourceSpans().At(0).ScopeSpans().At(0).Scope(), "", &buf) + err := model.encodeSpan(td.ResourceSpans().At(0).Resource(), "", td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0), td.ResourceSpans().At(0).ScopeSpans().At(0).Scope(), "", esIndex{}, &buf) assert.NoError(t, err) assert.Equal(t, expectedSpanBody, buf.String()) } @@ -68,7 +68,7 @@ func TestEncodeLog(t *testing.T) { td := mockResourceLogs() td.ScopeLogs().At(0).LogRecords().At(0).SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Date(2023, 4, 19, 3, 4, 5, 6, time.UTC))) var buf bytes.Buffer - err := model.encodeLog(td.Resource(), td.SchemaUrl(), td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), td.ScopeLogs().At(0).SchemaUrl(), &buf) + err := model.encodeLog(td.Resource(), td.SchemaUrl(), td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), td.ScopeLogs().At(0).SchemaUrl(), esIndex{}, &buf) assert.NoError(t, err) assert.Equal(t, expectedLogBody, buf.String()) }) @@ -77,7 +77,7 @@ func TestEncodeLog(t *testing.T) { model := &encodeModel{dedot: false} td := mockResourceLogs() var buf bytes.Buffer - err := model.encodeLog(td.Resource(), td.SchemaUrl(), td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), td.ScopeLogs().At(0).SchemaUrl(), &buf) + err := model.encodeLog(td.Resource(), td.SchemaUrl(), td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), td.ScopeLogs().At(0).SchemaUrl(), esIndex{}, &buf) assert.NoError(t, err) assert.Equal(t, expectedLogBodyWithEmptyTimestamp, buf.String()) }) @@ -87,7 +87,7 @@ func TestEncodeLog(t *testing.T) { td := mockResourceLogs() td.Resource().Attributes().PutStr("foo.bar", "baz") var buf bytes.Buffer - err := model.encodeLog(td.Resource(), td.SchemaUrl(), td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), td.ScopeLogs().At(0).SchemaUrl(), &buf) + err := model.encodeLog(td.Resource(), td.SchemaUrl(), td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), td.ScopeLogs().At(0).SchemaUrl(), esIndex{}, &buf) require.NoError(t, err) require.Equal(t, expectedLogBodyDeDottedWithEmptyTimestamp, buf.String()) }) @@ -124,7 +124,7 @@ func TestEncodeMetric(t *testing.T) { for _, dataPoints := range groupedDataPoints { var buf bytes.Buffer errors := make([]error, 0) - _, err := model.encodeMetrics(rm.Resource(), rm.SchemaUrl(), sm.Scope(), sm.SchemaUrl(), dataPoints, &errors, &buf) + _, err := model.encodeMetrics(rm.Resource(), rm.SchemaUrl(), sm.Scope(), sm.SchemaUrl(), dataPoints, &errors, esIndex{}, &buf) require.Empty(t, errors, err) require.NoError(t, err) docsBytes = append(docsBytes, buf.Bytes()) @@ -151,6 +151,7 @@ func createTestMetrics(t *testing.T) pmetric.Metrics { require.NoError(t, err) metrics, err := metricsUnmarshaler.UnmarshalMetrics(metricBytes) require.NoError(t, err) + metrics.MarkReadOnly() return metrics } @@ -192,6 +193,7 @@ func mockResourceSpans() ptrace.Traces { event.SetTimestamp(pcommon.NewTimestampFromTime(tStart)) event.Attributes().PutStr("eventMockFoo", "foo") event.Attributes().PutStr("eventMockBar", "bar") + traces.MarkReadOnly() return traces } @@ -251,7 +253,7 @@ func TestEncodeAttributes(t *testing.T) { } doc := objmodel.Document{} - m.encodeAttributes(&doc, attributes) + m.encodeAttributes(&doc, attributes, esIndex{}) require.Equal(t, test.want(), doc) }) } @@ -312,7 +314,8 @@ func TestEncodeEvents(t *testing.T) { } func TestEncodeLogECSModeDuplication(t *testing.T) { - resource := pcommon.NewResource() + logs := plog.NewLogs() + resource := logs.ResourceLogs().AppendEmpty().Resource() err := resource.Attributes().FromRaw(map[string]any{ semconv.AttributeServiceName: "foo.bar", semconv.AttributeHostName: "localhost", @@ -340,20 +343,22 @@ func TestEncodeLogECSModeDuplication(t *testing.T) { require.NoError(t, err) observedTimestamp := pcommon.Timestamp(1710273641123456789) record.SetObservedTimestamp(observedTimestamp) + logs.MarkReadOnly() m := encodeModel{ mode: MappingECS, dedot: true, } var buf bytes.Buffer - err = m.encodeLog(resource, "", record, scope, "", &buf) + err = m.encodeLog(resource, "", record, scope, "", esIndex{}, &buf) require.NoError(t, err) assert.Equal(t, want, buf.String()) } func TestEncodeLogECSMode(t *testing.T) { - resource := pcommon.NewResource() + logs := plog.NewLogs() + resource := logs.ResourceLogs().AppendEmpty().Resource() err := resource.Attributes().FromRaw(map[string]any{ semconv.AttributeServiceName: "foo.bar", semconv.AttributeServiceVersion: "1.1.0", @@ -415,10 +420,11 @@ func TestEncodeLogECSMode(t *testing.T) { require.NoError(t, err) observedTimestamp := pcommon.Timestamp(1710273641123456789) record.SetObservedTimestamp(observedTimestamp) + logs.MarkReadOnly() var buf bytes.Buffer m := encodeModel{} - doc := m.encodeLogECSMode(resource, record, scope) + doc := m.encodeLogECSMode(resource, record, scope, esIndex{}) require.NoError(t, doc.Serialize(&buf, false)) require.JSONEq(t, `{ @@ -530,7 +536,8 @@ func TestEncodeLogECSModeAgentName(t *testing.T) { for name, test := range tests { t.Run(name, func(t *testing.T) { - resource := pcommon.NewResource() + logs := plog.NewLogs() + resource := logs.ResourceLogs().AppendEmpty().Resource() scope := pcommon.NewInstrumentationScope() record := plog.NewLogRecord() @@ -546,10 +553,11 @@ func TestEncodeLogECSModeAgentName(t *testing.T) { timestamp := pcommon.Timestamp(1710373859123456789) record.SetTimestamp(timestamp) + logs.MarkReadOnly() var buf bytes.Buffer m := encodeModel{} - doc := m.encodeLogECSMode(resource, record, scope) + doc := m.encodeLogECSMode(resource, record, scope, esIndex{}) require.NoError(t, doc.Serialize(&buf, false)) require.JSONEq(t, fmt.Sprintf(`{ "@timestamp": "2024-03-13T23:50:59.123456789Z", @@ -585,7 +593,8 @@ func TestEncodeLogECSModeAgentVersion(t *testing.T) { for name, test := range tests { t.Run(name, func(t *testing.T) { - resource := pcommon.NewResource() + logs := plog.NewLogs() + resource := logs.ResourceLogs().AppendEmpty().Resource() scope := pcommon.NewInstrumentationScope() record := plog.NewLogRecord() @@ -598,10 +607,11 @@ func TestEncodeLogECSModeAgentVersion(t *testing.T) { timestamp := pcommon.Timestamp(1710373859123456789) record.SetTimestamp(timestamp) + logs.MarkReadOnly() var buf bytes.Buffer m := encodeModel{} - doc := m.encodeLogECSMode(resource, record, scope) + doc := m.encodeLogECSMode(resource, record, scope, esIndex{}) require.NoError(t, doc.Serialize(&buf, false)) if test.expectedAgentVersion == "" { @@ -692,7 +702,8 @@ func TestEncodeLogECSModeHostOSType(t *testing.T) { for name, test := range tests { t.Run(name, func(t *testing.T) { - resource := pcommon.NewResource() + logs := plog.NewLogs() + resource := logs.ResourceLogs().AppendEmpty().Resource() scope := pcommon.NewInstrumentationScope() record := plog.NewLogRecord() @@ -708,7 +719,8 @@ func TestEncodeLogECSModeHostOSType(t *testing.T) { var buf bytes.Buffer m := encodeModel{} - doc := m.encodeLogECSMode(resource, record, scope) + logs.MarkReadOnly() + doc := m.encodeLogECSMode(resource, record, scope, esIndex{}) require.NoError(t, doc.Serialize(&buf, false)) expectedJSON := `{"@timestamp":"2024-03-13T23:50:59.123456789Z", "agent.name":"otlp"` @@ -759,7 +771,7 @@ func TestEncodeLogECSModeTimestamps(t *testing.T) { var buf bytes.Buffer m := encodeModel{} - doc := m.encodeLogECSMode(resource, record, scope) + doc := m.encodeLogECSMode(resource, record, scope, esIndex{}) require.NoError(t, doc.Serialize(&buf, false)) require.JSONEq(t, fmt.Sprintf( @@ -1119,11 +1131,10 @@ func TestEncodeLogOtelMode(t *testing.T) { for _, tc := range tests { record, scope, resource := createTestOTelLogRecord(t, tc.rec) - // This sets the data_stream values default or derived from the record/scope/resources - routeLogRecord(record.Attributes(), scope.Attributes(), resource.Attributes(), "", true, scope.Name()) + idx := routeLogRecord(record.Attributes(), scope.Attributes(), resource.Attributes(), "", true, scope.Name()) var buf bytes.Buffer - err := m.encodeLog(resource, tc.rec.Resource.SchemaURL, record, scope, tc.rec.Scope.SchemaURL, &buf) + err := m.encodeLog(resource, tc.rec.Resource.SchemaURL, record, scope, tc.rec.Scope.SchemaURL, idx, &buf) require.NoError(t, err) want := tc.rec @@ -1254,7 +1265,7 @@ func TestEncodeLogScalarObjectConflict(t *testing.T) { td.ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("foo", "scalar") td.ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("foo.bar", "baz") var buf bytes.Buffer - err := model.encodeLog(td.Resource(), "", td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), "", &buf) + err := model.encodeLog(td.Resource(), "", td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), "", esIndex{}, &buf) assert.NoError(t, err) encoded := buf.Bytes() @@ -1268,7 +1279,7 @@ func TestEncodeLogScalarObjectConflict(t *testing.T) { // If there is an attribute named "foo.value", then "foo" would be omitted rather than renamed. td.ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("foo.value", "foovalue") buf = bytes.Buffer{} - err = model.encodeLog(td.Resource(), "", td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), "", &buf) + err = model.encodeLog(td.Resource(), "", td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), "", esIndex{}, &buf) assert.NoError(t, err) encoded = buf.Bytes() diff --git a/exporter/elasticsearchexporter/pdata_serializer.go b/exporter/elasticsearchexporter/pdata_serializer.go index 5fbcc5c91fa0a..5c59ef8147474 100644 --- a/exporter/elasticsearchexporter/pdata_serializer.go +++ b/exporter/elasticsearchexporter/pdata_serializer.go @@ -19,7 +19,7 @@ import ( const tsLayout = "2006-01-02T15:04:05.000000000Z" -func serializeMetrics(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, dataPoints []dataPoint, validationErrors *[]error, buf *bytes.Buffer) (map[string]string, error) { +func serializeMetrics(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, dataPoints []dataPoint, validationErrors *[]error, idx esIndex, buf *bytes.Buffer) (map[string]string, error) { if len(dataPoints) == 0 { return nil, nil } @@ -35,7 +35,7 @@ func serializeMetrics(resource pcommon.Resource, resourceSchemaURL string, scope writeTimestampField(v, "start_timestamp", dp0.StartTimestamp()) } writeStringFieldSkipDefault(v, "unit", dp0.Metric().Unit()) - writeDataStream(v, dp0.Attributes()) + writeDataStream(v, idx) writeAttributes(v, dp0.Attributes(), true) writeResource(v, resource, resourceSchemaURL, true) writeScope(v, scope, scopeSchemaURL, true) @@ -92,14 +92,14 @@ func serializeDataPoints(v *json.Visitor, dataPoints []dataPoint, validationErro return dynamicTemplates } -func serializeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, buf *bytes.Buffer) { +func serializeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, idx esIndex, buf *bytes.Buffer) { v := json.NewVisitor(buf) // Enable ExplicitRadixPoint such that 1.0 is encoded as 1.0 instead of 1. // This is required to generate the correct dynamic mapping in ES. v.SetExplicitRadixPoint(true) _ = v.OnObjectStart(-1, structform.AnyType) writeTimestampField(v, "@timestamp", spanEvent.Timestamp()) - writeDataStream(v, spanEvent.Attributes()) + writeDataStream(v, idx) writeTraceIDField(v, span.TraceID()) writeSpanIDField(v, "span_id", span.SpanID()) writeIntFieldSkipDefault(v, "dropped_attributes_count", int64(spanEvent.DroppedAttributesCount())) @@ -119,14 +119,14 @@ func serializeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, sco _ = v.OnObjectFinished() } -func serializeSpan(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, span ptrace.Span, buf *bytes.Buffer) error { +func serializeSpan(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, span ptrace.Span, idx esIndex, buf *bytes.Buffer) error { v := json.NewVisitor(buf) // Enable ExplicitRadixPoint such that 1.0 is encoded as 1.0 instead of 1. // This is required to generate the correct dynamic mapping in ES. v.SetExplicitRadixPoint(true) _ = v.OnObjectStart(-1, structform.AnyType) writeTimestampField(v, "@timestamp", span.StartTimestamp()) - writeDataStream(v, span.Attributes()) + writeDataStream(v, idx) writeTraceIDField(v, span.TraceID()) writeSpanIDField(v, "span_id", span.SpanID()) writeStringFieldSkipDefault(v, "trace_state", span.TraceState().AsRaw()) @@ -179,7 +179,7 @@ func serializeMap(m pcommon.Map, buf *bytes.Buffer) { writeMap(v, m, false) } -func serializeLog(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, record plog.LogRecord, buf *bytes.Buffer) error { +func serializeLog(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, record plog.LogRecord, idx esIndex, buf *bytes.Buffer) error { v := json.NewVisitor(buf) // Enable ExplicitRadixPoint such that 1.0 is encoded as 1.0 instead of 1. // This is required to generate the correct dynamic mapping in ES. @@ -191,7 +191,7 @@ func serializeLog(resource pcommon.Resource, resourceSchemaURL string, scope pco } writeTimestampField(v, "@timestamp", docTimeStamp) writeTimestampField(v, "observed_timestamp", record.ObservedTimestamp()) - writeDataStream(v, record.Attributes()) + writeDataStream(v, idx) writeStringFieldSkipDefault(v, "severity_text", record.SeverityText()) writeIntFieldSkipDefault(v, "severity_number", int64(record.SeverityNumber())) writeTraceIDField(v, record.TraceID()) @@ -213,16 +213,15 @@ func serializeLog(resource pcommon.Resource, resourceSchemaURL string, scope pco return nil } -func writeDataStream(v *json.Visitor, attributes pcommon.Map) { +func writeDataStream(v *json.Visitor, idx esIndex) { + if !idx.isDataStream() { + return + } _ = v.OnKey("data_stream") _ = v.OnObjectStart(-1, structform.AnyType) - attributes.Range(func(k string, val pcommon.Value) bool { - if strings.HasPrefix(k, "data_stream.") && val.Type() == pcommon.ValueTypeStr { - writeStringFieldSkipDefault(v, k[12:], val.Str()) - } - return true - }) - + writeStringFieldSkipDefault(v, "type", idx.Type) + writeStringFieldSkipDefault(v, "dataset", idx.Dataset) + writeStringFieldSkipDefault(v, "namespace", idx.Namespace) _ = v.OnObjectFinished() } diff --git a/exporter/elasticsearchexporter/pdata_serializer_test.go b/exporter/elasticsearchexporter/pdata_serializer_test.go index 85ba952d140f0..6131ebbc6ee16 100644 --- a/exporter/elasticsearchexporter/pdata_serializer_test.go +++ b/exporter/elasticsearchexporter/pdata_serializer_test.go @@ -42,10 +42,7 @@ func TestSerializeLog(t *testing.T) { }, wantErr: false, expected: map[string]any{ "@timestamp": "0.0", "observed_timestamp": "0.0", - "data_stream": map[string]any{ - "type": "logs", - }, - "severity_text": "debug", + "severity_text": "debug", "resource": map[string]any{ "attributes": map[string]any{ "resource_map": map[string]any{ @@ -86,7 +83,6 @@ func TestSerializeLog(t *testing.T) { expected: map[string]any{ "@timestamp": "0.0", "observed_timestamp": "0.0", - "data_stream": map[string]any{}, "resource": map[string]any{}, "scope": map[string]any{}, "body": map[string]any{ @@ -103,7 +99,6 @@ func TestSerializeLog(t *testing.T) { expected: map[string]any{ "@timestamp": "0.0", "observed_timestamp": "0.0", - "data_stream": map[string]any{}, "resource": map[string]any{}, "scope": map[string]any{}, "body": map[string]any{ @@ -132,7 +127,6 @@ func TestSerializeLog(t *testing.T) { expected: map[string]any{ "@timestamp": "0.0", "observed_timestamp": "0.0", - "data_stream": map[string]any{}, "resource": map[string]any{}, "scope": map[string]any{}, "attributes": map[string]any{ @@ -159,7 +153,6 @@ func TestSerializeLog(t *testing.T) { "@timestamp": "0.0", "observed_timestamp": "0.0", "event_name": "bar", - "data_stream": map[string]any{}, "resource": map[string]any{}, "scope": map[string]any{}, "attributes": map[string]any{ @@ -176,7 +169,6 @@ func TestSerializeLog(t *testing.T) { expected: map[string]any{ "@timestamp": "1721314113467.654123", "observed_timestamp": "0.0", - "data_stream": map[string]any{}, "resource": map[string]any{}, "scope": map[string]any{}, }, @@ -184,13 +176,15 @@ func TestSerializeLog(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - resourceLogs := plog.NewResourceLogs() + logs := plog.NewLogs() + resourceLogs := logs.ResourceLogs().AppendEmpty() scopeLogs := resourceLogs.ScopeLogs().AppendEmpty() record := scopeLogs.LogRecords().AppendEmpty() tt.logCustomizer(resourceLogs.Resource(), scopeLogs.Scope(), record) + logs.MarkReadOnly() var buf bytes.Buffer - err := serializeLog(resourceLogs.Resource(), "", scopeLogs.Scope(), "", record, &buf) + err := serializeLog(resourceLogs.Resource(), "", scopeLogs.Scope(), "", record, esIndex{}, &buf) if (err != nil) != tt.wantErr { t.Errorf("serializeLog() error = %v, wantErr %v", err, tt.wantErr) } @@ -209,7 +203,8 @@ func TestSerializeLog(t *testing.T) { } func TestSerializeMetricsConflict(t *testing.T) { - resourceMetrics := pmetric.NewResourceMetrics() + metrics := pmetric.NewMetrics() + resourceMetrics := metrics.ResourceMetrics().AppendEmpty() scopeMetrics := resourceMetrics.ScopeMetrics().AppendEmpty() var dataPoints []dataPoint metric1 := scopeMetrics.Metrics().AppendEmpty() @@ -220,10 +215,11 @@ func TestSerializeMetricsConflict(t *testing.T) { dp.SetIntValue(42) dataPoints = append(dataPoints, newNumberDataPoint(m, dp)) } + metrics.MarkReadOnly() var validationErrors []error var buf bytes.Buffer - _, err := serializeMetrics(resourceMetrics.Resource(), "", scopeMetrics.Scope(), "", dataPoints, &validationErrors, &buf) + _, err := serializeMetrics(resourceMetrics.Resource(), "", scopeMetrics.Scope(), "", dataPoints, &validationErrors, esIndex{}, &buf) if err != nil { t.Errorf("serializeMetrics() error = %v", err) } @@ -240,10 +236,9 @@ func TestSerializeMetricsConflict(t *testing.T) { assert.Equal(t, fmt.Errorf("metric with name 'foo' has already been serialized in document with timestamp 1970-01-01T00:00:00.000000000Z"), validationErrors[0]) assert.Equal(t, map[string]any{ - "@timestamp": "0.0", - "data_stream": map[string]any{}, - "resource": map[string]any{}, - "scope": map[string]any{}, + "@timestamp": "0.0", + "resource": map[string]any{}, + "scope": map[string]any{}, "metrics": map[string]any{ "foo": json.Number("42"), },