Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] Apply, items cache logic to profiles #12171

Merged
merged 1 commit into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions exporter/exporterhelper/xexporterhelper/profiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@ var (
)

type profilesRequest struct {
pd pprofile.Profiles
pusher xconsumer.ConsumeProfilesFunc
pd pprofile.Profiles
pusher xconsumer.ConsumeProfilesFunc
cachedItemsCount int
}

func newProfilesRequest(pd pprofile.Profiles, pusher xconsumer.ConsumeProfilesFunc) exporterhelper.Request {
return &profilesRequest{
pd: pd,
pusher: pusher,
pd: pd,
pusher: pusher,
cachedItemsCount: pd.SampleCount(),
}
}

Expand Down Expand Up @@ -66,7 +68,11 @@ func (req *profilesRequest) Export(ctx context.Context) error {
}

func (req *profilesRequest) ItemsCount() int {
return req.pd.SampleCount()
return req.cachedItemsCount
}

func (req *profilesRequest) setCachedItemsCount(count int) {
req.cachedItemsCount = count
}

type profileExporter struct {
Expand Down
12 changes: 10 additions & 2 deletions exporter/exporterhelper/xexporterhelper/profiles_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ func (req *profilesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.Ma
}

if cfg.MaxSizeItems == 0 {
req.setCachedItemsCount(req.ItemsCount() + req2.ItemsCount())
req2.setCachedItemsCount(0)
req2.pd.ResourceProfiles().MoveAndAppendTo(req.pd.ResourceProfiles())
return []exporterhelper.Request{req}, nil
}
Expand All @@ -43,6 +45,8 @@ func (req *profilesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.Ma
if destReq == nil {
destReq = srcReq
} else {
destReq.setCachedItemsCount(destReq.ItemsCount() + srcCount)
srcReq.setCachedItemsCount(0)
srcReq.pd.ResourceProfiles().MoveAndAppendTo(destReq.pd.ResourceProfiles())
}
capacityLeft -= srcCount
Expand All @@ -51,13 +55,17 @@ func (req *profilesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.Ma

for {
extractedProfiles := extractProfiles(srcReq.pd, capacityLeft)
if extractedProfiles.SampleCount() == 0 {
extractedCount := extractedProfiles.SampleCount()
if extractedCount == 0 {
break
}

capacityLeft -= extractedProfiles.SampleCount()
if destReq == nil {
destReq = &profilesRequest{pd: extractedProfiles, pusher: srcReq.pusher}
destReq = newProfilesRequest(extractedProfiles, srcReq.pusher).(*profilesRequest)
} else {
destReq.setCachedItemsCount(destReq.ItemsCount() + extractedCount)
srcReq.setCachedItemsCount(srcReq.ItemsCount() - extractedCount)
extractedProfiles.ResourceProfiles().MoveAndAppendTo(destReq.pd.ResourceProfiles())
}
// Create new batch once capacity is reached.
Expand Down
77 changes: 37 additions & 40 deletions exporter/exporterhelper/xexporterhelper/profiles_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ package xexporterhelper

import (
"context"
"fmt"
"os"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -19,18 +17,17 @@ import (
)

func TestMergeProfiles(t *testing.T) {
pr1 := &profilesRequest{pd: testdata.GenerateProfiles(2)}
pr2 := &profilesRequest{pd: testdata.GenerateProfiles(3)}
pr1 := newProfilesRequest(testdata.GenerateProfiles(2), nil)
pr2 := newProfilesRequest(testdata.GenerateProfiles(3), nil)
res, err := pr1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, pr2)
require.NoError(t, err)
assert.Len(t, res, 1)
fmt.Fprintf(os.Stdout, "%#v\n", res[0].(*profilesRequest).pd)
assert.Equal(t, 5, res[0].(*profilesRequest).pd.SampleCount())
assert.Equal(t, 5, res[0].ItemsCount())
}

func TestMergeProfilesInvalidInput(t *testing.T) {
pr1 := &dummyRequest{}
pr2 := &profilesRequest{pd: testdata.GenerateProfiles(3)}
pr2 := newProfilesRequest(testdata.GenerateProfiles(3), nil)
_, err := pr2.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, pr1)
assert.Error(t, err)
}
Expand All @@ -41,78 +38,78 @@ func TestMergeSplitProfiles(t *testing.T) {
cfg exporterbatcher.MaxSizeConfig
pr1 exporterhelper.Request
pr2 exporterhelper.Request
expected []*profilesRequest
expected []exporterhelper.Request
}{
{
name: "both_requests_empty",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10},
pr1: &profilesRequest{pd: pprofile.NewProfiles()},
pr2: &profilesRequest{pd: pprofile.NewProfiles()},
expected: []*profilesRequest{{pd: pprofile.NewProfiles()}},
pr1: newProfilesRequest(pprofile.NewProfiles(), nil),
pr2: newProfilesRequest(pprofile.NewProfiles(), nil),
expected: []exporterhelper.Request{newProfilesRequest(pprofile.NewProfiles(), nil)},
},
{
name: "first_request_empty",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10},
pr1: &profilesRequest{pd: pprofile.NewProfiles()},
pr2: &profilesRequest{pd: testdata.GenerateProfiles(5)},
expected: []*profilesRequest{{pd: testdata.GenerateProfiles(5)}},
pr1: newProfilesRequest(pprofile.NewProfiles(), nil),
pr2: newProfilesRequest(testdata.GenerateProfiles(5), nil),
expected: []exporterhelper.Request{newProfilesRequest(testdata.GenerateProfiles(5), nil)},
},
{
name: "first_empty_second_nil",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10},
pr1: &profilesRequest{pd: pprofile.NewProfiles()},
pr1: newProfilesRequest(pprofile.NewProfiles(), nil),
pr2: nil,
expected: []*profilesRequest{{pd: pprofile.NewProfiles()}},
expected: []exporterhelper.Request{newProfilesRequest(pprofile.NewProfiles(), nil)},
},
{
name: "merge_only",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10},
pr1: &profilesRequest{pd: testdata.GenerateProfiles(4)},
pr2: &profilesRequest{pd: testdata.GenerateProfiles(6)},
expected: []*profilesRequest{{pd: func() pprofile.Profiles {
pr1: newProfilesRequest(testdata.GenerateProfiles(4), nil),
pr2: newProfilesRequest(testdata.GenerateProfiles(6), nil),
expected: []exporterhelper.Request{newProfilesRequest(func() pprofile.Profiles {
profiles := testdata.GenerateProfiles(4)
testdata.GenerateProfiles(6).ResourceProfiles().MoveAndAppendTo(profiles.ResourceProfiles())
return profiles
}()}},
}(), nil)},
},
{
name: "split_only",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 4},
pr1: &profilesRequest{pd: testdata.GenerateProfiles(10)},
pr1: newProfilesRequest(testdata.GenerateProfiles(10), nil),
pr2: nil,
expected: []*profilesRequest{
{pd: testdata.GenerateProfiles(4)},
{pd: testdata.GenerateProfiles(4)},
{pd: testdata.GenerateProfiles(2)},
expected: []exporterhelper.Request{
newProfilesRequest(testdata.GenerateProfiles(4), nil),
newProfilesRequest(testdata.GenerateProfiles(4), nil),
newProfilesRequest(testdata.GenerateProfiles(2), nil),
},
},
{
name: "merge_and_split",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10},
pr1: &profilesRequest{pd: testdata.GenerateProfiles(8)},
pr2: &profilesRequest{pd: testdata.GenerateProfiles(20)},
expected: []*profilesRequest{
{pd: func() pprofile.Profiles {
pr1: newProfilesRequest(testdata.GenerateProfiles(8), nil),
pr2: newProfilesRequest(testdata.GenerateProfiles(20), nil),
expected: []exporterhelper.Request{
newProfilesRequest(func() pprofile.Profiles {
profiles := testdata.GenerateProfiles(8)
testdata.GenerateProfiles(2).ResourceProfiles().MoveAndAppendTo(profiles.ResourceProfiles())
return profiles
}()},
{pd: testdata.GenerateProfiles(10)},
{pd: testdata.GenerateProfiles(8)},
}(), nil),
newProfilesRequest(testdata.GenerateProfiles(10), nil),
newProfilesRequest(testdata.GenerateProfiles(8), nil),
},
},
{
name: "scope_profiles_split",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 4},
pr1: &profilesRequest{pd: func() pprofile.Profiles {
pr1: newProfilesRequest(func() pprofile.Profiles {
return testdata.GenerateProfiles(6)
}()},
}(), nil),
pr2: nil,
expected: []*profilesRequest{
{pd: testdata.GenerateProfiles(4)},
{pd: func() pprofile.Profiles {
expected: []exporterhelper.Request{
newProfilesRequest(testdata.GenerateProfiles(4), nil),
newProfilesRequest(func() pprofile.Profiles {
return testdata.GenerateProfiles(2)
}()},
}(), nil),
},
},
}
Expand All @@ -122,15 +119,15 @@ func TestMergeSplitProfiles(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, len(tt.expected), len(res))
for i, r := range res {
assert.Equal(t, tt.expected[i], r.(*profilesRequest))
assert.Equal(t, tt.expected[i], r)
}
})
}
}

func TestMergeSplitProfilesInvalidInput(t *testing.T) {
r1 := &dummyRequest{}
r2 := &profilesRequest{pd: testdata.GenerateProfiles(3)}
r2 := newProfilesRequest(testdata.GenerateProfiles(3), nil)
_, err := r2.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, r1)
assert.Error(t, err)
}
Expand Down
Loading