diff --git a/.chloggen/profiles-reenable-batching.yaml b/.chloggen/profiles-reenable-batching.yaml new file mode 100644 index 00000000000..2ba8998e7b5 --- /dev/null +++ b/.chloggen/profiles-reenable-batching.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/otlp) +component: pkg/xexporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Reenable batching for profiles + +# One or more tracking issues or pull requests related to the change +issues: [14313] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/exporterhelper/xexporterhelper/profiles_batch.go b/exporter/exporterhelper/xexporterhelper/profiles_batch.go index 3ee8d7a5c90..2a837a745eb 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles_batch.go +++ b/exporter/exporterhelper/xexporterhelper/profiles_batch.go @@ -33,18 +33,10 @@ func (req *profilesRequest) MergeSplit(_ context.Context, maxSize int, szt expor if !ok { return nil, errors.New("invalid input type") } - // TODO(13106): handle merging of profiles (and change the indice tables with their new indices) - // req2.mergeTo(req, sz) - - // If no limit we can simply merge the new request into the current and return. - if maxSize == 0 { - return []Request{req, req2}, nil + err := req2.mergeTo(req, sz) + if err != nil { + return nil, fmt.Errorf("failed merging profiles; %w", err) } - - sp1, err1 := req.split(maxSize, sz) - sp2, err2 := req2.split(maxSize, sz) - - return append(sp1, sp2...), errors.Join(err1, err2) } // If no limit we can simply merge the new request into the current and return. @@ -54,14 +46,13 @@ func (req *profilesRequest) MergeSplit(_ context.Context, maxSize int, szt expor return req.split(maxSize, sz) } -// TODO(13106): handle merging of profiles (and change the indice tables with their new indices) -/*func (req *profilesRequest) mergeTo(dst *profilesRequest, sz sizer.ProfilesSizer) { +func (req *profilesRequest) mergeTo(dst *profilesRequest, sz sizer.ProfilesSizer) error { if sz != nil { dst.setCachedSize(dst.size(sz) + req.size(sz)) req.setCachedSize(0) } - req.pd.ResourceProfiles().MoveAndAppendTo(dst.pd.ResourceProfiles()) -}*/ + return req.pd.MergeTo(dst.pd) +} func (req *profilesRequest) split(maxSize int, sz sizer.ProfilesSizer) ([]Request, error) { var res []Request @@ -83,6 +74,8 @@ func extractProfiles(srcProfiles pprofile.Profiles, capacity int, sz sizer.Profi destProfiles := pprofile.NewProfiles() capacityLeft := capacity - sz.ProfilesSize(destProfiles) removedSize := 0 + + srcProfiles.Dictionary().CopyTo(destProfiles.Dictionary()) srcProfiles.ResourceProfiles().RemoveIf(func(srcRP pprofile.ResourceProfiles) bool { // If the no more capacity left just return. if capacityLeft == 0 { diff --git a/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go b/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go index 218f22fb197..3aea36f6b26 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go +++ b/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go @@ -22,9 +22,8 @@ func TestMergeProfiles(t *testing.T) { pr2 := newProfilesRequest(testdata.GenerateProfiles(3)) res, err := pr1.MergeSplit(context.Background(), 0, exporterhelper.RequestSizerTypeItems, pr2) require.NoError(t, err) - assert.Len(t, res, 2) - assert.Equal(t, 2, res[0].ItemsCount()) - assert.Equal(t, 3, res[1].ItemsCount()) + assert.Len(t, res, 1) + assert.Equal(t, 5, res[0].ItemsCount()) } func TestMergeProfilesInvalidInput(t *testing.T) { @@ -34,8 +33,6 @@ func TestMergeProfilesInvalidInput(t *testing.T) { } func TestMergeSplitProfiles(t *testing.T) { - t.Skip("merging of profiles has been temporarily disabled (https://github.com/open-telemetry/opentelemetry-collector/issues/13106)") - tests := []struct { name string szt exporterhelper.RequestSizerType @@ -53,12 +50,16 @@ func TestMergeSplitProfiles(t *testing.T) { expected: []Request{newProfilesRequest(pprofile.NewProfiles())}, }, { - name: "first_request_empty", - szt: exporterhelper.RequestSizerTypeItems, - maxSize: 10, - pr1: newProfilesRequest(pprofile.NewProfiles()), - pr2: newProfilesRequest(testdata.GenerateProfiles(5)), - expected: []Request{newProfilesRequest(testdata.GenerateProfiles(5))}, + name: "first_request_empty", + szt: exporterhelper.RequestSizerTypeItems, + maxSize: 10, + pr1: newProfilesRequest(testdata.GenerateProfiles(0)), + pr2: newProfilesRequest(testdata.GenerateProfiles(5)), + expected: []Request{newProfilesRequest(func() pprofile.Profiles { + profiles := testdata.GenerateProfiles(0) + _ = testdata.GenerateProfiles(5).MergeTo(profiles) + return profiles + }())}, }, { name: "first_empty_second_nil", @@ -137,8 +138,6 @@ func TestMergeSplitProfiles(t *testing.T) { } func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) { - t.Skip("merging of profiles has been temporarily disabled (https://github.com/open-telemetry/opentelemetry-collector/issues/13106)") - tests := []struct { name string szt exporterhelper.RequestSizerType @@ -253,7 +252,7 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) { { name: "merge_only", szt: exporterhelper.RequestSizerTypeBytes, - maxSize: profilesMarshaler.ProfilesSize(testdata.GenerateProfiles(11)), + maxSize: profilesMarshaler.ProfilesSize(testdata.GenerateProfiles(13)), pr1: newProfilesRequest(testdata.GenerateProfiles(4)), pr2: newProfilesRequest(testdata.GenerateProfiles(6)), expected: []Request{newProfilesRequest(func() pprofile.Profiles { @@ -266,12 +265,12 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) { name: "split_only", szt: exporterhelper.RequestSizerTypeBytes, maxSize: profilesMarshaler.ProfilesSize(testdata.GenerateProfiles(4)), - pr1: newProfilesRequest(pprofile.NewProfiles()), + pr1: newProfilesRequest(testdata.GenerateProfiles(0)), pr2: newProfilesRequest(testdata.GenerateProfiles(10)), expected: []Request{ newProfilesRequest(testdata.GenerateProfiles(4)), - newProfilesRequest(testdata.GenerateProfiles(4)), - newProfilesRequest(testdata.GenerateProfiles(2)), + newProfilesRequest(testdata.GenerateProfiles(5)), + newProfilesRequest(testdata.GenerateProfiles(1)), }, }, { @@ -283,11 +282,11 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) { expected: []Request{ newProfilesRequest(func() pprofile.Profiles { profiles := testdata.GenerateProfiles(7) - testdata.GenerateProfiles(2).ResourceProfiles().MoveAndAppendTo(profiles.ResourceProfiles()) + testdata.GenerateProfiles(3).ResourceProfiles().MoveAndAppendTo(profiles.ResourceProfiles()) return profiles }()), - newProfilesRequest(testdata.GenerateProfiles(10)), - newProfilesRequest(testdata.GenerateProfiles(9)), + newProfilesRequest(testdata.GenerateProfiles(11)), + newProfilesRequest(testdata.GenerateProfiles(7)), }, }, } @@ -313,8 +312,6 @@ func TestExtractProfiles(t *testing.T) { } func TestMergeSplitManySmallLogs(t *testing.T) { - t.Skip("merging of profiles has been temporarily disabled (https://github.com/open-telemetry/opentelemetry-collector/issues/13106)") - // All requests merge into a single batch. merged := []Request{newProfilesRequest(testdata.GenerateProfiles(1))} for range 1000 { @@ -326,8 +323,6 @@ func TestMergeSplitManySmallLogs(t *testing.T) { } func BenchmarkSplittingBasedOnByteSizeManySmallProfiles(b *testing.B) { - b.Skip("merging of profiles has been temporarily disabled (https://github.com/open-telemetry/opentelemetry-collector/issues/13106)") - // All requests merge into a single batch. b.ReportAllocs() for b.Loop() { @@ -342,13 +337,11 @@ func BenchmarkSplittingBasedOnByteSizeManySmallProfiles(b *testing.B) { ) merged = append(merged[0:len(merged)-1], res...) } - assert.Len(b, merged, 1) + assert.Len(b, merged, 2) } } func BenchmarkSplittingBasedOnByteSizeManyProfilesSlightlyAboveLimit(b *testing.B) { - b.Skip("merging of profiles has been temporarily disabled (https://github.com/open-telemetry/opentelemetry-collector/issues/13106)") - // Every incoming request results in a split. b.ReportAllocs() for b.Loop() { @@ -369,8 +362,6 @@ func BenchmarkSplittingBasedOnByteSizeManyProfilesSlightlyAboveLimit(b *testing. } func BenchmarkSplittingBasedOnByteSizeHugeProfiles(b *testing.B) { - b.Skip("merging of profiles has been temporarily disabled (https://github.com/open-telemetry/opentelemetry-collector/issues/13106)") - // One request splits into many batches. b.ReportAllocs() for b.Loop() {