Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ BUILD_INFO_IMPORT_PATH=go.opentelemetry.io/collector/internal/version
all: gotidy test build

test:
for dir in $(GODIRS); do (cd $${dir}; $(GOCMD) test --tags=assert ./...) || exit 1; done
for dir in $(GODIRS); do (cd $${dir}; $(GOCMD) test -parallel=4 -race --tags=assert ./...) || exit 1; done

fmt:
for dir in $(GODIRS); do (cd $${dir}; $(GOCMD) fmt ./...) || exit 1; done
Expand Down
23 changes: 15 additions & 8 deletions collector/processor/concurrentbatchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ type shard struct {

pending []pendingItem

totalSent int
// totalSent is a monotonic count of items used to match
// request and response.
totalSent int64

tracer trace.TracerProvider
}
Expand Down Expand Up @@ -323,28 +325,29 @@ func (b *shard) resetTimer() {
}

func (b *shard) sendItems(trigger trigger) {
sent, req := b.batch.splitBatch(b.exportCtx, b.processor.sendBatchMaxSize, b.processor.telemetry.detailed)
split, req := b.batch.splitBatch(b.exportCtx, b.processor.sendBatchMaxSize, b.processor.telemetry.detailed)
bytes := int64(b.batch.sizeBytes(req))
toSend := int64(split)

var waiters []chan error
var countItems []int
var contexts []context.Context

numItemsBefore := b.totalSent
numItemsAfter := b.totalSent + sent
numItemsAfter := b.totalSent + toSend

// The current batch can contain items from several different producers. Ensure each producer gets a response back.
for len(b.pending) > 0 && numItemsBefore < numItemsAfter {
// Waiter only had some items in the current batch
if numItemsBefore+b.pending[0].numItems > numItemsAfter {
partialSent := numItemsAfter - numItemsBefore
if numItemsBefore+int64(b.pending[0].numItems) > numItemsAfter {
partialSent := int(numItemsAfter - numItemsBefore)
b.pending[0].numItems -= partialSent
numItemsBefore += partialSent
numItemsBefore += int64(partialSent)
waiters = append(waiters, b.pending[0].respCh)
contexts = append(contexts, b.pending[0].parentCtx)
countItems = append(countItems, partialSent)
} else { // waiter gets a complete response.
numItemsBefore += b.pending[0].numItems
numItemsBefore += int64(b.pending[0].numItems)
waiters = append(waiters, b.pending[0].respCh)
contexts = append(contexts, b.pending[0].parentCtx)
countItems = append(countItems, b.pending[0].numItems)
Expand Down Expand Up @@ -389,7 +392,7 @@ func (b *shard) sendItems(trigger trigger) {
if err != nil {
b.processor.logger.Warn("Sender failed", zap.Error(err))
} else {
b.processor.telemetry.record(latency, trigger, int64(sent), bytes)
b.processor.telemetry.record(latency, trigger, toSend, bytes)
}
}()

Expand Down Expand Up @@ -457,6 +460,10 @@ func (b *shard) consumeAndWait(ctx context.Context, data any) error {
item.count = telem.LogRecordCount()
}

if item.count == 0 {
return nil
}

bytes := int64(b.batch.sizeBytes(data))

if bytes > b.processor.limitBytes {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ func TestBatchProcessorSpansPanicRecover(t *testing.T) {
// until batch size reached to unblock.
wg.Add(1)
go func() {
err = bp.ConsumeTraces(context.Background(), td)
defer wg.Done()
err := bp.ConsumeTraces(context.Background(), td)
assert.Contains(t, err.Error(), "testing panic")
wg.Done()
}()
}

Expand Down Expand Up @@ -168,9 +168,9 @@ func TestBatchProcessorMetricsPanicRecover(t *testing.T) {
md.ResourceMetrics().At(0).CopyTo(sentResourceMetrics.AppendEmpty())
wg.Add(1)
go func() {
err = bp.ConsumeMetrics(context.Background(), md)
defer wg.Done()
err := bp.ConsumeMetrics(context.Background(), md)
assert.Contains(t, err.Error(), "testing panic")
wg.Done()
}()
}

Expand Down Expand Up @@ -202,9 +202,9 @@ func TestBatchProcessorLogsPanicRecover(t *testing.T) {
ld.ResourceLogs().At(0).CopyTo(sentResourceLogs.AppendEmpty())
wg.Add(1)
go func() {
err = bp.ConsumeLogs(context.Background(), ld)
defer wg.Done()
err := bp.ConsumeLogs(context.Background(), ld)
assert.Contains(t, err.Error(), "testing panic")
wg.Done()
}()
}

Expand Down Expand Up @@ -307,9 +307,9 @@ func TestBatchProcessorCancelContext(t *testing.T) {
// until batch size reached to unblock.
wg.Add(1)
go func() {
err = bp.ConsumeTraces(ctx, td)
defer wg.Done()
err := bp.ConsumeTraces(ctx, td)
assert.Contains(t, err.Error(), "context canceled")
wg.Done()
}()
}

Expand Down Expand Up @@ -389,8 +389,8 @@ func TestBatchProcessorUnbrokenParentContext(t *testing.T) {
// until batch size reached to unblock.
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, bp.ConsumeTraces(bg, td))
wg.Done()
}()
}
wg.Wait()
Expand Down Expand Up @@ -481,8 +481,8 @@ func TestBatchProcessorUnbrokenParentContextMultiple(t *testing.T) {
// until batch size reached to unblock.
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, bp.ConsumeTraces(callCtxs[num], td))
wg.Done()
}()
}
wg.Wait()
Expand Down Expand Up @@ -528,17 +528,17 @@ func TestBatchProcessorSpansDelivered(t *testing.T) {
// until batch size reached to unblock.
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, batcher.ConsumeTraces(context.Background(), td))
wg.Done()
}()
}

