Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
93e2fec
- copied processorIDCounter
mahendrabishnoi2 Aug 4, 2025
9de991b
- added hook to call configureSelfObservability on BatchProcessor cre…
mahendrabishnoi2 Aug 4, 2025
8d1edbf
- run `make precommit`
mahendrabishnoi2 Aug 4, 2025
ec0b55e
Merge branch 'main' into logs-batch-processor-metrics
mahendrabishnoi2 Aug 17, 2025
385fd9f
flatten setup for self-observability to make it transparent
mahendrabishnoi2 Aug 17, 2025
5c405d7
add metricsExporter, a wrapper to record successful log processed metric
mahendrabishnoi2 Aug 17, 2025
1366caa
integrate with metricsExporter when self observability is enabled
mahendrabishnoi2 Aug 17, 2025
b2dd092
don't record metric when a log is added to queue, update comment to r…
mahendrabishnoi2 Aug 17, 2025
4f24dff
update CHANGELOG.md and README.md (sdk/log/internal/x/README.md)
mahendrabishnoi2 Aug 17, 2025
c4bf81a
Merge branch 'main' into logs-batch-processor-metrics
mahendrabishnoi2 Oct 12, 2025
4cbaff1
self observability -> observability
mahendrabishnoi2 Oct 12, 2025
9d42e5c
instrumentation implementation in a separate observ package as per ne…
mahendrabishnoi2 Oct 12, 2025
3a35f8e
remove the wrapped exporter
mahendrabishnoi2 Oct 12, 2025
96de62a
use newly created BLP abstraction for observability
mahendrabishnoi2 Oct 12, 2025
001ddb6
re-add metricsExporter with newly created struct (BLP) fo observabili…
mahendrabishnoi2 Oct 12, 2025
9371ede
use generated counter package for component names
mahendrabishnoi2 Oct 12, 2025
f3a214a
test cases for BLP
mahendrabishnoi2 Oct 12, 2025
a931308
make precommit
mahendrabishnoi2 Oct 12, 2025
df7b8f0
Merge branch 'main' into logs-batch-processor-metrics
mahendrabishnoi2 Jan 28, 2026
7c2fd2b
update CHANGELOG.md
mahendrabishnoi2 Jan 28, 2026
7238dd4
remove duplicate ScopeName
mahendrabishnoi2 Jan 28, 2026
3f2c046
Merge branch 'main' into logs-batch-processor-metrics
mahendrabishnoi2 Apr 10, 2026
a3d80da
update semconv version
mahendrabishnoi2 Apr 10, 2026
bff6fee
check `b.processed.Enabled(ctx)` before updating metrics
mahendrabishnoi2 Apr 10, 2026
5d57d29
add tests
mahendrabishnoi2 Apr 10, 2026
a12f99f
fix CHANGELOG.md
mahendrabishnoi2 Apr 10, 2026
d7fcefa
bypass lint for uint64 to int64 conversion as metric expects int64
mahendrabishnoi2 Apr 10, 2026
055f5ec
fix ci failures - add view to ignore batch log processor metrics
mahendrabishnoi2 Apr 10, 2026
eebda4b
make precommit
mahendrabishnoi2 Apr 10, 2026
84e81c4
minor refactoring in benchmark tests to reduce verbosity/duplicate code
mahendrabishnoi2 Apr 12, 2026
68025cc
minor refactoring in benchmark tests to reduce verbosity/duplicate code
mahendrabishnoi2 Apr 12, 2026
c82c8db
Merge branch 'main' into logs-batch-processor-metrics
mahendrabishnoi2 Apr 12, 2026
81872b0
Merge branch 'main' into logs-batch-processor-metrics
mahendrabishnoi2 Apr 16, 2026
52a7300
address review comments: fix nil reg panic in Shutdown, changelog wor…
mahendrabishnoi2 Apr 16, 2026
170e336
Merge branch 'main' into logs-batch-processor-metrics
mahendrabishnoi2 Apr 24, 2026
b41cc36
Merge branch 'main' into logs-batch-processor-metrics
mahendrabishnoi2 Apr 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Add experimental support for splitting metric data across multiple batches in `go.opentelemetry.io/otel/sdk/metric`.
Set `OTEL_GO_X_METRIC_EXPORT_BATCH_SIZE=<max_size>` to enable for all periodic readers.
See `go.opentelemetry.io/otel/sdk/metric/internal/x` for feature documentation. (#8071)
- Add experimental observability metrics to the BatchProcessor in `go.opentelemetry.io/otel/sdk/log`. (#7124)
- Add `WithDefaultAttributes` to `go.opentelemetry.io/otel/metric/x` to support setting default attributes on instruments. (#8135)
- Add `Settable` to `go.opentelemetry.io/otel/metric/x` to allow reusing attribute options. (#8178)
- Add experimental self-observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#8194)
Expand Down
49 changes: 41 additions & 8 deletions sdk/log/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ import (
"sync/atomic"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/log/internal/counter"
"go.opentelemetry.io/otel/sdk/log/internal/observ"
)

const (
Expand Down Expand Up @@ -98,6 +101,9 @@ type BatchProcessor struct {
// stopped holds the stopped state of the BatchProcessor.
stopped atomic.Bool

// inst is the instrumentation for observability (nil when disabled).
inst *observ.BLP

noCmp [0]func() //nolint: unused // This is indeed used.
}

Expand All @@ -111,6 +117,31 @@ func NewBatchProcessor(exporter Exporter, opts ...BatchProcessorOption) *BatchPr
// Do not panic on nil export.
exporter = defaultNoopExporter
}

b := &BatchProcessor{
q: newQueue(cfg.maxQSize.Value),
batchSize: cfg.expMaxBatchSize.Value,
pollTrigger: make(chan struct{}, 1),
pollKill: make(chan struct{}),
}

var err error
b.inst, err = observ.NewBLP(
counter.NextExporterID(),
func() int64 { return int64(b.q.Len()) },
int64(cfg.maxQSize.Value),
)
Comment on lines +128 to +133
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

counter.NextExporterID() is evaluated unconditionally before observ.NewBLP(...) can check whether observability is enabled, so the global ID counter is incremented (atomic op) even when OTEL_GO_X_OBSERVABILITY is disabled. This adds avoidable overhead in the default (disabled) path and makes component IDs depend on how many batch processors were created while disabled. Consider gating the ID generation and NewBLP call behind the feature-flag check (or moving ID generation inside NewBLP after it confirms observability is enabled).

Copilot uses AI. Check for mistakes.
if err != nil {
otel.Handle(err)
}

// Wrap exporter with metrics recording if observability is enabled.
// This must be the innermost wrapper (closest to user exporter) to record
// metrics just before calling the actual exporter.
if b.inst != nil {
exporter = newMetricsExporter(exporter, b.inst)
}
Comment on lines +128 to +143
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

observ.NewBLP(...) may return a non-nil b.inst together with a non-nil err (best-effort init). Right now the code logs the error via otel.Handle(err) but still wraps the exporter and later calls b.inst.Shutdown(), which can lead to runtime panics if the instrumentation is only partially initialized (e.g., callback registration failed). Consider treating any non-nil error as a signal to disable this instrumentation instance (b.inst = nil), or adjust NewBLP so it only returns a non-nil instance when it is safe to use/shutdown.

Copilot uses AI. Check for mistakes.

// Order is important here. Wrap the timeoutExporter with the chunkExporter
// to ensure each export completes in timeout (instead of all chunked
// exports).
Expand All @@ -119,15 +150,9 @@ func NewBatchProcessor(exporter Exporter, opts ...BatchProcessorOption) *BatchPr
// appropriately on export.
exporter = newChunkExporter(exporter, cfg.expMaxBatchSize.Value)

b := &BatchProcessor{
exporter: newBufferExporter(exporter, cfg.expBufferSize.Value),

q: newQueue(cfg.maxQSize.Value),
batchSize: cfg.expMaxBatchSize.Value,
pollTrigger: make(chan struct{}, 1),
pollKill: make(chan struct{}),
}
b.exporter = newBufferExporter(exporter, cfg.expBufferSize.Value)
b.pollDone = b.poll(cfg.expInterval.Value)

return b
}

Expand All @@ -143,6 +168,8 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) {
defer close(done)
defer ticker.Stop()

ctx := context.Background()

for {
select {
case <-ticker.C:
Expand All @@ -153,6 +180,9 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) {
}

if d := b.q.Dropped(); d > 0 {
if b.inst != nil {
b.inst.ProcessedQueueFull(ctx, int64(d)) //nolint: gosec
}
Comment thread
pellared marked this conversation as resolved.
global.Warn("dropped log records", "dropped", d)
}

Expand Down Expand Up @@ -225,6 +255,9 @@ func (b *BatchProcessor) Shutdown(ctx context.Context) error {

// Flush remaining queued before exporter shutdown.
err := b.exporter.Export(ctx, b.q.Flush())
if b.inst != nil {
err = errors.Join(err, b.inst.Shutdown())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When records have overflowed the ring buffer since the last poll iteration, Shutdown closes pollKill and can exit the poll goroutine without ever consuming q.Dropped(). Because ProcessedQueueFull is only driven from that goroutine, an application that shuts down under backpressure can silently miss otel.sdk.processor.log.processed{error.type="queue_full"} increments for the last dropped log records, so the new observability metric under-reports exactly the shutdown path users care about most.

I think that addressing this would be very hard and I would rather create an issue for tracking this.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
return errors.Join(err, b.exporter.Shutdown(ctx))
}

Expand Down
206 changes: 206 additions & 0 deletions sdk/log/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,18 @@ import (
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/sdk"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/log/internal/counter"
"go.opentelemetry.io/otel/sdk/log/internal/observ"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
semconv "go.opentelemetry.io/otel/semconv/v1.40.0"
"go.opentelemetry.io/otel/semconv/v1.40.0/otelconv"
)

type concurrentBuffer struct {
Expand Down Expand Up @@ -673,3 +683,199 @@ func BenchmarkBatchProcessorOnEmit(b *testing.B) {
_ = err
})
}

const blpComponentID int64 = 0

func TestBatchProcessorMetricsDisabled(t *testing.T) {
t.Setenv("OTEL_GO_X_OBSERVABILITY", "false")

Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test mutates the global exporter ID counter via counter.SetExporterID(...) but does not restore the previous value. Since Go test execution order across tests in the package is not guaranteed, this can leak state into other tests that create batch processors and makes failures order-dependent. Consider saving the previous counter value and restoring it in t.Cleanup(...).

Suggested change
origExporterID := counter.ExporterID()
t.Cleanup(func() { counter.SetExporterID(origExporterID) })

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar pattern as other package tests, setting to 0

counter.SetExporterID(blpComponentID)

orig := otel.GetMeterProvider()
t.Cleanup(func() { otel.SetMeterProvider(orig) })

reader := sdkmetric.NewManualReader()
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))
otel.SetMeterProvider(mp)

e := newTestExporter(nil)
bp := NewBatchProcessor(
e,
WithMaxQueueSize(2),
WithExportMaxBatchSize(2),
WithExportInterval(time.Hour),
WithExportTimeout(time.Hour),
)
ctx := t.Context()

r := new(Record)
r.SetBody(log.BoolValue(true))
require.NoError(t, bp.OnEmit(ctx, r))
require.NoError(t, bp.ForceFlush(ctx))

var rm metricdata.ResourceMetrics
require.NoError(t, reader.Collect(ctx, &rm))
for _, sm := range rm.ScopeMetrics {
assert.NotEqual(t, observ.ScopeName, sm.Scope.Name,
"observ metrics should not be present when disabled")
}

e.Stop()
require.NoError(t, bp.Shutdown(ctx))
}

func TestBatchProcessorMetrics(t *testing.T) {
counter.SetExporterID(blpComponentID)

t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
Comment on lines +727 to +730
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test mutates the global exporter ID counter via counter.SetExporterID(...) but does not restore the previous value, which can leak global state into other tests in the package depending on execution order. Consider saving the previous counter value and restoring it in t.Cleanup(...).

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above


origLogger := global.GetLogger()
t.Cleanup(func() { global.SetLogger(origLogger) })
buf := new(concurrentBuffer)
stdr.SetVerbosity(1)
global.SetLogger(stdr.New(stdlog.New(buf, "", 0)))

orig := otel.GetMeterProvider()
t.Cleanup(func() { otel.SetMeterProvider(orig) })

reader := sdkmetric.NewManualReader()
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))
otel.SetMeterProvider(mp)

e := newTestExporter(nil)
e.ExportTrigger = make(chan struct{})
bp := NewBatchProcessor(
e,
WithMaxQueueSize(1),
WithExportMaxBatchSize(1),
WithExportInterval(time.Hour),
WithExportTimeout(time.Hour),
)
ctx := t.Context()

r := new(Record)
r.SetBody(log.BoolValue(true))
require.NoError(t, bp.OnEmit(ctx, r))
require.Eventually(t, func() bool {
return e.ExportN() > 0
}, 2*time.Second, time.Microsecond, "export not started")

assertBLPMetrics(t, reader,
blpQCap(1),
blpQSize(0),
blpProcessed(blpDPt(blpSet(), 1)),
)

require.NoError(t, bp.OnEmit(ctx, r))
require.Eventually(t, func() bool {
return len(bp.exporter.input) == cap(bp.exporter.input)
}, 2*time.Second, time.Microsecond, "buffer channel not filled")

require.NoError(t, bp.OnEmit(ctx, r))
require.NoError(t, bp.OnEmit(ctx, r))

wantMsg := `"level"=1 "msg"="dropped log records" "dropped"=1`
require.Eventually(t, func() bool {
return strings.Contains(buf.String(), wantMsg)
}, 2*time.Second, time.Microsecond, "drop not detected")

assertBLPMetrics(t, reader,
blpQCap(1),
blpQSize(1),
blpProcessed(
blpDPt(blpSet(), 1),
blpDPt(blpSet(observ.ErrQueueFull), 1),
),
)

close(e.ExportTrigger)
e.Stop()
require.NoError(t, bp.Shutdown(ctx))
}

