From 06e6a9d0a18e4e933741c9912eddc4820e64e68d Mon Sep 17 00:00:00 2001 From: dmathieu <42@dmathieu.com> Date: Thu, 18 Dec 2025 11:49:08 +0100 Subject: [PATCH 1/7] don't duplicate stack in profiles testdata --- pdata/testdata/profile.go | 18 ++++++++++-------- service/internal/graph/obs_test.go | 16 ++++++++-------- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/pdata/testdata/profile.go b/pdata/testdata/profile.go index 76f5429c422..d5eef46c335 100644 --- a/pdata/testdata/profile.go +++ b/pdata/testdata/profile.go @@ -50,12 +50,13 @@ func fillProfileOne(dic pprofile.ProfilesDictionary, profile pprofile.Profile) { loc := pprofile.NewLocation() loc.SetAddress(1) - id, _ := pprofile.SetLocation(dic.LocationTable(), loc) - stack := dic.StackTable().AppendEmpty() - stack.LocationIndices().Append(id) + locID, _ := pprofile.SetLocation(dic.LocationTable(), loc) + stack := pprofile.NewStack() + stack.LocationIndices().Append(locID) + stackID, _ := pprofile.SetStack(dic.StackTable(), stack) sample := profile.Samples().AppendEmpty() - sample.SetStackIndex(1) + sample.SetStackIndex(stackID) sample.Values().Append(4) sample.AttributeIndices().Append(0) } @@ -67,12 +68,13 @@ func fillProfileTwo(dic pprofile.ProfilesDictionary, profile pprofile.Profile) { loc := pprofile.NewLocation() loc.SetAddress(2) - id, _ := pprofile.SetLocation(dic.LocationTable(), loc) - stack := dic.StackTable().AppendEmpty() - stack.LocationIndices().Append(id) + locID, _ := pprofile.SetLocation(dic.LocationTable(), loc) + stack := pprofile.NewStack() + stack.LocationIndices().Append(locID) + stackID, _ := pprofile.SetStack(dic.StackTable(), stack) sample := profile.Samples().AppendEmpty() - sample.SetStackIndex(1) + sample.SetStackIndex(stackID) sample.Values().Append(9) sample.AttributeIndices().Append(0) } diff --git a/service/internal/graph/obs_test.go b/service/internal/graph/obs_test.go index bd18c3d5092..dba12eaad74 100644 --- a/service/internal/graph/obs_test.go +++ b/service/internal/graph/obs_test.go @@ -548,7 +548,7 @@ func TestComponentInstrumentation(t *testing.T) { "otelcol.receiver.produced.size": simpleMetric{ attribute.NewSet( attribute.String(obsconsumer.ComponentOutcome, "success"), - ): 1118, + ): 1035, }, }, attribute.NewSet( @@ -570,12 +570,12 @@ func TestComponentInstrumentation(t *testing.T) { "otelcol.processor.consumed.size": simpleMetric{ attribute.NewSet( attribute.String(obsconsumer.ComponentOutcome, "success"), - ): 1118, + ): 1035, }, "otelcol.processor.produced.size": simpleMetric{ attribute.NewSet( attribute.String(obsconsumer.ComponentOutcome, "success"), - ): 1118, + ): 1035, }, }, attribute.NewSet( @@ -602,17 +602,17 @@ func TestComponentInstrumentation(t *testing.T) { "otelcol.connector.consumed.size": simpleMetric{ attribute.NewSet( attribute.String(obsconsumer.ComponentOutcome, "success"), - ): 1118, + ): 1035, }, "otelcol.connector.produced.size": simpleMetric{ attribute.NewSet( attribute.String(obsconsumer.ComponentOutcome, "success"), attribute.String("otelcol.pipeline.id", "profiles/right"), - ): 587, + ): 542, attribute.NewSet( attribute.String(obsconsumer.ComponentOutcome, "success"), attribute.String("otelcol.pipeline.id", "profiles/left"), - ): 531, + ): 493, }, }, attribute.NewSet( @@ -628,7 +628,7 @@ func TestComponentInstrumentation(t *testing.T) { "otelcol.exporter.consumed.size": simpleMetric{ attribute.NewSet( attribute.String(obsconsumer.ComponentOutcome, "success"), - ): 587, + ): 542, }, }, attribute.NewSet( @@ -644,7 +644,7 @@ func TestComponentInstrumentation(t *testing.T) { "otelcol.exporter.consumed.size": simpleMetric{ attribute.NewSet( attribute.String(obsconsumer.ComponentOutcome, "success"), - ): 531, + ): 493, }, }, } From d1c6d14c3a92e41de528ca1ad2c41d890f35b065 Mon Sep 17 00:00:00 2001 From: dmathieu <42@dmathieu.com> Date: Thu, 18 Dec 2025 13:46:42 +0100 Subject: [PATCH 2/7] bring back profiles batching --- .../xexporterhelper/profiles_batch.go | 21 ++++++------------- .../xexporterhelper/profiles_batch_test.go | 5 ++--- 2 files changed, 8 insertions(+), 18 deletions(-) diff --git a/exporter/exporterhelper/xexporterhelper/profiles_batch.go b/exporter/exporterhelper/xexporterhelper/profiles_batch.go index 3ee8d7a5c90..d4585eb8961 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles_batch.go +++ b/exporter/exporterhelper/xexporterhelper/profiles_batch.go @@ -33,18 +33,10 @@ func (req *profilesRequest) MergeSplit(_ context.Context, maxSize int, szt expor if !ok { return nil, errors.New("invalid input type") } - // TODO(13106): handle merging of profiles (and change the indice tables with their new indices) - // req2.mergeTo(req, sz) - - // If no limit we can simply merge the new request into the current and return. - if maxSize == 0 { - return []Request{req, req2}, nil + err := req2.mergeTo(req, sz) + if err != nil { + return nil, fmt.Errorf("failed merging profiles; %w", err) } - - sp1, err1 := req.split(maxSize, sz) - sp2, err2 := req2.split(maxSize, sz) - - return append(sp1, sp2...), errors.Join(err1, err2) } // If no limit we can simply merge the new request into the current and return. @@ -54,14 +46,13 @@ func (req *profilesRequest) MergeSplit(_ context.Context, maxSize int, szt expor return req.split(maxSize, sz) } -// TODO(13106): handle merging of profiles (and change the indice tables with their new indices) -/*func (req *profilesRequest) mergeTo(dst *profilesRequest, sz sizer.ProfilesSizer) { +func (req *profilesRequest) mergeTo(dst *profilesRequest, sz sizer.ProfilesSizer) error { if sz != nil { dst.setCachedSize(dst.size(sz) + req.size(sz)) req.setCachedSize(0) } - req.pd.ResourceProfiles().MoveAndAppendTo(dst.pd.ResourceProfiles()) -}*/ + return req.pd.MergeTo(dst.pd) +} func (req *profilesRequest) split(maxSize int, sz sizer.ProfilesSizer) ([]Request, error) { var res []Request diff --git a/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go b/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go index 218f22fb197..f90d4de68e6 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go +++ b/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go @@ -22,9 +22,8 @@ func TestMergeProfiles(t *testing.T) { pr2 := newProfilesRequest(testdata.GenerateProfiles(3)) res, err := pr1.MergeSplit(context.Background(), 0, exporterhelper.RequestSizerTypeItems, pr2) require.NoError(t, err) - assert.Len(t, res, 2) - assert.Equal(t, 2, res[0].ItemsCount()) - assert.Equal(t, 3, res[1].ItemsCount()) + assert.Len(t, res, 1) + assert.Equal(t, 5, res[0].ItemsCount()) } func TestMergeProfilesInvalidInput(t *testing.T) { From 223806bedec404fc8de8e598c9f3e22a58439ed4 Mon Sep 17 00:00:00 2001 From: dmathieu <42@dmathieu.com> Date: Thu, 18 Dec 2025 13:47:53 +0100 Subject: [PATCH 3/7] reenable the benchmarks --- .../exporterhelper/xexporterhelper/profiles_batch_test.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go b/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go index f90d4de68e6..8683af92326 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go +++ b/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go @@ -325,8 +325,6 @@ func TestMergeSplitManySmallLogs(t *testing.T) { } func BenchmarkSplittingBasedOnByteSizeManySmallProfiles(b *testing.B) { - b.Skip("merging of profiles has been temporarily disabled (https://github.com/open-telemetry/opentelemetry-collector/issues/13106)") - // All requests merge into a single batch. b.ReportAllocs() for b.Loop() { @@ -341,13 +339,11 @@ func BenchmarkSplittingBasedOnByteSizeManySmallProfiles(b *testing.B) { ) merged = append(merged[0:len(merged)-1], res...) } - assert.Len(b, merged, 1) + assert.Len(b, merged, 2) } } func BenchmarkSplittingBasedOnByteSizeManyProfilesSlightlyAboveLimit(b *testing.B) { - b.Skip("merging of profiles has been temporarily disabled (https://github.com/open-telemetry/opentelemetry-collector/issues/13106)") - // Every incoming request results in a split. b.ReportAllocs() for b.Loop() { @@ -368,8 +364,6 @@ func BenchmarkSplittingBasedOnByteSizeManyProfilesSlightlyAboveLimit(b *testing. } func BenchmarkSplittingBasedOnByteSizeHugeProfiles(b *testing.B) { - b.Skip("merging of profiles has been temporarily disabled (https://github.com/open-telemetry/opentelemetry-collector/issues/13106)") - // One request splits into many batches. b.ReportAllocs() for b.Loop() { From 147b656304a19923a6f6aec300616506790e95c9 Mon Sep 17 00:00:00 2001 From: dmathieu <42@dmathieu.com> Date: Thu, 18 Dec 2025 14:19:06 +0100 Subject: [PATCH 4/7] copy the dictionary when splitting --- exporter/exporterhelper/xexporterhelper/profiles_batch.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/exporter/exporterhelper/xexporterhelper/profiles_batch.go b/exporter/exporterhelper/xexporterhelper/profiles_batch.go index d4585eb8961..2a837a745eb 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles_batch.go +++ b/exporter/exporterhelper/xexporterhelper/profiles_batch.go @@ -74,6 +74,8 @@ func extractProfiles(srcProfiles pprofile.Profiles, capacity int, sz sizer.Profi destProfiles := pprofile.NewProfiles() capacityLeft := capacity - sz.ProfilesSize(destProfiles) removedSize := 0 + + srcProfiles.Dictionary().CopyTo(destProfiles.Dictionary()) srcProfiles.ResourceProfiles().RemoveIf(func(srcRP pprofile.ResourceProfiles) bool { // If the no more capacity left just return. if capacityLeft == 0 { From 918b01eb6718af7a9a0ff0f7af47daa7c1c7602d Mon Sep 17 00:00:00 2001 From: dmathieu <42@dmathieu.com> Date: Thu, 18 Dec 2025 14:19:17 +0100 Subject: [PATCH 5/7] reenable all tests --- .../xexporterhelper/profiles_batch_test.go | 36 +++++++++---------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go b/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go index 8683af92326..d38642f63e4 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go +++ b/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go @@ -33,8 +33,6 @@ func TestMergeProfilesInvalidInput(t *testing.T) { } func TestMergeSplitProfiles(t *testing.T) { - t.Skip("merging of profiles has been temporarily disabled (https://github.com/open-telemetry/opentelemetry-collector/issues/13106)") - tests := []struct { name string szt exporterhelper.RequestSizerType @@ -52,12 +50,16 @@ func TestMergeSplitProfiles(t *testing.T) { expected: []Request{newProfilesRequest(pprofile.NewProfiles())}, }, { - name: "first_request_empty", - szt: exporterhelper.RequestSizerTypeItems, - maxSize: 10, - pr1: newProfilesRequest(pprofile.NewProfiles()), - pr2: newProfilesRequest(testdata.GenerateProfiles(5)), - expected: []Request{newProfilesRequest(testdata.GenerateProfiles(5))}, + name: "first_request_empty", + szt: exporterhelper.RequestSizerTypeItems, + maxSize: 10, + pr1: newProfilesRequest(testdata.GenerateProfiles(0)), + pr2: newProfilesRequest(testdata.GenerateProfiles(5)), + expected: []Request{newProfilesRequest(func() pprofile.Profiles { + profiles := testdata.GenerateProfiles(0) + testdata.GenerateProfiles(5).MergeTo(profiles) + return profiles + }())}, }, { name: "first_empty_second_nil", @@ -136,8 +138,6 @@ func TestMergeSplitProfiles(t *testing.T) { } func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) { - t.Skip("merging of profiles has been temporarily disabled (https://github.com/open-telemetry/opentelemetry-collector/issues/13106)") - tests := []struct { name string szt exporterhelper.RequestSizerType @@ -252,7 +252,7 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) { { name: "merge_only", szt: exporterhelper.RequestSizerTypeBytes, - maxSize: profilesMarshaler.ProfilesSize(testdata.GenerateProfiles(11)), + maxSize: profilesMarshaler.ProfilesSize(testdata.GenerateProfiles(13)), pr1: newProfilesRequest(testdata.GenerateProfiles(4)), pr2: newProfilesRequest(testdata.GenerateProfiles(6)), expected: []Request{newProfilesRequest(func() pprofile.Profiles { @@ -265,12 +265,12 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) { name: "split_only", szt: exporterhelper.RequestSizerTypeBytes, maxSize: profilesMarshaler.ProfilesSize(testdata.GenerateProfiles(4)), - pr1: newProfilesRequest(pprofile.NewProfiles()), + pr1: newProfilesRequest(testdata.GenerateProfiles(0)), pr2: newProfilesRequest(testdata.GenerateProfiles(10)), expected: []Request{ newProfilesRequest(testdata.GenerateProfiles(4)), - newProfilesRequest(testdata.GenerateProfiles(4)), - newProfilesRequest(testdata.GenerateProfiles(2)), + newProfilesRequest(testdata.GenerateProfiles(5)), + newProfilesRequest(testdata.GenerateProfiles(1)), }, }, { @@ -282,11 +282,11 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) { expected: []Request{ newProfilesRequest(func() pprofile.Profiles { profiles := testdata.GenerateProfiles(7) - testdata.GenerateProfiles(2).ResourceProfiles().MoveAndAppendTo(profiles.ResourceProfiles()) + testdata.GenerateProfiles(3).ResourceProfiles().MoveAndAppendTo(profiles.ResourceProfiles()) return profiles }()), - newProfilesRequest(testdata.GenerateProfiles(10)), - newProfilesRequest(testdata.GenerateProfiles(9)), + newProfilesRequest(testdata.GenerateProfiles(11)), + newProfilesRequest(testdata.GenerateProfiles(7)), }, }, } @@ -312,8 +312,6 @@ func TestExtractProfiles(t *testing.T) { } func TestMergeSplitManySmallLogs(t *testing.T) { - t.Skip("merging of profiles has been temporarily disabled (https://github.com/open-telemetry/opentelemetry-collector/issues/13106)") - // All requests merge into a single batch. merged := []Request{newProfilesRequest(testdata.GenerateProfiles(1))} for range 1000 { From 0ed008e0a1259795d0a421a37521f887cb197814 Mon Sep 17 00:00:00 2001 From: dmathieu <42@dmathieu.com> Date: Thu, 18 Dec 2025 14:22:06 +0100 Subject: [PATCH 6/7] add changelog entry --- .chloggen/profiles-reenable-batching.yaml | 25 +++++++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 .chloggen/profiles-reenable-batching.yaml diff --git a/.chloggen/profiles-reenable-batching.yaml b/.chloggen/profiles-reenable-batching.yaml new file mode 100644 index 00000000000..2ba8998e7b5 --- /dev/null +++ b/.chloggen/profiles-reenable-batching.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/otlp) +component: pkg/xexporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Reenable batching for profiles + +# One or more tracking issues or pull requests related to the change +issues: [14313] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] From b7e20fa89d7d70fdf5f5a2788fb088e9590c2dab Mon Sep 17 00:00:00 2001 From: dmathieu <42@dmathieu.com> Date: Thu, 18 Dec 2025 14:33:56 +0100 Subject: [PATCH 7/7] fix linter --- exporter/exporterhelper/xexporterhelper/profiles_batch_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go b/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go index d38642f63e4..3aea36f6b26 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go +++ b/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go @@ -57,7 +57,7 @@ func TestMergeSplitProfiles(t *testing.T) { pr2: newProfilesRequest(testdata.GenerateProfiles(5)), expected: []Request{newProfilesRequest(func() pprofile.Profiles { profiles := testdata.GenerateProfiles(0) - testdata.GenerateProfiles(5).MergeTo(profiles) + _ = testdata.GenerateProfiles(5).MergeTo(profiles) return profiles }())}, },