Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][exporter] Consolidate merge splitting for the case where maxLimit is set and the case it's not #12104

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 28 additions & 64 deletions exporter/internal/queue/default_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,77 +43,41 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() {

qb.currentBatchMu.Lock()

if qb.batchCfg.MaxSizeItems > 0 {
var reqList []internal.Request
var mergeSplitErr error
if qb.currentBatch == nil || qb.currentBatch.req == nil {
qb.resetTimer()
reqList, mergeSplitErr = req.MergeSplit(ctx, qb.batchCfg.MaxSizeConfig, nil)
} else {
reqList, mergeSplitErr = qb.currentBatch.req.MergeSplit(ctx, qb.batchCfg.MaxSizeConfig, req)
}
var reqList []internal.Request
var mergeSplitErr error
if qb.currentBatch == nil || qb.currentBatch.req == nil {
qb.resetTimer()
reqList, mergeSplitErr = req.MergeSplit(ctx, qb.batchCfg.MaxSizeConfig, nil)
} else {
reqList, mergeSplitErr = qb.currentBatch.req.MergeSplit(ctx, qb.batchCfg.MaxSizeConfig, req)
}

if mergeSplitErr != nil || reqList == nil {
qb.queue.OnProcessingFinished(idx, mergeSplitErr)
qb.currentBatchMu.Unlock()
continue
}
if mergeSplitErr != nil || reqList == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure you want to do anything if error, unless I am missing something.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A slice never compare to nil, you must most likely compare len to 0

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm. In Go a slice can be nil and the comparison is valid. Note however that nil != []type{}.
https://go.dev/tour/moretypes/12

qb.queue.OnProcessingFinished(idx, mergeSplitErr)
qb.currentBatchMu.Unlock()
continue
}

// If there was a split, we flush everything immediately.
if reqList[0].ItemsCount() >= qb.batchCfg.MinSizeItems || len(reqList) > 1 {
qb.currentBatch = nil
qb.currentBatchMu.Unlock()
for i := 0; i < len(reqList); i++ {
qb.flush(batch{
req: reqList[i],
ctx: ctx,
idxList: []uint64{idx},
})
// TODO: handle partial failure
}
qb.resetTimer()
} else {
qb.currentBatch = &batch{
req: reqList[0],
// If there was a split, we flush everything immediately.
if reqList[0].ItemsCount() >= qb.batchCfg.MinSizeItems || len(reqList) > 1 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if len is 0, get a segfault here?

qb.currentBatch = nil
qb.currentBatchMu.Unlock()
for i := 0; i < len(reqList); i++ {
qb.flush(batch{
req: reqList[i],
ctx: ctx,
idxList: []uint64{idx},
}
qb.currentBatchMu.Unlock()
})
// TODO: handle partial failure
}
qb.resetTimer()
} else {
if qb.currentBatch == nil || qb.currentBatch.req == nil {
qb.resetTimer()
qb.currentBatch = &batch{
req: req,
ctx: ctx,
idxList: []uint64{idx},
}
} else {
// TODO: consolidate implementation for the cases where MaxSizeConfig is specified and the case where it is not specified
mergedReq, mergeErr := qb.currentBatch.req.MergeSplit(qb.currentBatch.ctx, qb.batchCfg.MaxSizeConfig, req)
if mergeErr != nil {
qb.queue.OnProcessingFinished(idx, mergeErr)
qb.currentBatchMu.Unlock()
continue
}
qb.currentBatch = &batch{
req: mergedReq[0],
ctx: qb.currentBatch.ctx,
idxList: append(qb.currentBatch.idxList, idx),
}
}

if qb.currentBatch.req.ItemsCount() >= qb.batchCfg.MinSizeItems {
batchToFlush := *qb.currentBatch
qb.currentBatch = nil
qb.currentBatchMu.Unlock()

// flush() blocks until successfully started a goroutine for flushing.
qb.flush(batchToFlush)
qb.resetTimer()
} else {
qb.currentBatchMu.Unlock()
qb.currentBatch = &batch{
req: reqList[0],
ctx: ctx,
idxList: []uint64{idx},
}
qb.currentBatchMu.Unlock()
}
}
}()
Expand Down
4 changes: 4 additions & 0 deletions exporter/internal/requesttest/fake_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func (r *FakeRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeC

maxItems := cfg.MaxSizeItems
if maxItems == 0 {
if r2 == nil {
return []internal.Request{r}, nil
}

fr2 := r2.(*FakeRequest)
if fr2.MergeErr != nil {
return nil, fr2.MergeErr
Expand Down
Loading