diff --git a/.chloggen/elasticsearchexporter_otel-mode-traces.yaml b/.chloggen/elasticsearchexporter_otel-mode-traces.yaml new file mode 100644 index 000000000000..8796ac2161ba --- /dev/null +++ b/.chloggen/elasticsearchexporter_otel-mode-traces.yaml @@ -0,0 +1,28 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add OTel mapping mode for traces + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [34588, 34590] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + Add OTel mapping mode support for traces, without span events. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/.chloggen/elasticsearchexporter_otel-mode-update.yaml b/.chloggen/elasticsearchexporter_otel-mode-update.yaml new file mode 100644 index 000000000000..45026fd1ceb5 --- /dev/null +++ b/.chloggen/elasticsearchexporter_otel-mode-update.yaml @@ -0,0 +1,29 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Update OTel mapping mode for logs and metrics; Remove trace_flags + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [34472] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + Update logs and metrics OTel mapping mode to always emit "scope" and zero int, but not emit empty strings for known fields. + Breaking change to remove trace_flags from logs. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index 1ea18a46e754..339c7c637623 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -355,7 +355,7 @@ func (e *elasticsearchExporter) pushTraceData( spans := scopeSpan.Spans() for k := 0; k < spans.Len(); k++ { span := spans.At(k) - if err := e.pushTraceRecord(ctx, resource, span, scope, session); err != nil { + if err := e.pushTraceRecord(ctx, resource, il.SchemaUrl(), span, scope, scopeSpan.SchemaUrl(), session); err != nil { if cerr := ctx.Err(); cerr != nil { return cerr } @@ -377,8 +377,10 @@ func (e *elasticsearchExporter) pushTraceData( func (e *elasticsearchExporter) pushTraceRecord( ctx context.Context, resource pcommon.Resource, + resourceSchemaURL string, span ptrace.Span, scope pcommon.InstrumentationScope, + scopeSchemaURL string, bulkIndexerSession bulkIndexerSession, ) error { fIndex := e.index @@ -394,7 +396,7 @@ func (e *elasticsearchExporter) pushTraceRecord( fIndex = formattedIndex } - document, err := e.model.encodeSpan(resource, span, scope) + document, err := e.model.encodeSpan(resource, resourceSchemaURL, span, scope, scopeSchemaURL) if err != nil { return fmt.Errorf("failed to encode trace record: %w", err) } diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 6d1de038abb3..7c12cee62960 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -312,7 +312,7 @@ func TestExporterLogs(t *testing.T) { expected := []itemRequest{ { Action: []byte(`{"create":{"_index":"logs-attr.dataset.otel-resource.attribute.namespace"}}`), - Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"attr.foo":"attr.foo.value"},"data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"},"dropped_attributes_count":0,"schema_url":""},"severity_number":0,"trace_flags":0}`), + Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"attr.foo":"attr.foo.value"},"data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0}`), }, } @@ -821,19 +821,19 @@ func TestExporterMetrics(t *testing.T) { expected := []itemRequest{ { Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.metric.foo":"histogram"}}}`), - Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.foo":{"counts":[1,2,3,4],"values":[0.5,1.5,2.5,3]}},"resource":{"dropped_attributes_count":0,"schema_url":""}}`), + Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.foo":{"counts":[1,2,3,4],"values":[0.5,1.5,2.5,3]}},"resource":{"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0}}`), }, { Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.metric.foo":"histogram"}}}`), - Document: []byte(`{"@timestamp":"1970-01-01T01:00:00.000000000Z","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.foo":{"counts":[4,5,6,7],"values":[2,4.5,5.5,6]}},"resource":{"dropped_attributes_count":0,"schema_url":""}}`), + Document: []byte(`{"@timestamp":"1970-01-01T01:00:00.000000000Z","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.foo":{"counts":[4,5,6,7],"values":[2,4.5,5.5,6]}},"resource":{"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0}}`), }, { Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.metric.sum":"gauge_double"}}}`), - Document: []byte(`{"@timestamp":"1970-01-01T01:00:00.000000000Z","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.sum":1.5},"resource":{"dropped_attributes_count":0,"schema_url":""},"start_timestamp":"1970-01-01T02:00:00.000000000Z"}`), + Document: []byte(`{"@timestamp":"1970-01-01T01:00:00.000000000Z","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.sum":1.5},"resource":{"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"start_timestamp":"1970-01-01T02:00:00.000000000Z"}`), }, { Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.metric.summary":"summary_metrics"}}}`), - Document: []byte(`{"@timestamp":"1970-01-01T03:00:00.000000000Z","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.summary":{"sum":1.5,"value_count":1}},"resource":{"dropped_attributes_count":0,"schema_url":""},"start_timestamp":"1970-01-01T03:00:00.000000000Z"}`), + Document: []byte(`{"@timestamp":"1970-01-01T03:00:00.000000000Z","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.summary":{"sum":1.5,"value_count":1}},"resource":{"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"start_timestamp":"1970-01-01T03:00:00.000000000Z"}`), }, } @@ -1031,6 +1031,68 @@ func TestExporterTraces(t *testing.T) { )) rec.WaitItems(1) }) + + t.Run("otel mode", func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + return itemsAllOK(docs) + }) + + exporter := newTestTracesExporter(t, server.URL, func(cfg *Config) { + cfg.TracesDynamicIndex.Enabled = true + cfg.Mapping.Mode = "otel" + }) + + traces := ptrace.NewTraces() + resourceSpans := traces.ResourceSpans() + rs := resourceSpans.AppendEmpty() + + span := rs.ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.SetName("name") + span.SetTraceID(pcommon.NewTraceIDEmpty()) + span.SetSpanID(pcommon.NewSpanIDEmpty()) + span.SetFlags(1) + span.SetDroppedAttributesCount(2) + span.SetDroppedEventsCount(3) + span.SetDroppedLinksCount(4) + span.TraceState().FromRaw("foo") + span.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(3600, 0))) + span.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Unix(7200, 0))) + + scopeAttr := span.Attributes() + fillResourceAttributeMap(scopeAttr, map[string]string{ + "attr.foo": "attr.bar", + }) + + resAttr := rs.Resource().Attributes() + fillResourceAttributeMap(resAttr, map[string]string{ + "resource.foo": "resource.bar", + }) + + spanLink := span.Links().AppendEmpty() + spanLink.SetTraceID(pcommon.NewTraceIDEmpty()) + spanLink.SetSpanID(pcommon.NewSpanIDEmpty()) + spanLink.SetFlags(10) + spanLink.SetDroppedAttributesCount(11) + spanLink.TraceState().FromRaw("bar") + fillResourceAttributeMap(spanLink.Attributes(), map[string]string{ + "link.attr.foo": "link.attr.bar", + }) + + mustSendTraces(t, exporter, traces) + + rec.WaitItems(1) + + expected := []itemRequest{ + { + Action: []byte(`{"create":{"_index":"traces-generic.otel-default"}}`), + Document: []byte(`{"@timestamp":"1970-01-01T01:00:00.000000000Z","attributes":{"attr.foo":"attr.bar"},"data_stream":{"dataset":"generic.otel","namespace":"default","type":"traces"},"dropped_attributes_count":2,"dropped_events_count":3,"dropped_links_count":4,"duration":3600000000000,"kind":"Unspecified","links":[{"attributes":{"link.attr.foo":"link.attr.bar"},"dropped_attributes_count":11,"span_id":"","trace_id":"","trace_state":"bar"}],"name":"name","resource":{"attributes":{"resource.foo":"resource.bar"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"status":{"code":"Unset"},"trace_state":"foo"}`), + }, + } + + assertItemsEqual(t, expected, rec.Items(), false) + }) } // TestExporterAuth verifies that the Elasticsearch exporter supports diff --git a/exporter/elasticsearchexporter/model.go b/exporter/elasticsearchexporter/model.go index c98f412d7b03..bdf030bfc282 100644 --- a/exporter/elasticsearchexporter/model.go +++ b/exporter/elasticsearchexporter/model.go @@ -65,7 +65,7 @@ var resourceAttrsToPreserve = map[string]bool{ type mappingModel interface { encodeLog(pcommon.Resource, string, plog.LogRecord, pcommon.InstrumentationScope, string) ([]byte, error) - encodeSpan(pcommon.Resource, ptrace.Span, pcommon.InstrumentationScope) ([]byte, error) + encodeSpan(pcommon.Resource, string, ptrace.Span, pcommon.InstrumentationScope, string) ([]byte, error) upsertMetricDataPointValue(map[uint32]objmodel.Document, pcommon.Resource, string, pcommon.InstrumentationScope, string, pmetric.Metric, dataPoint, pcommon.Value) error encodeDocument(objmodel.Document) ([]byte, error) } @@ -131,8 +131,6 @@ func (m *encodeModel) encodeLogDefaultMode(resource pcommon.Resource, record plo return document } -var datastreamKeys = []string{dataStreamType, dataStreamDataset, dataStreamNamespace} - func (m *encodeModel) encodeLogOTelMode(resource pcommon.Resource, resourceSchemaURL string, record plog.LogRecord, scope pcommon.InstrumentationScope, scopeSchemaURL string) objmodel.Document { var document objmodel.Document @@ -146,76 +144,13 @@ func (m *encodeModel) encodeLogOTelMode(resource pcommon.Resource, resourceSchem document.AddTraceID("trace_id", record.TraceID()) document.AddSpanID("span_id", record.SpanID()) - document.AddInt("trace_flags", int64(record.Flags())) document.AddString("severity_text", record.SeverityText()) document.AddInt("severity_number", int64(record.SeverityNumber())) document.AddInt("dropped_attributes_count", int64(record.DroppedAttributesCount())) - // At this point the data_stream attributes are expected to be in the record attributes, - // updated by the router. - // Move them to the top of the document and remove them from the record - attributeMap := record.Attributes() - - forEachDataStreamKey := func(fn func(key string)) { - for _, key := range datastreamKeys { - fn(key) - } - } - - forEachDataStreamKey(func(key string) { - if value, exists := attributeMap.Get(key); exists { - document.AddAttribute(key, value) - attributeMap.Remove(key) - } - }) - - document.AddAttributes("attributes", attributeMap) - - // Resource - resourceMapVal := pcommon.NewValueMap() - resourceMap := resourceMapVal.Map() - resourceMap.PutStr("schema_url", resourceSchemaURL) - resourceMap.PutInt("dropped_attributes_count", int64(resource.DroppedAttributesCount())) - resourceAttrMap := resourceMap.PutEmptyMap("attributes") - - resource.Attributes().CopyTo(resourceAttrMap) - - // Remove data_stream attributes from the resources attributes if present - forEachDataStreamKey(func(key string) { - resourceAttrMap.Remove(key) - }) - - document.Add("resource", objmodel.ValueFromAttribute(resourceMapVal)) - - // Scope - scopeMapVal := pcommon.NewValueMap() - scopeMap := scopeMapVal.Map() - if scope.Name() != "" { - scopeMap.PutStr("name", scope.Name()) - } - if scope.Version() != "" { - scopeMap.PutStr("version", scope.Version()) - } - if scopeSchemaURL != "" { - scopeMap.PutStr("schema_url", scopeSchemaURL) - } - if scope.DroppedAttributesCount() > 0 { - scopeMap.PutInt("dropped_attributes_count", int64(scope.DroppedAttributesCount())) - } - scopeAttributes := scope.Attributes() - if scopeAttributes.Len() > 0 { - scopeAttrMap := scopeMap.PutEmptyMap("attributes") - scopeAttributes.CopyTo(scopeAttrMap) - - // Remove data_stream attributes from the scope attributes if present - forEachDataStreamKey(func(key string) { - scopeAttrMap.Remove(key) - }) - } - - if scopeMap.Len() > 0 { - document.Add("scope", objmodel.ValueFromAttribute(scopeMapVal)) - } + m.encodeAttributesOTelMode(&document, record.Attributes()) + m.encodeResourceOTelMode(&document, resource, resourceSchemaURL) + m.encodeScopeOTelMode(&document, scope, scopeSchemaURL) // Body setOTelLogBody(&document, record.Body()) @@ -349,71 +284,9 @@ func (m *encodeModel) upsertMetricDataPointValueOTelMode(documents map[uint32]ob } document.AddString("unit", metric.Unit()) - // At this point the data_stream attributes are expected to be in the record attributes, - // updated by the router. - // Move them to the top of the document and remove them from the record - attributeMap := dp.Attributes() - - forEachDataStreamKey := func(fn func(key string)) { - for _, key := range datastreamKeys { - fn(key) - } - } - - forEachDataStreamKey(func(key string) { - if val, exists := attributeMap.Get(key); exists { - document.AddAttribute(key, val) - attributeMap.Remove(key) - } - }) - - document.AddAttributes("attributes", attributeMap) - - // Resource - resourceMapVal := pcommon.NewValueMap() - resourceMap := resourceMapVal.Map() - resourceMap.PutStr("schema_url", resourceSchemaURL) - resourceMap.PutInt("dropped_attributes_count", int64(resource.DroppedAttributesCount())) - resourceAttrMap := resourceMap.PutEmptyMap("attributes") - - resource.Attributes().CopyTo(resourceAttrMap) - - // Remove data_stream attributes from the resources attributes if present - forEachDataStreamKey(func(key string) { - resourceAttrMap.Remove(key) - }) - - document.Add("resource", objmodel.ValueFromAttribute(resourceMapVal)) - - // Scope - scopeMapVal := pcommon.NewValueMap() - scopeMap := scopeMapVal.Map() - if scope.Name() != "" { - scopeMap.PutStr("name", scope.Name()) - } - if scope.Version() != "" { - scopeMap.PutStr("version", scope.Version()) - } - if scopeSchemaURL != "" { - scopeMap.PutStr("schema_url", scopeSchemaURL) - } - if scope.DroppedAttributesCount() > 0 { - scopeMap.PutInt("dropped_attributes_count", int64(scope.DroppedAttributesCount())) - } - scopeAttributes := scope.Attributes() - if scopeAttributes.Len() > 0 { - scopeAttrMap := scopeMap.PutEmptyMap("attributes") - scopeAttributes.CopyTo(scopeAttrMap) - - // Remove data_stream attributes from the scope attributes if present - forEachDataStreamKey(func(key string) { - scopeAttrMap.Remove(key) - }) - } - - if scopeMap.Len() > 0 { - document.Add("scope", objmodel.ValueFromAttribute(scopeMapVal)) - } + m.encodeAttributesOTelMode(&document, dp.Attributes()) + m.encodeResourceOTelMode(&document, resource, resourceSchemaURL) + m.encodeScopeOTelMode(&document, scope, scopeSchemaURL) } switch value.Type() { @@ -544,7 +417,124 @@ func numberToValue(dp pmetric.NumberDataPoint) (pcommon.Value, error) { return pcommon.Value{}, errInvalidNumberDataPoint } -func (m *encodeModel) encodeSpan(resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) ([]byte, error) { +func (m *encodeModel) encodeResourceOTelMode(document *objmodel.Document, resource pcommon.Resource, resourceSchemaURL string) { + resourceMapVal := pcommon.NewValueMap() + resourceMap := resourceMapVal.Map() + if resourceSchemaURL != "" { + resourceMap.PutStr("schema_url", resourceSchemaURL) + } + resourceMap.PutInt("dropped_attributes_count", int64(resource.DroppedAttributesCount())) + resourceAttrMap := resourceMap.PutEmptyMap("attributes") + resource.Attributes().CopyTo(resourceAttrMap) + resourceAttrMap.RemoveIf(func(key string, _ pcommon.Value) bool { + switch key { + case dataStreamType, dataStreamDataset, dataStreamNamespace: + return true + } + return false + }) + + document.Add("resource", objmodel.ValueFromAttribute(resourceMapVal)) +} + +func (m *encodeModel) encodeScopeOTelMode(document *objmodel.Document, scope pcommon.InstrumentationScope, scopeSchemaURL string) { + scopeMapVal := pcommon.NewValueMap() + scopeMap := scopeMapVal.Map() + if scope.Name() != "" { + scopeMap.PutStr("name", scope.Name()) + } + if scope.Version() != "" { + scopeMap.PutStr("version", scope.Version()) + } + if scopeSchemaURL != "" { + scopeMap.PutStr("schema_url", scopeSchemaURL) + } + scopeMap.PutInt("dropped_attributes_count", int64(scope.DroppedAttributesCount())) + scopeAttrMap := scopeMap.PutEmptyMap("attributes") + scope.Attributes().CopyTo(scopeAttrMap) + scopeAttrMap.RemoveIf(func(key string, _ pcommon.Value) bool { + switch key { + case dataStreamType, dataStreamDataset, dataStreamNamespace: + return true + } + return false + }) + document.Add("scope", objmodel.ValueFromAttribute(scopeMapVal)) +} + +func (m *encodeModel) encodeAttributesOTelMode(document *objmodel.Document, attributeMap pcommon.Map) { + attributeMap.RemoveIf(func(key string, val pcommon.Value) bool { + switch key { + case dataStreamType, dataStreamDataset, dataStreamNamespace: + // At this point the data_stream attributes are expected to be in the record attributes, + // updated by the router. + // Move them to the top of the document and remove them from the record + document.AddAttribute(key, val) + return true + } + return false + }) + document.AddAttributes("attributes", attributeMap) +} + +func (m *encodeModel) encodeSpan(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, scope pcommon.InstrumentationScope, scopeSchemaURL string) ([]byte, error) { + var document objmodel.Document + switch m.mode { + case MappingOTel: + document = m.encodeSpanOTelMode(resource, resourceSchemaURL, span, scope, scopeSchemaURL) + default: + document = m.encodeSpanDefaultMode(resource, span, scope) + } + document.Dedup() + var buf bytes.Buffer + err := document.Serialize(&buf, m.dedot, m.mode == MappingOTel) + return buf.Bytes(), err +} + +func (m *encodeModel) encodeSpanOTelMode(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, scope pcommon.InstrumentationScope, scopeSchemaURL string) objmodel.Document { + var document objmodel.Document + document.AddTimestamp("@timestamp", span.StartTimestamp()) + document.AddTraceID("trace_id", span.TraceID()) + document.AddSpanID("span_id", span.SpanID()) + document.AddString("trace_state", span.TraceState().AsRaw()) + document.AddSpanID("parent_span_id", span.ParentSpanID()) + document.AddString("name", span.Name()) + document.AddString("kind", span.Kind().String()) + document.AddInt("duration", int64(span.EndTimestamp()-span.StartTimestamp())) + + m.encodeAttributesOTelMode(&document, span.Attributes()) + + document.AddInt("dropped_attributes_count", int64(span.DroppedAttributesCount())) + document.AddInt("dropped_events_count", int64(span.DroppedEventsCount())) + + links := pcommon.NewValueSlice() + linkSlice := links.SetEmptySlice() + spanLinks := span.Links() + for i := 0; i < spanLinks.Len(); i++ { + linkMap := linkSlice.AppendEmpty().SetEmptyMap() + spanLink := spanLinks.At(i) + linkMap.PutStr("trace_id", spanLink.TraceID().String()) + linkMap.PutStr("span_id", spanLink.SpanID().String()) + linkMap.PutStr("trace_state", spanLink.TraceState().AsRaw()) + mAttr := linkMap.PutEmptyMap("attributes") + spanLink.Attributes().CopyTo(mAttr) + linkMap.PutInt("dropped_attributes_count", int64(spanLink.DroppedAttributesCount())) + } + document.AddAttribute("links", links) + + document.AddInt("dropped_links_count", int64(span.DroppedLinksCount())) + document.AddString("status.message", span.Status().Message()) + document.AddString("status.code", span.Status().Code().String()) + + m.encodeResourceOTelMode(&document, resource, resourceSchemaURL) + m.encodeScopeOTelMode(&document, scope, scopeSchemaURL) + + // TODO: add span events to log data streams + + return document +} + +func (m *encodeModel) encodeSpanDefaultMode(resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) objmodel.Document { var document objmodel.Document document.AddTimestamp("@timestamp", span.StartTimestamp()) // We use @timestamp in order to ensure that we can index if the default data stream logs template is used. document.AddTimestamp("EndTimestamp", span.EndTimestamp()) @@ -561,12 +551,7 @@ func (m *encodeModel) encodeSpan(resource pcommon.Resource, span ptrace.Span, sc m.encodeEvents(&document, span.Events()) document.AddInt("Duration", durationAsMicroseconds(span.StartTimestamp().AsTime(), span.EndTimestamp().AsTime())) // unit is microseconds document.AddAttributes("Scope", scopeToAttributes(scope)) - document.Dedup() - - var buf bytes.Buffer - // OTel serialization is not supported for traces yet - err := document.Serialize(&buf, m.dedot, false) - return buf.Bytes(), err + return document } func (m *encodeModel) encodeAttributes(document *objmodel.Document, attributes pcommon.Map) { diff --git a/exporter/elasticsearchexporter/model_test.go b/exporter/elasticsearchexporter/model_test.go index 37b9d3788e82..eb9aa1b5453c 100644 --- a/exporter/elasticsearchexporter/model_test.go +++ b/exporter/elasticsearchexporter/model_test.go @@ -54,7 +54,7 @@ var expectedLogBodyDeDottedWithEmptyTimestamp = `{"@timestamp":"1970-01-01T00:00 func TestEncodeSpan(t *testing.T) { model := &encodeModel{dedot: false} td := mockResourceSpans() - spanByte, err := model.encodeSpan(td.ResourceSpans().At(0).Resource(), td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0), td.ResourceSpans().At(0).ScopeSpans().At(0).Scope()) + spanByte, err := model.encodeSpan(td.ResourceSpans().At(0).Resource(), "", td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0), td.ResourceSpans().At(0).ScopeSpans().At(0).Scope(), "") assert.NoError(t, err) assert.Equal(t, expectedSpanBody, string(spanByte)) } @@ -892,11 +892,10 @@ type OTelRecord struct { SpanID OTelSpanID `json:"span_id"` Timestamp time.Time `json:"@timestamp"` ObservedTimestamp time.Time `json:"observed_timestamp"` - TraceFlags uint32 `json:"trace_flags"` SeverityNumber int32 `json:"severity_number"` SeverityText string `json:"severity_text"` Attributes map[string]any `json:"attributes"` - DroppedAttributesCount uint32 `json:"dropped_attrbutes_count"` + DroppedAttributesCount uint32 `json:"dropped_attributes_count"` Scope OTelScope `json:"scope"` Resource OTelResource `json:"resource"` Datastream OTelRecordDatastream `json:"data_stream"` @@ -912,14 +911,14 @@ type OTelScope struct { Name string `json:"name"` Version string `json:"version"` Attributes map[string]any `json:"attributes"` - DroppedAttributesCount uint32 `json:"dropped_attrbutes_count"` - Schema string `json:"schema"` + DroppedAttributesCount uint32 `json:"dropped_attributes_count"` + SchemaURL string `json:"schema_url"` } type OTelResource struct { Attributes map[string]any `json:"attributes"` - DroppedAttributesCount uint32 `json:"dropped_attrbutes_count"` - Schema string `json:"schema"` + DroppedAttributesCount uint32 `json:"dropped_attributes_count"` + SchemaURL string `json:"schema_url"` } type OTelSpanID pcommon.SpanID @@ -1059,7 +1058,7 @@ func TestEncodeLogOtelMode(t *testing.T) { // This sets the data_stream values default or derived from the record/scope/resources routeLogRecord(record, scope, resource, "", true) - b, err := m.encodeLog(resource, tc.rec.Resource.Schema, record, scope, tc.rec.Scope.Schema) + b, err := m.encodeLog(resource, tc.rec.Resource.SchemaURL, record, scope, tc.rec.Scope.SchemaURL) require.NoError(t, err) want := tc.rec @@ -1084,7 +1083,6 @@ func createTestOTelLogRecord(t *testing.T, rec OTelRecord) (plog.LogRecord, pcom record.SetTraceID(pcommon.TraceID(rec.TraceID)) record.SetSpanID(pcommon.SpanID(rec.SpanID)) - record.SetFlags(plog.LogRecordFlags(rec.TraceFlags)) record.SetSeverityNumber(plog.SeverityNumber(rec.SeverityNumber)) record.SetSeverityText(rec.SeverityText) record.SetDroppedAttributesCount(rec.DroppedAttributesCount) @@ -1138,7 +1136,6 @@ func buildOTelRecordTestData(t *testing.T, fn func(OTelRecord) OTelRecord) OTelR "severity_number": 17, "severity_text": "ERROR", "span_id": "0102030405060708", - "trace_flags": 1234, "trace_id": "01020304050607080900010203040506" }`