diff --git a/.chloggen/fix-ub-batch-processor.yaml b/.chloggen/fix-ub-batch-processor.yaml new file mode 100644 index 00000000000..9468c96fb8a --- /dev/null +++ b/.chloggen/fix-ub-batch-processor.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: batchprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix UB in batch processor when trying to read bytes size after adding request to pipeline + +# One or more tracking issues or pull requests related to the change +issues: [13698] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: This bug only happens id detailed metrics are enabled and also an async (sending queue enabled) exporter that mutates data is configure. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/processor/batchprocessor/batch_processor.go b/processor/batchprocessor/batch_processor.go index d0ffe4148b3..0c43fb6f17b 100644 --- a/processor/batchprocessor/batch_processor.go +++ b/processor/batchprocessor/batch_processor.go @@ -260,14 +260,8 @@ func (b *shard[T]) resetTimer() { func (b *shard[T]) sendItems(trigger trigger) { sent, req := b.batch.split(b.processor.sendBatchMaxSize) - err := b.batch.export(b.exportCtx, req) - if err != nil { - b.processor.logger.Warn("Sender failed", zap.Error(err)) - return - } - var bytes int bpt := b.processor.telemetry - + var bytes int // Check if the instrument is enabled to calculate the size of the batch in bytes. // See https://pkg.go.dev/go.opentelemetry.io/otel/sdk/metric/internal/x#readme-instrument-enabled batchSendSizeBytes := bpt.telemetryBuilder.ProcessorBatchBatchSendSizeBytes @@ -276,6 +270,11 @@ func (b *shard[T]) sendItems(trigger trigger) { bytes = b.batch.sizeBytes(req) } + err := b.batch.export(b.exportCtx, req) + if err != nil { + b.processor.logger.Warn("Sender failed", zap.Error(err)) + return + } bpt.record(trigger, int64(sent), int64(bytes)) } @@ -442,6 +441,7 @@ func newBatchTraces(nextConsumer consumer.Traces) *batchTraces { // add updates current batchTraces by adding new TraceData object func (bt *batchTraces) add(td ptrace.Traces) { + defer pref.UnrefTraces(td) newSpanCount := td.SpanCount() if newSpanCount == 0 { return @@ -449,7 +449,6 @@ func (bt *batchTraces) add(td ptrace.Traces) { bt.spanCount += newSpanCount td.ResourceSpans().MoveAndAppendTo(bt.traceData.ResourceSpans()) - pref.UnrefTraces(td) } func (bt *batchTraces) sizeBytes(td ptrace.Traces) int { @@ -521,13 +520,13 @@ func (bm *batchMetrics) itemCount() int { } func (bm *batchMetrics) add(md pmetric.Metrics) { + defer pref.UnrefMetrics(md) newDataPointCount := md.DataPointCount() if newDataPointCount == 0 { return } bm.dataPointCount += newDataPointCount md.ResourceMetrics().MoveAndAppendTo(bm.metricData.ResourceMetrics()) - pref.UnrefMetrics(md) } type batchLogs struct { @@ -571,11 +570,11 @@ func (bl *batchLogs) itemCount() int { } func (bl *batchLogs) add(ld plog.Logs) { + defer pref.UnrefLogs(ld) newLogsCount := ld.LogRecordCount() if newLogsCount == 0 { return } bl.logCount += newLogsCount ld.ResourceLogs().MoveAndAppendTo(bl.logData.ResourceLogs()) - pref.UnrefLogs(ld) }