From 50de4a2f61f70d60f639da3784168063ddc15078 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Thu, 23 Jan 2025 03:02:30 -0800 Subject: [PATCH] [chore] Apply, items cache logic to profiles (#12171) No need for a changelog since the cache items was added this release. Signed-off-by: Bogdan Drutu --- .../xexporterhelper/profiles.go | 16 ++-- .../xexporterhelper/profiles_batch.go | 12 ++- .../xexporterhelper/profiles_batch_test.go | 77 +++++++++---------- 3 files changed, 58 insertions(+), 47 deletions(-) diff --git a/exporter/exporterhelper/xexporterhelper/profiles.go b/exporter/exporterhelper/xexporterhelper/profiles.go index d9eb55b3ef7..34afd2959b7 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles.go +++ b/exporter/exporterhelper/xexporterhelper/profiles.go @@ -28,14 +28,16 @@ var ( ) type profilesRequest struct { - pd pprofile.Profiles - pusher xconsumer.ConsumeProfilesFunc + pd pprofile.Profiles + pusher xconsumer.ConsumeProfilesFunc + cachedItemsCount int } func newProfilesRequest(pd pprofile.Profiles, pusher xconsumer.ConsumeProfilesFunc) exporterhelper.Request { return &profilesRequest{ - pd: pd, - pusher: pusher, + pd: pd, + pusher: pusher, + cachedItemsCount: pd.SampleCount(), } } @@ -66,7 +68,11 @@ func (req *profilesRequest) Export(ctx context.Context) error { } func (req *profilesRequest) ItemsCount() int { - return req.pd.SampleCount() + return req.cachedItemsCount +} + +func (req *profilesRequest) setCachedItemsCount(count int) { + req.cachedItemsCount = count } type profileExporter struct { diff --git a/exporter/exporterhelper/xexporterhelper/profiles_batch.go b/exporter/exporterhelper/xexporterhelper/profiles_batch.go index 2aa47c95043..da7d251a7e5 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles_batch.go +++ b/exporter/exporterhelper/xexporterhelper/profiles_batch.go @@ -24,6 +24,8 @@ func (req *profilesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.Ma } if cfg.MaxSizeItems == 0 { + req.setCachedItemsCount(req.ItemsCount() + req2.ItemsCount()) + req2.setCachedItemsCount(0) req2.pd.ResourceProfiles().MoveAndAppendTo(req.pd.ResourceProfiles()) return []exporterhelper.Request{req}, nil } @@ -43,6 +45,8 @@ func (req *profilesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.Ma if destReq == nil { destReq = srcReq } else { + destReq.setCachedItemsCount(destReq.ItemsCount() + srcCount) + srcReq.setCachedItemsCount(0) srcReq.pd.ResourceProfiles().MoveAndAppendTo(destReq.pd.ResourceProfiles()) } capacityLeft -= srcCount @@ -51,13 +55,17 @@ func (req *profilesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.Ma for { extractedProfiles := extractProfiles(srcReq.pd, capacityLeft) - if extractedProfiles.SampleCount() == 0 { + extractedCount := extractedProfiles.SampleCount() + if extractedCount == 0 { break } + capacityLeft -= extractedProfiles.SampleCount() if destReq == nil { - destReq = &profilesRequest{pd: extractedProfiles, pusher: srcReq.pusher} + destReq = newProfilesRequest(extractedProfiles, srcReq.pusher).(*profilesRequest) } else { + destReq.setCachedItemsCount(destReq.ItemsCount() + extractedCount) + srcReq.setCachedItemsCount(srcReq.ItemsCount() - extractedCount) extractedProfiles.ResourceProfiles().MoveAndAppendTo(destReq.pd.ResourceProfiles()) } // Create new batch once capacity is reached. diff --git a/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go b/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go index 2981d11830b..291a18b0c63 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go +++ b/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go @@ -5,8 +5,6 @@ package xexporterhelper import ( "context" - "fmt" - "os" "testing" "github.com/stretchr/testify/assert" @@ -19,18 +17,17 @@ import ( ) func TestMergeProfiles(t *testing.T) { - pr1 := &profilesRequest{pd: testdata.GenerateProfiles(2)} - pr2 := &profilesRequest{pd: testdata.GenerateProfiles(3)} + pr1 := newProfilesRequest(testdata.GenerateProfiles(2), nil) + pr2 := newProfilesRequest(testdata.GenerateProfiles(3), nil) res, err := pr1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, pr2) require.NoError(t, err) assert.Len(t, res, 1) - fmt.Fprintf(os.Stdout, "%#v\n", res[0].(*profilesRequest).pd) - assert.Equal(t, 5, res[0].(*profilesRequest).pd.SampleCount()) + assert.Equal(t, 5, res[0].ItemsCount()) } func TestMergeProfilesInvalidInput(t *testing.T) { pr1 := &dummyRequest{} - pr2 := &profilesRequest{pd: testdata.GenerateProfiles(3)} + pr2 := newProfilesRequest(testdata.GenerateProfiles(3), nil) _, err := pr2.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, pr1) assert.Error(t, err) } @@ -41,78 +38,78 @@ func TestMergeSplitProfiles(t *testing.T) { cfg exporterbatcher.MaxSizeConfig pr1 exporterhelper.Request pr2 exporterhelper.Request - expected []*profilesRequest + expected []exporterhelper.Request }{ { name: "both_requests_empty", cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, - pr1: &profilesRequest{pd: pprofile.NewProfiles()}, - pr2: &profilesRequest{pd: pprofile.NewProfiles()}, - expected: []*profilesRequest{{pd: pprofile.NewProfiles()}}, + pr1: newProfilesRequest(pprofile.NewProfiles(), nil), + pr2: newProfilesRequest(pprofile.NewProfiles(), nil), + expected: []exporterhelper.Request{newProfilesRequest(pprofile.NewProfiles(), nil)}, }, { name: "first_request_empty", cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, - pr1: &profilesRequest{pd: pprofile.NewProfiles()}, - pr2: &profilesRequest{pd: testdata.GenerateProfiles(5)}, - expected: []*profilesRequest{{pd: testdata.GenerateProfiles(5)}}, + pr1: newProfilesRequest(pprofile.NewProfiles(), nil), + pr2: newProfilesRequest(testdata.GenerateProfiles(5), nil), + expected: []exporterhelper.Request{newProfilesRequest(testdata.GenerateProfiles(5), nil)}, }, { name: "first_empty_second_nil", cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, - pr1: &profilesRequest{pd: pprofile.NewProfiles()}, + pr1: newProfilesRequest(pprofile.NewProfiles(), nil), pr2: nil, - expected: []*profilesRequest{{pd: pprofile.NewProfiles()}}, + expected: []exporterhelper.Request{newProfilesRequest(pprofile.NewProfiles(), nil)}, }, { name: "merge_only", cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, - pr1: &profilesRequest{pd: testdata.GenerateProfiles(4)}, - pr2: &profilesRequest{pd: testdata.GenerateProfiles(6)}, - expected: []*profilesRequest{{pd: func() pprofile.Profiles { + pr1: newProfilesRequest(testdata.GenerateProfiles(4), nil), + pr2: newProfilesRequest(testdata.GenerateProfiles(6), nil), + expected: []exporterhelper.Request{newProfilesRequest(func() pprofile.Profiles { profiles := testdata.GenerateProfiles(4) testdata.GenerateProfiles(6).ResourceProfiles().MoveAndAppendTo(profiles.ResourceProfiles()) return profiles - }()}}, + }(), nil)}, }, { name: "split_only", cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 4}, - pr1: &profilesRequest{pd: testdata.GenerateProfiles(10)}, + pr1: newProfilesRequest(testdata.GenerateProfiles(10), nil), pr2: nil, - expected: []*profilesRequest{ - {pd: testdata.GenerateProfiles(4)}, - {pd: testdata.GenerateProfiles(4)}, - {pd: testdata.GenerateProfiles(2)}, + expected: []exporterhelper.Request{ + newProfilesRequest(testdata.GenerateProfiles(4), nil), + newProfilesRequest(testdata.GenerateProfiles(4), nil), + newProfilesRequest(testdata.GenerateProfiles(2), nil), }, }, { name: "merge_and_split", cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, - pr1: &profilesRequest{pd: testdata.GenerateProfiles(8)}, - pr2: &profilesRequest{pd: testdata.GenerateProfiles(20)}, - expected: []*profilesRequest{ - {pd: func() pprofile.Profiles { + pr1: newProfilesRequest(testdata.GenerateProfiles(8), nil), + pr2: newProfilesRequest(testdata.GenerateProfiles(20), nil), + expected: []exporterhelper.Request{ + newProfilesRequest(func() pprofile.Profiles { profiles := testdata.GenerateProfiles(8) testdata.GenerateProfiles(2).ResourceProfiles().MoveAndAppendTo(profiles.ResourceProfiles()) return profiles - }()}, - {pd: testdata.GenerateProfiles(10)}, - {pd: testdata.GenerateProfiles(8)}, + }(), nil), + newProfilesRequest(testdata.GenerateProfiles(10), nil), + newProfilesRequest(testdata.GenerateProfiles(8), nil), }, }, { name: "scope_profiles_split", cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 4}, - pr1: &profilesRequest{pd: func() pprofile.Profiles { + pr1: newProfilesRequest(func() pprofile.Profiles { return testdata.GenerateProfiles(6) - }()}, + }(), nil), pr2: nil, - expected: []*profilesRequest{ - {pd: testdata.GenerateProfiles(4)}, - {pd: func() pprofile.Profiles { + expected: []exporterhelper.Request{ + newProfilesRequest(testdata.GenerateProfiles(4), nil), + newProfilesRequest(func() pprofile.Profiles { return testdata.GenerateProfiles(2) - }()}, + }(), nil), }, }, } @@ -122,7 +119,7 @@ func TestMergeSplitProfiles(t *testing.T) { require.NoError(t, err) assert.Equal(t, len(tt.expected), len(res)) for i, r := range res { - assert.Equal(t, tt.expected[i], r.(*profilesRequest)) + assert.Equal(t, tt.expected[i], r) } }) } @@ -130,7 +127,7 @@ func TestMergeSplitProfiles(t *testing.T) { func TestMergeSplitProfilesInvalidInput(t *testing.T) { r1 := &dummyRequest{} - r2 := &profilesRequest{pd: testdata.GenerateProfiles(3)} + r2 := newProfilesRequest(testdata.GenerateProfiles(3), nil) _, err := r2.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, r1) assert.Error(t, err) }