diff --git a/.chloggen/ottl-profilesample.yaml b/.chloggen/ottl-profilesample.yaml new file mode 100644 index 0000000000000..f028710f9ef5b --- /dev/null +++ b/.chloggen/ottl-profilesample.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/ottl + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add OTTL support for sample submessage of OTel Profiling signal. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [40161] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/pkg/ottl/contexts/internal/ctxprofilesample/context.go b/pkg/ottl/contexts/internal/ctxprofilesample/context.go new file mode 100644 index 0000000000000..b0fdb3f8edbb2 --- /dev/null +++ b/pkg/ottl/contexts/internal/ctxprofilesample/context.go @@ -0,0 +1,16 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package ctxprofilesample // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxprofilesample" + +import "go.opentelemetry.io/collector/pdata/pprofile" + +const ( + Name = "profilesample" + DocRef = "https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottlprofilesample" +) + +type Context interface { + GetProfileSample() pprofile.Sample + GetProfilesDictionary() pprofile.ProfilesDictionary +} diff --git a/pkg/ottl/contexts/internal/ctxprofilesample/profilesample.go b/pkg/ottl/contexts/internal/ctxprofilesample/profilesample.go new file mode 100644 index 0000000000000..b471504380c68 --- /dev/null +++ b/pkg/ottl/contexts/internal/ctxprofilesample/profilesample.go @@ -0,0 +1,218 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package ctxprofilesample // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxprofilesample" + +import ( + "context" + "errors" + "math" + "time" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pprofile" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxerror" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxutil" +) + +var ( + errMaxValueExceed = errors.New("exceeded max value") + errInvalidValueType = errors.New("invalid value type") +) + +func PathGetSetter[K Context](path ottl.Path[K]) (ottl.GetSetter[K], error) { + if path == nil { + return nil, ctxerror.New("nil", "nil", Name, DocRef) + } + switch path.Name() { + case "locations_start_index": + return accessLocationsStartIndex[K](), nil + case "locations_length": + return accessLocationsLength[K](), nil + case "values": + return accessValues[K](), nil + case "attribute_indices": + return accessAttributeIndices[K](), nil + case "link_index": + return accessLinkIndex[K](), nil + case "timestamps_unix_nano": + return accessTimestampsUnixNano[K](), nil + case "timestamps": + return accessTimestamps[K](), nil + case "attributes": + if path.Keys() == nil { + return accessAttributes[K](), nil + } + return accessAttributesKey(path.Keys()), nil + default: + return nil, ctxerror.New(path.Name(), path.String(), Name, DocRef) + } +} + +func accessLocationsStartIndex[K Context]() ottl.StandardGetSetter[K] { + return ottl.StandardGetSetter[K]{ + Getter: func(_ context.Context, tCtx K) (any, error) { + return int64(tCtx.GetProfileSample().LocationsStartIndex()), nil + }, + Setter: func(_ context.Context, tCtx K, val any) error { + if v, ok := val.(int64); ok { + if v >= math.MaxInt32 { + return errMaxValueExceed + } + tCtx.GetProfileSample().SetLocationsStartIndex(int32(v)) + return nil + } + return errInvalidValueType + }, + } +} + +func accessLocationsLength[K Context]() ottl.StandardGetSetter[K] { + return ottl.StandardGetSetter[K]{ + Getter: func(_ context.Context, tCtx K) (any, error) { + return int64(tCtx.GetProfileSample().LocationsLength()), nil + }, + Setter: func(_ context.Context, tCtx K, val any) error { + if v, ok := val.(int64); ok { + if v >= math.MaxInt32 { + return errMaxValueExceed + } + tCtx.GetProfileSample().SetLocationsLength(int32(v)) + return nil + } + return errInvalidValueType + }, + } +} + +func accessValues[K Context]() ottl.StandardGetSetter[K] { + return ottl.StandardGetSetter[K]{ + Getter: func(_ context.Context, tCtx K) (any, error) { + return ctxutil.GetCommonIntSliceValues[int64](tCtx.GetProfileSample().Value()), nil + }, + Setter: func(_ context.Context, tCtx K, val any) error { + return ctxutil.SetCommonIntSliceValues[int64](tCtx.GetProfileSample().Value(), val) + }, + } +} + +func accessAttributeIndices[K Context]() ottl.StandardGetSetter[K] { + return ottl.StandardGetSetter[K]{ + Getter: func(_ context.Context, tCtx K) (any, error) { + return ctxutil.GetCommonIntSliceValues[int32](tCtx.GetProfileSample().AttributeIndices()), nil + }, + Setter: func(_ context.Context, tCtx K, val any) error { + return ctxutil.SetCommonIntSliceValues[int32](tCtx.GetProfileSample().AttributeIndices(), val) + }, + } +} + +func accessLinkIndex[K Context]() ottl.StandardGetSetter[K] { + return ottl.StandardGetSetter[K]{ + Getter: func(_ context.Context, tCtx K) (any, error) { + return int64(tCtx.GetProfileSample().LinkIndex()), nil + }, + Setter: func(_ context.Context, tCtx K, val any) error { + if v, ok := val.(int64); ok { + if v >= math.MaxInt32 { + return errMaxValueExceed + } + tCtx.GetProfileSample().SetLinkIndex(int32(v)) + return nil + } + return errInvalidValueType + }, + } +} + +func accessTimestampsUnixNano[K Context]() ottl.StandardGetSetter[K] { + return ottl.StandardGetSetter[K]{ + Getter: func(_ context.Context, tCtx K) (any, error) { + return ctxutil.GetCommonIntSliceValues[uint64](tCtx.GetProfileSample().TimestampsUnixNano()), nil + }, + Setter: func(_ context.Context, tCtx K, val any) error { + return ctxutil.SetCommonIntSliceValues[uint64](tCtx.GetProfileSample().TimestampsUnixNano(), val) + }, + } +} + +func accessTimestamps[K Context]() ottl.StandardGetSetter[K] { + return ottl.StandardGetSetter[K]{ + Getter: func(_ context.Context, tCtx K) (any, error) { + var ts []time.Time + for _, t := range tCtx.GetProfileSample().TimestampsUnixNano().All() { + ts = append(ts, time.Unix(0, int64(t)).UTC()) + } + return ts, nil + }, + Setter: func(_ context.Context, tCtx K, val any) error { + if ts, ok := val.([]time.Time); ok { + tCtx.GetProfileSample().TimestampsUnixNano().FromRaw([]uint64{}) + for _, t := range ts { + tCtx.GetProfileSample().TimestampsUnixNano().Append(uint64(t.UTC().UnixNano())) + } + } + return nil + }, + } +} + +func accessAttributes[K Context]() ottl.StandardGetSetter[K] { + return ottl.StandardGetSetter[K]{ + Getter: func(_ context.Context, tCtx K) (any, error) { + return pprofile.FromAttributeIndices(tCtx.GetProfilesDictionary().AttributeTable(), tCtx.GetProfileSample()), nil + }, + Setter: func(_ context.Context, tCtx K, val any) error { + m, err := ctxutil.GetMap(val) + if err != nil { + return err + } + tCtx.GetProfileSample().AttributeIndices().FromRaw([]int32{}) + for k, v := range m.All() { + if err := pprofile.PutAttribute(tCtx.GetProfilesDictionary().AttributeTable(), tCtx.GetProfileSample(), k, v); err != nil { + return err + } + } + return nil + }, + } +} + +func accessAttributesKey[K Context](key []ottl.Key[K]) ottl.StandardGetSetter[K] { + return ottl.StandardGetSetter[K]{ + Getter: func(ctx context.Context, tCtx K) (any, error) { + return ctxutil.GetMapValue[K](ctx, tCtx, pprofile.FromAttributeIndices(tCtx.GetProfilesDictionary().AttributeTable(), tCtx.GetProfileSample()), key) + }, + Setter: func(ctx context.Context, tCtx K, val any) error { + newKey, err := ctxutil.GetMapKeyName(ctx, tCtx, key[0]) + if err != nil { + return err + } + v := getAttributeValue(tCtx, *newKey) + if err := ctxutil.SetIndexableValue[K](ctx, tCtx, v, val, key[1:]); err != nil { + return err + } + return pprofile.PutAttribute(tCtx.GetProfilesDictionary().AttributeTable(), tCtx.GetProfileSample(), *newKey, v) + }, + } +} + +func getAttributeValue[K Context](tCtx K, key string) pcommon.Value { + // Find the index of the attribute in the profile's attribute indices + // and return the corresponding value from the attribute table. + table := tCtx.GetProfilesDictionary().AttributeTable() + indices := tCtx.GetProfileSample().AttributeIndices().AsRaw() + + for _, tableIndex := range indices { + attr := table.At(int(tableIndex)) + if attr.Key() == key { + v := pcommon.NewValueEmpty() + attr.Value().CopyTo(v) + return v + } + } + + return pcommon.NewValueEmpty() +} diff --git a/pkg/ottl/contexts/internal/ctxprofilesample/profilesample_test.go b/pkg/ottl/contexts/internal/ctxprofilesample/profilesample_test.go new file mode 100644 index 0000000000000..0caafee2a838e --- /dev/null +++ b/pkg/ottl/contexts/internal/ctxprofilesample/profilesample_test.go @@ -0,0 +1,100 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package ctxprofilesample // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxprofilesample" + +import ( + "context" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pprofile" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/pathtest" +) + +func TestPathGetSetter(t *testing.T) { + tsNow := time.Now().UTC() + tests := []struct { + path string + val any + keys []ottl.Key[*profileSampleContext] + }{ + { + path: "locations_start_index", + val: int64(42), + }, + { + path: "locations_length", + val: int64(43), + }, + { + path: "values", + val: []int64{73, 74, 75}, + }, + { + path: "attribute_indices", + val: []int64{97, 98, 99}, + }, + { + path: "link_index", + val: int64(44), + }, + { + path: "timestamps_unix_nano", + val: []int64{tsNow.Unix(), 2, 3}, + }, + { + path: "timestamps", + val: []time.Time{tsNow}, + }, + } + + for _, tt := range tests { + t.Run(tt.path, func(t *testing.T) { + pathParts := strings.Split(tt.path, " ") + path := &pathtest.Path[*profileSampleContext]{N: pathParts[0]} + if tt.keys != nil { + path.KeySlice = tt.keys + } + if len(pathParts) > 1 { + path.NextPath = &pathtest.Path[*profileSampleContext]{N: pathParts[1]} + } + + sample := pprofile.NewSample() + dictionary := pprofile.NewProfilesDictionary() + + accessor, err := PathGetSetter(path) + require.NoError(t, err) + + err = accessor.Set(context.Background(), newProfileSampleContext(sample, dictionary), tt.val) + require.NoError(t, err) + + got, err := accessor.Get(context.Background(), newProfileSampleContext(sample, dictionary)) + require.NoError(t, err) + + assert.Equal(t, tt.val, got) + }) + } +} + +type profileSampleContext struct { + sample pprofile.Sample + dictionary pprofile.ProfilesDictionary +} + +func (p *profileSampleContext) GetProfilesDictionary() pprofile.ProfilesDictionary { + return p.dictionary +} + +func (p *profileSampleContext) GetProfileSample() pprofile.Sample { + return p.sample +} + +func newProfileSampleContext(sample pprofile.Sample, dictionary pprofile.ProfilesDictionary) *profileSampleContext { + return &profileSampleContext{sample: sample, dictionary: dictionary} +} diff --git a/pkg/ottl/contexts/internal/logprofile/logging.go b/pkg/ottl/contexts/internal/logprofile/logging.go index ef008f0090676..2b910c773e4f2 100644 --- a/pkg/ottl/contexts/internal/logprofile/logging.go +++ b/pkg/ottl/contexts/internal/logprofile/logging.go @@ -90,9 +90,14 @@ func (p Profile) MarshalLogObject(encoder zapcore.ObjectEncoder) error { joinedErr = errors.Join(joinedErr, err) joinedErr = errors.Join(joinedErr, encoder.AddArray("sample_type", vts)) - ss, err := newSamples(p, p.Sample()) - joinedErr = errors.Join(joinedErr, err) - joinedErr = errors.Join(joinedErr, encoder.AddArray("sample", ss)) + samples := p.Sample() + for _, s := range samples.All() { + joinedErr = errors.Join(joinedErr, encoder.AddObject("sample", ProfileSample{ + s, + p.Profile, + p.Dictionary, + })) + } encoder.AddInt64("time_nanos", int64(p.Time())) encoder.AddInt64("duration_nanos", int64(p.Duration())) @@ -124,60 +129,36 @@ func (p Profile) MarshalLogObject(encoder zapcore.ObjectEncoder) error { return joinedErr } -type samples []sample - -func (ss samples) MarshalLogArray(encoder zapcore.ArrayEncoder) error { - var joinedErr error - for _, s := range ss { - joinedErr = errors.Join(joinedErr, encoder.AppendObject(s)) - } - return joinedErr +type ProfileSample struct { + pprofile.Sample + Profile pprofile.Profile + Dictionary pprofile.ProfilesDictionary } -func newSamples(p Profile, sampleSlice pprofile.SampleSlice) (samples, error) { +func (s ProfileSample) MarshalLogObject(encoder zapcore.ObjectEncoder) error { var joinedErr error - ss := make(samples, 0, sampleSlice.Len()) - for i := range sampleSlice.Len() { - s, err := newSample(p, sampleSlice.At(i)) - joinedErr = errors.Join(joinedErr, err) - ss = append(ss, s) - } - return ss, joinedErr -} -type sample struct { - timestamps timestamps - attributes attributes - locations locations - values values - link *link -} + locs, err := getLocations(s.Dictionary, s.Profile.LocationIndices().AsRaw(), s.LocationsStartIndex(), s.LocationsLength()) + joinedErr = errors.Join(joinedErr, err) + joinedErr = errors.Join(joinedErr, encoder.AddArray("locations", locs)) -func newSample(p Profile, ps pprofile.Sample) (sample, error) { - var s sample - var err, joinedErr error + values := newValues(s.Value()) + joinedErr = errors.Join(joinedErr, encoder.AddArray("values", values)) - s.timestamps = newTimestamps(ps.TimestampsUnixNano()) - s.values = newValues(ps.Value()) - s.attributes, err = newAttributes(p.Dictionary, ps.AttributeIndices()) - joinedErr = errors.Join(joinedErr, err) - s.locations, err = getLocations(p.Dictionary, p.LocationIndices().AsRaw(), - ps.LocationsStartIndex(), ps.LocationsLength()) + ats, err := newAttributes(s.Dictionary, s.AttributeIndices()) joinedErr = errors.Join(joinedErr, err) - if ps.HasLinkIndex() { // optional - l, err := getLink(p.Dictionary, ps.LinkIndex()) + joinedErr = errors.Join(joinedErr, encoder.AddArray("attributes", ats)) + + if s.HasLinkIndex() { + l, err := getLink(s.Dictionary, s.LinkIndex()) joinedErr = errors.Join(joinedErr, err) - s.link = &l + joinedErr = errors.Join(joinedErr, encoder.AddObject("link", l)) } - return s, joinedErr -} + ts := newTimestamps(s.TimestampsUnixNano()) + joinedErr = errors.Join(joinedErr, encoder.AddArray("timestamps_unix_nano", ts)) -func (s sample) MarshalLogObject(encoder zapcore.ObjectEncoder) error { - err := encoder.AddArray("timestamps_unix_nano", s.timestamps) - err = errors.Join(err, encoder.AddArray("attributes", s.attributes)) - err = errors.Join(err, encoder.AddArray("locations", s.locations)) - return errors.Join(err, encoder.AddArray("values", s.values)) + return joinedErr } type valueTypes []valueType diff --git a/pkg/ottl/contexts/internal/logprofile/logging_test.go b/pkg/ottl/contexts/internal/logprofile/logging_test.go index f94f045060917..9c6ec1b8f7fcc 100644 --- a/pkg/ottl/contexts/internal/logprofile/logging_test.go +++ b/pkg/ottl/contexts/internal/logprofile/logging_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pprofile" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -26,7 +27,41 @@ func TestProfile_MarshalLogObject(t *testing.T) { profile: func() (pprofile.ProfilesDictionary, pprofile.Profile) { dic := pprofile.NewProfilesDictionary() p := &pprofiletest.Profile{ - ProfileID: pprofile.ProfileID([]byte("profileid1111111")), + ProfileID: pprofile.ProfileID([]byte("profileid1111111")), + SampleType: []pprofiletest.ValueType{ + { + Typ: "test", + Unit: "foobar", + }, + }, + Sample: []pprofiletest.Sample{ + { + Locations: []pprofiletest.Location{ + { + Address: 0x42, + }, + }, + Value: []int64{73}, + Link: &pprofiletest.Link{ + TraceID: pcommon.TraceID{ + 0xf, 0xe, 0xd, 0xc, 0xb, 0xa, 0x9, 0x8, + 0x7, 0x6, 0x5, 0x4, 0x3, 0x2, 0x1, 0x0, + }, + SpanID: pcommon.SpanID{0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7}, + }, + }, + { + Locations: []pprofiletest.Location{ + { + Address: 0x43, + }, + }, + Value: []int64{74}, + Attributes: []pprofiletest.Attribute{ + {Key: "sample2", Value: "value2"}, + }, + }, + }, Attributes: []pprofiletest.Attribute{{Key: "container-attr1", Value: "value1"}}, Comment: []string{"comment1", "comment2"}, } diff --git a/pkg/ottl/contexts/ottlprofilesample/README.md b/pkg/ottl/contexts/ottlprofilesample/README.md new file mode 100644 index 0000000000000..8618dc6f71eab --- /dev/null +++ b/pkg/ottl/contexts/ottlprofilesample/README.md @@ -0,0 +1,33 @@ +# Profile Sample Context + +> [!NOTE] +> This documentation applies only to version `0.132.0` and later. Information on earlier versions is not available. + +The Profile Sample Context is a Context implementation for [pdata Profiles](https://github.com/open-telemetry/opentelemetry-collector/tree/main/pdata/pprofile), the collector's internal representation for OTLP profile data. This Context should be used when interacted with OTLP profiles. + +## Paths +In general, the Profile Sample Context supports accessing pdata using the field names from the [profiles proto](https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/profiles/v1development/profiles.proto). All integers are returned and set via `int64`. All doubles are returned and set via `float64`. + +The following paths are supported. + +| path | field accessed | type | +|------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------| +| cache | the value of the current transform context's temporary cache. cache can be used as a temporary placeholder for data during complex transformations | pcommon.Map | +| cache\[""\] | the value of an item in cache. Supports multiple indexes to access nested fields. | string, bool, int64, float64, pcommon.Map, pcommon.Slice, []byte or nil | +| resource | resource of the profile being processed | pcommon.Resource | +| resource.attributes | resource attributes of the profile being processed | pcommon.Map | +| resource.attributes\[""\] | the value of the resource attribute of the profile being processed. Supports multiple indexes to access nested fields. | string, bool, int64, float64, pcommon.Map, pcommon.Slice, []byte or nil | +| instrumentation_scope | instrumentation scope of the profile being processed | pcommon.InstrumentationScope | +| instrumentation_scope.name | name of the instrumentation scope of the profile being processed | string | +| instrumentation_scope.version | version of the instrumentation scope of the profile being processed | string | +| instrumentation_scope.attributes | instrumentation scope attributes of the data point being processed | pcommon.Map | +| instrumentation_scope.attributes\[""\] | the value of the instrumentation scope attribute of the data point being processed. Supports multiple indexes to access nested fields. | string, bool, int64, float64, pcommon.Map, pcommon.Slice, []byte or nil | +| profilesample.attributes | attributes of the profile being processed | pcommon.Map | +| profilesample.attributes\[""\] | the value of the attribute of the profile being processed. Supports multiple indexes to access nested fields. | string, bool, int64, float64, pcommon.Map, pcommon.Slice, []byte or nil | +| profilesample.locations_start_index | the locations start index into the ProfilesDictionary of the sample being processed. | int64 | +| profilesample.locations_length | the locations length of the sample being processed. | int64 | +| profilesample.values | the values of the sample being processed. | []int64 | +| profilesample.link_index | the link index into the ProfilesDictionary of the sample being processed. | int64 | +| profilesample.timestamps_unix_nano | the timestamps in unix nano associated with the sample being processed. | []int64 | +| profilesample.timestamps | the timestamps in `time.Time` associated with the sample being processed. | []time.Time | +| profilesample.attribute_indices | the attribute indices of the sample being processed. | []int64 | diff --git a/pkg/ottl/contexts/ottlprofilesample/package_test.go b/pkg/ottl/contexts/ottlprofilesample/package_test.go new file mode 100644 index 0000000000000..1db8ded68bc5a --- /dev/null +++ b/pkg/ottl/contexts/ottlprofilesample/package_test.go @@ -0,0 +1,14 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package ottlprofilesample + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/pkg/ottl/contexts/ottlprofilesample/profilesample.go b/pkg/ottl/contexts/ottlprofilesample/profilesample.go new file mode 100644 index 0000000000000..e0291ef160141 --- /dev/null +++ b/pkg/ottl/contexts/ottlprofilesample/profilesample.go @@ -0,0 +1,203 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package ottlprofilesample // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlprofilesample" + +import ( + "errors" + "fmt" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pprofile" + "go.uber.org/zap/zapcore" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxcache" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxcommon" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxprofile" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxprofilesample" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxresource" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxscope" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/logging" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/logprofile" +) + +// ContextName is the name of the context for profiles. +// Experimental: *NOTE* this constant is subject to change or removal in the future. +const ContextName = ctxprofilesample.Name + +var ( + _ ctxresource.Context = TransformContext{} + _ ctxscope.Context = TransformContext{} + _ ctxprofilesample.Context = TransformContext{} + _ ctxprofile.Context = TransformContext{} + _ zapcore.ObjectMarshaler = TransformContext{} +) + +// TransformContext represents a profile and its associated hierarchy. +type TransformContext struct { + sample pprofile.Sample + profile pprofile.Profile + dictionary pprofile.ProfilesDictionary + instrumentationScope pcommon.InstrumentationScope + resource pcommon.Resource + cache pcommon.Map + scopeProfiles pprofile.ScopeProfiles + resourceProfiles pprofile.ResourceProfiles +} + +// MarshalLogObject serializes the profile into a zapcore.ObjectEncoder for logging. +func (tCtx TransformContext) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + err := encoder.AddObject("resource", logging.Resource(tCtx.resource)) + err = errors.Join(err, encoder.AddObject("scope", logging.InstrumentationScope(tCtx.instrumentationScope))) + err = errors.Join(err, encoder.AddObject("sample", logprofile.ProfileSample{Sample: tCtx.sample, Dictionary: tCtx.dictionary})) + err = errors.Join(err, encoder.AddObject("cache", logging.Map(tCtx.cache))) + return err +} + +// TransformContextOption represents an option for configuring a TransformContext. +type TransformContextOption func(*TransformContext) + +// NewTransformContext creates a new TransformContext with the provided parameters. +func NewTransformContext(sample pprofile.Sample, profile pprofile.Profile, dictionary pprofile.ProfilesDictionary, instrumentationScope pcommon.InstrumentationScope, resource pcommon.Resource, scopeProfiles pprofile.ScopeProfiles, resourceProfiles pprofile.ResourceProfiles, options ...TransformContextOption) TransformContext { + tc := TransformContext{ + sample: sample, + profile: profile, + dictionary: dictionary, + instrumentationScope: instrumentationScope, + resource: resource, + cache: pcommon.NewMap(), + scopeProfiles: scopeProfiles, + resourceProfiles: resourceProfiles, + } + for _, opt := range options { + opt(&tc) + } + return tc +} + +// GetProfile returns the profile from the TransformContext. +func (tCtx TransformContext) GetProfile() pprofile.Profile { + return tCtx.profile +} + +// GetProfileSample returns the sample from the TransformContext. +func (tCtx TransformContext) GetProfileSample() pprofile.Sample { + return tCtx.sample +} + +// GetProfilesDictionary returns the profiles dictionary from the TransformContext. +func (tCtx TransformContext) GetProfilesDictionary() pprofile.ProfilesDictionary { + return tCtx.dictionary +} + +// GetInstrumentationScope returns the instrumentation scope from the TransformContext. +func (tCtx TransformContext) GetInstrumentationScope() pcommon.InstrumentationScope { + return tCtx.instrumentationScope +} + +// GetResource returns the resource from the TransformContext. +func (tCtx TransformContext) GetResource() pcommon.Resource { + return tCtx.resource +} + +// GetScopeSchemaURLItem returns the scope schema URL item from the TransformContext. +func (tCtx TransformContext) GetScopeSchemaURLItem() ctxcommon.SchemaURLItem { + return tCtx.scopeProfiles +} + +// GetResourceSchemaURLItem returns the resource schema URL item from the TransformContext. +func (tCtx TransformContext) GetResourceSchemaURLItem() ctxcommon.SchemaURLItem { + return tCtx.resourceProfiles +} + +// NewParser creates a new profile parser with the provided functions and options. +func NewParser(functions map[string]ottl.Factory[TransformContext], telemetrySettings component.TelemetrySettings, options ...ottl.Option[TransformContext]) (ottl.Parser[TransformContext], error) { + return ctxcommon.NewParser( + functions, + telemetrySettings, + pathExpressionParser(getCache), + parseEnum, + options..., + ) +} + +// EnablePathContextNames enables the support for path's context names on statements. +// When this option is configured, all statement's paths must have a valid context prefix, +// otherwise an error is reported. +// +// Experimental: *NOTE* this option is subject to change or removal in the future. +func EnablePathContextNames() ottl.Option[TransformContext] { + return func(p *ottl.Parser[TransformContext]) { + ottl.WithPathContextNames[TransformContext]([]string{ + ctxprofilesample.Name, + ctxscope.LegacyName, + ctxscope.Name, + ctxresource.Name, + })(p) + } +} + +// StatementSequenceOption represents an option for configuring a statement sequence. +type StatementSequenceOption func(*ottl.StatementSequence[TransformContext]) + +// WithStatementSequenceErrorMode sets the error mode for a statement sequence. +func WithStatementSequenceErrorMode(errorMode ottl.ErrorMode) StatementSequenceOption { + return func(s *ottl.StatementSequence[TransformContext]) { + ottl.WithStatementSequenceErrorMode[TransformContext](errorMode)(s) + } +} + +// NewStatementSequence creates a new statement sequence with the provided statements and options. +func NewStatementSequence(statements []*ottl.Statement[TransformContext], telemetrySettings component.TelemetrySettings, options ...StatementSequenceOption) ottl.StatementSequence[TransformContext] { + s := ottl.NewStatementSequence(statements, telemetrySettings) + for _, op := range options { + op(&s) + } + return s +} + +// ConditionSequenceOption represents an option for configuring a condition sequence. +type ConditionSequenceOption func(*ottl.ConditionSequence[TransformContext]) + +// WithConditionSequenceErrorMode sets the error mode for a condition sequence. +func WithConditionSequenceErrorMode(errorMode ottl.ErrorMode) ConditionSequenceOption { + return func(c *ottl.ConditionSequence[TransformContext]) { + ottl.WithConditionSequenceErrorMode[TransformContext](errorMode)(c) + } +} + +// NewConditionSequence creates a new condition sequence with the provided conditions and options. +func NewConditionSequence(conditions []*ottl.Condition[TransformContext], telemetrySettings component.TelemetrySettings, options ...ConditionSequenceOption) ottl.ConditionSequence[TransformContext] { + c := ottl.NewConditionSequence(conditions, telemetrySettings) + for _, op := range options { + op(&c) + } + return c +} + +func parseEnum(val *ottl.EnumSymbol) (*ottl.Enum, error) { + if val != nil { + return nil, fmt.Errorf("enum symbol, %s, not found", *val) + } + return nil, errors.New("enum symbol not provided") +} + +func getCache(tCtx TransformContext) pcommon.Map { + return tCtx.cache +} + +func pathExpressionParser(cacheGetter ctxcache.Getter[TransformContext]) ottl.PathExpressionParser[TransformContext] { + return ctxcommon.PathExpressionParser( + ctxprofilesample.Name, + ctxprofilesample.DocRef, + cacheGetter, + map[string]ottl.PathExpressionParser[TransformContext]{ + ctxresource.Name: ctxresource.PathGetSetter[TransformContext], + ctxscope.Name: ctxscope.PathGetSetter[TransformContext], + ctxscope.LegacyName: ctxscope.PathGetSetter[TransformContext], + ctxprofile.Name: ctxprofile.PathGetSetter[TransformContext], + ctxprofilesample.Name: ctxprofilesample.PathGetSetter[TransformContext], + }) +} diff --git a/pkg/ottl/contexts/ottlprofilesample/profilesample_test.go b/pkg/ottl/contexts/ottlprofilesample/profilesample_test.go new file mode 100644 index 0000000000000..2c78938e6f82d --- /dev/null +++ b/pkg/ottl/contexts/ottlprofilesample/profilesample_test.go @@ -0,0 +1,210 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package ottlprofilesample // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlprofilesample" + +import ( + "context" + "slices" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pprofile" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxprofilesample" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/pathtest" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/ottltest" +) + +func Test_newPathGetSetter_Cache(t *testing.T) { + newCache := pcommon.NewMap() + newCache.PutStr("temp", "value") + + tests := []struct { + name string + path ottl.Path[TransformContext] + orig any + newVal any + modified func(sample pprofile.Sample, cache pcommon.Map) + }{ + { + name: "link_index", + path: &pathtest.Path[TransformContext]{ + N: "link_index", + }, + orig: int64(42), + newVal: int64(43), + modified: func(sample pprofile.Sample, _ pcommon.Map) { + sample.SetLinkIndex(43) + }, + }, + { + name: "cache", + path: &pathtest.Path[TransformContext]{ + N: "cache", + }, + orig: pcommon.NewMap(), + newVal: newCache, + modified: func(_ pprofile.Sample, cache pcommon.Map) { + newCache.CopyTo(cache) + }, + }, + { + name: "cache access", + path: &pathtest.Path[TransformContext]{ + N: "cache", + KeySlice: []ottl.Key[TransformContext]{ + &pathtest.Key[TransformContext]{ + S: ottltest.Strp("temp"), + }, + }, + }, + orig: nil, + newVal: "new value", + modified: func(_ pprofile.Sample, cache pcommon.Map) { + cache.PutStr("temp", "new value") + }, + }, + } + // Copy all tests cases and sets the path.Context value to the generated ones. + // It ensures all exiting field access also work when the path context is set. + for _, tt := range slices.Clone(tests) { + testWithContext := tt + testWithContext.name = "with_path_context:" + tt.name + pathWithContext := *tt.path.(*pathtest.Path[TransformContext]) + pathWithContext.C = ctxprofilesample.Name + testWithContext.path = ottl.Path[TransformContext](&pathWithContext) + tests = append(tests, testWithContext) + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + testCache := pcommon.NewMap() + cacheGetter := func(_ TransformContext) pcommon.Map { + return testCache + } + accessor, err := pathExpressionParser(cacheGetter)(tt.path) + assert.NoError(t, err) + + profileSample, profile := createProfileSampleTelemetry() + + tCtx := NewTransformContext(profileSample, profile, pprofile.NewProfilesDictionary(), pcommon.NewInstrumentationScope(), pcommon.NewResource(), pprofile.NewScopeProfiles(), pprofile.NewResourceProfiles()) + got, err := accessor.Get(context.Background(), tCtx) + assert.NoError(t, err) + assert.Equal(t, tt.orig, got) + + tCtx = NewTransformContext(profileSample, pprofile.NewProfile(), pprofile.NewProfilesDictionary(), pcommon.NewInstrumentationScope(), pcommon.NewResource(), pprofile.NewScopeProfiles(), pprofile.NewResourceProfiles()) + err = accessor.Set(context.Background(), tCtx, tt.newVal) + assert.NoError(t, err) + + exProfileSample, exProfile := createProfileSampleTelemetry() + exCache := pcommon.NewMap() + tt.modified(exProfileSample, exCache) + + assert.Equal(t, exProfile, profile) + assert.Equal(t, exProfileSample, profileSample) + assert.Equal(t, exCache, testCache) + }) + } +} + +func Test_newPathGetSetter_higherContextPath(t *testing.T) { + resource := pcommon.NewResource() + resource.Attributes().PutStr("foo", "bar") + + instrumentationScope := pcommon.NewInstrumentationScope() + instrumentationScope.SetName("instrumentation_scope") + + profile := pprofile.NewProfile() + profile.SetDroppedAttributesCount(42) + + ctx := NewTransformContext(pprofile.NewSample(), profile, pprofile.NewProfilesDictionary(), instrumentationScope, resource, pprofile.NewScopeProfiles(), pprofile.NewResourceProfiles()) + + tests := []struct { + name string + path ottl.Path[TransformContext] + expected any + }{ + { + name: "resource", + path: &pathtest.Path[TransformContext]{N: "resource", NextPath: &pathtest.Path[TransformContext]{ + N: "attributes", + KeySlice: []ottl.Key[TransformContext]{ + &pathtest.Key[TransformContext]{ + S: ottltest.Strp("foo"), + }, + }, + }}, + expected: "bar", + }, + { + name: "resource with context", + path: &pathtest.Path[TransformContext]{C: "resource", N: "attributes", KeySlice: []ottl.Key[TransformContext]{ + &pathtest.Key[TransformContext]{ + S: ottltest.Strp("foo"), + }, + }}, + expected: "bar", + }, + { + name: "instrumentation_scope", + path: &pathtest.Path[TransformContext]{N: "instrumentation_scope", NextPath: &pathtest.Path[TransformContext]{N: "name"}}, + expected: instrumentationScope.Name(), + }, + { + name: "instrumentation_scope with context", + path: &pathtest.Path[TransformContext]{C: "instrumentation_scope", N: "name"}, + expected: instrumentationScope.Name(), + }, + { + name: "scope", + path: &pathtest.Path[TransformContext]{N: "scope", NextPath: &pathtest.Path[TransformContext]{N: "name"}}, + expected: instrumentationScope.Name(), + }, + { + name: "scope with context", + path: &pathtest.Path[TransformContext]{C: "scope", N: "name"}, + expected: instrumentationScope.Name(), + }, + } + + testCache := pcommon.NewMap() + cacheGetter := func(_ TransformContext) pcommon.Map { + return testCache + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + accessor, err := pathExpressionParser(cacheGetter)(tt.path) + assert.NoError(t, err) + + got, err := accessor.Get(context.Background(), ctx) + assert.NoError(t, err) + assert.Equal(t, tt.expected, got) + }) + } +} + +func createProfileSampleTelemetry() (pprofile.Sample, pprofile.Profile) { + profile := pprofile.NewProfile() + sample := profile.Sample().AppendEmpty() + sample.SetLinkIndex(42) + sample.SetLocationsStartIndex(73) + sample.SetLocationsLength(97) + + timestamps := sample.TimestampsUnixNano() + if timestamps.Len() == 0 { + timestamps.EnsureCapacity(1) + timestamps.Append(uint64(time.Now().Unix())) + } + + values := sample.Value() + if values.Len() == 0 { + values.EnsureCapacity(1) + values.Append(3) + } + + return sample, profile +}