func blpSet(attrs ...attribute.KeyValue) attribute.Set {
return attribute.NewSet(append([]attribute.KeyValue{
semconv.OTelComponentTypeBatchingLogProcessor,
observ.BLPComponentName(blpComponentID),
}, attrs...)...)
}

func blpDPt(set attribute.Set, value int64) metricdata.DataPoint[int64] {
return metricdata.DataPoint[int64]{Attributes: set, Value: value}
}

func blpQCap(v int64) metricdata.Metrics {
return metricdata.Metrics{
Name: otelconv.SDKProcessorLogQueueCapacity{}.Name(),
Description: otelconv.SDKProcessorLogQueueCapacity{}.Description(),
Unit: otelconv.SDKProcessorLogQueueCapacity{}.Unit(),
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: false,
DataPoints: []metricdata.DataPoint[int64]{{Attributes: blpSet(), Value: v}},
},
}
}

func blpQSize(v int64) metricdata.Metrics {
return metricdata.Metrics{
Name: otelconv.SDKProcessorLogQueueSize{}.Name(),
Description: otelconv.SDKProcessorLogQueueSize{}.Description(),
Unit: otelconv.SDKProcessorLogQueueSize{}.Unit(),
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: false,
DataPoints: []metricdata.DataPoint[int64]{{Attributes: blpSet(), Value: v}},
},
}
}