// Added to test logic that check for empty resources.
td := ptrace.NewTraces()
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, batcher.ConsumeTraces(context.Background(), td))
wg.Done()
}()

wg.Wait()
Expand Down Expand Up @@ -580,17 +580,17 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) {
}
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, batcher.ConsumeTraces(context.Background(), td))
wg.Done()
}()
}

// Added to test logic that check for empty resources.
td := ptrace.NewTraces()
wg.Add(1)
go func() {
defer wg.Done()
require.NoError(t, batcher.ConsumeTraces(context.Background(), td))
wg.Done()
}()

// shutdown will flush any remaining spans
Expand Down Expand Up @@ -633,8 +633,8 @@ func testBatchProcessorSentBySize(t *testing.T, tel testTelemetry) {
sizeSum += sizer.TracesSize(td)
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, batcher.ConsumeTraces(context.Background(), td))
wg.Done()
}()
}

Expand Down Expand Up @@ -694,8 +694,8 @@ func testBatchProcessorSentBySizeWithMaxSize(t *testing.T, tel testTelemetry) {
// this should be a noerr but need to separate triggerTimeout from triggerShutdown
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, batcher.ConsumeTraces(context.Background(), td))
wg.Done()
}()
}

Expand Down Expand Up @@ -738,8 +738,8 @@ func TestBatchProcessorSentByTimeout(t *testing.T) {
td := testdata.GenerateTraces(spansPerRequest)
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, batcher.ConsumeTraces(context.Background(), td))
wg.Done()
}()
}

Expand Down Expand Up @@ -784,8 +784,8 @@ func TestBatchProcessorTraceSendWhenClosing(t *testing.T) {
td := testdata.GenerateTraces(spansPerRequest)
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, batcher.ConsumeTraces(context.Background(), td))
wg.Done()
}()
}

Expand Down Expand Up @@ -827,17 +827,17 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) {
md.ResourceMetrics().At(0).CopyTo(sentResourceMetrics.AppendEmpty())
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, batcher.ConsumeMetrics(context.Background(), md))
wg.Done()
}()
}

// Added to test case with empty resources sent.
md := pmetric.NewMetrics()
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, batcher.ConsumeMetrics(context.Background(), md))
wg.Done()
}()

wg.Wait()
Expand Down Expand Up @@ -890,8 +890,8 @@ func testBatchMetricProcessorBatchSize(t *testing.T, tel testTelemetry) {
size += sizer.MetricsSize(md)
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, batcher.ConsumeMetrics(context.Background(), md))
wg.Done()
}()
}
wg.Wait()
Expand Down Expand Up @@ -960,8 +960,8 @@ func TestBatchMetricsProcessor_Timeout(t *testing.T) {
md := testdata.GenerateMetrics(metricsPerRequest)
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, batcher.ConsumeMetrics(context.Background(), md))
wg.Done()
}()
}

Expand Down Expand Up @@ -1005,8 +1005,8 @@ func TestBatchMetricProcessor_Shutdown(t *testing.T) {
md := testdata.GenerateMetrics(metricsPerRequest)
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, batcher.ConsumeMetrics(context.Background(), md))
wg.Done()
}()
}

Expand Down Expand Up @@ -1167,17 +1167,17 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) {
ld.ResourceLogs().At(0).CopyTo(sentResourceLogs.AppendEmpty())
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld))
wg.Done()
}()
}

// Added to test case with empty resources sent.
ld := plog.NewLogs()
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld))
wg.Done()
}()

wg.Wait()
Expand Down Expand Up @@ -1228,8 +1228,8 @@ func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry) {
size += sizer.LogsSize(ld)
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld))
wg.Done()
}()
}
wg.Wait()
Expand Down Expand Up @@ -1278,8 +1278,8 @@ func TestBatchLogsProcessor_Timeout(t *testing.T) {
ld := testdata.GenerateLogs(logsPerRequest)
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld))
wg.Done()
}()
}

Expand Down Expand Up @@ -1323,8 +1323,8 @@ func TestBatchLogProcessor_Shutdown(t *testing.T) {
ld := testdata.GenerateLogs(logsPerRequest)
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld))
wg.Done()
}()
}

Expand Down Expand Up @@ -1380,8 +1380,8 @@ func verifyTracesDoesNotProduceAfterShutdown(t *testing.T, factory processor.Fac
for i := 0; i < generatedCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, proc.ConsumeTraces(context.Background(), testdata.GenerateTraces(1)))
wg.Done()
}()
}

Expand Down Expand Up @@ -1482,8 +1482,8 @@ func TestBatchProcessorSpansBatchedByMetadata(t *testing.T) {
expectByContext[num] += spansPerRequest
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, batcher.ConsumeTraces(callCtxs[num], td))
wg.Done()
}()
}

Expand Down Expand Up @@ -1546,8 +1546,8 @@ func TestBatchProcessorMetadataCardinalityLimit(t *testing.T) {

wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, batcher.ConsumeTraces(ctx, td))
wg.Done()
}()
}

Expand All @@ -1561,9 +1561,9 @@ func TestBatchProcessorMetadataCardinalityLimit(t *testing.T) {

wg.Add(1)
go func() {
defer wg.Done()
err := batcher.ConsumeTraces(ctx, td)
assert.ErrorIs(t, err, errTooManyBatchers)
wg.Done()
}()

wg.Wait()
Expand Down
Loading