From b03a61aa1f409096d71f133ba5736658a34b888e Mon Sep 17 00:00:00 2001 From: Daniel Jaglowski Date: Thu, 13 Jun 2024 12:00:43 -0500 Subject: [PATCH] [processor/transform] Add 'transform.flatten.logs' feature gate (#33338) This PR proposes a feature gate which would enable the Flatten/Unflatten behavior described [here](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32080#issuecomment-2120764953). This was discussed in the Collector SIG recently and it was mentioned that a feature gate would be a reasonable way to implement this. One immediate question: Should this be purely a feature gate, or should there be a config option on the processor which fails `Validate` if the feature gate is not set? --------- Co-authored-by: Curtis Robert Co-authored-by: Evan Bradley <11745660+evan-bradley@users.noreply.github.com> Co-authored-by: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> --- .chloggen/transform-flatten-logs.yaml | 28 ++++ cmd/otelcontribcol/builder-config.yaml | 1 + cmd/otelcontribcol/go.mod | 3 + connector/datadogconnector/go.mod | 2 + exporter/datadogexporter/go.mod | 3 + .../datadogexporter/integrationtest/go.mod | 2 + go.mod | 3 + processor/transformprocessor/README.md | 25 +++ processor/transformprocessor/config.go | 18 +++ processor/transformprocessor/factory.go | 2 +- processor/transformprocessor/go.mod | 4 + .../internal/logs/processor.go | 9 +- .../internal/logs/processor_test.go | 10 +- .../transformprocessor/processor_test.go | 147 ++++++++++++++++++ .../testdata/logs/expected-with-flatten.yaml | 27 ++++ .../logs/expected-without-flatten.yaml | 20 +++ .../testdata/logs/input.yaml | 23 +++ 17 files changed, 320 insertions(+), 7 deletions(-) create mode 100644 .chloggen/transform-flatten-logs.yaml create mode 100644 processor/transformprocessor/processor_test.go create mode 100644 processor/transformprocessor/testdata/logs/expected-with-flatten.yaml create mode 100644 processor/transformprocessor/testdata/logs/expected-without-flatten.yaml create mode 100644 processor/transformprocessor/testdata/logs/input.yaml diff --git a/.chloggen/transform-flatten-logs.yaml b/.chloggen/transform-flatten-logs.yaml new file mode 100644 index 000000000000..0f3d76368188 --- /dev/null +++ b/.chloggen/transform-flatten-logs.yaml @@ -0,0 +1,28 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: processor/transform + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add `transform.flatten.logs` featuregate to give each log record a distinct resource and scope. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [32080] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + This option is useful when applying transformations which alter the resource or scope. e.g. `set(resource.attributes["to"], attributes["from"])`, which may otherwise result in unexpected behavior. Using this option typically incurs a performance penalty as the processor must compute many hashes and create copies of resource and scope information for every log record. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/cmd/otelcontribcol/builder-config.yaml b/cmd/otelcontribcol/builder-config.yaml index fbad43a10bec..82476f3e1215 100644 --- a/cmd/otelcontribcol/builder-config.yaml +++ b/cmd/otelcontribcol/builder-config.yaml @@ -479,3 +479,4 @@ replaces: - github.com/open-telemetry/opentelemetry-collector-contrib/confmap/provider/s3provider => ../../confmap/provider/s3provider - github.com/open-telemetry/opentelemetry-collector-contrib/confmap/provider/secretsmanagerprovider => ../../confmap/provider/secretsmanagerprovider - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling => ../../pkg/sampling + - github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil => ../../internal/pdatautil diff --git a/cmd/otelcontribcol/go.mod b/cmd/otelcontribcol/go.mod index 89c64f749517..7828090b6cb5 100644 --- a/cmd/otelcontribcol/go.mod +++ b/cmd/otelcontribcol/go.mod @@ -623,6 +623,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka v0.102.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/kubelet v0.102.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/metadataproviders v0.102.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil v0.102.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.102.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk v0.102.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/sqlquery v0.102.0 // indirect @@ -1297,3 +1298,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/confmap/provid replace github.com/open-telemetry/opentelemetry-collector-contrib/confmap/provider/secretsmanagerprovider => ../../confmap/provider/secretsmanagerprovider replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling => ../../pkg/sampling + +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil => ../../internal/pdatautil diff --git a/connector/datadogconnector/go.mod b/connector/datadogconnector/go.mod index 0e40df474a0a..c22be6ebed40 100644 --- a/connector/datadogconnector/go.mod +++ b/connector/datadogconnector/go.mod @@ -336,3 +336,5 @@ replace github.com/openshift/api v3.9.0+incompatible => github.com/openshift/api replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor => ../../processor/transformprocessor replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling => ../../pkg/sampling + +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil => ../../internal/pdatautil diff --git a/exporter/datadogexporter/go.mod b/exporter/datadogexporter/go.mod index 90d46e041d6c..f3c77b77ed94 100644 --- a/exporter/datadogexporter/go.mod +++ b/exporter/datadogexporter/go.mod @@ -249,6 +249,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.102.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/docker v0.102.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.102.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil v0.102.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.102.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.102.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling v0.102.0 // indirect @@ -430,3 +431,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/stor replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor => ../../processor/transformprocessor replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling => ../../pkg/sampling + +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil => ../../internal/pdatautil diff --git a/exporter/datadogexporter/integrationtest/go.mod b/exporter/datadogexporter/integrationtest/go.mod index 7fdabda6a36a..96ab696567f1 100644 --- a/exporter/datadogexporter/integrationtest/go.mod +++ b/exporter/datadogexporter/integrationtest/go.mod @@ -345,3 +345,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prome replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor => ../../../processor/transformprocessor replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling => ../../../pkg/sampling + +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil => ../../../internal/pdatautil diff --git a/go.mod b/go.mod index d731d5616cbc..d77127245605 100644 --- a/go.mod +++ b/go.mod @@ -578,6 +578,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka v0.102.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/kubelet v0.102.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/metadataproviders v0.102.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil v0.102.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.102.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk v0.102.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/sqlquery v0.102.0 // indirect @@ -1237,3 +1238,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/acke replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/splunkenterprisereceiver => ./receiver/splunkenterprisereceiver replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling => ./pkg/sampling + +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil => ./internal/pdatautil diff --git a/processor/transformprocessor/README.md b/processor/transformprocessor/README.md index 02ccd3ddfc42..daab05325aec 100644 --- a/processor/transformprocessor/README.md +++ b/processor/transformprocessor/README.md @@ -529,3 +529,28 @@ The transform processor uses the [OpenTelemetry Transformation Language](https:/ - Although the OTTL allows the `set` function to be used with `metric.data_type`, its implementation in the transform processor is NOOP. To modify a data type you must use a function specific to that purpose. - [Identity Conflict](https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/standard-warnings.md#identity-conflict): Transformation of metrics have the potential to affect the identity of a metric leading to an Identity Crisis. Be especially cautious when transforming metric name and when reducing/changing existing attributes. Adding new attributes is safe. - [Orphaned Telemetry](https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/standard-warnings.md#orphaned-telemetry): The processor allows you to modify `span_id`, `trace_id`, and `parent_span_id` for traces and `span_id`, and `trace_id` logs. Modifying these fields could lead to orphaned spans or logs. + +## Feature Gate + +### `transform.flatten.logs` + +The `transform.flatten.logs` [feature gate](https://github.com/open-telemetry/opentelemetry-collector/blob/main/featuregate/README.md#collector-feature-gates) enables the `flatten_data` configuration option (default `false`). With `flatten_data: true`, the processor provides each log record with a distinct copy of its resource and scope. Then, after applying all transformations, the log records are regrouped by resource and scope. + +This option is useful when applying transformations which alter the resource or scope. e.g. `set(resource.attributes["to"], attributes["from"])`, which may otherwise result in unexpected behavior. Using this option typically incurs a performance penalty as the processor must compute many hashes and create copies of resource and scope information for every log record. + +The feature is currently only available for log processing. + +#### Example Usage + +`config.yaml`: + + ```yaml + transform: + flatten_data: true + log_statements: + - context: log + statements: + - set(resource.attributes["to"], attributes["from"]) + ``` + + Run collector: `./otelcol --config config.yaml --feature-gates=transform.flatten.logs` diff --git a/processor/transformprocessor/config.go b/processor/transformprocessor/config.go index c81700201aad..8c184510ed5a 100644 --- a/processor/transformprocessor/config.go +++ b/processor/transformprocessor/config.go @@ -4,7 +4,10 @@ package transformprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor" import ( + "errors" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/featuregate" "go.uber.org/multierr" "go.uber.org/zap" @@ -15,6 +18,15 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces" ) +var ( + flatLogsFeatureGate = featuregate.GlobalRegistry().MustRegister("transform.flatten.logs", featuregate.StageAlpha, + featuregate.WithRegisterDescription("Flatten log data prior to transformation so every record has a unique copy of the resource and scope. Regroups logs based on resource and scope after transformations."), + featuregate.WithRegisterFromVersion("v0.103.0"), + featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32080#issuecomment-2120764953"), + ) + errFlatLogsGateDisabled = errors.New("'flatten_data' requires the 'transform.flatten.logs' feature gate to be enabled") +) + // Config defines the configuration for the processor. type Config struct { // ErrorMode determines how the processor reacts to errors that occur while processing a statement. @@ -27,6 +39,8 @@ type Config struct { TraceStatements []common.ContextStatements `mapstructure:"trace_statements"` MetricStatements []common.ContextStatements `mapstructure:"metric_statements"` LogStatements []common.ContextStatements `mapstructure:"log_statements"` + + FlattenData bool `mapstructure:"flatten_data"` } var _ component.Config = (*Config)(nil) @@ -73,5 +87,9 @@ func (c *Config) Validate() error { } } + if c.FlattenData && !flatLogsFeatureGate.IsEnabled() { + errors = multierr.Append(errors, errFlatLogsGateDisabled) + } + return errors } diff --git a/processor/transformprocessor/factory.go b/processor/transformprocessor/factory.go index a22856fbe9c1..7138907dc303 100644 --- a/processor/transformprocessor/factory.go +++ b/processor/transformprocessor/factory.go @@ -49,7 +49,7 @@ func createLogsProcessor( ) (processor.Logs, error) { oCfg := cfg.(*Config) - proc, err := logs.NewProcessor(oCfg.LogStatements, oCfg.ErrorMode, set.TelemetrySettings) + proc, err := logs.NewProcessor(oCfg.LogStatements, oCfg.ErrorMode, oCfg.FlattenData, set.TelemetrySettings) if err != nil { return nil, fmt.Errorf("invalid config for \"transform\" processor %w", err) } diff --git a/processor/transformprocessor/go.mod b/processor/transformprocessor/go.mod index 874a4ebdf9e5..6fd8e9029d38 100644 --- a/processor/transformprocessor/go.mod +++ b/processor/transformprocessor/go.mod @@ -5,6 +5,8 @@ go 1.21.0 require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.102.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.102.0 + github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil v0.102.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.102.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.102.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.102.0 github.com/stretchr/testify v1.9.0 @@ -86,3 +88,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/common => ../../internal/common replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter => ../../internal/filter + +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil => ../../internal/pdatautil diff --git a/processor/transformprocessor/internal/logs/processor.go b/processor/transformprocessor/internal/logs/processor.go index d9502e97762f..e2b184f3c8d7 100644 --- a/processor/transformprocessor/internal/logs/processor.go +++ b/processor/transformprocessor/internal/logs/processor.go @@ -12,6 +12,7 @@ import ( "go.uber.org/multierr" "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" ) @@ -19,9 +20,10 @@ import ( type Processor struct { contexts []consumer.Logs logger *zap.Logger + flatMode bool } -func NewProcessor(contextStatements []common.ContextStatements, errorMode ottl.ErrorMode, settings component.TelemetrySettings) (*Processor, error) { +func NewProcessor(contextStatements []common.ContextStatements, errorMode ottl.ErrorMode, flatMode bool, settings component.TelemetrySettings) (*Processor, error) { pc, err := common.NewLogParserCollection(settings, common.WithLogParser(LogFunctions()), common.WithLogErrorMode(errorMode)) if err != nil { return nil, err @@ -44,10 +46,15 @@ func NewProcessor(contextStatements []common.ContextStatements, errorMode ottl.E return &Processor{ contexts: contexts, logger: settings.Logger, + flatMode: flatMode, }, nil } func (p *Processor) ProcessLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) { + if p.flatMode { + pdatautil.FlattenLogs(ld.ResourceLogs()) + defer pdatautil.GroupByResourceLogs(ld.ResourceLogs()) + } for _, c := range p.contexts { err := c.ConsumeLogs(ctx, ld) if err != nil { diff --git a/processor/transformprocessor/internal/logs/processor_test.go b/processor/transformprocessor/internal/logs/processor_test.go index f401093b7d30..60a7c79105ea 100644 --- a/processor/transformprocessor/internal/logs/processor_test.go +++ b/processor/transformprocessor/internal/logs/processor_test.go @@ -49,7 +49,7 @@ func Test_ProcessLogs_ResourceContext(t *testing.T) { for _, tt := range tests { t.Run(tt.statement, func(t *testing.T) { td := constructLogs() - processor, err := NewProcessor([]common.ContextStatements{{Context: "resource", Statements: []string{tt.statement}}}, ottl.IgnoreError, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor([]common.ContextStatements{{Context: "resource", Statements: []string{tt.statement}}}, ottl.IgnoreError, false, componenttest.NewNopTelemetrySettings()) assert.NoError(t, err) _, err = processor.ProcessLogs(context.Background(), td) @@ -84,7 +84,7 @@ func Test_ProcessLogs_ScopeContext(t *testing.T) { for _, tt := range tests { t.Run(tt.statement, func(t *testing.T) { td := constructLogs() - processor, err := NewProcessor([]common.ContextStatements{{Context: "scope", Statements: []string{tt.statement}}}, ottl.IgnoreError, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor([]common.ContextStatements{{Context: "scope", Statements: []string{tt.statement}}}, ottl.IgnoreError, false, componenttest.NewNopTelemetrySettings()) assert.NoError(t, err) _, err = processor.ProcessLogs(context.Background(), td) @@ -338,7 +338,7 @@ func Test_ProcessLogs_LogContext(t *testing.T) { for _, tt := range tests { t.Run(tt.statement, func(t *testing.T) { td := constructLogs() - processor, err := NewProcessor([]common.ContextStatements{{Context: "log", Statements: []string{tt.statement}}}, ottl.IgnoreError, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor([]common.ContextStatements{{Context: "log", Statements: []string{tt.statement}}}, ottl.IgnoreError, false, componenttest.NewNopTelemetrySettings()) assert.NoError(t, err) _, err = processor.ProcessLogs(context.Background(), td) @@ -455,7 +455,7 @@ func Test_ProcessLogs_MixContext(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { td := constructLogs() - processor, err := NewProcessor(tt.contextStatments, ottl.IgnoreError, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor(tt.contextStatments, ottl.IgnoreError, false, componenttest.NewNopTelemetrySettings()) assert.NoError(t, err) _, err = processor.ProcessLogs(context.Background(), td) @@ -488,7 +488,7 @@ func Test_ProcessTraces_Error(t *testing.T) { for _, tt := range tests { t.Run(string(tt.context), func(t *testing.T) { td := constructLogs() - processor, err := NewProcessor([]common.ContextStatements{{Context: tt.context, Statements: []string{`set(attributes["test"], ParseJSON(1))`}}}, ottl.PropagateError, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor([]common.ContextStatements{{Context: tt.context, Statements: []string{`set(attributes["test"], ParseJSON(1))`}}}, ottl.PropagateError, false, componenttest.NewNopTelemetrySettings()) assert.NoError(t, err) _, err = processor.ProcessLogs(context.Background(), td) diff --git a/processor/transformprocessor/processor_test.go b/processor/transformprocessor/processor_test.go new file mode 100644 index 000000000000..a617c2728729 --- /dev/null +++ b/processor/transformprocessor/processor_test.go @@ -0,0 +1,147 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package transformprocessor + +import ( + "context" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/processor/processortest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" +) + +func TestFlattenDataDisabledByDefault(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + oCfg := cfg.(*Config) + assert.False(t, oCfg.FlattenData) + assert.NoError(t, oCfg.Validate()) +} + +func TestFlattenDataRequiresGate(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + oCfg := cfg.(*Config) + oCfg.FlattenData = true + assert.Equal(t, errFlatLogsGateDisabled, oCfg.Validate()) +} + +func TestProcessLogsWithoutFlatten(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + oCfg := cfg.(*Config) + oCfg.LogStatements = []common.ContextStatements{ + { + Context: "log", + Statements: []string{ + `set(resource.attributes["host.name"], attributes["host.name"])`, + `delete_key(attributes, "host.name")`, + }, + }, + } + sink := new(consumertest.LogsSink) + p, err := factory.CreateLogsProcessor(context.Background(), processortest.NewNopSettings(), oCfg, sink) + require.NoError(t, err) + + input, err := golden.ReadLogs(filepath.Join("testdata", "logs", "input.yaml")) + require.NoError(t, err) + expected, err := golden.ReadLogs(filepath.Join("testdata", "logs", "expected-without-flatten.yaml")) + require.NoError(t, err) + + assert.NoError(t, p.ConsumeLogs(context.Background(), input)) + + actual := sink.AllLogs() + require.Len(t, actual, 1) + + assert.NoError(t, plogtest.CompareLogs(expected, actual[0])) +} + +func TestProcessLogsWithFlatten(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + oCfg := cfg.(*Config) + oCfg.FlattenData = true + oCfg.LogStatements = []common.ContextStatements{ + { + Context: "log", + Statements: []string{ + `set(resource.attributes["host.name"], attributes["host.name"])`, + `delete_key(attributes, "host.name")`, + }, + }, + } + sink := new(consumertest.LogsSink) + p, err := factory.CreateLogsProcessor(context.Background(), processortest.NewNopSettings(), oCfg, sink) + require.NoError(t, err) + + input, err := golden.ReadLogs(filepath.Join("testdata", "logs", "input.yaml")) + require.NoError(t, err) + expected, err := golden.ReadLogs(filepath.Join("testdata", "logs", "expected-with-flatten.yaml")) + require.NoError(t, err) + + assert.NoError(t, p.ConsumeLogs(context.Background(), input)) + + actual := sink.AllLogs() + require.Len(t, actual, 1) + + assert.NoError(t, plogtest.CompareLogs(expected, actual[0])) +} + +func BenchmarkLogsWithoutFlatten(b *testing.B) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + oCfg := cfg.(*Config) + oCfg.LogStatements = []common.ContextStatements{ + { + Context: "log", + Statements: []string{ + `set(resource.attributes["host.name"], attributes["host.name"])`, + `delete_key(attributes, "host.name")`, + }, + }, + } + sink := new(consumertest.LogsSink) + p, err := factory.CreateLogsProcessor(context.Background(), processortest.NewNopSettings(), oCfg, sink) + require.NoError(b, err) + + input, err := golden.ReadLogs(filepath.Join("testdata", "logs", "input.yaml")) + require.NoError(b, err) + + for n := 0; n < b.N; n++ { + assert.NoError(b, p.ConsumeLogs(context.Background(), input)) + } +} + +func BenchmarkLogsWithFlatten(b *testing.B) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + oCfg := cfg.(*Config) + oCfg.FlattenData = true + oCfg.LogStatements = []common.ContextStatements{ + { + Context: "log", + Statements: []string{ + `set(resource.attributes["host.name"], attributes["host.name"])`, + `delete_key(attributes, "host.name")`, + }, + }, + } + sink := new(consumertest.LogsSink) + p, err := factory.CreateLogsProcessor(context.Background(), processortest.NewNopSettings(), oCfg, sink) + require.NoError(b, err) + + input, err := golden.ReadLogs(filepath.Join("testdata", "logs", "input.yaml")) + require.NoError(b, err) + + for n := 0; n < b.N; n++ { + assert.NoError(b, p.ConsumeLogs(context.Background(), input)) + } +} diff --git a/processor/transformprocessor/testdata/logs/expected-with-flatten.yaml b/processor/transformprocessor/testdata/logs/expected-with-flatten.yaml new file mode 100644 index 000000000000..e9d036e217ed --- /dev/null +++ b/processor/transformprocessor/testdata/logs/expected-with-flatten.yaml @@ -0,0 +1,27 @@ +resourceLogs: + - resource: + attributes: + - key: host.name + value: + stringValue: HOST.ONE + scopeLogs: + - logRecords: + - body: + stringValue: "hello one" + attributes: + - key: log.file.name + value: + stringValue: one.log + - resource: + attributes: + - key: host.name + value: + stringValue: HOST.TWO + scopeLogs: + - logRecords: + - body: + stringValue: "hello two" + attributes: + - key: log.file.name + value: + stringValue: two.log diff --git a/processor/transformprocessor/testdata/logs/expected-without-flatten.yaml b/processor/transformprocessor/testdata/logs/expected-without-flatten.yaml new file mode 100644 index 000000000000..37803c4aefd1 --- /dev/null +++ b/processor/transformprocessor/testdata/logs/expected-without-flatten.yaml @@ -0,0 +1,20 @@ +resourceLogs: + - resource: + attributes: + - key: host.name + value: + stringValue: HOST.TWO + scopeLogs: + - logRecords: + - body: + stringValue: "hello one" + attributes: + - key: log.file.name + value: + stringValue: one.log + - body: + stringValue: "hello two" + attributes: + - key: log.file.name + value: + stringValue: two.log diff --git a/processor/transformprocessor/testdata/logs/input.yaml b/processor/transformprocessor/testdata/logs/input.yaml new file mode 100644 index 000000000000..811bbc572bbb --- /dev/null +++ b/processor/transformprocessor/testdata/logs/input.yaml @@ -0,0 +1,23 @@ +resourceLogs: + - resource: + attributes: + scopeLogs: + - logRecords: + - body: + stringValue: "hello one" + attributes: + - key: host.name + value: + stringValue: HOST.ONE + - key: log.file.name + value: + stringValue: one.log + - body: + stringValue: "hello two" + attributes: + - key: host.name + value: + stringValue: HOST.TWO + - key: log.file.name + value: + stringValue: two.log