Skip to content

Commit

Permalink
[chore] Apply, items cache logic to profiles
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Jan 23, 2025
1 parent ad36545 commit 8d41c26
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 47 deletions.
16 changes: 11 additions & 5 deletions exporter/exporterhelper/xexporterhelper/profiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@ var (
)

type profilesRequest struct {
pd pprofile.Profiles
pusher xconsumer.ConsumeProfilesFunc
pd pprofile.Profiles
pusher xconsumer.ConsumeProfilesFunc
cachedItemsCount int
}

func newProfilesRequest(pd pprofile.Profiles, pusher xconsumer.ConsumeProfilesFunc) exporterhelper.Request {
return &profilesRequest{
pd: pd,
pusher: pusher,
pd: pd,
pusher: pusher,
cachedItemsCount: pd.SampleCount(),
}
}

Expand Down Expand Up @@ -66,7 +68,11 @@ func (req *profilesRequest) Export(ctx context.Context) error {
}

func (req *profilesRequest) ItemsCount() int {
return req.pd.SampleCount()
return req.cachedItemsCount
}

func (req *profilesRequest) setCachedItemsCount(count int) {
req.cachedItemsCount = count
}

type profileExporter struct {
Expand Down
12 changes: 10 additions & 2 deletions exporter/exporterhelper/xexporterhelper/profiles_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ func (req *profilesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.Ma
}

if cfg.MaxSizeItems == 0 {
req.setCachedItemsCount(req.ItemsCount() + req2.ItemsCount())
req2.setCachedItemsCount(0)
req2.pd.ResourceProfiles().MoveAndAppendTo(req.pd.ResourceProfiles())
return []exporterhelper.Request{req}, nil
}
Expand All @@ -43,6 +45,8 @@ func (req *profilesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.Ma
if destReq == nil {
destReq = srcReq
} else {
destReq.setCachedItemsCount(srcCount)
srcReq.setCachedItemsCount(0)
srcReq.pd.ResourceProfiles().MoveAndAppendTo(destReq.pd.ResourceProfiles())
}
capacityLeft -= srcCount
Expand All @@ -51,13 +55,17 @@ func (req *profilesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.Ma

for {
extractedProfiles := extractProfiles(srcReq.pd, capacityLeft)
if extractedProfiles.SampleCount() == 0 {
extractedCount := extractedProfiles.SampleCount()
if extractedCount == 0 {
break
}

capacityLeft -= extractedProfiles.SampleCount()
if destReq == nil {
destReq = &profilesRequest{pd: extractedProfiles, pusher: srcReq.pusher}
destReq = newProfilesRequest(extractedProfiles, srcReq.pusher).(*profilesRequest)
} else {
destReq.setCachedItemsCount(destReq.ItemsCount() + extractedCount)
srcReq.setCachedItemsCount(srcReq.ItemsCount() - extractedCount)
extractedProfiles.ResourceProfiles().MoveAndAppendTo(destReq.pd.ResourceProfiles())
}
// Create new batch once capacity is reached.
Expand Down
77 changes: 37 additions & 40 deletions exporter/exporterhelper/xexporterhelper/profiles_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ package xexporterhelper

import (
"context"
"fmt"
"os"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -19,18 +17,17 @@ import (
)

func TestMergeProfiles(t *testing.T) {
pr1 := &profilesRequest{pd: testdata.GenerateProfiles(2)}
pr2 := &profilesRequest{pd: testdata.GenerateProfiles(3)}
pr1 := newProfilesRequest(testdata.GenerateProfiles(2), nil)
pr2 := newProfilesRequest(testdata.GenerateProfiles(3), nil)
res, err := pr1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, pr2)
require.NoError(t, err)
assert.Len(t, res, 1)
fmt.Fprintf(os.Stdout, "%#v\n", res[0].(*profilesRequest).pd)
assert.Equal(t, 5, res[0].(*profilesRequest).pd.SampleCount())
assert.Equal(t, 5, res[0].ItemsCount())
}

func TestMergeProfilesInvalidInput(t *testing.T) {
pr1 := &dummyRequest{}
pr2 := &profilesRequest{pd: testdata.GenerateProfiles(3)}
pr2 := newProfilesRequest(testdata.GenerateProfiles(3), nil)
_, err := pr2.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, pr1)
assert.Error(t, err)
}
Expand All @@ -41,78 +38,78 @@ func TestMergeSplitProfiles(t *testing.T) {
cfg exporterbatcher.MaxSizeConfig
pr1 exporterhelper.Request
pr2 exporterhelper.Request
expected []*profilesRequest
expected []exporterhelper.Request
}{
{
name: "both_requests_empty",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10},
pr1: &profilesRequest{pd: pprofile.NewProfiles()},
pr2: &profilesRequest{pd: pprofile.NewProfiles()},
expected: []*profilesRequest{{pd: pprofile.NewProfiles()}},
pr1: newProfilesRequest(pprofile.NewProfiles(), nil),
pr2: newProfilesRequest(pprofile.NewProfiles(), nil),
expected: []exporterhelper.Request{newProfilesRequest(pprofile.NewProfiles(), nil)},
},
{
name: "first_request_empty",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10},
pr1: &profilesRequest{pd: pprofile.NewProfiles()},
pr2: &profilesRequest{pd: testdata.GenerateProfiles(5)},
expected: []*profilesRequest{{pd: testdata.GenerateProfiles(5)}},
pr1: newProfilesRequest(pprofile.NewProfiles(), nil),
pr2: newProfilesRequest(testdata.GenerateProfiles(5), nil),
expected: []exporterhelper.Request{newProfilesRequest(testdata.GenerateProfiles(5), nil)},
},
{
name: "first_empty_second_nil",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10},
pr1: &profilesRequest{pd: pprofile.NewProfiles()},
pr1: newProfilesRequest(pprofile.NewProfiles(), nil),
pr2: nil,
expected: []*profilesRequest{{pd: pprofile.NewProfiles()}},
expected: []exporterhelper.Request{newProfilesRequest(pprofile.NewProfiles(), nil)},
},
{
name: "merge_only",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10},
pr1: &profilesRequest{pd: testdata.GenerateProfiles(4)},
pr2: &profilesRequest{pd: testdata.GenerateProfiles(6)},
expected: []*profilesRequest{{pd: func() pprofile.Profiles {
pr1: newProfilesRequest(testdata.GenerateProfiles(4), nil),
pr2: newProfilesRequest(testdata.GenerateProfiles(6), nil),
expected: []exporterhelper.Request{newProfilesRequest(func() pprofile.Profiles {
profiles := testdata.GenerateProfiles(4)
testdata.GenerateProfiles(6).ResourceProfiles().MoveAndAppendTo(profiles.ResourceProfiles())
return profiles
}()}},
}(), nil)},
},
{
name: "split_only",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 4},
pr1: &profilesRequest{pd: testdata.GenerateProfiles(10)},
pr1: newProfilesRequest(testdata.GenerateProfiles(10), nil),
pr2: nil,
expected: []*profilesRequest{
{pd: testdata.GenerateProfiles(4)},
{pd: testdata.GenerateProfiles(4)},
{pd: testdata.GenerateProfiles(2)},
expected: []exporterhelper.Request{
newProfilesRequest(testdata.GenerateProfiles(4), nil),
newProfilesRequest(testdata.GenerateProfiles(4), nil),
newProfilesRequest(testdata.GenerateProfiles(2), nil),
},
},
{
name: "merge_and_split",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10},
pr1: &profilesRequest{pd: testdata.GenerateProfiles(8)},
pr2: &profilesRequest{pd: testdata.GenerateProfiles(20)},
expected: []*profilesRequest{
{pd: func() pprofile.Profiles {
pr1: newProfilesRequest(testdata.GenerateProfiles(8), nil),
pr2: newProfilesRequest(testdata.GenerateProfiles(20), nil),
expected: []exporterhelper.Request{
newProfilesRequest(func() pprofile.Profiles {
profiles := testdata.GenerateProfiles(8)
testdata.GenerateProfiles(2).ResourceProfiles().MoveAndAppendTo(profiles.ResourceProfiles())
return profiles
}()},
{pd: testdata.GenerateProfiles(10)},
{pd: testdata.GenerateProfiles(8)},
}(), nil),
newProfilesRequest(testdata.GenerateProfiles(10), nil),
newProfilesRequest(testdata.GenerateProfiles(8), nil),
},
},
{
name: "scope_profiles_split",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 4},
pr1: &profilesRequest{pd: func() pprofile.Profiles {
pr1: newProfilesRequest(func() pprofile.Profiles {
return testdata.GenerateProfiles(6)
}()},
}(), nil),
pr2: nil,
expected: []*profilesRequest{
{pd: testdata.GenerateProfiles(4)},
{pd: func() pprofile.Profiles {
expected: []exporterhelper.Request{
newProfilesRequest(testdata.GenerateProfiles(4), nil),
newProfilesRequest(func() pprofile.Profiles {
return testdata.GenerateProfiles(2)
}()},
}(), nil),
},
},
}
Expand All @@ -122,15 +119,15 @@ func TestMergeSplitProfiles(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, len(tt.expected), len(res))
for i, r := range res {
assert.Equal(t, tt.expected[i], r.(*profilesRequest))
assert.Equal(t, tt.expected[i].(*profilesRequest).pd, r.(*profilesRequest).pd)
}
})
}
}

func TestMergeSplitProfilesInvalidInput(t *testing.T) {
r1 := &dummyRequest{}
r2 := &profilesRequest{pd: testdata.GenerateProfiles(3)}
r2 := newProfilesRequest(testdata.GenerateProfiles(3), nil)
_, err := r2.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, r1)
assert.Error(t, err)
}
Expand Down

0 comments on commit 8d41c26

Please sign in to comment.