Skip to content
408 changes: 193 additions & 215 deletions processor/elasticapmprocessor/internal/ecs/ecs_translation.go

Large diffs are not rendered by default.

243 changes: 241 additions & 2 deletions processor/elasticapmprocessor/internal/ecs/ecs_translation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"

"github.com/elastic/opentelemetry-collector-components/internal/elasticattr"
"github.com/elastic/opentelemetry-collector-components/processor/elasticapmprocessor/internal/sanitize"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pcommon"
semconv "go.opentelemetry.io/otel/semconv/v1.25.0"
Expand All @@ -33,6 +34,7 @@ func TestTranslateResourceMetadata(t *testing.T) {
inputKey string
inputVal string
wantKey string
wantVal string
wantAbsent string // if non-empty, assert this attribute key is removed after translation (e.g. sanitized key)
}{
{
Expand Down Expand Up @@ -123,12 +125,96 @@ func TestTranslateResourceMetadata(t *testing.T) {
inputVal: "dotnet",
wantKey: string(semconv.TelemetrySDKLanguageKey),
},
{
name: "supported service version truncated",
inputKey: string(semconv.ServiceVersionKey),
inputVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1),
wantKey: string(semconv.ServiceVersionKey),
wantVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)),
},
{
name: "supported service instance id truncated",
inputKey: string(semconv.ServiceInstanceIDKey),
inputVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1),
wantKey: string(semconv.ServiceInstanceIDKey),
wantVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)),
},
{
name: "supported deployment environment truncated",
inputKey: string(semconv.DeploymentEnvironmentKey),
inputVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1),
wantKey: string(semconv.DeploymentEnvironmentKey),
wantVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)),
},
{
name: "supported telemetry sdk name truncated",
inputKey: string(semconv.TelemetrySDKNameKey),
inputVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1),
wantKey: string(semconv.TelemetrySDKNameKey),
wantVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)),
},
{
name: "supported telemetry sdk version",
inputKey: string(semconv.TelemetrySDKVersionKey),
inputVal: "8.0.0",
wantKey: string(semconv.TelemetrySDKVersionKey),
},
{
name: "supported cloud provider truncated",
inputKey: string(semconv.CloudProviderKey),
inputVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1),
wantKey: string(semconv.CloudProviderKey),
wantVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)),
},
{
name: "supported container name truncated",
inputKey: string(semconv.ContainerNameKey),
inputVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1),
wantKey: string(semconv.ContainerNameKey),
wantVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)),
},
{
name: "supported kubernetes namespace truncated",
inputKey: string(semconv.K8SNamespaceNameKey),
inputVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1),
wantKey: string(semconv.K8SNamespaceNameKey),
wantVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)),
},
{
name: "supported host name truncated",
inputKey: string(semconv.HostNameKey),
inputVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1),
wantKey: string(semconv.HostNameKey),
wantVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)),
},
{
name: "supported process command line truncated",
inputKey: string(semconv.ProcessCommandLineKey),
inputVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1),
wantKey: string(semconv.ProcessCommandLineKey),
wantVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)),
},
{
name: "supported process owner truncated",
inputKey: string(semconv.ProcessOwnerKey),
inputVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1),
wantKey: string(semconv.ProcessOwnerKey),
wantVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)),
},
{
name: "supported os description truncated",
inputKey: string(semconv.OSDescriptionKey),
inputVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1),
wantKey: string(semconv.OSDescriptionKey),
wantVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)),
},
{
name: "supported device model name truncated",
inputKey: string(semconv.DeviceModelNameKey),
inputVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1),
wantKey: string(semconv.DeviceModelNameKey),
wantVal: strings.Repeat("a", int(sanitize.StandardKeyWordLength)),
},
{
name: "supported process runtime name",
inputKey: string(semconv.ProcessRuntimeNameKey),
Expand Down Expand Up @@ -182,8 +268,12 @@ func TestTranslateResourceMetadata(t *testing.T) {
if !ok {
t.Fatalf("expected attribute %q to be present. all attrs %v", tc.wantKey, attrs.AsRaw())
}
if v.AsString() != tc.inputVal {
t.Errorf("attribute %q value = %q, want %q", tc.wantKey, v.AsString(), tc.inputVal)
wantVal := tc.inputVal
if tc.wantVal != "" {
wantVal = tc.wantVal
}
if v.AsString() != wantVal {
t.Errorf("attribute %q value = %q, want %q", tc.wantKey, v.AsString(), wantVal)
}
if tc.wantAbsent != "" {
if _, ok := attrs.Get(tc.wantAbsent); ok {
Expand Down Expand Up @@ -230,6 +320,34 @@ func TestTranslateLogRecordAttributes(t *testing.T) {
elasticattr.DataStreamNamespace: "default",
},
},
{
name: "supported semantic fields are not truncated",
setAttrs: func(attrs pcommon.Map) {
longValue := strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1)
attrs.PutStr(string(semconv.ExceptionMessageKey), longValue)
attrs.PutStr(string(semconv.ExceptionStacktraceKey), longValue)
attrs.PutStr(string(semconv.ExceptionTypeKey), longValue)
attrs.PutStr("event.name", longValue)
attrs.PutStr("event.domain", longValue)
attrs.PutStr("session.id", longValue)
attrs.PutStr(string(semconv.NetworkConnectionTypeKey), longValue)
attrs.PutStr(elasticattr.DataStreamDataset, longValue)
attrs.PutStr(elasticattr.DataStreamNamespace, longValue)
attrs.PutStr(elasticattr.DataStreamType, longValue)
},
want: map[string]any{
string(semconv.ExceptionMessageKey): strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1),
string(semconv.ExceptionStacktraceKey): strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1),
string(semconv.ExceptionTypeKey): strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1),
"event.name": strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1),
"event.domain": strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1),
"session.id": strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1),
string(semconv.NetworkConnectionTypeKey): strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1),
elasticattr.DataStreamDataset: strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1),
elasticattr.DataStreamNamespace: strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1),
elasticattr.DataStreamType: strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1),
},
},
{
name: "unsupported attributes moved to labels",
setAttrs: func(attrs pcommon.Map) {
Expand Down Expand Up @@ -296,6 +414,127 @@ func TestTranslateLogRecordAttributes(t *testing.T) {
}
}

func TestTranslateMetricDataPointAttributes(t *testing.T) {
tests := []struct {
name string
setAttrs func(pcommon.Map)
want map[string]any
wantAbsent []string
}{
{
name: "raw otlp metric dimensions moved to labels",
setAttrs: func(attrs pcommon.Map) {
attrs.PutStr("http.request.method", "GET")
attrs.PutStr("http.route", "/api/users")
attrs.PutInt("http.response.status_code", 200)
attrs.PutStr("host", "server-01")
attrs.PutStr("state", "used")
},
want: map[string]any{
"labels.http_request_method": "GET",
"labels.http_route": "/api/users",
"numeric_labels.http_response_status_code": float64(200),
"labels.host": "server-01",
"labels.state": "used",
},
wantAbsent: []string{
"http.request.method",
"http.route",
"http.response.status_code",
"host",
"state",
},
},
{
name: "existing metric label keys are sanitized in place",
setAttrs: func(attrs pcommon.Map) {
attrs.PutStr("labels.http.request.method", "GET")
attrs.PutDouble("numeric_labels.http.response.status_code", 200)
},
want: map[string]any{
"labels.http_request_method": "GET",
"numeric_labels.http_response_status_code": 200.0,
},
wantAbsent: []string{
"labels.http.request.method",
"numeric_labels.http.response.status_code",
},
},
{
name: "metric special cases and routing attrs are preserved",
setAttrs: func(attrs pcommon.Map) {
attrs.PutStr(elasticattr.DataStreamDataset, "apm.internal")
attrs.PutStr(elasticattr.DataStreamNamespace, "default")
attrs.PutStr(elasticattr.DataStreamType, "metrics")
attrs.PutStr("host", "server-01")
attrs.PutStr("state", "used")
attrs.PutStr("system.process.cmdline", "/usr/bin/java")
attrs.PutStr("system.filesystem.mount_point", "/mnt/data")
attrs.PutStr("event.module", "system")
attrs.PutStr("user.name", "appuser")
},
want: map[string]any{
elasticattr.DataStreamDataset: "apm.internal",
elasticattr.DataStreamNamespace: "default",
elasticattr.DataStreamType: "metrics",
"labels.host": "server-01",
"labels.state": "used",
"system.process.cmdline": "/usr/bin/java",
"system.filesystem.mount_point": "/mnt/data",
"event.module": "system",
"user.name": "appuser",
},
wantAbsent: []string{"host", "state"},
},
{
name: "metric special cases requiring truncation are truncated in place",
setAttrs: func(attrs pcommon.Map) {
longValue := strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1)
attrs.PutStr("system.process.cmdline", longValue)
attrs.PutStr("system.filesystem.mount_point", longValue)
attrs.PutStr("user.name", longValue)
attrs.PutStr("event.module", longValue)
attrs.PutStr("system.process.state", longValue)
},
want: map[string]any{
"system.process.cmdline": strings.Repeat("a", int(sanitize.StandardKeyWordLength)),
"system.filesystem.mount_point": strings.Repeat("a", int(sanitize.StandardKeyWordLength)),
"user.name": strings.Repeat("a", int(sanitize.StandardKeyWordLength)),
"event.module": strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1),
"system.process.state": strings.Repeat("a", int(sanitize.StandardKeyWordLength)+1),
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
attrs := pcommon.NewMap()
tc.setAttrs(attrs)

TranslateMetricDataPointAttributes(attrs)

for key, want := range tc.want {
got, ok := attrs.Get(key)
if !assert.True(t, ok, "expected %q to be present", key) {
continue
}
switch want := want.(type) {
case string:
assert.Equal(t, want, got.Str())
case float64:
assert.InDelta(t, want, got.Double(), 1e-9)
default:
t.Fatalf("unsupported want type %T", want)
}
}
for _, key := range tc.wantAbsent {
_, ok := attrs.Get(key)
assert.False(t, ok, "expected %q to be absent", key)
}
})
}
}

// TestSetLabelAttributeValue verifies that setLabelAttributeValue stores
// supported value types under the correct labels.* / numeric_labels.* prefix
// and rejects unsupported types (Map, Bytes, Empty). This matches
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,9 @@ type SpanEventConfig struct {

// ElasticMetricConfig configures the enrichment attributes for metrics
type ElasticMetricConfig struct {
ProcessorEvent AttributeConfig `mapstructure:"processor_event"`
MetricsetName AttributeConfig `mapstructure:"metricset_name"`
ProcessorEvent AttributeConfig `mapstructure:"processor_event"`
MetricsetName AttributeConfig `mapstructure:"metricset_name"`
TranslateUnsupportedAttributes AttributeConfig `mapstructure:"translate_unsupported_attributes"`
}

// ElasticLogConfig configures the enrichment attributes for logs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ func (e *Enricher) EnrichMetrics(pl pmetric.Metrics) {
for j := 0; j < scopeMetics.Len(); j++ {
scopeMetric := scopeMetics.At(j)
EnrichScope(scopeMetric.Scope(), e.Config)
scopeAttrs := scopeMetric.Scope().Attributes()
metrics := scopeMetric.Metrics()
for k := 0; k < metrics.Len(); k++ {
EnrichMetricDataPoints(metrics.At(k), scopeAttrs, e.Config)
}
}
}
}
Expand Down
Loading