func blpProcessed(dPts ...metricdata.DataPoint[int64]) metricdata.Metrics {
return metricdata.Metrics{
Name: otelconv.SDKProcessorLogProcessed{}.Name(),
Description: otelconv.SDKProcessorLogProcessed{}.Description(),
Unit: otelconv.SDKProcessorLogProcessed{}.Unit(),
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: dPts,
},
}
}

func assertBLPMetrics(
t *testing.T,
reader sdkmetric.Reader,
wantMetrics ...metricdata.Metrics,
) {
t.Helper()

var rm metricdata.ResourceMetrics
require.NoError(t, reader.Collect(t.Context(), &rm))

var found bool
var gotScope metricdata.ScopeMetrics
for _, sm := range rm.ScopeMetrics {
if sm.Scope.Name == observ.ScopeName {
gotScope = sm
found = true
break
}
}
require.True(t, found, "observ scope %q not found in collected metrics", observ.ScopeName)

metricdatatest.AssertEqual(
t,
metricdata.ScopeMetrics{
Scope: instrumentation.Scope{
Name: observ.ScopeName,
Version: sdk.Version(),
SchemaURL: observ.SchemaURL,
},
Metrics: wantMetrics,
},
gotScope,
metricdatatest.IgnoreTimestamp(),
metricdatatest.IgnoreExemplars(),
)
}
24 changes: 24 additions & 0 deletions sdk/log/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/sdk/log/internal/observ"
)

// Exporter handles the delivery of log records to external receivers.
Expand Down Expand Up @@ -324,3 +325,26 @@ func (e *bufferExporter) Shutdown(ctx context.Context) error {
}
return e.Exporter.Shutdown(ctx)
}

// metricsExporter wraps an Exporter to record log processing metrics
// just before calling the wrapped exporter.
type metricsExporter struct {
Exporter
inst *observ.BLP
}

// newMetricsExporter creates a metricsExporter that wraps the given exporter.
func newMetricsExporter(exporter Exporter, inst *observ.BLP) Exporter {
return &metricsExporter{
Exporter: exporter,
inst: inst,
}
}

// Export records the number of log records as a metric then forwards
// them to the wrapped Exporter. Error returned from wrapped exporter
// is not considered as per specification (to be measured by exporter).
func (e *metricsExporter) Export(ctx context.Context, records []Record) error {
e.inst.Processed(ctx, int64(len(records)))
return e.Exporter.Export(ctx, records)
}
31 changes: 31 additions & 0 deletions sdk/log/internal/counter/counter.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading