Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 23 additions & 16 deletions exporter/exporterhelper/internal/queuebatch/partition_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhel

import (
"context"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -69,12 +68,8 @@ func (qb *partitionBatcher) Consume(ctx context.Context, req request.Request, do
if qb.currentBatch == nil {
reqList, mergeSplitErr := req.MergeSplit(ctx, int(qb.cfg.MaxSize), qb.cfg.Sizer, nil)
if mergeSplitErr != nil {
qb.logger.Warn(mergeSplitErr.Error())
if !strings.Contains(mergeSplitErr.Error(), "partial success") {
done.OnDone(mergeSplitErr)
qb.currentBatchMu.Unlock()
return
}
// Do not return in case of error if there are data, try to export as much as possible.
qb.logger.Warn("Failed to split request.", zap.Error(mergeSplitErr))
}

if len(reqList) == 0 {
Expand All @@ -84,8 +79,16 @@ func (qb *partitionBatcher) Consume(ctx context.Context, req request.Request, do
}

// If more than one flush is required for this request, call done only when all flushes are done.
if len(reqList) > 1 {
done = newRefCountDone(done, int64(len(reqList)))
numRefs := len(reqList)
// Need to also inform about the mergeSplitErr, consider the errored data as 1 batch.
if mergeSplitErr != nil {
numRefs++
}
if numRefs > 1 {
done = newRefCountDone(done, int64(numRefs))
if mergeSplitErr != nil {
done.OnDone(mergeSplitErr)
}
}

// We have at least one result in the reqList. Last in the list may not have enough data to be flushed.
Expand Down Expand Up @@ -113,12 +116,8 @@ func (qb *partitionBatcher) Consume(ctx context.Context, req request.Request, do
reqList, mergeSplitErr := qb.currentBatch.req.MergeSplit(ctx, int(qb.cfg.MaxSize), qb.cfg.Sizer, req)
// If failed to merge signal all Done callbacks from the current batch as well as the current request and reset the current batch.
if mergeSplitErr != nil {
qb.logger.Warn(mergeSplitErr.Error())
if !strings.Contains(mergeSplitErr.Error(), "partial success") {
done.OnDone(mergeSplitErr)
qb.currentBatchMu.Unlock()
return
}
// Do not return in case of error if there are data, try to export as much as possible.
qb.logger.Warn("Failed to split request.", zap.Error(mergeSplitErr))
}

if len(reqList) == 0 {
Expand All @@ -128,8 +127,16 @@ func (qb *partitionBatcher) Consume(ctx context.Context, req request.Request, do
}

// If more than one flush is required for this request, call done only when all flushes are done.
if len(reqList) > 1 {
numRefs := len(reqList)
// Need to also inform about the mergeSplitErr, consider the errored data as 1 batch.
if mergeSplitErr != nil {
numRefs++
}
if numRefs > 1 {
done = newRefCountDone(done, int64(len(reqList)))
if mergeSplitErr != nil {
done.OnDone(mergeSplitErr)
}
}

// We have at least one result in the reqList, if more results here is what that means:
Expand Down
143 changes: 42 additions & 101 deletions exporter/exporterhelper/internal/queuebatch/partition_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package queuebatch
import (
"context"
"errors"
"fmt"
"runtime"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -382,53 +381,26 @@ func TestPartitionBatcher_MergeError(t *testing.T) {
assert.EqualValues(t, 0, done.success.Load())
}

type customPartialErrorRequest struct {
*requesttest.FakeRequest
failureCount int
maxSize int
}

func (r *customPartialErrorRequest) MergeSplit(_ context.Context, _ int, _ request.SizerType, _ request.Request) ([]request.Request, error) {
return nil, fmt.Errorf("partial success: failed to split request: size is greater than max size. size: %d, max_size: %d. Failed: %d",
r.ItemsCount(),
r.maxSize,
r.ItemsCount(),
)
}

func (r *customPartialErrorRequest) ItemsCount() int {
return r.FakeRequest.ItemsCount()
}

func (r *customPartialErrorRequest) OnError(err error) request.Request {
return r.FakeRequest.OnError(err)
}

func TestShardBatcher_PartialSuccessError(t *testing.T) {
func TestPartitionBatcher_PartialSuccessError(t *testing.T) {
cfg := BatchConfig{
FlushTimeout: 0,
MinSize: 0,
MaxSize: 10,
Sizer: request.SizerTypeBytes,
MinSize: 10,
MaxSize: 15,
}

core, observed := observer.New(zap.WarnLevel)
logger := zap.New(core)
sink := requesttest.NewSink()
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), newWorkerPool(1), sink.Export, logger)
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
require.NoError(t, ba.Shutdown(context.Background()))
})

done := newFakeDone()

req := &customPartialErrorRequest{
maxSize: 10,
FakeRequest: &requesttest.FakeRequest{
Items: 100,
Bytes: 100,
},
failureCount: 42,
req := &requesttest.FakeRequest{
Items: 100,
Bytes: 100,
MergeErr: errors.New("split error"),
MergeErrResult: []request.Request{&requesttest.FakeRequest{Items: 10, Bytes: 15}},
}
ba.Consume(context.Background(), req, done)

Expand All @@ -439,91 +411,60 @@ func TestShardBatcher_PartialSuccessError(t *testing.T) {
}
log := logs[0]
return log.Level == zap.WarnLevel &&
log.Message == "partial success: failed to split request: size is greater than max size. size: 100, max_size: 10. Failed: 100"
log.Message == "Failed to split request."
}, time.Second, 10*time.Millisecond)

// Verify that done callback was called with the error
assert.Eventually(t, func() bool {
return done.errors.Load() == 1
}, time.Second, 10*time.Millisecond)
require.NoError(t, ba.Shutdown(context.Background()))

// Verify that done callback was called with the returned batch and error for the split.
assert.Equal(t, int64(1), done.errors.Load())
assert.Equal(t, 1, sink.RequestsCount())
assert.Equal(t, 10, sink.ItemsCount())
assert.Equal(t, 15, sink.BytesCount())
}

func TestShardBatcher_PartialSuccessError_WithLogs(t *testing.T) {
func TestSPartitionBatcher_PartialSuccessError_AfterOkRequest(t *testing.T) {
cfg := BatchConfig{
FlushTimeout: 0,
Sizer: request.SizerTypeBytes,
MinSize: 10,
MaxSize: 15,
}

core, logs := observer.New(zap.WarnLevel)
core, observed := observer.New(zap.WarnLevel)
logger := zap.New(core)

sink := requesttest.NewSink()
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), newWorkerPool(1), sink.Export, logger)
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
require.NoError(t, ba.Shutdown(context.Background()))
})

done := newFakeDone()
req := &customPartialErrorRequest{
FakeRequest: &requesttest.FakeRequest{
Items: 8,
MergeErr: errors.New("partial success: failed to split request: size is greater than max size. size: 3, max_size: 10. Failed: 3"),
},
failureCount: 42,
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 5, Bytes: 5}, done)
req := &requesttest.FakeRequest{
Items: 100,
Bytes: 100,
MergeErr: errors.New("split error"),
MergeErrResult: []request.Request{&requesttest.FakeRequest{Items: 10, Bytes: 15}},
}

ba.Consume(context.Background(), req, done)

assert.Eventually(t, func() bool {
return done.errors.Load() == 1
}, 1*time.Second, 10*time.Millisecond)

allLogs := logs.All()
require.Len(t, allLogs, 1)
assert.Contains(t, allLogs[0].Message, "partial success: failed to split request:")
}

func TestShardBatcher_PartialSuccessError_WithFailureCountAndReason(t *testing.T) {
core, observedLogs := observer.New(zap.WarnLevel)
logger := zap.New(core)

cfg := BatchConfig{
FlushTimeout: 0,
MinSize: 5,
MaxSize: 10,
}

sink := requesttest.NewSink()
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), newWorkerPool(1), sink.Export, logger)
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
require.NoError(t, ba.Shutdown(context.Background()))
})

// First add a request to create a current batch
done1 := newFakeDone()
req1 := &requesttest.FakeRequest{
Items: 3,
}
ba.Consume(context.Background(), req1, done1)

// Now add a request that will trigger the partial success error
done2 := newFakeDone()
req2 := &requesttest.FakeRequest{
Items: 3,
MergeErr: errors.New("partial success: failed to split request: size is greater than max size. size: 3, max_size: 10. Failed: 3"),
}
ba.Consume(context.Background(), req2, done2)

assert.Eventually(t, func() bool {
return observedLogs.Len() > 0
logs := observed.All()
if len(logs) == 0 {
return false
}
log := logs[0]
return log.Level == zap.WarnLevel &&
log.Message == "Failed to split request."
}, time.Second, 10*time.Millisecond)

allLogs := observedLogs.All()
require.Len(t, allLogs, 1)
assert.Contains(t, allLogs[0].Message, "failed to split request")
assert.EqualValues(t, 1, done2.errors.Load())
require.NoError(t, ba.Shutdown(context.Background()))

// Verify that done callback was called with the success for the returned batch and error for the split.
assert.Equal(t, int64(1), done.errors.Load())
assert.Equal(t, int64(1), done.success.Load())
assert.Equal(t, 1, sink.RequestsCount())
assert.Equal(t, 10, sink.ItemsCount())
assert.Equal(t, 15, sink.BytesCount())
}

