Skip to content

Commit

Permalink
[chore] Fix MergeSplit logic, update tests to use Request
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Jan 23, 2025
1 parent faf7758 commit 7e52446
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 63 deletions.
6 changes: 4 additions & 2 deletions exporter/exporterhelper/logs_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSiz
}

if cfg.MaxSizeItems == 0 {
req.setCachedItemsCount(req.ItemsCount() + req2.ItemsCount())
req2.setCachedItemsCount(0)
req2.ld.ResourceLogs().MoveAndAppendTo(req.ld.ResourceLogs())
return []Request{req}, nil
}
Expand All @@ -43,9 +45,9 @@ func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSiz
if destReq == nil {
destReq = srcReq
} else {
srcReq.ld.ResourceLogs().MoveAndAppendTo(destReq.ld.ResourceLogs())
destReq.setCachedItemsCount(srcCount)
destReq.setCachedItemsCount(destReq.ItemsCount() + srcCount)
srcReq.setCachedItemsCount(0)
srcReq.ld.ResourceLogs().MoveAndAppendTo(destReq.ld.ResourceLogs())
}
capacityLeft -= srcCount
continue
Expand Down
40 changes: 20 additions & 20 deletions exporter/exporterhelper/logs_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestMergeLogs(t *testing.T) {
lr2 := newLogsRequest(testdata.GenerateLogs(3), nil)
res, err := lr1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, lr2)
require.NoError(t, err)
require.Equal(t, 5, res[0].(*logsRequest).ld.LogRecordCount())
require.Equal(t, 5, res[0].ItemsCount())
}

func TestMergeLogsInvalidInput(t *testing.T) {
Expand All @@ -37,64 +37,64 @@ func TestMergeSplitLogs(t *testing.T) {
cfg exporterbatcher.MaxSizeConfig
lr1 internal.Request
lr2 internal.Request
expected []*logsRequest
expected []Request
}{
{
name: "both_requests_empty",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10},
lr1: newLogsRequest(plog.NewLogs(), nil),
lr2: newLogsRequest(plog.NewLogs(), nil),
expected: []*logsRequest{newLogsRequest(plog.NewLogs(), nil).(*logsRequest)},
expected: []Request{newLogsRequest(plog.NewLogs(), nil)},
},
{
name: "first_request_empty",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10},
lr1: newLogsRequest(plog.NewLogs(), nil),
lr2: newLogsRequest(testdata.GenerateLogs(5), nil),
expected: []*logsRequest{newLogsRequest(testdata.GenerateLogs(5), nil).(*logsRequest)},
expected: []Request{newLogsRequest(testdata.GenerateLogs(5), nil)},
},
{
name: "first_empty_second_nil",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10},
lr1: newLogsRequest(plog.NewLogs(), nil),
lr2: nil,
expected: []*logsRequest{newLogsRequest(plog.NewLogs(), nil).(*logsRequest)},
expected: []Request{newLogsRequest(plog.NewLogs(), nil)},
},
{
name: "merge_only",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10},
lr1: newLogsRequest(testdata.GenerateLogs(4), nil),
lr2: newLogsRequest(testdata.GenerateLogs(6), nil),
expected: []*logsRequest{newLogsRequest(func() plog.Logs {
expected: []Request{newLogsRequest(func() plog.Logs {
logs := testdata.GenerateLogs(4)
testdata.GenerateLogs(6).ResourceLogs().MoveAndAppendTo(logs.ResourceLogs())
return logs
}(), nil).(*logsRequest)},
}(), nil)},
},
{
name: "split_only",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 4},
lr1: newLogsRequest(plog.NewLogs(), nil),
lr2: newLogsRequest(testdata.GenerateLogs(10), nil),
expected: []*logsRequest{
newLogsRequest(testdata.GenerateLogs(4), nil).(*logsRequest),
newLogsRequest(testdata.GenerateLogs(4), nil).(*logsRequest),
newLogsRequest(testdata.GenerateLogs(2), nil).(*logsRequest),
expected: []Request{
newLogsRequest(testdata.GenerateLogs(4), nil),
newLogsRequest(testdata.GenerateLogs(4), nil),
newLogsRequest(testdata.GenerateLogs(2), nil),
},
},
{
name: "merge_and_split",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10},
lr1: newLogsRequest(testdata.GenerateLogs(8), nil),
lr2: newLogsRequest(testdata.GenerateLogs(20), nil),
expected: []*logsRequest{
expected: []Request{
newLogsRequest(func() plog.Logs {
logs := testdata.GenerateLogs(8)
testdata.GenerateLogs(2).ResourceLogs().MoveAndAppendTo(logs.ResourceLogs())
return logs
}(), nil).(*logsRequest),
newLogsRequest(testdata.GenerateLogs(10), nil).(*logsRequest),
newLogsRequest(testdata.GenerateLogs(8), nil).(*logsRequest),
}(), nil),
newLogsRequest(testdata.GenerateLogs(10), nil),
newLogsRequest(testdata.GenerateLogs(8), nil),
},
},
{
Expand All @@ -106,14 +106,14 @@ func TestMergeSplitLogs(t *testing.T) {
return ld
}(), nil),
lr2: newLogsRequest(testdata.GenerateLogs(2), nil),
expected: []*logsRequest{
newLogsRequest(testdata.GenerateLogs(4), nil).(*logsRequest),
expected: []Request{
newLogsRequest(testdata.GenerateLogs(4), nil),
newLogsRequest(func() plog.Logs {
ld := testdata.GenerateLogs(0)
ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().AppendEmpty().Body().SetStr("extra log")
testdata.GenerateLogs(2).ResourceLogs().MoveAndAppendTo(ld.ResourceLogs())
return ld
}(), nil).(*logsRequest),
}(), nil),
},
},
}
Expand All @@ -122,8 +122,8 @@ func TestMergeSplitLogs(t *testing.T) {
res, err := tt.lr1.MergeSplit(context.Background(), tt.cfg, tt.lr2)
require.NoError(t, err)
assert.Equal(t, len(tt.expected), len(res))
for i, r := range res {
assert.Equal(t, tt.expected[i].ld, r.(*logsRequest).ld)
for i := range res {
assert.Equal(t, tt.expected[i], res[i])
}
})
}
Expand Down
4 changes: 3 additions & 1 deletion exporter/exporterhelper/metrics_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ func (req *metricsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.Max
}

if cfg.MaxSizeItems == 0 {
req.setCachedItemsCount(req.ItemsCount() + req2.ItemsCount())
req2.setCachedItemsCount(0)
req2.md.ResourceMetrics().MoveAndAppendTo(req.md.ResourceMetrics())
return []Request{req}, nil
}
Expand All @@ -43,7 +45,7 @@ func (req *metricsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.Max
if destReq == nil {
destReq = srcReq
} else {
destReq.setCachedItemsCount(srcCount)
destReq.setCachedItemsCount(destReq.ItemsCount() + srcCount)
srcReq.setCachedItemsCount(0)
srcReq.md.ResourceMetrics().MoveAndAppendTo(destReq.md.ResourceMetrics())
}
Expand Down
39 changes: 20 additions & 19 deletions exporter/exporterhelper/metrics_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ func TestMergeMetrics(t *testing.T) {
mr2 := newMetricsRequest(testdata.GenerateMetrics(3), nil)
res, err := mr1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, mr2)
require.NoError(t, err)
assert.Equal(t, 5, res[0].(*metricsRequest).md.MetricCount())
// Every metric has 2 data points.
assert.Equal(t, 2*5, res[0].ItemsCount())
}

func TestMergeMetricsInvalidInput(t *testing.T) {
Expand All @@ -36,64 +37,64 @@ func TestMergeSplitMetrics(t *testing.T) {
cfg exporterbatcher.MaxSizeConfig
mr1 Request
mr2 Request
expected []*metricsRequest
expected []Request
}{
{
name: "both_requests_empty",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10},
mr1: newMetricsRequest(pmetric.NewMetrics(), nil),
mr2: newMetricsRequest(pmetric.NewMetrics(), nil),
expected: []*metricsRequest{newMetricsRequest(pmetric.NewMetrics(), nil).(*metricsRequest)},
expected: []Request{newMetricsRequest(pmetric.NewMetrics(), nil)},
},
{
name: "first_request_empty",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10},
mr1: newMetricsRequest(pmetric.NewMetrics(), nil),
mr2: newMetricsRequest(testdata.GenerateMetrics(5), nil),
expected: []*metricsRequest{newMetricsRequest(testdata.GenerateMetrics(5), nil).(*metricsRequest)},
expected: []Request{newMetricsRequest(testdata.GenerateMetrics(5), nil)},
},
{
name: "first_empty_second_nil",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10},
mr1: newMetricsRequest(pmetric.NewMetrics(), nil),
mr2: nil,
expected: []*metricsRequest{newMetricsRequest(pmetric.NewMetrics(), nil).(*metricsRequest)},
expected: []Request{newMetricsRequest(pmetric.NewMetrics(), nil)},
},
{
name: "merge_only",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 60},
mr1: newMetricsRequest(testdata.GenerateMetrics(10), nil),
mr2: newMetricsRequest(testdata.GenerateMetrics(14), nil),
expected: []*metricsRequest{newMetricsRequest(func() pmetric.Metrics {
expected: []Request{newMetricsRequest(func() pmetric.Metrics {
metrics := testdata.GenerateMetrics(10)
testdata.GenerateMetrics(14).ResourceMetrics().MoveAndAppendTo(metrics.ResourceMetrics())
return metrics
}(), nil).(*metricsRequest)},
}(), nil)},
},
{
name: "split_only",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 14},
mr1: newMetricsRequest(pmetric.NewMetrics(), nil),
mr2: newMetricsRequest(testdata.GenerateMetrics(15), nil), // 15 metrics, 30 data points
expected: []*metricsRequest{
newMetricsRequest(testdata.GenerateMetrics(7), nil).(*metricsRequest), // 7 metrics, 14 data points
newMetricsRequest(testdata.GenerateMetrics(7), nil).(*metricsRequest), // 7 metrics, 14 data points
newMetricsRequest(testdata.GenerateMetrics(1), nil).(*metricsRequest), // 1 metric, 2 data points
expected: []Request{
newMetricsRequest(testdata.GenerateMetrics(7), nil), // 7 metrics, 14 data points
newMetricsRequest(testdata.GenerateMetrics(7), nil), // 7 metrics, 14 data points
newMetricsRequest(testdata.GenerateMetrics(1), nil), // 1 metric, 2 data points
},
},
{
name: "split_and_merge",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 28},
mr1: newMetricsRequest(testdata.GenerateMetrics(7), nil), // 7 metrics, 14 data points
mr2: newMetricsRequest(testdata.GenerateMetrics(25), nil), // 25 metrics, 50 data points
expected: []*metricsRequest{
expected: []Request{
newMetricsRequest(func() pmetric.Metrics {
metrics := testdata.GenerateMetrics(7)
testdata.GenerateMetrics(7).ResourceMetrics().MoveAndAppendTo(metrics.ResourceMetrics())
return metrics
}(), nil).(*metricsRequest),
newMetricsRequest(testdata.GenerateMetrics(14), nil).(*metricsRequest), // 14 metrics, 28 data points
newMetricsRequest(testdata.GenerateMetrics(4), nil).(*metricsRequest), // 4 metrics, 8 data points
}(), nil),
newMetricsRequest(testdata.GenerateMetrics(14), nil), // 14 metrics, 28 data points
newMetricsRequest(testdata.GenerateMetrics(4), nil), // 4 metrics, 8 data points
},
},
{
Expand All @@ -107,13 +108,13 @@ func TestMergeSplitMetrics(t *testing.T) {
return md
}(), nil),
mr2: nil,
expected: []*metricsRequest{
newMetricsRequest(testdata.GenerateMetrics(4), nil).(*metricsRequest),
expected: []Request{
newMetricsRequest(testdata.GenerateMetrics(4), nil),
newMetricsRequest(func() pmetric.Metrics {
md := testdata.GenerateMetrics(4)
md.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope().SetName("extra scope")
return md
}(), nil).(*metricsRequest),
}(), nil),
},
},
}
Expand All @@ -123,7 +124,7 @@ func TestMergeSplitMetrics(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, len(tt.expected), len(res))
for i := range res {
assert.Equal(t, tt.expected[i].md, res[i].(*metricsRequest).md)
assert.Equal(t, tt.expected[i], res[i])
}
})
}
Expand Down
4 changes: 3 additions & 1 deletion exporter/exporterhelper/traces_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ func (req *tracesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxS
}

if cfg.MaxSizeItems == 0 {
req.setCachedItemsCount(req.ItemsCount() + req2.ItemsCount())
req2.setCachedItemsCount(0)
req2.td.ResourceSpans().MoveAndAppendTo(req.td.ResourceSpans())
return []Request{req}, nil
}
Expand All @@ -43,7 +45,7 @@ func (req *tracesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxS
if destReq == nil {
destReq = srcReq
} else {
destReq.setCachedItemsCount(srcCount)
destReq.setCachedItemsCount(destReq.ItemsCount() + srcCount)
srcReq.setCachedItemsCount(0)
srcReq.td.ResourceSpans().MoveAndAppendTo(destReq.td.ResourceSpans())
}
Expand Down
Loading

0 comments on commit 7e52446

Please sign in to comment.