Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .chloggen/tp-functions-per-context.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: processor/transform

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix issue where the metric context was using datapoint functions.

# One or more tracking issues related to the change
issues: [16251]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
14 changes: 7 additions & 7 deletions processor/transformprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,15 @@ func (c *Config) Validate() error {
}

if len(c.Traces.Statements) > 0 {
ottlspanp := ottlspan.NewParser(traces.Functions(), component.TelemetrySettings{Logger: zap.NewNop()})
ottlspanp := ottlspan.NewParser(traces.SpanFunctions(), component.TelemetrySettings{Logger: zap.NewNop()})
_, err := ottlspanp.ParseStatements(c.Traces.Statements)
if err != nil {
return err
}
}

if len(c.TraceStatements) > 0 {
pc, err := common.NewTraceParserCollection(traces.Functions(), component.TelemetrySettings{Logger: zap.NewNop()})
pc, err := common.NewTraceParserCollection(component.TelemetrySettings{Logger: zap.NewNop()}, common.WithSpanParser(traces.SpanFunctions()), common.WithSpanEventParser(traces.SpanEventFunctions()))
if err != nil {
return err
}
Expand All @@ -81,15 +81,15 @@ func (c *Config) Validate() error {
}

if len(c.Metrics.Statements) > 0 {
ottldatapointp := ottldatapoint.NewParser(metrics.Functions(), component.TelemetrySettings{Logger: zap.NewNop()})
_, err := ottldatapointp.ParseStatements(c.Metrics.Statements)
ottlmetricsp := ottldatapoint.NewParser(metrics.DataPointFunctions(), component.TelemetrySettings{Logger: zap.NewNop()})
_, err := ottlmetricsp.ParseStatements(c.Metrics.Statements)
if err != nil {
return err
}
}

if len(c.MetricStatements) > 0 {
pc, err := common.NewMetricParserCollection(metrics.Functions(), component.TelemetrySettings{Logger: zap.NewNop()})
pc, err := common.NewMetricParserCollection(component.TelemetrySettings{Logger: zap.NewNop()}, common.WithMetricParser(metrics.MetricFunctions()), common.WithDataPointParser(metrics.DataPointFunctions()))
if err != nil {
return err
}
Expand All @@ -102,15 +102,15 @@ func (c *Config) Validate() error {
}

if len(c.Logs.Statements) > 0 {
ottllogsp := ottllog.NewParser(logs.Functions(), component.TelemetrySettings{Logger: zap.NewNop()})
ottllogsp := ottllog.NewParser(logs.LogFunctions(), component.TelemetrySettings{Logger: zap.NewNop()})
_, err := ottllogsp.ParseStatements(c.Logs.Statements)
if err != nil {
return err
}
}

if len(c.LogStatements) > 0 {
pc, err := common.NewLogParserCollection(logs.Functions(), component.TelemetrySettings{Logger: zap.NewNop()})
pc, err := common.NewLogParserCollection(component.TelemetrySettings{Logger: zap.NewNop()}, common.WithLogParser(logs.LogFunctions()))
if err != nil {
return err
}
Expand Down
10 changes: 8 additions & 2 deletions processor/transformprocessor/internal/common/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,20 @@ type LogParserCollection struct {

type LogParserCollectionOption func(*LogParserCollection) error

func NewLogParserCollection(functions map[string]interface{}, settings component.TelemetrySettings, options ...LogParserCollectionOption) (*LogParserCollection, error) {
func WithLogParser(functions map[string]interface{}) LogParserCollectionOption {
return func(lp *LogParserCollection) error {
lp.logParser = ottllog.NewParser(functions, lp.settings)
return nil
}
}

func NewLogParserCollection(settings component.TelemetrySettings, options ...LogParserCollectionOption) (*LogParserCollection, error) {
lpc := &LogParserCollection{
parserCollection: parserCollection{
settings: settings,
resourceParser: ottlresource.NewParser(ResourceFunctions(), settings),
scopeParser: ottlscope.NewParser(ScopeFunctions(), settings),
},
logParser: ottllog.NewParser(functions, settings),
}

for _, op := range options {
Expand Down
18 changes: 15 additions & 3 deletions processor/transformprocessor/internal/common/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,27 @@ type MetricParserCollection struct {

type MetricParserCollectionOption func(*MetricParserCollection) error

func NewMetricParserCollection(functions map[string]interface{}, settings component.TelemetrySettings, options ...MetricParserCollectionOption) (*MetricParserCollection, error) {
func WithMetricParser(functions map[string]interface{}) MetricParserCollectionOption {
return func(mp *MetricParserCollection) error {
mp.metricParser = ottlmetric.NewParser(functions, mp.settings)
return nil
}
}

func WithDataPointParser(functions map[string]interface{}) MetricParserCollectionOption {
return func(mp *MetricParserCollection) error {
mp.dataPointParser = ottldatapoint.NewParser(functions, mp.settings)
return nil
}
}

func NewMetricParserCollection(settings component.TelemetrySettings, options ...MetricParserCollectionOption) (*MetricParserCollection, error) {
mpc := &MetricParserCollection{
parserCollection: parserCollection{
settings: settings,
resourceParser: ottlresource.NewParser(ResourceFunctions(), settings),
scopeParser: ottlscope.NewParser(ScopeFunctions(), settings),
},
metricParser: ottlmetric.NewParser(functions, settings),
dataPointParser: ottldatapoint.NewParser(functions, settings),
}

for _, op := range options {
Expand Down
17 changes: 15 additions & 2 deletions processor/transformprocessor/internal/common/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,27 @@ type TraceParserCollection struct {

type TraceParserCollectionOption func(*TraceParserCollection) error

func NewTraceParserCollection(functions map[string]interface{}, settings component.TelemetrySettings, options ...TraceParserCollectionOption) (*TraceParserCollection, error) {
func WithSpanParser(functions map[string]interface{}) TraceParserCollectionOption {
return func(tp *TraceParserCollection) error {
tp.spanParser = ottlspan.NewParser(functions, tp.settings)
return nil
}
}

func WithSpanEventParser(functions map[string]interface{}) TraceParserCollectionOption {
return func(tp *TraceParserCollection) error {
tp.spanEventParser = ottlspanevent.NewParser(functions, tp.settings)
return nil
}
}

func NewTraceParserCollection(settings component.TelemetrySettings, options ...TraceParserCollectionOption) (*TraceParserCollection, error) {
tpc := &TraceParserCollection{
parserCollection: parserCollection{
settings: settings,
resourceParser: ottlresource.NewParser(ResourceFunctions(), settings),
scopeParser: ottlscope.NewParser(ScopeFunctions(), settings),
},
spanParser: ottlspan.NewParser(functions, settings),
}

for _, op := range options {
Expand Down
2 changes: 1 addition & 1 deletion processor/transformprocessor/internal/logs/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

func Functions() map[string]interface{} {
func LogFunctions() map[string]interface{} {
// No logs-only functions yet.
return common.Functions[ottllog.TransformContext]()
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

func Test_DefaultFunctions(t *testing.T) {
func Test_LogFunctions(t *testing.T) {
expected := common.Functions[ottllog.TransformContext]()
actual := Functions()
actual := LogFunctions()
require.Equal(t, len(expected), len(actual))
for k := range actual {
assert.Contains(t, expected, k)
Expand Down
4 changes: 2 additions & 2 deletions processor/transformprocessor/internal/logs/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Processor struct {

func NewProcessor(statements []string, contextStatements []common.ContextStatements, settings component.TelemetrySettings) (*Processor, error) {
if len(statements) > 0 {
ottlp := ottllog.NewParser(Functions(), settings)
ottlp := ottllog.NewParser(LogFunctions(), settings)
parsedStatements, err := ottlp.ParseStatements(statements)
if err != nil {
return nil, err
Expand All @@ -44,7 +44,7 @@ func NewProcessor(statements []string, contextStatements []common.ContextStateme
}, nil
}

pc, err := common.NewLogParserCollection(Functions(), settings)
pc, err := common.NewLogParserCollection(settings, common.WithLogParser(LogFunctions()))
if err != nil {
return nil, err
}
Expand Down
13 changes: 9 additions & 4 deletions processor/transformprocessor/internal/metrics/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ package metrics // import "github.com/open-telemetry/opentelemetry-collector-con

import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

// registry is a map of names to functions for metrics pipelines
var registry = map[string]interface{}{
var datapointRegistry = map[string]interface{}{
"convert_sum_to_gauge": convertSumToGauge,
"convert_gauge_to_sum": convertGaugeToSum,
"convert_summary_sum_val_to_sum": convertSummarySumValToSum,
Expand All @@ -30,10 +31,14 @@ var registry = map[string]interface{}{
func init() {
// Init metrics registry with default functions common to all signals
for k, v := range common.Functions[ottldatapoint.TransformContext]() {
registry[k] = v
datapointRegistry[k] = v
}
}

func Functions() map[string]interface{} {

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the root of the issue: the metrics processor was calling this function to initialize the MetricsParserCollection, which was then using the registry to parse metric statements. The issue is that these functions are datapoint context functions, so when used with metric statements things would panic if a metric path was used that isn't in datapoints, such as name.

return registry
func DataPointFunctions() map[string]interface{} {
return datapointRegistry
}

func MetricFunctions() map[string]interface{} {
return common.Functions[ottlmetric.TransformContext]()
}
14 changes: 12 additions & 2 deletions processor/transformprocessor/internal/metrics/functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,30 @@ import (
"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

func Test_DefaultFunctions(t *testing.T) {
func Test_DataPointFunctions(t *testing.T) {
expected := common.Functions[ottldatapoint.TransformContext]()
expected["convert_sum_to_gauge"] = convertSumToGauge
expected["convert_gauge_to_sum"] = convertGaugeToSum
expected["convert_summary_sum_val_to_sum"] = convertSummarySumValToSum
expected["convert_summary_count_val_to_sum"] = convertSummaryCountValToSum

actual := Functions()
actual := DataPointFunctions()

require.Equal(t, len(expected), len(actual))
for k := range actual {
assert.Contains(t, expected, k)
}
}

func Test_MetricFunctions(t *testing.T) {
expected := common.Functions[ottlmetric.TransformContext]()
actual := MetricFunctions()
require.Equal(t, len(expected), len(actual))
for k := range actual {
assert.Contains(t, expected, k)
}
}
4 changes: 2 additions & 2 deletions processor/transformprocessor/internal/metrics/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Processor struct {

func NewProcessor(statements []string, contextStatements []common.ContextStatements, settings component.TelemetrySettings) (*Processor, error) {
if len(statements) > 0 {
ottlp := ottldatapoint.NewParser(Functions(), settings)
ottlp := ottldatapoint.NewParser(DataPointFunctions(), settings)
parsedStatements, err := ottlp.ParseStatements(statements)
if err != nil {
return nil, err
Expand All @@ -45,7 +45,7 @@ func NewProcessor(statements []string, contextStatements []common.ContextStateme
}, nil
}

pc, err := common.NewMetricParserCollection(Functions(), settings)
pc, err := common.NewMetricParserCollection(settings, common.WithMetricParser(MetricFunctions()), common.WithDataPointParser(DataPointFunctions()))
if err != nil {
return nil, err
}
Expand Down
8 changes: 7 additions & 1 deletion processor/transformprocessor/internal/traces/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@ package traces // import "github.com/open-telemetry/opentelemetry-collector-cont

import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspanevent"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

func Functions() map[string]interface{} {
func SpanFunctions() map[string]interface{} {
// No trace-only functions yet.
return common.Functions[ottlspan.TransformContext]()
}

func SpanEventFunctions() map[string]interface{} {
// No trace-only functions yet.
return common.Functions[ottlspanevent.TransformContext]()
}
14 changes: 12 additions & 2 deletions processor/transformprocessor/internal/traces/functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,22 @@ import (
"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspanevent"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

func Test_DefaultFunctions(t *testing.T) {
func Test_SpanFunctions(t *testing.T) {
expected := common.Functions[ottlspan.TransformContext]()
actual := Functions()
actual := SpanFunctions()
require.Equal(t, len(expected), len(actual))
for k := range actual {
assert.Contains(t, expected, k)
}
}

func Test_SpanEventFunctions(t *testing.T) {
expected := common.Functions[ottlspanevent.TransformContext]()
actual := SpanEventFunctions()
require.Equal(t, len(expected), len(actual))
for k := range actual {
assert.Contains(t, expected, k)
Expand Down
4 changes: 2 additions & 2 deletions processor/transformprocessor/internal/traces/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Processor struct {

func NewProcessor(statements []string, contextStatements []common.ContextStatements, settings component.TelemetrySettings) (*Processor, error) {
if len(statements) > 0 {
ottlp := ottlspan.NewParser(Functions(), settings)
ottlp := ottlspan.NewParser(SpanFunctions(), settings)
parsedStatements, err := ottlp.ParseStatements(statements)
if err != nil {
return nil, err
Expand All @@ -44,7 +44,7 @@ func NewProcessor(statements []string, contextStatements []common.ContextStateme
}, nil
}

pc, err := common.NewTraceParserCollection(Functions(), settings)
pc, err := common.NewTraceParserCollection(settings, common.WithSpanParser(SpanFunctions()), common.WithSpanEventParser(SpanEventFunctions()))
if err != nil {
return nil, err
}
Expand Down