type fakeDone struct {
Expand Down
15 changes: 8 additions & 7 deletions exporter/exporterhelper/internal/requesttest/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ func (e errorPartial) Error() string {
}

type FakeRequest struct {
Items int
Bytes int
Partial int
MergeErr error
Delay time.Duration
Items int
Bytes int
Partial int
MergeErr error
MergeErrResult []request.Request
Delay time.Duration
}

func (r *FakeRequest) OnError(err error) request.Request {
Expand All @@ -45,13 +46,13 @@ func (r *FakeRequest) ItemsCount() int {

func (r *FakeRequest) MergeSplit(_ context.Context, maxSize int, szt request.SizerType, r2 request.Request) ([]request.Request, error) {
if r.MergeErr != nil {
return nil, r.MergeErr
return r.MergeErrResult, r.MergeErr
}

if r2 != nil {
fr2 := r2.(*FakeRequest)
if fr2.MergeErr != nil {
return nil, fr2.MergeErr
return fr2.MergeErrResult, fr2.MergeErr
}
fr2.mergeTo(r)
}
Expand Down
27 changes: 6 additions & 21 deletions exporter/exporterhelper/logs_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,29 +50,14 @@ func (req *logsRequest) mergeTo(dst *logsRequest, sz sizer.LogsSizer) {

func (req *logsRequest) split(maxSize int, sz sizer.LogsSizer) ([]Request, error) {
var res []Request
var ld plog.Logs
rmSize := -1

previousSize := req.size(sz)

for req.size(sz) > maxSize && rmSize != 0 {
ld, rmSize = extractLogs(req.ld, maxSize, sz)
if ld.LogRecordCount() > 0 {
req.setCachedSize(req.size(sz) - rmSize)
res = append(res, newLogsRequest(ld))
for req.size(sz) > maxSize {
ld, removedSize := extractLogs(req.ld, maxSize, sz)
if ld.LogRecordCount() == 0 {
return res, fmt.Errorf("one log record size is greater than max size, dropping items: %d", req.ld.LogRecordCount())
}
req.setCachedSize(req.size(sz) - removedSize)
res = append(res, newLogsRequest(ld))
}

if req.size(sz) == previousSize && req.size(sz) > maxSize {
err := fmt.Errorf(
"partial success: failed to split logs request: size is greater than max size. size: %d, max_size: %d. Failed: %d",
req.size(sz),
maxSize,
req.ld.LogRecordCount(),
)
return res, err
}

res = append(res, req)
return res, nil
}
Expand Down
25 changes: 21 additions & 4 deletions exporter/exporterhelper/logs_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,16 +233,33 @@ func TestMergeSplitLogsBasedOnByteSize(t *testing.T) {
expected: []Request{},
expectPartialError: true,
},
{
name: "splittable_then_unsplittable_log",
szt: RequestSizerTypeBytes,
maxSize: 1000,
lr1: newLogsRequest(func() plog.Logs {
ld := testdata.GenerateLogs(2)
ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr(string(make([]byte, 10)))
ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(1).Body().SetStr(string(make([]byte, 1001)))
return ld
}()),
lr2: nil,
expected: []Request{newLogsRequest(func() plog.Logs {
ld := testdata.GenerateLogs(1)
ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr(string(make([]byte, 10)))
return ld
}())},
expectPartialError: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
res, err := tt.lr1.MergeSplit(context.Background(), tt.maxSize, tt.szt, tt.lr2)
if tt.expectPartialError {
require.Error(t, err)
assert.Contains(t, err.Error(), "partial success: failed to split logs request: size is greater than max size")
return
require.ErrorContains(t, err, "one log record size is greater than max size, dropping")
} else {
require.NoError(t, err)
}
require.NoError(t, err)
assert.Len(t, res, len(tt.expected))
for i := range res {
assert.Equal(t, tt.expected[i].(*logsRequest).ld, res[i].(*logsRequest).ld)
Expand Down
Loading
Loading