From 65b7e8c3ff322ead9a8fa4c9df6cf1382eab006d Mon Sep 17 00:00:00 2001 From: Lanre Ade Date: Mon, 30 Mar 2026 16:30:09 -0400 Subject: [PATCH 1/7] feat: align ECS OTLP metric fallback with apm-data for raw metric dimensions --- .../internal/ecs/ecs_translation.go | 38 +++ .../internal/ecs/ecs_translation_test.go | 101 ++++++++ processor/elasticapmprocessor/processor.go | 57 +++++ .../elasticapmprocessor/processor_test.go | 229 ++++++++++++++++++ .../ecs/elastic_hostname/metrics_output.yaml | 3 + .../metrics_false_ecs_output.yaml | 3 + .../metrics_true_ecs_output.yaml | 3 + 7 files changed, 434 insertions(+) diff --git a/processor/elasticapmprocessor/internal/ecs/ecs_translation.go b/processor/elasticapmprocessor/internal/ecs/ecs_translation.go index 7a5ab8952..431799453 100644 --- a/processor/elasticapmprocessor/internal/ecs/ecs_translation.go +++ b/processor/elasticapmprocessor/internal/ecs/ecs_translation.go @@ -48,6 +48,44 @@ func TranslateLogRecordAttributes(attributes pcommon.Map) { translateAttributes(attributes, isSupportedLogRecordAttribute) } +// TranslateMetricDataPointAttributes applies the apm-data OTLP metric fallback +// for raw metric datapoint attributes in ECS mode. Existing labels.* / +// numeric_labels.* keys are sanitized in place, metric-specific special cases +// are preserved, and everything else is moved to labels.* / numeric_labels.*. +// +// The collector preserves data_stream.type in addition to the apm-data +// data_stream dataset/namespace handling, since datapoint-level routing depends +// on the full data_stream triple before export. +func TranslateMetricDataPointAttributes(attributes pcommon.Map) { + attributes.Range(func(k string, v pcommon.Value) bool { + if sanitize.IsLabelAttribute(k) { + sanitized := sanitize.HandleLabelAttributeKey(k) + if sanitized != k { + v.CopyTo(attributes.PutEmpty(sanitized)) + attributes.Remove(k) + } + return true + } + switch k { + case elasticattr.DataStreamDataset, + elasticattr.DataStreamNamespace, + elasticattr.DataStreamType, + elasticattr.EventDataset, + "event.module", + "system.process.cmdline", + "system.process.cpu.start_time", + "system.filesystem.mount_point", + "system.process.state", + string(semconv.UserNameKey): + return true + default: + setLabelAttributeValue(attributes, sanitize.HandleAttributeKey(k), v) + attributes.Remove(k) + } + return true + }) +} + func translateAttributes(attributes pcommon.Map, isSupported func(string) bool) { attributes.Range(func(k string, v pcommon.Value) bool { if sanitize.IsLabelAttribute(k) { diff --git a/processor/elasticapmprocessor/internal/ecs/ecs_translation_test.go b/processor/elasticapmprocessor/internal/ecs/ecs_translation_test.go index 2e2a013d9..632824008 100644 --- a/processor/elasticapmprocessor/internal/ecs/ecs_translation_test.go +++ b/processor/elasticapmprocessor/internal/ecs/ecs_translation_test.go @@ -296,6 +296,107 @@ func TestTranslateLogRecordAttributes(t *testing.T) { } } +func TestTranslateMetricDataPointAttributes(t *testing.T) { + tests := []struct { + name string + setAttrs func(pcommon.Map) + want map[string]any + wantAbsent []string + }{ + { + name: "raw otlp metric dimensions moved to labels", + setAttrs: func(attrs pcommon.Map) { + attrs.PutStr("http.request.method", "GET") + attrs.PutStr("http.route", "/api/users") + attrs.PutInt("http.response.status_code", 200) + attrs.PutStr("host", "server-01") + attrs.PutStr("state", "used") + }, + want: map[string]any{ + "labels.http_request_method": "GET", + "labels.http_route": "/api/users", + "numeric_labels.http_response_status_code": float64(200), + "labels.host": "server-01", + "labels.state": "used", + }, + wantAbsent: []string{ + "http.request.method", + "http.route", + "http.response.status_code", + "host", + "state", + }, + }, + { + name: "existing metric label keys are sanitized in place", + setAttrs: func(attrs pcommon.Map) { + attrs.PutStr("labels.http.request.method", "GET") + attrs.PutDouble("numeric_labels.http.response.status_code", 200) + }, + want: map[string]any{ + "labels.http_request_method": "GET", + "numeric_labels.http_response_status_code": 200.0, + }, + wantAbsent: []string{ + "labels.http.request.method", + "numeric_labels.http.response.status_code", + }, + }, + { + name: "metric special cases and routing attrs are preserved", + setAttrs: func(attrs pcommon.Map) { + attrs.PutStr(elasticattr.DataStreamDataset, "apm.internal") + attrs.PutStr(elasticattr.DataStreamNamespace, "default") + attrs.PutStr(elasticattr.DataStreamType, "metrics") + attrs.PutStr("host", "server-01") + attrs.PutStr("state", "used") + attrs.PutStr("system.process.cmdline", "/usr/bin/java") + attrs.PutStr("event.module", "system") + attrs.PutStr("user.name", "appuser") + }, + want: map[string]any{ + elasticattr.DataStreamDataset: "apm.internal", + elasticattr.DataStreamNamespace: "default", + elasticattr.DataStreamType: "metrics", + "labels.host": "server-01", + "labels.state": "used", + "system.process.cmdline": "/usr/bin/java", + "event.module": "system", + "user.name": "appuser", + }, + wantAbsent: []string{"host", "state"}, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + attrs := pcommon.NewMap() + tc.setAttrs(attrs) + + TranslateMetricDataPointAttributes(attrs) + + for key, want := range tc.want { + got, ok := attrs.Get(key) + if !assert.True(t, ok, "expected %q to be present", key) { + continue + } + switch want := want.(type) { + case string: + assert.Equal(t, want, got.Str()) + case float64: + assert.InDelta(t, want, got.Double(), 1e-9) + default: + t.Fatalf("unsupported want type %T", want) + } + } + for _, key := range tc.wantAbsent { + _, ok := attrs.Get(key) + assert.False(t, ok, "expected %q to be absent", key) + } + }) + } +} + // TestSetLabelAttributeValue verifies that setLabelAttributeValue stores // supported value types under the correct labels.* / numeric_labels.* prefix // and rejects unsupported types (Map, Bytes, Empty). This matches diff --git a/processor/elasticapmprocessor/processor.go b/processor/elasticapmprocessor/processor.go index 1ef932fd2..015bf15ab 100644 --- a/processor/elasticapmprocessor/processor.go +++ b/processor/elasticapmprocessor/processor.go @@ -213,9 +213,11 @@ func newMetricProcessor(cfg *Config, next consumer.Metrics, logger *zap.Logger) ecsEnricherConfig.Resource.HostOSType.Enabled = true ecsEnricherConfig.Resource.ServiceName.Enabled = true ecsEnricherConfig.Resource.DefaultDeploymentEnvironment.Enabled = true + ecsEnricherConfig.Resource.DefaultServiceLanguage.Enabled = true intakeECSEnricherConfig := ecsEnricherConfig intakeECSEnricherConfig.Resource.HostOSType.Enabled = false + intakeECSEnricherConfig.Resource.DefaultServiceLanguage.Enabled = false return &MetricProcessor{ next: next, @@ -237,12 +239,15 @@ func (p *MetricProcessor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics if ecsMode { enricher = p.ecsEnricher resourceMetrics := md.ResourceMetrics() + // ECS metric batches are assumed to be homogeneous by origin. We select + // the enricher from the first resource metric and apply it to the whole batch. if resourceMetrics.Len() > 0 && isIntakeECS(resourceMetrics.At(0).Resource()) { enricher = p.intakeECSEnricher } for i := 0; i < resourceMetrics.Len(); i++ { resourceMetric := resourceMetrics.At(i) resource := resourceMetric.Resource() + resourceIsIntake := isIntakeECS(resource) ecs.TranslateResourceMetadata(resource) ecs.ApplyResourceConventions(resource) routing.EncodeDataStream(resource, routing.DataStreamTypeMetrics, p.cfg.ServiceNameInDataStreamDataset) @@ -258,6 +263,9 @@ func (p *MetricProcessor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics // Route internal metrics to appropriate data streams if needed. routeMetricsToDataStream(resourceMetric.ScopeMetrics(), hasServiceName) + if !resourceIsIntake && hasServiceName { + translateRawOTLPMetricDataPoints(resourceMetric.ScopeMetrics()) + } } } // When skipEnrichment is true, only enrich when mapping mode is ecs @@ -328,6 +336,55 @@ func routeMetricsToDataStream(scopeMetrics pmetric.ScopeMetricsSlice, hasService } } +func translateRawOTLPMetricDataPoints(scopeMetrics pmetric.ScopeMetricsSlice) { + for j := 0; j < scopeMetrics.Len(); j++ { + metrics := scopeMetrics.At(j).Metrics() + for k := 0; k < metrics.Len(); k++ { + metric := metrics.At(k) + switch metric.Type() { + case pmetric.MetricTypeGauge: + dataPoints := metric.Gauge().DataPoints() + for l := 0; l < dataPoints.Len(); l++ { + translateRawOTLPMetricDataPointAttributes(dataPoints.At(l).Attributes()) + } + case pmetric.MetricTypeSum: + dataPoints := metric.Sum().DataPoints() + for l := 0; l < dataPoints.Len(); l++ { + translateRawOTLPMetricDataPointAttributes(dataPoints.At(l).Attributes()) + } + case pmetric.MetricTypeHistogram: + dataPoints := metric.Histogram().DataPoints() + for l := 0; l < dataPoints.Len(); l++ { + translateRawOTLPMetricDataPointAttributes(dataPoints.At(l).Attributes()) + } + case pmetric.MetricTypeExponentialHistogram: + dataPoints := metric.ExponentialHistogram().DataPoints() + for l := 0; l < dataPoints.Len(); l++ { + translateRawOTLPMetricDataPointAttributes(dataPoints.At(l).Attributes()) + } + case pmetric.MetricTypeSummary: + dataPoints := metric.Summary().DataPoints() + for l := 0; l < dataPoints.Len(); l++ { + translateRawOTLPMetricDataPointAttributes(dataPoints.At(l).Attributes()) + } + } + } + } +} + +func translateRawOTLPMetricDataPointAttributes(attributes pcommon.Map) { + if _, ok := attributes.Get("metricset.name"); ok { + return + } + if _, ok := attributes.Get("metricset.interval"); ok { + return + } + if _, ok := attributes.Get("elasticsearch.mapping.hints"); ok { + return + } + ecs.TranslateMetricDataPointAttributes(attributes) +} + func (p *LogProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error { enricher := p.enricher ecsMode := isECS(ctx) diff --git a/processor/elasticapmprocessor/processor_test.go b/processor/elasticapmprocessor/processor_test.go index f3ec0efe3..c8e043224 100644 --- a/processor/elasticapmprocessor/processor_test.go +++ b/processor/elasticapmprocessor/processor_test.go @@ -498,6 +498,235 @@ func TestConsumeLogs_ECSAssumesHomogeneousBatchOrigin(t *testing.T) { assert.False(t, ok) } +func TestConsumeMetrics_ECSOTLPFallbacks(t *testing.T) { + ctx := client.NewContext(context.Background(), client.Info{ + Metadata: client.NewMetadata(map[string][]string{"x-elastic-mapping-mode": {"ecs"}}), + }) + + factory := NewFactory() + settings := processortest.NewNopSettings(metadata.Type) + next := &consumertest.MetricsSink{} + cfg := NewDefaultConfig().(*Config) + + mp, err := factory.CreateMetrics(ctx, settings, cfg, next) + require.NoError(t, err) + + metrics := pmetric.NewMetrics() + resourceMetric := metrics.ResourceMetrics().AppendEmpty() + resource := resourceMetric.Resource() + resource.Attributes().PutStr("service.name", "test-service") + resource.Attributes().PutStr(string(semconv.TelemetrySDKNameKey), "opentelemetry") + + scopeMetrics := resourceMetric.ScopeMetrics().AppendEmpty() + + httpMetric := scopeMetrics.Metrics().AppendEmpty() + httpMetric.SetName("http.requests.total") + httpDP := httpMetric.SetEmptySum().DataPoints().AppendEmpty() + httpDP.SetIntValue(1) + httpAttrs := httpDP.Attributes() + httpAttrs.PutStr("http.request.method", "GET") + httpAttrs.PutStr("http.route", "/api/users") + httpAttrs.PutInt("http.response.status_code", 200) + + memoryMetric := scopeMetrics.Metrics().AppendEmpty() + memoryMetric.SetName("system.memory.usage") + memoryDP := memoryMetric.SetEmptyGauge().DataPoints().AppendEmpty() + memoryDP.SetDoubleValue(2048.5) + memoryAttrs := memoryDP.Attributes() + memoryAttrs.PutStr("host", "server-01") + memoryAttrs.PutStr("state", "used") + + require.NoError(t, mp.ConsumeMetrics(ctx, metrics)) + actual := next.AllMetrics()[0] + + require.Equal(t, 1, actual.ResourceMetrics().Len()) + actualResource := actual.ResourceMetrics().At(0).Resource().Attributes() + lang, ok := actualResource.Get(string(semconv.TelemetrySDKLanguageKey)) + require.True(t, ok) + assert.Equal(t, "unknown", lang.Str()) + + agentName, ok := actualResource.Get("agent.name") + require.True(t, ok) + assert.Equal(t, "opentelemetry", agentName.Str()) + + actualMetrics := actual.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics() + actualHTTPAttrs := actualMetrics.At(0).Sum().DataPoints().At(0).Attributes() + value, ok := actualHTTPAttrs.Get("labels.http_request_method") + require.True(t, ok) + assert.Equal(t, "GET", value.Str()) + value, ok = actualHTTPAttrs.Get("labels.http_route") + require.True(t, ok) + assert.Equal(t, "/api/users", value.Str()) + value, ok = actualHTTPAttrs.Get("numeric_labels.http_response_status_code") + require.True(t, ok) + assert.InDelta(t, 200, value.Double(), 1e-9) + _, ok = actualHTTPAttrs.Get("http.request.method") + assert.False(t, ok) + _, ok = actualHTTPAttrs.Get("http.route") + assert.False(t, ok) + _, ok = actualHTTPAttrs.Get("http.response.status_code") + assert.False(t, ok) + + actualMemoryAttrs := actualMetrics.At(1).Gauge().DataPoints().At(0).Attributes() + value, ok = actualMemoryAttrs.Get("labels.host") + require.True(t, ok) + assert.Equal(t, "server-01", value.Str()) + value, ok = actualMemoryAttrs.Get("labels.state") + require.True(t, ok) + assert.Equal(t, "used", value.Str()) + _, ok = actualMemoryAttrs.Get("host") + assert.False(t, ok) + _, ok = actualMemoryAttrs.Get("state") + assert.False(t, ok) +} + +func TestConsumeMetrics_ECSIntakeSkipsOTLPFallbacks(t *testing.T) { + ctx := client.NewContext(context.Background(), client.Info{ + Metadata: client.NewMetadata(map[string][]string{"x-elastic-mapping-mode": {"ecs"}}), + }) + + factory := NewFactory() + settings := processortest.NewNopSettings(metadata.Type) + next := &consumertest.MetricsSink{} + cfg := NewDefaultConfig().(*Config) + + mp, err := factory.CreateMetrics(ctx, settings, cfg, next) + require.NoError(t, err) + + metrics := pmetric.NewMetrics() + resourceMetric := metrics.ResourceMetrics().AppendEmpty() + resource := resourceMetric.Resource() + resource.Attributes().PutStr("service.name", "test-service") + resource.Attributes().PutStr(string(semconv.TelemetrySDKNameKey), "ElasticAPM") + + scopeMetrics := resourceMetric.ScopeMetrics().AppendEmpty() + metric := scopeMetrics.Metrics().AppendEmpty() + metric.SetName("http.requests.total") + dp := metric.SetEmptySum().DataPoints().AppendEmpty() + dp.SetIntValue(1) + attrs := dp.Attributes() + attrs.PutStr("http.request.method", "GET") + attrs.PutStr("http.route", "/api/users") + attrs.PutInt("http.response.status_code", 200) + attrs.PutStr("host", "server-01") + attrs.PutStr("state", "used") + + require.NoError(t, mp.ConsumeMetrics(ctx, metrics)) + actual := next.AllMetrics()[0] + + actualResource := actual.ResourceMetrics().At(0).Resource().Attributes() + _, ok := actualResource.Get(string(semconv.TelemetrySDKLanguageKey)) + assert.False(t, ok) + + actualAttrs := actual.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes() + value, ok := actualAttrs.Get("http.request.method") + require.True(t, ok) + assert.Equal(t, "GET", value.Str()) + value, ok = actualAttrs.Get("http.route") + require.True(t, ok) + assert.Equal(t, "/api/users", value.Str()) + value, ok = actualAttrs.Get("http.response.status_code") + require.True(t, ok) + assert.EqualValues(t, 200, value.Int()) + value, ok = actualAttrs.Get("host") + require.True(t, ok) + assert.Equal(t, "server-01", value.Str()) + value, ok = actualAttrs.Get("state") + require.True(t, ok) + assert.Equal(t, "used", value.Str()) + _, ok = actualAttrs.Get("labels.http_request_method") + assert.False(t, ok) + _, ok = actualAttrs.Get("labels.http_route") + assert.False(t, ok) + _, ok = actualAttrs.Get("numeric_labels.http_response_status_code") + assert.False(t, ok) + _, ok = actualAttrs.Get("labels.host") + assert.False(t, ok) + _, ok = actualAttrs.Get("labels.state") + assert.False(t, ok) +} + +func TestTranslateRawOTLPMetricDataPointAttributesSkipsAggregatedMetrics(t *testing.T) { + tests := []struct { + name string + markerKey string + markerVal func(pcommon.Map) + }{ + { + name: "metricset name present", + markerKey: "metricset.name", + markerVal: func(attrs pcommon.Map) { attrs.PutStr("metricset.name", "service_summary") }, + }, + { + name: "metricset interval present", + markerKey: "metricset.interval", + markerVal: func(attrs pcommon.Map) { attrs.PutStr("metricset.interval", "1m") }, + }, + { + name: "mapping hints present", + markerKey: "elasticsearch.mapping.hints", + markerVal: func(attrs pcommon.Map) { + hints := attrs.PutEmptySlice("elasticsearch.mapping.hints") + hints.AppendEmpty().SetStr("_doc_count") + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + attrs := pcommon.NewMap() + tc.markerVal(attrs) + attrs.PutStr("host", "server-01") + attrs.PutStr("state", "used") + + translateRawOTLPMetricDataPointAttributes(attrs) + + value, ok := attrs.Get("host") + require.True(t, ok) + assert.Equal(t, "server-01", value.Str()) + value, ok = attrs.Get("state") + require.True(t, ok) + assert.Equal(t, "used", value.Str()) + _, ok = attrs.Get("labels.host") + assert.False(t, ok) + _, ok = attrs.Get("labels.state") + assert.False(t, ok) + }) + } +} + +func TestTranslateRawOTLPMetricDataPointAttributesPreservesRoutingAttrs(t *testing.T) { + attrs := pcommon.NewMap() + attrs.PutStr("data_stream.dataset", "apm.internal") + attrs.PutStr("data_stream.namespace", "default") + attrs.PutStr("data_stream.type", "metrics") + attrs.PutStr("host", "server-01") + attrs.PutStr("state", "used") + + translateRawOTLPMetricDataPointAttributes(attrs) + + value, ok := attrs.Get("data_stream.dataset") + require.True(t, ok) + assert.Equal(t, "apm.internal", value.Str()) + value, ok = attrs.Get("data_stream.namespace") + require.True(t, ok) + assert.Equal(t, "default", value.Str()) + value, ok = attrs.Get("data_stream.type") + require.True(t, ok) + assert.Equal(t, "metrics", value.Str()) + + value, ok = attrs.Get("labels.host") + require.True(t, ok) + assert.Equal(t, "server-01", value.Str()) + value, ok = attrs.Get("labels.state") + require.True(t, ok) + assert.Equal(t, "used", value.Str()) + _, ok = attrs.Get("host") + assert.False(t, ok) + _, ok = attrs.Get("state") + assert.False(t, ok) +} + // TestSkipEnrichmentMetrics tests that metrics are only enriched when skipEnrichment is false or when mapping mode is ecs func TestSkipEnrichmentMetrics(t *testing.T) { testCases := []struct { diff --git a/processor/elasticapmprocessor/testdata/ecs/elastic_hostname/metrics_output.yaml b/processor/elasticapmprocessor/testdata/ecs/elastic_hostname/metrics_output.yaml index 06e8046f9..6c0e34a44 100644 --- a/processor/elasticapmprocessor/testdata/ecs/elastic_hostname/metrics_output.yaml +++ b/processor/elasticapmprocessor/testdata/ecs/elastic_hostname/metrics_output.yaml @@ -34,6 +34,9 @@ resourceMetrics: - key: service.name value: stringValue: test-service + - key: telemetry.sdk.language + value: + stringValue: unknown scopeMetrics: - metrics: - gauge: diff --git a/processor/elasticapmprocessor/testdata/skip_enrichment/metrics_false_ecs_output.yaml b/processor/elasticapmprocessor/testdata/skip_enrichment/metrics_false_ecs_output.yaml index a4137d4de..c2ecbe491 100644 --- a/processor/elasticapmprocessor/testdata/skip_enrichment/metrics_false_ecs_output.yaml +++ b/processor/elasticapmprocessor/testdata/skip_enrichment/metrics_false_ecs_output.yaml @@ -25,6 +25,9 @@ resourceMetrics: - key: service.name value: stringValue: test-service + - key: telemetry.sdk.language + value: + stringValue: unknown scopeMetrics: - metrics: - gauge: diff --git a/processor/elasticapmprocessor/testdata/skip_enrichment/metrics_true_ecs_output.yaml b/processor/elasticapmprocessor/testdata/skip_enrichment/metrics_true_ecs_output.yaml index a4137d4de..c2ecbe491 100644 --- a/processor/elasticapmprocessor/testdata/skip_enrichment/metrics_true_ecs_output.yaml +++ b/processor/elasticapmprocessor/testdata/skip_enrichment/metrics_true_ecs_output.yaml @@ -25,6 +25,9 @@ resourceMetrics: - key: service.name value: stringValue: test-service + - key: telemetry.sdk.language + value: + stringValue: unknown scopeMetrics: - metrics: - gauge: From 8162569c521ff518adbd67add5d97b0f9dc3b9d6 Mon Sep 17 00:00:00 2001 From: Lanre Ade Date: Mon, 30 Mar 2026 17:01:21 -0400 Subject: [PATCH 2/7] feat: refactor metric encrichment --- .../internal/ecs/ecs_translation.go | 57 ++++---- .../internal/enrichments/config/config.go | 5 +- .../internal/enrichments/enricher.go | 4 + .../internal/enrichments/metric.go | 72 +++++++++++ .../internal/enrichments/metric_test.go | 121 +++++++++++++++++ .../internal/enrichments/resource.go | 2 +- processor/elasticapmprocessor/processor.go | 55 +------- .../elasticapmprocessor/processor_test.go | 122 ++++++++---------- 8 files changed, 281 insertions(+), 157 deletions(-) diff --git a/processor/elasticapmprocessor/internal/ecs/ecs_translation.go b/processor/elasticapmprocessor/internal/ecs/ecs_translation.go index 431799453..f7c90ce6b 100644 --- a/processor/elasticapmprocessor/internal/ecs/ecs_translation.go +++ b/processor/elasticapmprocessor/internal/ecs/ecs_translation.go @@ -52,38 +52,8 @@ func TranslateLogRecordAttributes(attributes pcommon.Map) { // for raw metric datapoint attributes in ECS mode. Existing labels.* / // numeric_labels.* keys are sanitized in place, metric-specific special cases // are preserved, and everything else is moved to labels.* / numeric_labels.*. -// -// The collector preserves data_stream.type in addition to the apm-data -// data_stream dataset/namespace handling, since datapoint-level routing depends -// on the full data_stream triple before export. func TranslateMetricDataPointAttributes(attributes pcommon.Map) { - attributes.Range(func(k string, v pcommon.Value) bool { - if sanitize.IsLabelAttribute(k) { - sanitized := sanitize.HandleLabelAttributeKey(k) - if sanitized != k { - v.CopyTo(attributes.PutEmpty(sanitized)) - attributes.Remove(k) - } - return true - } - switch k { - case elasticattr.DataStreamDataset, - elasticattr.DataStreamNamespace, - elasticattr.DataStreamType, - elasticattr.EventDataset, - "event.module", - "system.process.cmdline", - "system.process.cpu.start_time", - "system.filesystem.mount_point", - "system.process.state", - string(semconv.UserNameKey): - return true - default: - setLabelAttributeValue(attributes, sanitize.HandleAttributeKey(k), v) - attributes.Remove(k) - } - return true - }) + translateAttributes(attributes, isSupportedMetricDataPointAttribute) } func translateAttributes(attributes pcommon.Map, isSupported func(string) bool) { @@ -192,6 +162,31 @@ func isSupportedLogRecordAttribute(attr string) bool { return false } +// isSupportedMetricDataPointAttribute mirrors the apm-data OTLP metric +// datapoint handling where a small set of fields are preserved as first-class +// values and the rest fall back to labels.* / numeric_labels.*. +// +// The collector preserves data_stream.type in addition to the apm-data +// data_stream dataset/namespace handling, since datapoint-level routing depends +// on the full data_stream triple before export. +func isSupportedMetricDataPointAttribute(attr string) bool { + switch attr { + case elasticattr.DataStreamDataset, + elasticattr.DataStreamNamespace, + elasticattr.DataStreamType, + elasticattr.EventDataset, + "event.module", + "system.process.cmdline", + "system.process.cpu.start_time", + "system.filesystem.mount_point", + "system.process.state", + string(semconv.UserNameKey): + return true + } + + return false +} + // isSupportedResourceAttribute returns true if the resource attribute is // supported by ECS and can be mapped directly. // Supported fields can include OTEL SemConv attributes or ECS specific attributes. diff --git a/processor/elasticapmprocessor/internal/enrichments/config/config.go b/processor/elasticapmprocessor/internal/enrichments/config/config.go index bade313d0..2cb998c45 100644 --- a/processor/elasticapmprocessor/internal/enrichments/config/config.go +++ b/processor/elasticapmprocessor/internal/enrichments/config/config.go @@ -118,8 +118,9 @@ type SpanEventConfig struct { // ElasticMetricConfig configures the enrichment attributes for metrics type ElasticMetricConfig struct { - ProcessorEvent AttributeConfig `mapstructure:"processor_event"` - MetricsetName AttributeConfig `mapstructure:"metricset_name"` + ProcessorEvent AttributeConfig `mapstructure:"processor_event"` + MetricsetName AttributeConfig `mapstructure:"metricset_name"` + TranslateUnsupportedAttributes AttributeConfig `mapstructure:"translate_unsupported_attributes"` } // ElasticLogConfig configures the enrichment attributes for logs diff --git a/processor/elasticapmprocessor/internal/enrichments/enricher.go b/processor/elasticapmprocessor/internal/enrichments/enricher.go index 2e625ab06..00a47a7bf 100644 --- a/processor/elasticapmprocessor/internal/enrichments/enricher.go +++ b/processor/elasticapmprocessor/internal/enrichments/enricher.go @@ -102,6 +102,10 @@ func (e *Enricher) EnrichMetrics(pl pmetric.Metrics) { for j := 0; j < scopeMetics.Len(); j++ { scopeMetric := scopeMetics.At(j) EnrichScope(scopeMetric.Scope(), e.Config) + metrics := scopeMetric.Metrics() + for k := 0; k < metrics.Len(); k++ { + EnrichMetricDataPoints(metrics.At(k), e.Config) + } } } } diff --git a/processor/elasticapmprocessor/internal/enrichments/metric.go b/processor/elasticapmprocessor/internal/enrichments/metric.go index d0f6e1f92..b816e4746 100644 --- a/processor/elasticapmprocessor/internal/enrichments/metric.go +++ b/processor/elasticapmprocessor/internal/enrichments/metric.go @@ -18,9 +18,11 @@ package enrichments // import "github.com/elastic/opentelemetry-collector-components/processor/elasticapmprocessor/internal/enrichments" import ( + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "github.com/elastic/opentelemetry-collector-components/internal/elasticattr" + "github.com/elastic/opentelemetry-collector-components/processor/elasticapmprocessor/internal/ecs" "github.com/elastic/opentelemetry-collector-components/processor/elasticapmprocessor/internal/enrichments/attribute" "github.com/elastic/opentelemetry-collector-components/processor/elasticapmprocessor/internal/enrichments/config" ) @@ -38,3 +40,73 @@ func EnrichMetric(metric pmetric.ResourceMetrics, cfg config.Config) { attribute.PutStr(resAttrs, elasticattr.MetricsetName, metricSetNameApp) } } + +func EnrichMetricDataPoints(metric pmetric.Metric, cfg config.Config) { + switch metric.Type() { + case pmetric.MetricTypeGauge: + dataPoints := metric.Gauge().DataPoints() + for i := 0; i < dataPoints.Len(); i++ { + enrichMetricDataPointAttributes(dataPoints.At(i).Attributes(), cfg) + } + case pmetric.MetricTypeSum: + dataPoints := metric.Sum().DataPoints() + for i := 0; i < dataPoints.Len(); i++ { + enrichMetricDataPointAttributes(dataPoints.At(i).Attributes(), cfg) + } + case pmetric.MetricTypeHistogram: + dataPoints := metric.Histogram().DataPoints() + for i := 0; i < dataPoints.Len(); i++ { + enrichMetricDataPointAttributes(dataPoints.At(i).Attributes(), cfg) + } + case pmetric.MetricTypeExponentialHistogram: + dataPoints := metric.ExponentialHistogram().DataPoints() + for i := 0; i < dataPoints.Len(); i++ { + enrichMetricDataPointAttributes(dataPoints.At(i).Attributes(), cfg) + } + case pmetric.MetricTypeSummary: + dataPoints := metric.Summary().DataPoints() + for i := 0; i < dataPoints.Len(); i++ { + enrichMetricDataPointAttributes(dataPoints.At(i).Attributes(), cfg) + } + } +} + +// enrichMetricDataPointAttributes applies the raw OTLP metric fallback during +// enrichment, analogous to how EnrichLog delegates log-record fallback to +// EnrichLog. This happens later than apm-data's OTLP-to-APM conversion, so we +// must explicitly avoid relabeling attrs that identify aggregated metrics or +// influence exporter behavior. +func enrichMetricDataPointAttributes(attributes pcommon.Map, cfg config.Config) { + if !cfg.Metric.TranslateUnsupportedAttributes.Enabled { + return + } + if isAggregatedMetricDataPointAttributes(attributes) { + return + } + ecs.TranslateMetricDataPointAttributes(attributes) +} + +// isAggregatedMetricDataPointAttributes preserves aggregated-metric identity and +// exporter-only metadata before the raw OTLP fallback runs. +// +// In the collector we mutate OTel datapoints in place, so we must keep these attrs +// out of label fallback here. +// `elasticsearch.mapping.hints` is collector/exporter-specific metadata and has +// no apm-data equivalent, but it also needs to passthrough untouched. +func isAggregatedMetricDataPointAttributes(attributes pcommon.Map) bool { + if _, ok := attributes.Get("metricset.name"); ok { + return true + } + if _, ok := attributes.Get("metricset.interval"); ok { + return true + } + if isESMappingHint(attributes) { + return true + } + return false +} + +func isESMappingHint(attributes pcommon.Map) bool { + _, ok := attributes.Get("elasticsearch.mapping.hints") + return ok +} diff --git a/processor/elasticapmprocessor/internal/enrichments/metric_test.go b/processor/elasticapmprocessor/internal/enrichments/metric_test.go index 50e21b310..9285b8109 100644 --- a/processor/elasticapmprocessor/internal/enrichments/metric_test.go +++ b/processor/elasticapmprocessor/internal/enrichments/metric_test.go @@ -21,6 +21,8 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "github.com/elastic/opentelemetry-collector-components/internal/elasticattr" @@ -132,3 +134,122 @@ func TestEnrichMetric(t *testing.T) { }) } } + +func TestEnrichMetrics_TranslateUnsupportedAttributes(t *testing.T) { + cfg := config.Enabled() + cfg.Metric.TranslateUnsupportedAttributes.Enabled = true + + metrics := pmetric.NewMetrics() + resourceMetrics := metrics.ResourceMetrics().AppendEmpty() + scopeMetrics := resourceMetrics.ScopeMetrics().AppendEmpty() + metric := scopeMetrics.Metrics().AppendEmpty() + metric.SetName("test.metric") + dp := metric.SetEmptyGauge().DataPoints().AppendEmpty() + dp.SetDoubleValue(1.0) + attrs := dp.Attributes() + attrs.PutStr("data_stream.dataset", "apm.internal") + attrs.PutStr("data_stream.namespace", "default") + attrs.PutStr("data_stream.type", "metrics") + attrs.PutStr("host", "server-01") + attrs.PutStr("state", "used") + attrs.PutStr("event.module", "system") + attrs.PutStr("system.process.cmdline", "/usr/bin/java") + + enricher := NewEnricher(cfg) + enricher.EnrichMetrics(metrics) + + actualAttrs := metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Gauge().DataPoints().At(0).Attributes() + + value, ok := actualAttrs.Get("data_stream.dataset") + require.True(t, ok) + assert.Equal(t, "apm.internal", value.Str()) + value, ok = actualAttrs.Get("data_stream.namespace") + require.True(t, ok) + assert.Equal(t, "default", value.Str()) + value, ok = actualAttrs.Get("data_stream.type") + require.True(t, ok) + assert.Equal(t, "metrics", value.Str()) + value, ok = actualAttrs.Get("labels.host") + require.True(t, ok) + assert.Equal(t, "server-01", value.Str()) + value, ok = actualAttrs.Get("labels.state") + require.True(t, ok) + assert.Equal(t, "used", value.Str()) + value, ok = actualAttrs.Get("event.module") + require.True(t, ok) + assert.Equal(t, "system", value.Str()) + value, ok = actualAttrs.Get("system.process.cmdline") + require.True(t, ok) + assert.Equal(t, "/usr/bin/java", value.Str()) + _, ok = actualAttrs.Get("host") + assert.False(t, ok) + _, ok = actualAttrs.Get("state") + assert.False(t, ok) +} + +func TestEnrichMetricDataPoints_SkipsAggregatedMetricAttributes(t *testing.T) { + cfg := config.Enabled() + cfg.Metric.TranslateUnsupportedAttributes.Enabled = true + + metric := pmetric.NewMetric() + metric.SetName("service_summary") + dp := metric.SetEmptyGauge().DataPoints().AppendEmpty() + dp.SetDoubleValue(1.0) + attrs := dp.Attributes() + attrs.PutStr("metricset.name", "service_summary") + attrs.PutStr("host", "server-01") + attrs.PutStr("state", "used") + + EnrichMetricDataPoints(metric, cfg) + + actualAttrs := metric.Gauge().DataPoints().At(0).Attributes() + value, ok := actualAttrs.Get("host") + require.True(t, ok) + assert.Equal(t, "server-01", value.Str()) + value, ok = actualAttrs.Get("state") + require.True(t, ok) + assert.Equal(t, "used", value.Str()) + _, ok = actualAttrs.Get("labels.host") + assert.False(t, ok) + _, ok = actualAttrs.Get("labels.state") + assert.False(t, ok) +} + +func TestEnrichMetricDataPoints_SkipsMetricsWithMappingHints(t *testing.T) { + cfg := config.Enabled() + cfg.Metric.TranslateUnsupportedAttributes.Enabled = true + + metric := pmetric.NewMetric() + metric.SetName("transaction.duration.histogram") + dp := metric.SetEmptyGauge().DataPoints().AppendEmpty() + dp.SetDoubleValue(1.0) + attrs := dp.Attributes() + hints := attrs.PutEmptySlice("elasticsearch.mapping.hints") + hints.AppendEmpty().SetStr("_doc_count") + attrs.PutStr("host", "server-01") + + EnrichMetricDataPoints(metric, cfg) + + actualAttrs := metric.Gauge().DataPoints().At(0).Attributes() + value, ok := actualAttrs.Get("host") + require.True(t, ok) + assert.Equal(t, "server-01", value.Str()) + _, ok = actualAttrs.Get("labels.host") + assert.False(t, ok) +} + +func TestEnrichMetricDataPointAttributes_NoOpWhenDisabled(t *testing.T) { + cfg := config.Enabled() + cfg.Metric.TranslateUnsupportedAttributes.Enabled = false + + attrs := pcommon.NewMap() + attrs.PutStr("host", "server-01") + + enrichMetricDataPointAttributes(attrs, cfg) + + value, ok := attrs.Get("host") + require.True(t, ok) + assert.Equal(t, "server-01", value.Str()) + _, ok = attrs.Get("labels.host") + assert.False(t, ok) +} diff --git a/processor/elasticapmprocessor/internal/enrichments/resource.go b/processor/elasticapmprocessor/internal/enrichments/resource.go index 504f10f9d..49bbd4f1a 100644 --- a/processor/elasticapmprocessor/internal/enrichments/resource.go +++ b/processor/elasticapmprocessor/internal/enrichments/resource.go @@ -174,7 +174,7 @@ func (s *resourceEnrichmentContext) setServiceLanguage(resource pcommon.Resource // setDefaultServiceLanguage ensures telemetry.sdk.language is populated after // agent.name/version have already been derived. This preserves the // apm-data-compatible service.language.name alias without changing agent naming. -// The log processor enables this only for OTLP ECS log batches. +// The ECS log and metric processors enable this only for non-intake OTLP batches. func (s *resourceEnrichmentContext) setDefaultServiceLanguage(resource pcommon.Resource) { if s.telemetrySDKLanguage != "" { return diff --git a/processor/elasticapmprocessor/processor.go b/processor/elasticapmprocessor/processor.go index 015bf15ab..c91ad95a2 100644 --- a/processor/elasticapmprocessor/processor.go +++ b/processor/elasticapmprocessor/processor.go @@ -214,10 +214,12 @@ func newMetricProcessor(cfg *Config, next consumer.Metrics, logger *zap.Logger) ecsEnricherConfig.Resource.ServiceName.Enabled = true ecsEnricherConfig.Resource.DefaultDeploymentEnvironment.Enabled = true ecsEnricherConfig.Resource.DefaultServiceLanguage.Enabled = true + ecsEnricherConfig.Metric.TranslateUnsupportedAttributes.Enabled = true intakeECSEnricherConfig := ecsEnricherConfig intakeECSEnricherConfig.Resource.HostOSType.Enabled = false intakeECSEnricherConfig.Resource.DefaultServiceLanguage.Enabled = false + intakeECSEnricherConfig.Metric.TranslateUnsupportedAttributes.Enabled = false return &MetricProcessor{ next: next, @@ -247,7 +249,6 @@ func (p *MetricProcessor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics for i := 0; i < resourceMetrics.Len(); i++ { resourceMetric := resourceMetrics.At(i) resource := resourceMetric.Resource() - resourceIsIntake := isIntakeECS(resource) ecs.TranslateResourceMetadata(resource) ecs.ApplyResourceConventions(resource) routing.EncodeDataStream(resource, routing.DataStreamTypeMetrics, p.cfg.ServiceNameInDataStreamDataset) @@ -263,9 +264,6 @@ func (p *MetricProcessor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics // Route internal metrics to appropriate data streams if needed. routeMetricsToDataStream(resourceMetric.ScopeMetrics(), hasServiceName) - if !resourceIsIntake && hasServiceName { - translateRawOTLPMetricDataPoints(resourceMetric.ScopeMetrics()) - } } } // When skipEnrichment is true, only enrich when mapping mode is ecs @@ -336,55 +334,6 @@ func routeMetricsToDataStream(scopeMetrics pmetric.ScopeMetricsSlice, hasService } } -func translateRawOTLPMetricDataPoints(scopeMetrics pmetric.ScopeMetricsSlice) { - for j := 0; j < scopeMetrics.Len(); j++ { - metrics := scopeMetrics.At(j).Metrics() - for k := 0; k < metrics.Len(); k++ { - metric := metrics.At(k) - switch metric.Type() { - case pmetric.MetricTypeGauge: - dataPoints := metric.Gauge().DataPoints() - for l := 0; l < dataPoints.Len(); l++ { - translateRawOTLPMetricDataPointAttributes(dataPoints.At(l).Attributes()) - } - case pmetric.MetricTypeSum: - dataPoints := metric.Sum().DataPoints() - for l := 0; l < dataPoints.Len(); l++ { - translateRawOTLPMetricDataPointAttributes(dataPoints.At(l).Attributes()) - } - case pmetric.MetricTypeHistogram: - dataPoints := metric.Histogram().DataPoints() - for l := 0; l < dataPoints.Len(); l++ { - translateRawOTLPMetricDataPointAttributes(dataPoints.At(l).Attributes()) - } - case pmetric.MetricTypeExponentialHistogram: - dataPoints := metric.ExponentialHistogram().DataPoints() - for l := 0; l < dataPoints.Len(); l++ { - translateRawOTLPMetricDataPointAttributes(dataPoints.At(l).Attributes()) - } - case pmetric.MetricTypeSummary: - dataPoints := metric.Summary().DataPoints() - for l := 0; l < dataPoints.Len(); l++ { - translateRawOTLPMetricDataPointAttributes(dataPoints.At(l).Attributes()) - } - } - } - } -} - -func translateRawOTLPMetricDataPointAttributes(attributes pcommon.Map) { - if _, ok := attributes.Get("metricset.name"); ok { - return - } - if _, ok := attributes.Get("metricset.interval"); ok { - return - } - if _, ok := attributes.Get("elasticsearch.mapping.hints"); ok { - return - } - ecs.TranslateMetricDataPointAttributes(attributes) -} - func (p *LogProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error { enricher := p.enricher ecsMode := isECS(ctx) diff --git a/processor/elasticapmprocessor/processor_test.go b/processor/elasticapmprocessor/processor_test.go index c8e043224..77041f4c6 100644 --- a/processor/elasticapmprocessor/processor_test.go +++ b/processor/elasticapmprocessor/processor_test.go @@ -646,84 +646,66 @@ func TestConsumeMetrics_ECSIntakeSkipsOTLPFallbacks(t *testing.T) { assert.False(t, ok) } -func TestTranslateRawOTLPMetricDataPointAttributesSkipsAggregatedMetrics(t *testing.T) { - tests := []struct { - name string - markerKey string - markerVal func(pcommon.Map) - }{ - { - name: "metricset name present", - markerKey: "metricset.name", - markerVal: func(attrs pcommon.Map) { attrs.PutStr("metricset.name", "service_summary") }, - }, - { - name: "metricset interval present", - markerKey: "metricset.interval", - markerVal: func(attrs pcommon.Map) { attrs.PutStr("metricset.interval", "1m") }, - }, - { - name: "mapping hints present", - markerKey: "elasticsearch.mapping.hints", - markerVal: func(attrs pcommon.Map) { - hints := attrs.PutEmptySlice("elasticsearch.mapping.hints") - hints.AppendEmpty().SetStr("_doc_count") - }, - }, - } +func TestConsumeMetrics_ECSAssumesHomogeneousBatchOrigin(t *testing.T) { + ctx := client.NewContext(context.Background(), client.Info{ + Metadata: client.NewMetadata(map[string][]string{"x-elastic-mapping-mode": {"ecs"}}), + }) - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - attrs := pcommon.NewMap() - tc.markerVal(attrs) - attrs.PutStr("host", "server-01") - attrs.PutStr("state", "used") - - translateRawOTLPMetricDataPointAttributes(attrs) - - value, ok := attrs.Get("host") - require.True(t, ok) - assert.Equal(t, "server-01", value.Str()) - value, ok = attrs.Get("state") - require.True(t, ok) - assert.Equal(t, "used", value.Str()) - _, ok = attrs.Get("labels.host") - assert.False(t, ok) - _, ok = attrs.Get("labels.state") - assert.False(t, ok) - }) - } -} + factory := NewFactory() + settings := processortest.NewNopSettings(metadata.Type) + next := &consumertest.MetricsSink{} + cfg := NewDefaultConfig().(*Config) -func TestTranslateRawOTLPMetricDataPointAttributesPreservesRoutingAttrs(t *testing.T) { - attrs := pcommon.NewMap() - attrs.PutStr("data_stream.dataset", "apm.internal") - attrs.PutStr("data_stream.namespace", "default") - attrs.PutStr("data_stream.type", "metrics") - attrs.PutStr("host", "server-01") - attrs.PutStr("state", "used") + mp, err := factory.CreateMetrics(ctx, settings, cfg, next) + require.NoError(t, err) - translateRawOTLPMetricDataPointAttributes(attrs) + metrics := pmetric.NewMetrics() - value, ok := attrs.Get("data_stream.dataset") - require.True(t, ok) - assert.Equal(t, "apm.internal", value.Str()) - value, ok = attrs.Get("data_stream.namespace") - require.True(t, ok) - assert.Equal(t, "default", value.Str()) - value, ok = attrs.Get("data_stream.type") - require.True(t, ok) - assert.Equal(t, "metrics", value.Str()) + intakeResourceMetric := metrics.ResourceMetrics().AppendEmpty() + intakeResource := intakeResourceMetric.Resource() + intakeResource.Attributes().PutStr("service.name", "intake-service") + intakeResource.Attributes().PutStr(string(semconv.TelemetrySDKNameKey), "ElasticAPM") + intakeMetric := intakeResourceMetric.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + intakeMetric.SetName("intake.metric") + intakeMetric.SetEmptyGauge().DataPoints().AppendEmpty().SetDoubleValue(1.0) + + otlpResourceMetric := metrics.ResourceMetrics().AppendEmpty() + otlpResource := otlpResourceMetric.Resource() + otlpResource.Attributes().PutStr("service.name", "otlp-service") + otlpResource.Attributes().PutStr(string(semconv.TelemetrySDKNameKey), "opentelemetry") + otlpMetric := otlpResourceMetric.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + otlpMetric.SetName("http.requests.total") + otlpDP := otlpMetric.SetEmptySum().DataPoints().AppendEmpty() + otlpDP.SetIntValue(1) + otlpAttrs := otlpDP.Attributes() + otlpAttrs.PutStr("http.request.method", "GET") + otlpAttrs.PutStr("host", "server-01") + + require.NoError(t, mp.ConsumeMetrics(ctx, metrics)) + actual := next.AllMetrics()[0] + + require.Equal(t, 2, actual.ResourceMetrics().Len()) + + // Mixed-origin batches are not supported. The first resource metric determines + // which ECS metric enricher is used for the whole batch. + intakeAttrs := actual.ResourceMetrics().At(0).Resource().Attributes() + _, ok := intakeAttrs.Get(string(semconv.TelemetrySDKLanguageKey)) + assert.False(t, ok) - value, ok = attrs.Get("labels.host") + otlpAttrsAfter := actual.ResourceMetrics().At(1).Resource().Attributes() + _, ok = otlpAttrsAfter.Get(string(semconv.TelemetrySDKLanguageKey)) + assert.False(t, ok) + + actualDPAttrs := actual.ResourceMetrics().At(1).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes() + value, ok := actualDPAttrs.Get("http.request.method") require.True(t, ok) - assert.Equal(t, "server-01", value.Str()) - value, ok = attrs.Get("labels.state") + assert.Equal(t, "GET", value.Str()) + value, ok = actualDPAttrs.Get("host") require.True(t, ok) - assert.Equal(t, "used", value.Str()) - _, ok = attrs.Get("host") + assert.Equal(t, "server-01", value.Str()) + _, ok = actualDPAttrs.Get("labels.http_request_method") assert.False(t, ok) - _, ok = attrs.Get("state") + _, ok = actualDPAttrs.Get("labels.host") assert.False(t, ok) } From 069c70ea3ae0f01badec87d69cdcc72282397bee Mon Sep 17 00:00:00 2001 From: Lanre Ade Date: Mon, 30 Mar 2026 18:09:24 -0400 Subject: [PATCH 3/7] feat: refactor to handle truncation --- .../internal/ecs/ecs_translation.go | 161 +++++++++++------- .../internal/ecs/ecs_translation_test.go | 142 ++++++++++++++- .../internal/enrichments/metric.go | 2 +- .../internal/enrichments/metric_test.go | 53 ++++++ 4 files changed, 298 insertions(+), 60 deletions(-) diff --git a/processor/elasticapmprocessor/internal/ecs/ecs_translation.go b/processor/elasticapmprocessor/internal/ecs/ecs_translation.go index f7c90ce6b..4c83f5294 100644 --- a/processor/elasticapmprocessor/internal/ecs/ecs_translation.go +++ b/processor/elasticapmprocessor/internal/ecs/ecs_translation.go @@ -32,12 +32,20 @@ const ( ecsAttrOpenCensusExporterVersion = "opencensus.exporterversion" ) +type attrAction uint8 + +const ( + attrActionFallback attrAction = iota + attrActionPreserve + attrActionTruncateAndPreserve +) + // TranslateResourceMetadata normalizes resource attributes. // Sanitizes existing labels and numeric_labels keys. // Moves unsupported attributes to labels with a "labels." prefix (key sanitized), // and leaves supported ECS attributes unchanged. func TranslateResourceMetadata(resource pcommon.Resource) { - translateAttributes(resource.Attributes(), isSupportedResourceAttribute) + translateAttributes(resource.Attributes(), classifyResourceAttribute) } // TranslateLogRecordAttributes applies the apm-data OTLP fallback behaviour for @@ -45,7 +53,7 @@ func TranslateResourceMetadata(resource pcommon.Resource) { // unsupported attributes are moved to labels.* / numeric_labels.* with a // sanitized key. func TranslateLogRecordAttributes(attributes pcommon.Map) { - translateAttributes(attributes, isSupportedLogRecordAttribute) + translateAttributes(attributes, classifyLogRecordAttribute) } // TranslateMetricDataPointAttributes applies the apm-data OTLP metric fallback @@ -53,10 +61,10 @@ func TranslateLogRecordAttributes(attributes pcommon.Map) { // numeric_labels.* keys are sanitized in place, metric-specific special cases // are preserved, and everything else is moved to labels.* / numeric_labels.*. func TranslateMetricDataPointAttributes(attributes pcommon.Map) { - translateAttributes(attributes, isSupportedMetricDataPointAttribute) + translateAttributes(attributes, classifyMetricDataPointAttribute) } -func translateAttributes(attributes pcommon.Map, isSupported func(string) bool) { +func translateAttributes(attributes pcommon.Map, classify func(string, pcommon.Value) attrAction) { attributes.Range(func(k string, v pcommon.Value) bool { if sanitize.IsLabelAttribute(k) { sanitized := sanitize.HandleLabelAttributeKey(k) @@ -64,16 +72,36 @@ func translateAttributes(attributes pcommon.Map, isSupported func(string) bool) v.CopyTo(attributes.PutEmpty(sanitized)) attributes.Remove(k) } - } else if !isSupported(k) { + return true + } + + switch classify(k, v) { + case attrActionPreserve: + return true + case attrActionTruncateAndPreserve: + truncated := sanitize.Truncate(v.Str()) + if truncated != v.Str() { + attributes.PutStr(k, truncated) + } + return true + default: // Attributes not supported by ECS are moved to labels with a // labels./numeric_labels. prefix depending on their value type. setLabelAttributeValue(attributes, sanitize.HandleAttributeKey(k), v) attributes.Remove(k) + return true } - return true }) } +// shouldPreserveAndTruncateIfString determines if the value should be preserved, optionally truncating strings before preserving. +func shouldPreserveAndTruncateIfString(value pcommon.Value) attrAction { + if value.Type() == pcommon.ValueTypeStr { + return attrActionTruncateAndPreserve + } + return attrActionPreserve +} + // setLabelAttributeValue maps a value into labels.* / numeric_labels.*. // Elasticsearch label mappings only support flat scalar values and // homogeneous arrays thereof; Map, Bytes, and empty types cannot be @@ -135,13 +163,19 @@ func setLabelAttributeValue(attributes pcommon.Map, key string, value pcommon.Va } } -// isSupportedLogRecordAttribute is based on the OTLP log-record attribute switch +// classifyLogRecordAttribute is based on the OTLP log-record attribute switch // in apm-data/input/otlp/logs.go, which preserves exception.*, event.name, // event.domain, session.id, network.connection.type, and data_stream.* as // first-class fields and sends everything else through setLabel(replaceDots(k), ...). +// +// Unlike resource metadata and some metric special cases, apm-data does not +// truncate these preserved log-record fields. Truncation for log attributes only +// happens on the fallback label path via setLabel, which is mirrored here by +// setLabelAttributeValue. +// // This allowlist also keeps processor-added fields like processor.event, // error.id, and data_stream.type so they survive the collector-side translation pass. -func isSupportedLogRecordAttribute(attr string) bool { +func classifyLogRecordAttribute(attr string, _ pcommon.Value) attrAction { switch attr { case string(semconv26.ExceptionEscapedKey), string(semconv.ExceptionMessageKey), @@ -156,50 +190,58 @@ func isSupportedLogRecordAttribute(attr string) bool { "event.name", elasticattr.ProcessorEvent, elasticattr.SessionID: - return true + return attrActionPreserve } - return false + return attrActionFallback } -// isSupportedMetricDataPointAttribute mirrors the apm-data OTLP metric +// classifyMetricDataPointAttribute mirrors the apm-data OTLP metric // datapoint handling where a small set of fields are preserved as first-class // values and the rest fall back to labels.* / numeric_labels.*. // // The collector preserves data_stream.type in addition to the apm-data // data_stream dataset/namespace handling, since datapoint-level routing depends // on the full data_stream triple before export. -func isSupportedMetricDataPointAttribute(attr string) bool { +func classifyMetricDataPointAttribute(attr string, value pcommon.Value) attrAction { switch attr { case elasticattr.DataStreamDataset, elasticattr.DataStreamNamespace, elasticattr.DataStreamType, elasticattr.EventDataset, "event.module", - "system.process.cmdline", "system.process.cpu.start_time", + "system.process.state": + return attrActionPreserve + case "system.process.cmdline", "system.filesystem.mount_point", - "system.process.state", string(semconv.UserNameKey): - return true + return shouldPreserveAndTruncateIfString(value) } - return false + return attrActionFallback } -// isSupportedResourceAttribute returns true if the resource attribute is -// supported by ECS and can be mapped directly. -// Supported fields can include OTEL SemConv attributes or ECS specific attributes. +// classifyResourceAttribute returns the action required for a supported ECS +// resource attribute. Supported fields can include OTEL SemConv attributes or +// ECS specific attributes. +// // Fields are based on those found in the below areas: // 1. apm-data: https://github.com/elastic/apm-data/blob/main/input/otlp/metadata.go // 2. elasticapmintake receiver: https://github.com/elastic/opentelemetry-collector-components/tree/main/receiver/elasticapmintakereceiver/internal/mappers -func isSupportedResourceAttribute(attr string) bool { +// +// Where apm-data truncates a preserved resource string when populating the APM +// event, we mirror that here with attrActionTruncateAndPreserve so resource +// translation stays single-pass. +func classifyResourceAttribute(attr string, value pcommon.Value) attrAction { switch attr { // service.* - case string(semconv.ServiceNameKey), - string(semconv.ServiceVersionKey), - string(semconv.ServiceInstanceIDKey), - string(semconv.ServiceNamespaceKey), + case string(semconv.ServiceNameKey): + return attrActionPreserve + case string(semconv.ServiceVersionKey), + string(semconv.ServiceInstanceIDKey): + return shouldPreserveAndTruncateIfString(value) + case string(semconv.ServiceNamespaceKey), elasticattr.ServiceFrameworkName, elasticattr.ServiceFrameworkVersion, elasticattr.ServiceOriginID, @@ -207,27 +249,29 @@ func isSupportedResourceAttribute(attr string) bool { elasticattr.ServiceOriginVersion, elasticattr.ServiceTargetName, elasticattr.ServiceTargetType: - return true + return attrActionPreserve // deployment.* case string(semconv26.DeploymentEnvironmentKey), string(semconv.DeploymentEnvironmentNameKey): - return true + return shouldPreserveAndTruncateIfString(value) // telemetry.sdk.* case string(semconv.TelemetrySDKNameKey), string(semconv.TelemetrySDKVersionKey), - string(semconv.TelemetrySDKLanguageKey), - string(semconv.TelemetryDistroNameKey), + string(semconv.TelemetrySDKLanguageKey): + return shouldPreserveAndTruncateIfString(value) + case string(semconv.TelemetryDistroNameKey), string(semconv.TelemetryDistroVersionKey): - return true + return attrActionPreserve // cloud.* case string(semconv.CloudProviderKey), string(semconv.CloudAccountIDKey), string(semconv.CloudRegionKey), string(semconv.CloudAvailabilityZoneKey), - string(semconv.CloudPlatformKey), - elasticattr.CloudOriginAccountID, + string(semconv.CloudPlatformKey): + return shouldPreserveAndTruncateIfString(value) + case elasticattr.CloudOriginAccountID, elasticattr.CloudOriginProvider, elasticattr.CloudOriginRegion, elasticattr.CloudOriginServiceName, @@ -237,74 +281,77 @@ func isSupportedResourceAttribute(attr string) bool { elasticattr.CloudMachineType, elasticattr.CloudProjectID, elasticattr.CloudProjectName: - return true + return attrActionPreserve // container.* case string(semconv.ContainerNameKey), string(semconv.ContainerIDKey), string(semconv.ContainerImageNameKey), elasticattr.ContainerImageTag, - string(semconv.ContainerImageTagsKey), string(semconv.ContainerRuntimeKey): - return true + return shouldPreserveAndTruncateIfString(value) + case string(semconv.ContainerImageTagsKey): + return attrActionPreserve // k8s.* case string(semconv.K8SNamespaceNameKey), string(semconv.K8SNodeNameKey), string(semconv.K8SPodNameKey), string(semconv.K8SPodUIDKey): - return true + return shouldPreserveAndTruncateIfString(value) // host.* case string(semconv.HostNameKey), - elasticattr.HostHostName, // legacy hostname key for backwards compatibility string(semconv.HostIDKey), string(semconv.HostTypeKey), - string(semconv.HostArchKey), + string(semconv.HostArchKey): + return shouldPreserveAndTruncateIfString(value) + case elasticattr.HostHostName, string(semconv.HostIPKey), elasticattr.HostOSType: - return true + return attrActionPreserve // process.* - case string(semconv.ProcessPIDKey), - string(semconv.ProcessParentPIDKey), - string(semconv.ProcessExecutableNameKey), - string(semconv.ProcessCommandLineKey), + case string(semconv.ProcessCommandLineKey), string(semconv.ProcessExecutablePathKey), string(semconv.ProcessRuntimeNameKey), string(semconv.ProcessRuntimeVersionKey), string(semconv.ProcessOwnerKey): - return true + return shouldPreserveAndTruncateIfString(value) + case string(semconv.ProcessPIDKey), + string(semconv.ProcessParentPIDKey), + string(semconv.ProcessExecutableNameKey): + return attrActionPreserve // os.* case string(semconv.OSTypeKey), string(semconv.OSDescriptionKey), string(semconv.OSNameKey), string(semconv.OSVersionKey): - return true + return shouldPreserveAndTruncateIfString(value) // device.* case string(semconv.DeviceIDKey), string(semconv.DeviceModelIdentifierKey), string(semconv.DeviceModelNameKey), elasticattr.DeviceManufacturer: - return true + return shouldPreserveAndTruncateIfString(value) // data_stream.* case elasticattr.DataStreamDataset, elasticattr.DataStreamNamespace: - return true + return attrActionPreserve // user.* case string(semconv.UserIDKey), string(semconv.UserEmailKey), string(semconv.UserNameKey), elasticattr.UserDomain: - return true + return attrActionPreserve // user_agent.* case string(semconv.UserAgentOriginalKey): - return true + return attrActionPreserve // network.* case string(semconv.NetworkConnectionTypeKey), @@ -313,22 +360,22 @@ func isSupportedResourceAttribute(attr string) bool { string(semconv.NetworkCarrierMccKey), string(semconv.NetworkCarrierMncKey), string(semconv.NetworkCarrierIccKey): - return true + return attrActionPreserve // client.* case string(semconv.ClientAddressKey), string(semconv.ClientPortKey): - return true + return attrActionPreserve // source.* case string(semconv.SourceAddressKey), string(semconv.SourcePortKey), elasticattr.SourceNATIP: - return true + return attrActionPreserve // destination.* case elasticattr.DestinationIP: - return true + return attrActionPreserve // faas.* case string(semconv.FaaSInstanceKey), @@ -338,25 +385,25 @@ func isSupportedResourceAttribute(attr string) bool { string(semconv.FaaSColdstartKey), elasticattr.FaaSTriggerRequestID, elasticattr.FaaSExecution: - return true + return attrActionPreserve // Legacy OpenCensus attributes case ecsAttrOpenCensusExporterVersion: - return true + return attrActionPreserve // APM Agent enrichment case elasticattr.AgentName, elasticattr.AgentVersion, elasticattr.AgentEphemeralID, elasticattr.AgentActivationMethod: - return true + return attrActionPreserve // Metrics case elasticattr.MetricsetName: - return true + return attrActionPreserve } - return false + return attrActionFallback } func ApplyResourceConventions(resource pcommon.Resource) { diff --git a/processor/elasticapmprocessor/internal/ecs/ecs_translation_test.go b/processor/elasticapmprocessor/internal/ecs/ecs_translation_test.go index 632824008..94404bb6c 100644 --- a/processor/elasticapmprocessor/internal/ecs/ecs_translation_test.go +++ b/processor/elasticapmprocessor/internal/ecs/ecs_translation_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/elastic/opentelemetry-collector-components/internal/elasticattr" + "github.com/elastic/opentelemetry-collector-components/processor/elasticapmprocessor/internal/sanitize" "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/pdata/pcommon" semconv "go.opentelemetry.io/otel/semconv/v1.25.0" @@ -33,6 +34,7 @@ func TestTranslateResourceMetadata(t *testing.T) { inputKey string inputVal string wantKey string + wantVal string wantAbsent string // if non-empty, assert this attribute key is removed after translation (e.g. sanitized key) }{ { @@ -123,12 +125,96 @@ func TestTranslateResourceMetadata(t *testing.T) { inputVal: "dotnet", wantKey: string(semconv.TelemetrySDKLanguageKey), }, + { + name: "supported service version truncated", + inputKey: string(semconv.ServiceVersionKey), + inputVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1), + wantKey: string(semconv.ServiceVersionKey), + wantVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)), + }, + { + name: "supported service instance id truncated", + inputKey: string(semconv.ServiceInstanceIDKey), + inputVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1), + wantKey: string(semconv.ServiceInstanceIDKey), + wantVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)), + }, + { + name: "supported deployment environment truncated", + inputKey: string(semconv.DeploymentEnvironmentKey), + inputVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1), + wantKey: string(semconv.DeploymentEnvironmentKey), + wantVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)), + }, + { + name: "supported telemetry sdk name truncated", + inputKey: string(semconv.TelemetrySDKNameKey), + inputVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1), + wantKey: string(semconv.TelemetrySDKNameKey), + wantVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)), + }, { name: "supported telemetry sdk version", inputKey: string(semconv.TelemetrySDKVersionKey), inputVal: "8.0.0", wantKey: string(semconv.TelemetrySDKVersionKey), }, + { + name: "supported cloud provider truncated", + inputKey: string(semconv.CloudProviderKey), + inputVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1), + wantKey: string(semconv.CloudProviderKey), + wantVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)), + }, + { + name: "supported container name truncated", + inputKey: string(semconv.ContainerNameKey), + inputVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1), + wantKey: string(semconv.ContainerNameKey), + wantVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)), + }, + { + name: "supported kubernetes namespace truncated", + inputKey: string(semconv.K8SNamespaceNameKey), + inputVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1), + wantKey: string(semconv.K8SNamespaceNameKey), + wantVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)), + }, + { + name: "supported host name truncated", + inputKey: string(semconv.HostNameKey), + inputVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1), + wantKey: string(semconv.HostNameKey), + wantVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)), + }, + { + name: "supported process command line truncated", + inputKey: string(semconv.ProcessCommandLineKey), + inputVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1), + wantKey: string(semconv.ProcessCommandLineKey), + wantVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)), + }, + { + name: "supported process owner truncated", + inputKey: string(semconv.ProcessOwnerKey), + inputVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1), + wantKey: string(semconv.ProcessOwnerKey), + wantVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)), + }, + { + name: "supported os description truncated", + inputKey: string(semconv.OSDescriptionKey), + inputVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1), + wantKey: string(semconv.OSDescriptionKey), + wantVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)), + }, + { + name: "supported device model name truncated", + inputKey: string(semconv.DeviceModelNameKey), + inputVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1), + wantKey: string(semconv.DeviceModelNameKey), + wantVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)), + }, { name: "supported process runtime name", inputKey: string(semconv.ProcessRuntimeNameKey), @@ -182,8 +268,12 @@ func TestTranslateResourceMetadata(t *testing.T) { if !ok { t.Fatalf("expected attribute %q to be present. all attrs %v", tc.wantKey, attrs.AsRaw()) } - if v.AsString() != tc.inputVal { - t.Errorf("attribute %q value = %q, want %q", tc.wantKey, v.AsString(), tc.inputVal) + wantVal := tc.inputVal + if tc.wantVal != "" { + wantVal = tc.wantVal + } + if v.AsString() != wantVal { + t.Errorf("attribute %q value = %q, want %q", tc.wantKey, v.AsString(), wantVal) } if tc.wantAbsent != "" { if _, ok := attrs.Get(tc.wantAbsent); ok { @@ -230,6 +320,34 @@ func TestTranslateLogRecordAttributes(t *testing.T) { elasticattr.DataStreamNamespace: "default", }, }, + { + name: "supported semantic fields are not truncated", + setAttrs: func(attrs pcommon.Map) { + longValue := strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1) + attrs.PutStr(string(semconv.ExceptionMessageKey), longValue) + attrs.PutStr(string(semconv.ExceptionStacktraceKey), longValue) + attrs.PutStr(string(semconv.ExceptionTypeKey), longValue) + attrs.PutStr("event.name", longValue) + attrs.PutStr("event.domain", longValue) + attrs.PutStr("session.id", longValue) + attrs.PutStr(string(semconv.NetworkConnectionTypeKey), longValue) + attrs.PutStr(elasticattr.DataStreamDataset, longValue) + attrs.PutStr(elasticattr.DataStreamNamespace, longValue) + attrs.PutStr(elasticattr.DataStreamType, longValue) + }, + want: map[string]any{ + string(semconv.ExceptionMessageKey): strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1), + string(semconv.ExceptionStacktraceKey): strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1), + string(semconv.ExceptionTypeKey): strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1), + "event.name": strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1), + "event.domain": strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1), + "session.id": strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1), + string(semconv.NetworkConnectionTypeKey): strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1), + elasticattr.DataStreamDataset: strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1), + elasticattr.DataStreamNamespace: strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1), + elasticattr.DataStreamType: strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1), + }, + }, { name: "unsupported attributes moved to labels", setAttrs: func(attrs pcommon.Map) { @@ -351,6 +469,7 @@ func TestTranslateMetricDataPointAttributes(t *testing.T) { attrs.PutStr("host", "server-01") attrs.PutStr("state", "used") attrs.PutStr("system.process.cmdline", "/usr/bin/java") + attrs.PutStr("system.filesystem.mount_point", "/mnt/data") attrs.PutStr("event.module", "system") attrs.PutStr("user.name", "appuser") }, @@ -361,11 +480,30 @@ func TestTranslateMetricDataPointAttributes(t *testing.T) { "labels.host": "server-01", "labels.state": "used", "system.process.cmdline": "/usr/bin/java", + "system.filesystem.mount_point": "/mnt/data", "event.module": "system", "user.name": "appuser", }, wantAbsent: []string{"host", "state"}, }, + { + name: "metric special cases requiring truncation are truncated in place", + setAttrs: func(attrs pcommon.Map) { + longValue := strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1) + attrs.PutStr("system.process.cmdline", longValue) + attrs.PutStr("system.filesystem.mount_point", longValue) + attrs.PutStr("user.name", longValue) + attrs.PutStr("event.module", longValue) + attrs.PutStr("system.process.state", longValue) + }, + want: map[string]any{ + "system.process.cmdline": strings.Repeat("a", int(sanitize.StandardKeyWordLength)), + "system.filesystem.mount_point": strings.Repeat("a", int(sanitize.StandardKeyWordLength)), + "user.name": strings.Repeat("a", int(sanitize.StandardKeyWordLength)), + "event.module": strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1), + "system.process.state": strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1), + }, + }, } for _, tc := range tests { diff --git a/processor/elasticapmprocessor/internal/enrichments/metric.go b/processor/elasticapmprocessor/internal/enrichments/metric.go index b816e4746..9d447b29c 100644 --- a/processor/elasticapmprocessor/internal/enrichments/metric.go +++ b/processor/elasticapmprocessor/internal/enrichments/metric.go @@ -94,7 +94,7 @@ func enrichMetricDataPointAttributes(attributes pcommon.Map, cfg config.Config) // `elasticsearch.mapping.hints` is collector/exporter-specific metadata and has // no apm-data equivalent, but it also needs to passthrough untouched. func isAggregatedMetricDataPointAttributes(attributes pcommon.Map) bool { - if _, ok := attributes.Get("metricset.name"); ok { + if _, ok := attributes.Get(elasticattr.MetricsetName); ok { return true } if _, ok := attributes.Get("metricset.interval"); ok { diff --git a/processor/elasticapmprocessor/internal/enrichments/metric_test.go b/processor/elasticapmprocessor/internal/enrichments/metric_test.go index 9285b8109..e27a4291f 100644 --- a/processor/elasticapmprocessor/internal/enrichments/metric_test.go +++ b/processor/elasticapmprocessor/internal/enrichments/metric_test.go @@ -18,6 +18,7 @@ package enrichments import ( + "strings" "testing" "github.com/stretchr/testify/assert" @@ -27,6 +28,7 @@ import ( "github.com/elastic/opentelemetry-collector-components/internal/elasticattr" "github.com/elastic/opentelemetry-collector-components/processor/elasticapmprocessor/internal/enrichments/config" + "github.com/elastic/opentelemetry-collector-components/processor/elasticapmprocessor/internal/sanitize" ) func TestEnrichMetric(t *testing.T) { @@ -154,6 +156,8 @@ func TestEnrichMetrics_TranslateUnsupportedAttributes(t *testing.T) { attrs.PutStr("state", "used") attrs.PutStr("event.module", "system") attrs.PutStr("system.process.cmdline", "/usr/bin/java") + attrs.PutStr("system.filesystem.mount_point", "/mnt/data") + attrs.PutStr("user.name", "appuser") enricher := NewEnricher(cfg) enricher.EnrichMetrics(metrics) @@ -181,12 +185,61 @@ func TestEnrichMetrics_TranslateUnsupportedAttributes(t *testing.T) { value, ok = actualAttrs.Get("system.process.cmdline") require.True(t, ok) assert.Equal(t, "/usr/bin/java", value.Str()) + value, ok = actualAttrs.Get("system.filesystem.mount_point") + require.True(t, ok) + assert.Equal(t, "/mnt/data", value.Str()) + value, ok = actualAttrs.Get("user.name") + require.True(t, ok) + assert.Equal(t, "appuser", value.Str()) _, ok = actualAttrs.Get("host") assert.False(t, ok) _, ok = actualAttrs.Get("state") assert.False(t, ok) } +func TestEnrichMetrics_TruncatesPreservedMetricSpecialCaseAttributes(t *testing.T) { + cfg := config.Enabled() + cfg.Metric.TranslateUnsupportedAttributes.Enabled = true + + metrics := pmetric.NewMetrics() + resourceMetrics := metrics.ResourceMetrics().AppendEmpty() + scopeMetrics := resourceMetrics.ScopeMetrics().AppendEmpty() + metric := scopeMetrics.Metrics().AppendEmpty() + metric.SetName("test.metric") + dp := metric.SetEmptyGauge().DataPoints().AppendEmpty() + dp.SetDoubleValue(1.0) + attrs := dp.Attributes() + longValue := strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1) + attrs.PutStr("system.process.cmdline", longValue) + attrs.PutStr("system.filesystem.mount_point", longValue) + attrs.PutStr("user.name", longValue) + attrs.PutStr("event.module", longValue) + attrs.PutStr("system.process.state", longValue) + + NewEnricher(cfg).EnrichMetrics(metrics) + + actualAttrs := metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Gauge().DataPoints().At(0).Attributes() + expected := strings.Repeat("a", int(sanitize.StandardKeyWordLength)) + + value, ok := actualAttrs.Get("system.process.cmdline") + require.True(t, ok) + assert.Equal(t, expected, value.Str()) + value, ok = actualAttrs.Get("system.filesystem.mount_point") + require.True(t, ok) + assert.Equal(t, expected, value.Str()) + value, ok = actualAttrs.Get("user.name") + require.True(t, ok) + assert.Equal(t, expected, value.Str()) + + // These preserved attrs are intentionally not truncated in apm-data. + value, ok = actualAttrs.Get("event.module") + require.True(t, ok) + assert.Equal(t, longValue, value.Str()) + value, ok = actualAttrs.Get("system.process.state") + require.True(t, ok) + assert.Equal(t, longValue, value.Str()) +} + func TestEnrichMetricDataPoints_SkipsAggregatedMetricAttributes(t *testing.T) { cfg := config.Enabled() cfg.Metric.TranslateUnsupportedAttributes.Enabled = true From b73f726702d63901b0a520719f7d825e48dbabb1 Mon Sep 17 00:00:00 2001 From: Lanre Ade Date: Mon, 30 Mar 2026 18:19:21 -0400 Subject: [PATCH 4/7] feat: simplify translation functions --- .../internal/ecs/ecs_translation.go | 455 +++++++----------- 1 file changed, 176 insertions(+), 279 deletions(-) diff --git a/processor/elasticapmprocessor/internal/ecs/ecs_translation.go b/processor/elasticapmprocessor/internal/ecs/ecs_translation.go index 4c83f5294..9b85eadd6 100644 --- a/processor/elasticapmprocessor/internal/ecs/ecs_translation.go +++ b/processor/elasticapmprocessor/internal/ecs/ecs_translation.go @@ -32,20 +32,124 @@ const ( ecsAttrOpenCensusExporterVersion = "opencensus.exporterversion" ) -type attrAction uint8 - -const ( - attrActionFallback attrAction = iota - attrActionPreserve - attrActionTruncateAndPreserve -) - // TranslateResourceMetadata normalizes resource attributes. // Sanitizes existing labels and numeric_labels keys. // Moves unsupported attributes to labels with a "labels." prefix (key sanitized), // and leaves supported ECS attributes unchanged. func TranslateResourceMetadata(resource pcommon.Resource) { - translateAttributes(resource.Attributes(), classifyResourceAttribute) + attributes := resource.Attributes() + attributes.Range(func(k string, v pcommon.Value) bool { + if sanitizeExistingLabelAttribute(attributes, k, v) { + return true + } + + switch k { + case string(semconv.ServiceNameKey), + string(semconv.ServiceNamespaceKey), + elasticattr.ServiceFrameworkName, + elasticattr.ServiceFrameworkVersion, + elasticattr.ServiceOriginID, + elasticattr.ServiceOriginName, + elasticattr.ServiceOriginVersion, + elasticattr.ServiceTargetName, + elasticattr.ServiceTargetType, + string(semconv.TelemetryDistroNameKey), + string(semconv.TelemetryDistroVersionKey), + elasticattr.CloudOriginAccountID, + elasticattr.CloudOriginProvider, + elasticattr.CloudOriginRegion, + elasticattr.CloudOriginServiceName, + elasticattr.CloudAccountName, + elasticattr.CloudInstanceID, + elasticattr.CloudInstanceName, + elasticattr.CloudMachineType, + elasticattr.CloudProjectID, + elasticattr.CloudProjectName, + string(semconv.ContainerImageTagsKey), + elasticattr.HostHostName, + string(semconv.HostIPKey), + elasticattr.HostOSType, + elasticattr.DataStreamDataset, + elasticattr.DataStreamNamespace, + string(semconv.UserIDKey), + string(semconv.UserEmailKey), + string(semconv.UserNameKey), + elasticattr.UserDomain, + string(semconv.UserAgentOriginalKey), + string(semconv.NetworkConnectionTypeKey), + string(semconv.NetworkConnectionSubtypeKey), + string(semconv.NetworkCarrierNameKey), + string(semconv.NetworkCarrierMccKey), + string(semconv.NetworkCarrierMncKey), + string(semconv.NetworkCarrierIccKey), + string(semconv.ClientAddressKey), + string(semconv.ClientPortKey), + string(semconv.SourceAddressKey), + string(semconv.SourcePortKey), + elasticattr.SourceNATIP, + elasticattr.DestinationIP, + string(semconv.FaaSInstanceKey), + string(semconv.FaaSNameKey), + string(semconv.FaaSVersionKey), + string(semconv.FaaSTriggerKey), + string(semconv.FaaSColdstartKey), + elasticattr.FaaSTriggerRequestID, + elasticattr.FaaSExecution, + ecsAttrOpenCensusExporterVersion, + elasticattr.AgentName, + elasticattr.AgentVersion, + elasticattr.AgentEphemeralID, + elasticattr.AgentActivationMethod, + elasticattr.MetricsetName, + string(semconv.ProcessPIDKey), + string(semconv.ProcessParentPIDKey), + string(semconv.ProcessExecutableNameKey): + return true + case string(semconv.ServiceVersionKey), + string(semconv.ServiceInstanceIDKey), + string(semconv26.DeploymentEnvironmentKey), + string(semconv.DeploymentEnvironmentNameKey), + string(semconv.TelemetrySDKNameKey), + string(semconv.TelemetrySDKVersionKey), + string(semconv.TelemetrySDKLanguageKey), + string(semconv.CloudProviderKey), + string(semconv.CloudAccountIDKey), + string(semconv.CloudRegionKey), + string(semconv.CloudAvailabilityZoneKey), + string(semconv.CloudPlatformKey), + string(semconv.ContainerNameKey), + string(semconv.ContainerIDKey), + string(semconv.ContainerImageNameKey), + elasticattr.ContainerImageTag, + string(semconv.ContainerRuntimeKey), + string(semconv.K8SNamespaceNameKey), + string(semconv.K8SNodeNameKey), + string(semconv.K8SPodNameKey), + string(semconv.K8SPodUIDKey), + string(semconv.HostNameKey), + string(semconv.HostIDKey), + string(semconv.HostTypeKey), + string(semconv.HostArchKey), + string(semconv.ProcessCommandLineKey), + string(semconv.ProcessExecutablePathKey), + string(semconv.ProcessRuntimeNameKey), + string(semconv.ProcessRuntimeVersionKey), + string(semconv.ProcessOwnerKey), + string(semconv.OSTypeKey), + string(semconv.OSDescriptionKey), + string(semconv.OSNameKey), + string(semconv.OSVersionKey), + string(semconv.DeviceIDKey), + string(semconv.DeviceModelIdentifierKey), + string(semconv.DeviceModelNameKey), + elasticattr.DeviceManufacturer: + truncatePreservedStringAttribute(attributes, k, v) + return true + default: + fallbackToLabelAttribute(attributes, k, v) + return true + } + }) } // TranslateLogRecordAttributes applies the apm-data OTLP fallback behaviour for @@ -53,7 +157,31 @@ func TranslateResourceMetadata(resource pcommon.Resource) { // unsupported attributes are moved to labels.* / numeric_labels.* with a // sanitized key. func TranslateLogRecordAttributes(attributes pcommon.Map) { - translateAttributes(attributes, classifyLogRecordAttribute) + attributes.Range(func(k string, v pcommon.Value) bool { + if sanitizeExistingLabelAttribute(attributes, k, v) { + return true + } + + switch k { + case string(semconv26.ExceptionEscapedKey), + string(semconv.ExceptionMessageKey), + string(semconv.ExceptionStacktraceKey), + string(semconv.ExceptionTypeKey), + string(semconv.NetworkConnectionTypeKey), + elasticattr.DataStreamDataset, + elasticattr.DataStreamNamespace, + elasticattr.DataStreamType, + elasticattr.ErrorID, + "event.domain", + "event.name", + elasticattr.ProcessorEvent, + elasticattr.SessionID: + return true + default: + fallbackToLabelAttribute(attributes, k, v) + return true + } + }) } // TranslateMetricDataPointAttributes applies the apm-data OTLP metric fallback @@ -61,45 +189,57 @@ func TranslateLogRecordAttributes(attributes pcommon.Map) { // numeric_labels.* keys are sanitized in place, metric-specific special cases // are preserved, and everything else is moved to labels.* / numeric_labels.*. func TranslateMetricDataPointAttributes(attributes pcommon.Map) { - translateAttributes(attributes, classifyMetricDataPointAttribute) -} - -func translateAttributes(attributes pcommon.Map, classify func(string, pcommon.Value) attrAction) { attributes.Range(func(k string, v pcommon.Value) bool { - if sanitize.IsLabelAttribute(k) { - sanitized := sanitize.HandleLabelAttributeKey(k) - if sanitized != k { - v.CopyTo(attributes.PutEmpty(sanitized)) - attributes.Remove(k) - } + if sanitizeExistingLabelAttribute(attributes, k, v) { return true } - switch classify(k, v) { - case attrActionPreserve: + switch k { + case elasticattr.DataStreamDataset, + elasticattr.DataStreamNamespace, + elasticattr.DataStreamType, + elasticattr.EventDataset, + "event.module", + "system.process.cpu.start_time", + "system.process.state": return true - case attrActionTruncateAndPreserve: - truncated := sanitize.Truncate(v.Str()) - if truncated != v.Str() { - attributes.PutStr(k, truncated) - } + case "system.process.cmdline", + "system.filesystem.mount_point", + string(semconv.UserNameKey): + truncatePreservedStringAttribute(attributes, k, v) return true default: - // Attributes not supported by ECS are moved to labels with a - // labels./numeric_labels. prefix depending on their value type. - setLabelAttributeValue(attributes, sanitize.HandleAttributeKey(k), v) - attributes.Remove(k) + fallbackToLabelAttribute(attributes, k, v) return true } }) } -// shouldPreserveAndTruncateIfString determines if the value should be preserved, optionally truncating strings before preserving. -func shouldPreserveAndTruncateIfString(value pcommon.Value) attrAction { - if value.Type() == pcommon.ValueTypeStr { - return attrActionTruncateAndPreserve +func sanitizeExistingLabelAttribute(attributes pcommon.Map, key string, value pcommon.Value) bool { + if !sanitize.IsLabelAttribute(key) { + return false } - return attrActionPreserve + sanitized := sanitize.HandleLabelAttributeKey(key) + if sanitized != key { + value.CopyTo(attributes.PutEmpty(sanitized)) + attributes.Remove(key) + } + return true +} + +func truncatePreservedStringAttribute(attributes pcommon.Map, key string, value pcommon.Value) { + if value.Type() != pcommon.ValueTypeStr { + return + } + truncated := sanitize.Truncate(value.Str()) + if truncated != value.Str() { + attributes.PutStr(key, truncated) + } +} + +func fallbackToLabelAttribute(attributes pcommon.Map, key string, value pcommon.Value) { + setLabelAttributeValue(attributes, sanitize.HandleAttributeKey(key), value) + attributes.Remove(key) } // setLabelAttributeValue maps a value into labels.* / numeric_labels.*. @@ -163,249 +303,6 @@ func setLabelAttributeValue(attributes pcommon.Map, key string, value pcommon.Va } } -// classifyLogRecordAttribute is based on the OTLP log-record attribute switch -// in apm-data/input/otlp/logs.go, which preserves exception.*, event.name, -// event.domain, session.id, network.connection.type, and data_stream.* as -// first-class fields and sends everything else through setLabel(replaceDots(k), ...). -// -// Unlike resource metadata and some metric special cases, apm-data does not -// truncate these preserved log-record fields. Truncation for log attributes only -// happens on the fallback label path via setLabel, which is mirrored here by -// setLabelAttributeValue. -// -// This allowlist also keeps processor-added fields like processor.event, -// error.id, and data_stream.type so they survive the collector-side translation pass. -func classifyLogRecordAttribute(attr string, _ pcommon.Value) attrAction { - switch attr { - case string(semconv26.ExceptionEscapedKey), - string(semconv.ExceptionMessageKey), - string(semconv.ExceptionStacktraceKey), - string(semconv.ExceptionTypeKey), - string(semconv.NetworkConnectionTypeKey), - elasticattr.DataStreamDataset, - elasticattr.DataStreamNamespace, - elasticattr.DataStreamType, - elasticattr.ErrorID, - "event.domain", - "event.name", - elasticattr.ProcessorEvent, - elasticattr.SessionID: - return attrActionPreserve - } - - return attrActionFallback -} - -// classifyMetricDataPointAttribute mirrors the apm-data OTLP metric -// datapoint handling where a small set of fields are preserved as first-class -// values and the rest fall back to labels.* / numeric_labels.*. -// -// The collector preserves data_stream.type in addition to the apm-data -// data_stream dataset/namespace handling, since datapoint-level routing depends -// on the full data_stream triple before export. -func classifyMetricDataPointAttribute(attr string, value pcommon.Value) attrAction { - switch attr { - case elasticattr.DataStreamDataset, - elasticattr.DataStreamNamespace, - elasticattr.DataStreamType, - elasticattr.EventDataset, - "event.module", - "system.process.cpu.start_time", - "system.process.state": - return attrActionPreserve - case "system.process.cmdline", - "system.filesystem.mount_point", - string(semconv.UserNameKey): - return shouldPreserveAndTruncateIfString(value) - } - - return attrActionFallback -} - -// classifyResourceAttribute returns the action required for a supported ECS -// resource attribute. Supported fields can include OTEL SemConv attributes or -// ECS specific attributes. -// -// Fields are based on those found in the below areas: -// 1. apm-data: https://github.com/elastic/apm-data/blob/main/input/otlp/metadata.go -// 2. elasticapmintake receiver: https://github.com/elastic/opentelemetry-collector-components/tree/main/receiver/elasticapmintakereceiver/internal/mappers -// -// Where apm-data truncates a preserved resource string when populating the APM -// event, we mirror that here with attrActionTruncateAndPreserve so resource -// translation stays single-pass. -func classifyResourceAttribute(attr string, value pcommon.Value) attrAction { - switch attr { - // service.* - case string(semconv.ServiceNameKey): - return attrActionPreserve - case string(semconv.ServiceVersionKey), - string(semconv.ServiceInstanceIDKey): - return shouldPreserveAndTruncateIfString(value) - case string(semconv.ServiceNamespaceKey), - elasticattr.ServiceFrameworkName, - elasticattr.ServiceFrameworkVersion, - elasticattr.ServiceOriginID, - elasticattr.ServiceOriginName, - elasticattr.ServiceOriginVersion, - elasticattr.ServiceTargetName, - elasticattr.ServiceTargetType: - return attrActionPreserve - - // deployment.* - case string(semconv26.DeploymentEnvironmentKey), string(semconv.DeploymentEnvironmentNameKey): - return shouldPreserveAndTruncateIfString(value) - - // telemetry.sdk.* - case string(semconv.TelemetrySDKNameKey), - string(semconv.TelemetrySDKVersionKey), - string(semconv.TelemetrySDKLanguageKey): - return shouldPreserveAndTruncateIfString(value) - case string(semconv.TelemetryDistroNameKey), - string(semconv.TelemetryDistroVersionKey): - return attrActionPreserve - - // cloud.* - case string(semconv.CloudProviderKey), - string(semconv.CloudAccountIDKey), - string(semconv.CloudRegionKey), - string(semconv.CloudAvailabilityZoneKey), - string(semconv.CloudPlatformKey): - return shouldPreserveAndTruncateIfString(value) - case elasticattr.CloudOriginAccountID, - elasticattr.CloudOriginProvider, - elasticattr.CloudOriginRegion, - elasticattr.CloudOriginServiceName, - elasticattr.CloudAccountName, - elasticattr.CloudInstanceID, - elasticattr.CloudInstanceName, - elasticattr.CloudMachineType, - elasticattr.CloudProjectID, - elasticattr.CloudProjectName: - return attrActionPreserve - - // container.* - case string(semconv.ContainerNameKey), - string(semconv.ContainerIDKey), - string(semconv.ContainerImageNameKey), - elasticattr.ContainerImageTag, - string(semconv.ContainerRuntimeKey): - return shouldPreserveAndTruncateIfString(value) - case string(semconv.ContainerImageTagsKey): - return attrActionPreserve - - // k8s.* - case string(semconv.K8SNamespaceNameKey), - string(semconv.K8SNodeNameKey), - string(semconv.K8SPodNameKey), - string(semconv.K8SPodUIDKey): - return shouldPreserveAndTruncateIfString(value) - - // host.* - case string(semconv.HostNameKey), - string(semconv.HostIDKey), - string(semconv.HostTypeKey), - string(semconv.HostArchKey): - return shouldPreserveAndTruncateIfString(value) - case elasticattr.HostHostName, - string(semconv.HostIPKey), - elasticattr.HostOSType: - return attrActionPreserve - - // process.* - case string(semconv.ProcessCommandLineKey), - string(semconv.ProcessExecutablePathKey), - string(semconv.ProcessRuntimeNameKey), - string(semconv.ProcessRuntimeVersionKey), - string(semconv.ProcessOwnerKey): - return shouldPreserveAndTruncateIfString(value) - case string(semconv.ProcessPIDKey), - string(semconv.ProcessParentPIDKey), - string(semconv.ProcessExecutableNameKey): - return attrActionPreserve - - // os.* - case string(semconv.OSTypeKey), - string(semconv.OSDescriptionKey), - string(semconv.OSNameKey), - string(semconv.OSVersionKey): - return shouldPreserveAndTruncateIfString(value) - - // device.* - case string(semconv.DeviceIDKey), - string(semconv.DeviceModelIdentifierKey), - string(semconv.DeviceModelNameKey), - elasticattr.DeviceManufacturer: - return shouldPreserveAndTruncateIfString(value) - - // data_stream.* - case elasticattr.DataStreamDataset, - elasticattr.DataStreamNamespace: - return attrActionPreserve - - // user.* - case string(semconv.UserIDKey), - string(semconv.UserEmailKey), - string(semconv.UserNameKey), - elasticattr.UserDomain: - return attrActionPreserve - - // user_agent.* - case string(semconv.UserAgentOriginalKey): - return attrActionPreserve - - // network.* - case string(semconv.NetworkConnectionTypeKey), - string(semconv.NetworkConnectionSubtypeKey), - string(semconv.NetworkCarrierNameKey), - string(semconv.NetworkCarrierMccKey), - string(semconv.NetworkCarrierMncKey), - string(semconv.NetworkCarrierIccKey): - return attrActionPreserve - - // client.* - case string(semconv.ClientAddressKey), - string(semconv.ClientPortKey): - return attrActionPreserve - - // source.* - case string(semconv.SourceAddressKey), - string(semconv.SourcePortKey), - elasticattr.SourceNATIP: - return attrActionPreserve - - // destination.* - case elasticattr.DestinationIP: - return attrActionPreserve - - // faas.* - case string(semconv.FaaSInstanceKey), - string(semconv.FaaSNameKey), - string(semconv.FaaSVersionKey), - string(semconv.FaaSTriggerKey), - string(semconv.FaaSColdstartKey), - elasticattr.FaaSTriggerRequestID, - elasticattr.FaaSExecution: - return attrActionPreserve - - // Legacy OpenCensus attributes - case ecsAttrOpenCensusExporterVersion: - return attrActionPreserve - - // APM Agent enrichment - case elasticattr.AgentName, - elasticattr.AgentVersion, - elasticattr.AgentEphemeralID, - elasticattr.AgentActivationMethod: - return attrActionPreserve - - // Metrics - case elasticattr.MetricsetName: - return attrActionPreserve - } - - return attrActionFallback -} - func ApplyResourceConventions(resource pcommon.Resource) { setHostnameFromKubernetes(resource) } From 416e86816e83be7226253d357e98d839df4a77e8 Mon Sep 17 00:00:00 2001 From: Lanre Ade Date: Wed, 1 Apr 2026 18:42:27 -0400 Subject: [PATCH 5/7] fix: service.framework.* discrepancies --- .../internal/enrichments/enricher.go | 3 +- .../internal/enrichments/metric.go | 32 +++++++++++----- .../internal/enrichments/metric_test.go | 37 +++++++++++++++++-- .../elasticapmprocessor/processor_test.go | 35 +++++++++++++++++- 4 files changed, 91 insertions(+), 16 deletions(-) diff --git a/processor/elasticapmprocessor/internal/enrichments/enricher.go b/processor/elasticapmprocessor/internal/enrichments/enricher.go index 00a47a7bf..493eddde9 100644 --- a/processor/elasticapmprocessor/internal/enrichments/enricher.go +++ b/processor/elasticapmprocessor/internal/enrichments/enricher.go @@ -102,9 +102,10 @@ func (e *Enricher) EnrichMetrics(pl pmetric.Metrics) { for j := 0; j < scopeMetics.Len(); j++ { scopeMetric := scopeMetics.At(j) EnrichScope(scopeMetric.Scope(), e.Config) + scopeAttrs := scopeMetric.Scope().Attributes() metrics := scopeMetric.Metrics() for k := 0; k < metrics.Len(); k++ { - EnrichMetricDataPoints(metrics.At(k), e.Config) + EnrichMetricDataPoints(metrics.At(k), scopeAttrs, e.Config) } } } diff --git a/processor/elasticapmprocessor/internal/enrichments/metric.go b/processor/elasticapmprocessor/internal/enrichments/metric.go index 9d447b29c..785cfd161 100644 --- a/processor/elasticapmprocessor/internal/enrichments/metric.go +++ b/processor/elasticapmprocessor/internal/enrichments/metric.go @@ -41,32 +41,32 @@ func EnrichMetric(metric pmetric.ResourceMetrics, cfg config.Config) { } } -func EnrichMetricDataPoints(metric pmetric.Metric, cfg config.Config) { +func EnrichMetricDataPoints(metric pmetric.Metric, scopeAttrs pcommon.Map, cfg config.Config) { switch metric.Type() { case pmetric.MetricTypeGauge: dataPoints := metric.Gauge().DataPoints() for i := 0; i < dataPoints.Len(); i++ { - enrichMetricDataPointAttributes(dataPoints.At(i).Attributes(), cfg) + enrichMetricDataPointAttributes(dataPoints.At(i).Attributes(), scopeAttrs, cfg) } case pmetric.MetricTypeSum: dataPoints := metric.Sum().DataPoints() for i := 0; i < dataPoints.Len(); i++ { - enrichMetricDataPointAttributes(dataPoints.At(i).Attributes(), cfg) + enrichMetricDataPointAttributes(dataPoints.At(i).Attributes(), scopeAttrs, cfg) } case pmetric.MetricTypeHistogram: dataPoints := metric.Histogram().DataPoints() for i := 0; i < dataPoints.Len(); i++ { - enrichMetricDataPointAttributes(dataPoints.At(i).Attributes(), cfg) + enrichMetricDataPointAttributes(dataPoints.At(i).Attributes(), scopeAttrs, cfg) } case pmetric.MetricTypeExponentialHistogram: dataPoints := metric.ExponentialHistogram().DataPoints() for i := 0; i < dataPoints.Len(); i++ { - enrichMetricDataPointAttributes(dataPoints.At(i).Attributes(), cfg) + enrichMetricDataPointAttributes(dataPoints.At(i).Attributes(), scopeAttrs, cfg) } case pmetric.MetricTypeSummary: dataPoints := metric.Summary().DataPoints() for i := 0; i < dataPoints.Len(); i++ { - enrichMetricDataPointAttributes(dataPoints.At(i).Attributes(), cfg) + enrichMetricDataPointAttributes(dataPoints.At(i).Attributes(), scopeAttrs, cfg) } } } @@ -76,14 +76,26 @@ func EnrichMetricDataPoints(metric pmetric.Metric, cfg config.Config) { // EnrichLog. This happens later than apm-data's OTLP-to-APM conversion, so we // must explicitly avoid relabeling attrs that identify aggregated metrics or // influence exporter behavior. -func enrichMetricDataPointAttributes(attributes pcommon.Map, cfg config.Config) { - if !cfg.Metric.TranslateUnsupportedAttributes.Enabled { +func enrichMetricDataPointAttributes(attributes pcommon.Map, scopeAttrs pcommon.Map, cfg config.Config) { + if isAggregatedMetricDataPointAttributes(attributes) { return } - if isAggregatedMetricDataPointAttributes(attributes) { + if cfg.Metric.TranslateUnsupportedAttributes.Enabled { + ecs.TranslateMetricDataPointAttributes(attributes) + projectMetricFrameworkAttributes(attributes, scopeAttrs, cfg) + } +} + +func projectMetricFrameworkAttributes(attributes pcommon.Map, scopeAttrs pcommon.Map, cfg config.Config) { + if !cfg.Scope.ServiceFrameworkName.Enabled { return } - ecs.TranslateMetricDataPointAttributes(attributes) + if value, ok := scopeAttrs.Get(elasticattr.ServiceFrameworkName); ok { + attribute.PutStr(attributes, elasticattr.ServiceFrameworkName, value.Str()) + } + if value, ok := scopeAttrs.Get(elasticattr.ServiceFrameworkVersion); ok { + attribute.PutStr(attributes, elasticattr.ServiceFrameworkVersion, value.Str()) + } } // isAggregatedMetricDataPointAttributes preserves aggregated-metric identity and diff --git a/processor/elasticapmprocessor/internal/enrichments/metric_test.go b/processor/elasticapmprocessor/internal/enrichments/metric_test.go index e27a4291f..fc7f8c95c 100644 --- a/processor/elasticapmprocessor/internal/enrichments/metric_test.go +++ b/processor/elasticapmprocessor/internal/enrichments/metric_test.go @@ -144,6 +144,8 @@ func TestEnrichMetrics_TranslateUnsupportedAttributes(t *testing.T) { metrics := pmetric.NewMetrics() resourceMetrics := metrics.ResourceMetrics().AppendEmpty() scopeMetrics := resourceMetrics.ScopeMetrics().AppendEmpty() + scopeMetrics.Scope().SetName("metrics-instrumentation") + scopeMetrics.Scope().SetVersion("1.0.0") metric := scopeMetrics.Metrics().AppendEmpty() metric.SetName("test.metric") dp := metric.SetEmptyGauge().DataPoints().AppendEmpty() @@ -191,6 +193,12 @@ func TestEnrichMetrics_TranslateUnsupportedAttributes(t *testing.T) { value, ok = actualAttrs.Get("user.name") require.True(t, ok) assert.Equal(t, "appuser", value.Str()) + value, ok = actualAttrs.Get(elasticattr.ServiceFrameworkName) + require.True(t, ok) + assert.Equal(t, "metrics-instrumentation", value.Str()) + value, ok = actualAttrs.Get(elasticattr.ServiceFrameworkVersion) + require.True(t, ok) + assert.Equal(t, "1.0.0", value.Str()) _, ok = actualAttrs.Get("host") assert.False(t, ok) _, ok = actualAttrs.Get("state") @@ -253,7 +261,12 @@ func TestEnrichMetricDataPoints_SkipsAggregatedMetricAttributes(t *testing.T) { attrs.PutStr("host", "server-01") attrs.PutStr("state", "used") - EnrichMetricDataPoints(metric, cfg) + scope := pcommon.NewInstrumentationScope() + scope.SetName("github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector") + scope.SetVersion("1.0.0") + scopeAttrs := scope.Attributes() + + EnrichMetricDataPoints(metric, scopeAttrs, cfg) actualAttrs := metric.Gauge().DataPoints().At(0).Attributes() value, ok := actualAttrs.Get("host") @@ -266,6 +279,10 @@ func TestEnrichMetricDataPoints_SkipsAggregatedMetricAttributes(t *testing.T) { assert.False(t, ok) _, ok = actualAttrs.Get("labels.state") assert.False(t, ok) + _, ok = actualAttrs.Get(elasticattr.ServiceFrameworkName) + assert.False(t, ok) + _, ok = actualAttrs.Get(elasticattr.ServiceFrameworkVersion) + assert.False(t, ok) } func TestEnrichMetricDataPoints_SkipsMetricsWithMappingHints(t *testing.T) { @@ -281,7 +298,12 @@ func TestEnrichMetricDataPoints_SkipsMetricsWithMappingHints(t *testing.T) { hints.AppendEmpty().SetStr("_doc_count") attrs.PutStr("host", "server-01") - EnrichMetricDataPoints(metric, cfg) + scope := pcommon.NewInstrumentationScope() + scope.SetName("github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector") + scope.SetVersion("1.0.0") + scopeAttrs := scope.Attributes() + + EnrichMetricDataPoints(metric, scopeAttrs, cfg) actualAttrs := metric.Gauge().DataPoints().At(0).Attributes() value, ok := actualAttrs.Get("host") @@ -289,6 +311,8 @@ func TestEnrichMetricDataPoints_SkipsMetricsWithMappingHints(t *testing.T) { assert.Equal(t, "server-01", value.Str()) _, ok = actualAttrs.Get("labels.host") assert.False(t, ok) + _, ok = actualAttrs.Get(elasticattr.ServiceFrameworkName) + assert.False(t, ok) } func TestEnrichMetricDataPointAttributes_NoOpWhenDisabled(t *testing.T) { @@ -298,11 +322,18 @@ func TestEnrichMetricDataPointAttributes_NoOpWhenDisabled(t *testing.T) { attrs := pcommon.NewMap() attrs.PutStr("host", "server-01") - enrichMetricDataPointAttributes(attrs, cfg) + scope := pcommon.NewInstrumentationScope() + scope.SetName("metrics-instrumentation") + scope.SetVersion("1.0.0") + scopeAttrs := scope.Attributes() + + enrichMetricDataPointAttributes(attrs, scopeAttrs, cfg) value, ok := attrs.Get("host") require.True(t, ok) assert.Equal(t, "server-01", value.Str()) _, ok = attrs.Get("labels.host") assert.False(t, ok) + _, ok = attrs.Get(elasticattr.ServiceFrameworkName) + assert.False(t, ok) } diff --git a/processor/elasticapmprocessor/processor_test.go b/processor/elasticapmprocessor/processor_test.go index 77041f4c6..9d42c4f3d 100644 --- a/processor/elasticapmprocessor/processor_test.go +++ b/processor/elasticapmprocessor/processor_test.go @@ -39,6 +39,7 @@ import ( "go.opentelemetry.io/collector/processor/processortest" semconv "go.opentelemetry.io/otel/semconv/v1.27.0" + "github.com/elastic/opentelemetry-collector-components/internal/elasticattr" "github.com/elastic/opentelemetry-collector-components/processor/elasticapmprocessor/internal/metadata" ) @@ -518,6 +519,8 @@ func TestConsumeMetrics_ECSOTLPFallbacks(t *testing.T) { resource.Attributes().PutStr(string(semconv.TelemetrySDKNameKey), "opentelemetry") scopeMetrics := resourceMetric.ScopeMetrics().AppendEmpty() + scopeMetrics.Scope().SetName("metrics-instrumentation") + scopeMetrics.Scope().SetVersion("1.0.0") httpMetric := scopeMetrics.Metrics().AppendEmpty() httpMetric.SetName("http.requests.total") @@ -560,6 +563,12 @@ func TestConsumeMetrics_ECSOTLPFallbacks(t *testing.T) { value, ok = actualHTTPAttrs.Get("numeric_labels.http_response_status_code") require.True(t, ok) assert.InDelta(t, 200, value.Double(), 1e-9) + value, ok = actualHTTPAttrs.Get(elasticattr.ServiceFrameworkName) + require.True(t, ok) + assert.Equal(t, "metrics-instrumentation", value.Str()) + value, ok = actualHTTPAttrs.Get(elasticattr.ServiceFrameworkVersion) + require.True(t, ok) + assert.Equal(t, "1.0.0", value.Str()) _, ok = actualHTTPAttrs.Get("http.request.method") assert.False(t, ok) _, ok = actualHTTPAttrs.Get("http.route") @@ -574,6 +583,12 @@ func TestConsumeMetrics_ECSOTLPFallbacks(t *testing.T) { value, ok = actualMemoryAttrs.Get("labels.state") require.True(t, ok) assert.Equal(t, "used", value.Str()) + value, ok = actualMemoryAttrs.Get(elasticattr.ServiceFrameworkName) + require.True(t, ok) + assert.Equal(t, "metrics-instrumentation", value.Str()) + value, ok = actualMemoryAttrs.Get(elasticattr.ServiceFrameworkVersion) + require.True(t, ok) + assert.Equal(t, "1.0.0", value.Str()) _, ok = actualMemoryAttrs.Get("host") assert.False(t, ok) _, ok = actualMemoryAttrs.Get("state") @@ -600,6 +615,8 @@ func TestConsumeMetrics_ECSIntakeSkipsOTLPFallbacks(t *testing.T) { resource.Attributes().PutStr(string(semconv.TelemetrySDKNameKey), "ElasticAPM") scopeMetrics := resourceMetric.ScopeMetrics().AppendEmpty() + scopeMetrics.Scope().SetName("metrics-instrumentation") + scopeMetrics.Scope().SetVersion("1.0.0") metric := scopeMetrics.Metrics().AppendEmpty() metric.SetName("http.requests.total") dp := metric.SetEmptySum().DataPoints().AppendEmpty() @@ -644,6 +661,10 @@ func TestConsumeMetrics_ECSIntakeSkipsOTLPFallbacks(t *testing.T) { assert.False(t, ok) _, ok = actualAttrs.Get("labels.state") assert.False(t, ok) + _, ok = actualAttrs.Get(elasticattr.ServiceFrameworkName) + assert.False(t, ok) + _, ok = actualAttrs.Get(elasticattr.ServiceFrameworkVersion) + assert.False(t, ok) } func TestConsumeMetrics_ECSAssumesHomogeneousBatchOrigin(t *testing.T) { @@ -665,7 +686,10 @@ func TestConsumeMetrics_ECSAssumesHomogeneousBatchOrigin(t *testing.T) { intakeResource := intakeResourceMetric.Resource() intakeResource.Attributes().PutStr("service.name", "intake-service") intakeResource.Attributes().PutStr(string(semconv.TelemetrySDKNameKey), "ElasticAPM") - intakeMetric := intakeResourceMetric.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + intakeScopeMetrics := intakeResourceMetric.ScopeMetrics().AppendEmpty() + intakeScopeMetrics.Scope().SetName("metrics-instrumentation") + intakeScopeMetrics.Scope().SetVersion("1.0.0") + intakeMetric := intakeScopeMetrics.Metrics().AppendEmpty() intakeMetric.SetName("intake.metric") intakeMetric.SetEmptyGauge().DataPoints().AppendEmpty().SetDoubleValue(1.0) @@ -673,7 +697,10 @@ func TestConsumeMetrics_ECSAssumesHomogeneousBatchOrigin(t *testing.T) { otlpResource := otlpResourceMetric.Resource() otlpResource.Attributes().PutStr("service.name", "otlp-service") otlpResource.Attributes().PutStr(string(semconv.TelemetrySDKNameKey), "opentelemetry") - otlpMetric := otlpResourceMetric.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + otlpScopeMetrics := otlpResourceMetric.ScopeMetrics().AppendEmpty() + otlpScopeMetrics.Scope().SetName("metrics-instrumentation") + otlpScopeMetrics.Scope().SetVersion("1.0.0") + otlpMetric := otlpScopeMetrics.Metrics().AppendEmpty() otlpMetric.SetName("http.requests.total") otlpDP := otlpMetric.SetEmptySum().DataPoints().AppendEmpty() otlpDP.SetIntValue(1) @@ -707,6 +734,10 @@ func TestConsumeMetrics_ECSAssumesHomogeneousBatchOrigin(t *testing.T) { assert.False(t, ok) _, ok = actualDPAttrs.Get("labels.host") assert.False(t, ok) + _, ok = actualDPAttrs.Get(elasticattr.ServiceFrameworkName) + assert.False(t, ok) + _, ok = actualDPAttrs.Get(elasticattr.ServiceFrameworkVersion) + assert.False(t, ok) } // TestSkipEnrichmentMetrics tests that metrics are only enriched when skipEnrichment is false or when mapping mode is ecs From eeb1c8cd56370fa11bbae97b316d83e5e0e3c34c Mon Sep 17 00:00:00 2001 From: Lanre Ade Date: Thu, 2 Apr 2026 11:11:45 -0400 Subject: [PATCH 6/7] chore: format case comparison order alphabetically --- .../internal/ecs/ecs_translation.go | 158 +++++++++--------- 1 file changed, 79 insertions(+), 79 deletions(-) diff --git a/processor/elasticapmprocessor/internal/ecs/ecs_translation.go b/processor/elasticapmprocessor/internal/ecs/ecs_translation.go index 8a6197571..9b29caa41 100644 --- a/processor/elasticapmprocessor/internal/ecs/ecs_translation.go +++ b/processor/elasticapmprocessor/internal/ecs/ecs_translation.go @@ -44,106 +44,106 @@ func TranslateResourceMetadata(resource pcommon.Resource) { } switch k { - case string(semconv.ServiceNameKey), - string(semconv.ServiceNamespaceKey), - elasticattr.ServiceFrameworkName, - elasticattr.ServiceFrameworkVersion, - elasticattr.ServiceOriginID, - elasticattr.ServiceOriginName, - elasticattr.ServiceOriginVersion, - elasticattr.ServiceTargetName, - elasticattr.ServiceTargetType, - string(semconv.TelemetryDistroNameKey), - string(semconv.TelemetryDistroVersionKey), - elasticattr.CloudOriginAccountID, - elasticattr.CloudOriginProvider, - elasticattr.CloudOriginRegion, - elasticattr.CloudOriginServiceName, + case elasticattr.AgentActivationMethod, + elasticattr.AgentEphemeralID, + elasticattr.AgentName, + elasticattr.AgentVersion, elasticattr.CloudAccountName, elasticattr.CloudInstanceID, elasticattr.CloudInstanceName, elasticattr.CloudMachineType, + elasticattr.CloudOriginAccountID, + elasticattr.CloudOriginProvider, + elasticattr.CloudOriginRegion, + elasticattr.CloudOriginServiceName, elasticattr.CloudProjectID, elasticattr.CloudProjectName, - string(semconv.ContainerImageTagsKey), - elasticattr.HostHostName, - string(semconv.HostIPKey), - elasticattr.HostOSType, elasticattr.DataStreamDataset, elasticattr.DataStreamNamespace, elasticattr.DataStreamType, - string(semconv.UserIDKey), - string(semconv.UserEmailKey), - string(semconv.UserNameKey), + elasticattr.DestinationIP, + elasticattr.FaaSExecution, + elasticattr.FaaSTriggerRequestID, + elasticattr.HostHostName, + elasticattr.HostOSType, + elasticattr.MetricsetName, + elasticattr.ServiceFrameworkName, + elasticattr.ServiceFrameworkVersion, + elasticattr.ServiceOriginID, + elasticattr.ServiceOriginName, + elasticattr.ServiceOriginVersion, + elasticattr.ServiceTargetName, + elasticattr.ServiceTargetType, + elasticattr.SourceNATIP, elasticattr.UserDomain, - string(semconv.UserAgentOriginalKey), - string(semconv.NetworkConnectionTypeKey), - string(semconv.NetworkConnectionSubtypeKey), - string(semconv.NetworkCarrierNameKey), - string(semconv.NetworkCarrierMccKey), - string(semconv.NetworkCarrierMncKey), - string(semconv.NetworkCarrierIccKey), string(semconv.ClientAddressKey), string(semconv.ClientPortKey), - string(semconv.SourceAddressKey), - string(semconv.SourcePortKey), - elasticattr.SourceNATIP, - elasticattr.DestinationIP, + string(semconv.ContainerImageTagsKey), + string(semconv.FaaSColdstartKey), string(semconv.FaaSInstanceKey), string(semconv.FaaSNameKey), - string(semconv.FaaSVersionKey), string(semconv.FaaSTriggerKey), - string(semconv.FaaSColdstartKey), - elasticattr.FaaSTriggerRequestID, - elasticattr.FaaSExecution, - ecsAttrOpenCensusExporterVersion, - elasticattr.AgentName, - elasticattr.AgentVersion, - elasticattr.AgentEphemeralID, - elasticattr.AgentActivationMethod, - elasticattr.MetricsetName, - string(semconv.ProcessPIDKey), + string(semconv.FaaSVersionKey), + string(semconv.HostIPKey), + string(semconv.NetworkCarrierIccKey), + string(semconv.NetworkCarrierMccKey), + string(semconv.NetworkCarrierMncKey), + string(semconv.NetworkCarrierNameKey), + string(semconv.NetworkConnectionSubtypeKey), + string(semconv.NetworkConnectionTypeKey), + string(semconv.ProcessExecutableNameKey), string(semconv.ProcessParentPIDKey), - string(semconv.ProcessExecutableNameKey): + string(semconv.ProcessPIDKey), + string(semconv.ServiceNameKey), + string(semconv.ServiceNamespaceKey), + string(semconv.SourceAddressKey), + string(semconv.SourcePortKey), + string(semconv.TelemetryDistroNameKey), + string(semconv.TelemetryDistroVersionKey), + string(semconv.UserAgentOriginalKey), + string(semconv.UserEmailKey), + string(semconv.UserIDKey), + string(semconv.UserNameKey), + ecsAttrOpenCensusExporterVersion: return true - case string(semconv.ServiceVersionKey), - string(semconv.ServiceInstanceIDKey), - string(semconv26.DeploymentEnvironmentKey), - string(semconv.DeploymentEnvironmentNameKey), - string(semconv.TelemetrySDKNameKey), - string(semconv.TelemetrySDKVersionKey), - string(semconv.TelemetrySDKLanguageKey), - string(semconv.CloudProviderKey), + case elasticattr.ContainerImageTag, + elasticattr.DeviceManufacturer, string(semconv.CloudAccountIDKey), - string(semconv.CloudRegionKey), string(semconv.CloudAvailabilityZoneKey), string(semconv.CloudPlatformKey), - string(semconv.ContainerNameKey), + string(semconv.CloudProviderKey), + string(semconv.CloudRegionKey), string(semconv.ContainerIDKey), string(semconv.ContainerImageNameKey), - elasticattr.ContainerImageTag, + string(semconv.ContainerNameKey), string(semconv.ContainerRuntimeKey), + string(semconv26.DeploymentEnvironmentKey), + string(semconv.DeploymentEnvironmentNameKey), + string(semconv.DeviceIDKey), + string(semconv.DeviceModelIdentifierKey), + string(semconv.DeviceModelNameKey), + string(semconv.HostArchKey), + string(semconv.HostIDKey), + string(semconv.HostNameKey), + string(semconv.HostTypeKey), string(semconv.K8SNamespaceNameKey), string(semconv.K8SNodeNameKey), string(semconv.K8SPodNameKey), string(semconv.K8SPodUIDKey), - string(semconv.HostNameKey), - string(semconv.HostIDKey), - string(semconv.HostTypeKey), - string(semconv.HostArchKey), + string(semconv.OSDescriptionKey), + string(semconv.OSNameKey), + string(semconv.OSTypeKey), + string(semconv.OSVersionKey), string(semconv.ProcessCommandLineKey), string(semconv.ProcessExecutablePathKey), + string(semconv.ProcessOwnerKey), string(semconv.ProcessRuntimeNameKey), string(semconv.ProcessRuntimeVersionKey), - string(semconv.ProcessOwnerKey), - string(semconv.OSTypeKey), - string(semconv.OSDescriptionKey), - string(semconv.OSNameKey), - string(semconv.OSVersionKey), - string(semconv.DeviceIDKey), - string(semconv.DeviceModelIdentifierKey), - string(semconv.DeviceModelNameKey), - elasticattr.DeviceManufacturer: + string(semconv.ServiceInstanceIDKey), + string(semconv.ServiceVersionKey), + string(semconv.TelemetrySDKLanguageKey), + string(semconv.TelemetrySDKNameKey), + string(semconv.TelemetrySDKVersionKey): truncatePreservedStringAttribute(attributes, k, v) return true default: @@ -164,19 +164,19 @@ func TranslateLogRecordAttributes(attributes pcommon.Map) { } switch k { - case string(semconv26.ExceptionEscapedKey), + case elasticattr.DataStreamDataset, + elasticattr.DataStreamNamespace, + elasticattr.DataStreamType, + elasticattr.ErrorID, + elasticattr.ProcessorEvent, + elasticattr.SessionID, + string(semconv26.ExceptionEscapedKey), string(semconv.ExceptionMessageKey), string(semconv.ExceptionStacktraceKey), string(semconv.ExceptionTypeKey), string(semconv.NetworkConnectionTypeKey), - elasticattr.DataStreamDataset, - elasticattr.DataStreamNamespace, - elasticattr.DataStreamType, - elasticattr.ErrorID, "event.domain", - "event.name", - elasticattr.ProcessorEvent, - elasticattr.SessionID: + "event.name": return true default: fallbackToLabelAttribute(attributes, k, v) @@ -204,9 +204,9 @@ func TranslateMetricDataPointAttributes(attributes pcommon.Map) { "system.process.cpu.start_time", "system.process.state": return true - case "system.process.cmdline", + case string(semconv.UserNameKey), "system.filesystem.mount_point", - string(semconv.UserNameKey): + "system.process.cmdline": truncatePreservedStringAttribute(attributes, k, v) return true default: From bbf7f5f5940f0b8b75a106ced3a2c7707a22993e Mon Sep 17 00:00:00 2001 From: Lanre Ade Date: Fri, 10 Apr 2026 15:59:11 -0400 Subject: [PATCH 7/7] Add telemetry.sdk.elastic_export_timestamp key --- processor/elasticapmprocessor/internal/ecs/ecs_translation.go | 1 + 1 file changed, 1 insertion(+) diff --git a/processor/elasticapmprocessor/internal/ecs/ecs_translation.go b/processor/elasticapmprocessor/internal/ecs/ecs_translation.go index 9b29caa41..9a523bc15 100644 --- a/processor/elasticapmprocessor/internal/ecs/ecs_translation.go +++ b/processor/elasticapmprocessor/internal/ecs/ecs_translation.go @@ -100,6 +100,7 @@ func TranslateResourceMetadata(resource pcommon.Resource) { string(semconv.SourcePortKey), string(semconv.TelemetryDistroNameKey), string(semconv.TelemetryDistroVersionKey), + "telemetry.sdk.elastic_export_timestamp", string(semconv.UserAgentOriginalKey), string(semconv.UserEmailKey), string(semconv.UserIDKey),