Skip to content

Commit 3cbe092

Browse files
committed
Consolidate merge splitting for the case we have a max limit and the case we don't
1 parent 42b83a7 commit 3cbe092

File tree

1 file changed

+28
-64
lines changed

1 file changed

+28
-64
lines changed

exporter/internal/queue/default_batcher.go

+28-64
Original file line numberDiff line numberDiff line change
@@ -43,77 +43,41 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() {
4343

4444
qb.currentBatchMu.Lock()
4545

46-
if qb.batchCfg.MaxSizeItems > 0 {
47-
var reqList []internal.Request
48-
var mergeSplitErr error
49-
if qb.currentBatch == nil || qb.currentBatch.req == nil {
50-
qb.resetTimer()
51-
reqList, mergeSplitErr = req.MergeSplit(ctx, qb.batchCfg.MaxSizeConfig, nil)
52-
} else {
53-
reqList, mergeSplitErr = qb.currentBatch.req.MergeSplit(ctx, qb.batchCfg.MaxSizeConfig, req)
54-
}
46+
var reqList []internal.Request
47+
var mergeSplitErr error
48+
if qb.currentBatch == nil || qb.currentBatch.req == nil {
49+
qb.resetTimer()
50+
reqList, mergeSplitErr = req.MergeSplit(ctx, qb.batchCfg.MaxSizeConfig, nil)
51+
} else {
52+
reqList, mergeSplitErr = qb.currentBatch.req.MergeSplit(ctx, qb.batchCfg.MaxSizeConfig, req)
53+
}
5554

56-
if mergeSplitErr != nil || reqList == nil {
57-
qb.queue.OnProcessingFinished(idx, mergeSplitErr)
58-
qb.currentBatchMu.Unlock()
59-
continue
60-
}
55+
if mergeSplitErr != nil || reqList == nil {
56+
qb.queue.OnProcessingFinished(idx, mergeSplitErr)
57+
qb.currentBatchMu.Unlock()
58+
continue
59+
}
6160

62-
// If there was a split, we flush everything immediately.
63-
if reqList[0].ItemsCount() >= qb.batchCfg.MinSizeItems || len(reqList) > 1 {
64-
qb.currentBatch = nil
65-
qb.currentBatchMu.Unlock()
66-
for i := 0; i < len(reqList); i++ {
67-
qb.flush(batch{
68-
req: reqList[i],
69-
ctx: ctx,
70-
idxList: []uint64{idx},
71-
})
72-
// TODO: handle partial failure
73-
}
74-
qb.resetTimer()
75-
} else {
76-
qb.currentBatch = &batch{
77-
req: reqList[0],
61+
// If there was a split, we flush everything immediately.
62+
if reqList[0].ItemsCount() >= qb.batchCfg.MinSizeItems || len(reqList) > 1 {
63+
qb.currentBatch = nil
64+
qb.currentBatchMu.Unlock()
65+
for i := 0; i < len(reqList); i++ {
66+
qb.flush(batch{
67+
req: reqList[i],
7868
ctx: ctx,
7969
idxList: []uint64{idx},
80-
}
81-
qb.currentBatchMu.Unlock()
70+
})
71+
// TODO: handle partial failure
8272
}
73+
qb.resetTimer()
8374
} else {
84-
if qb.currentBatch == nil || qb.currentBatch.req == nil {
85-
qb.resetTimer()
86-
qb.currentBatch = &batch{
87-
req: req,
88-
ctx: ctx,
89-
idxList: []uint64{idx},
90-
}
91-
} else {
92-
// TODO: consolidate implementation for the cases where MaxSizeConfig is specified and the case where it is not specified
93-
mergedReq, mergeErr := qb.currentBatch.req.MergeSplit(qb.currentBatch.ctx, qb.batchCfg.MaxSizeConfig, req)
94-
if mergeErr != nil {
95-
qb.queue.OnProcessingFinished(idx, mergeErr)
96-
qb.currentBatchMu.Unlock()
97-
continue
98-
}
99-
qb.currentBatch = &batch{
100-
req: mergedReq[0],
101-
ctx: qb.currentBatch.ctx,
102-
idxList: append(qb.currentBatch.idxList, idx),
103-
}
104-
}
105-
106-
if qb.currentBatch.req.ItemsCount() >= qb.batchCfg.MinSizeItems {
107-
batchToFlush := *qb.currentBatch
108-
qb.currentBatch = nil
109-
qb.currentBatchMu.Unlock()
110-
111-
// flush() blocks until successfully started a goroutine for flushing.
112-
qb.flush(batchToFlush)
113-
qb.resetTimer()
114-
} else {
115-
qb.currentBatchMu.Unlock()
75+
qb.currentBatch = &batch{
76+
req: reqList[0],
77+
ctx: ctx,
78+
idxList: []uint64{idx},
11679
}
80+
qb.currentBatchMu.Unlock()
11781
}
11882
}
11983
}()

0 commit comments

Comments
 (0)