diff --git a/CHANGELOG.md b/CHANGELOG.md index e0306d6dcd5..8e226fb308f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Add experimental support for splitting metric data across multiple batches in `go.opentelemetry.io/otel/sdk/metric`. Set `OTEL_GO_X_METRIC_EXPORT_BATCH_SIZE=` to enable for all periodic readers. See `go.opentelemetry.io/otel/sdk/metric/internal/x` for feature documentation. (#8071) +- Add experimental observability metrics to the BatchProcessor in `go.opentelemetry.io/otel/sdk/log`. (#7124) - Add `WithDefaultAttributes` to `go.opentelemetry.io/otel/metric/x` to support setting default attributes on instruments. (#8135) - Add `Settable` to `go.opentelemetry.io/otel/metric/x` to allow reusing attribute options. (#8178) - Add experimental self-observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#8194) diff --git a/sdk/log/batch.go b/sdk/log/batch.go index a03248eba2d..b35e768e750 100644 --- a/sdk/log/batch.go +++ b/sdk/log/batch.go @@ -11,7 +11,10 @@ import ( "sync/atomic" "time" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/internal/global" + "go.opentelemetry.io/otel/sdk/log/internal/counter" + "go.opentelemetry.io/otel/sdk/log/internal/observ" ) const ( @@ -98,6 +101,9 @@ type BatchProcessor struct { // stopped holds the stopped state of the BatchProcessor. stopped atomic.Bool + // inst is the instrumentation for observability (nil when disabled). + inst *observ.BLP + noCmp [0]func() //nolint: unused // This is indeed used. } @@ -111,6 +117,31 @@ func NewBatchProcessor(exporter Exporter, opts ...BatchProcessorOption) *BatchPr // Do not panic on nil export. exporter = defaultNoopExporter } + + b := &BatchProcessor{ + q: newQueue(cfg.maxQSize.Value), + batchSize: cfg.expMaxBatchSize.Value, + pollTrigger: make(chan struct{}, 1), + pollKill: make(chan struct{}), + } + + var err error + b.inst, err = observ.NewBLP( + counter.NextExporterID(), + func() int64 { return int64(b.q.Len()) }, + int64(cfg.maxQSize.Value), + ) + if err != nil { + otel.Handle(err) + } + + // Wrap exporter with metrics recording if observability is enabled. + // This must be the innermost wrapper (closest to user exporter) to record + // metrics just before calling the actual exporter. + if b.inst != nil { + exporter = newMetricsExporter(exporter, b.inst) + } + // Order is important here. Wrap the timeoutExporter with the chunkExporter // to ensure each export completes in timeout (instead of all chunked // exports). @@ -119,15 +150,9 @@ func NewBatchProcessor(exporter Exporter, opts ...BatchProcessorOption) *BatchPr // appropriately on export. exporter = newChunkExporter(exporter, cfg.expMaxBatchSize.Value) - b := &BatchProcessor{ - exporter: newBufferExporter(exporter, cfg.expBufferSize.Value), - - q: newQueue(cfg.maxQSize.Value), - batchSize: cfg.expMaxBatchSize.Value, - pollTrigger: make(chan struct{}, 1), - pollKill: make(chan struct{}), - } + b.exporter = newBufferExporter(exporter, cfg.expBufferSize.Value) b.pollDone = b.poll(cfg.expInterval.Value) + return b } @@ -143,6 +168,8 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) { defer close(done) defer ticker.Stop() + ctx := context.Background() + for { select { case <-ticker.C: @@ -153,6 +180,9 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) { } if d := b.q.Dropped(); d > 0 { + if b.inst != nil { + b.inst.ProcessedQueueFull(ctx, int64(d)) //nolint: gosec + } global.Warn("dropped log records", "dropped", d) } @@ -225,6 +255,9 @@ func (b *BatchProcessor) Shutdown(ctx context.Context) error { // Flush remaining queued before exporter shutdown. err := b.exporter.Export(ctx, b.q.Flush()) + if b.inst != nil { + err = errors.Join(err, b.inst.Shutdown()) + } return errors.Join(err, b.exporter.Shutdown(ctx)) } diff --git a/sdk/log/batch_test.go b/sdk/log/batch_test.go index 81636f0be56..32a4419ad2a 100644 --- a/sdk/log/batch_test.go +++ b/sdk/log/batch_test.go @@ -20,8 +20,18 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/log" + "go.opentelemetry.io/otel/sdk" + "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/log/internal/counter" + "go.opentelemetry.io/otel/sdk/log/internal/observ" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + semconv "go.opentelemetry.io/otel/semconv/v1.40.0" + "go.opentelemetry.io/otel/semconv/v1.40.0/otelconv" ) type concurrentBuffer struct { @@ -673,3 +683,199 @@ func BenchmarkBatchProcessorOnEmit(b *testing.B) { _ = err }) } + +const blpComponentID int64 = 0 + +func TestBatchProcessorMetricsDisabled(t *testing.T) { + t.Setenv("OTEL_GO_X_OBSERVABILITY", "false") + + counter.SetExporterID(blpComponentID) + + orig := otel.GetMeterProvider() + t.Cleanup(func() { otel.SetMeterProvider(orig) }) + + reader := sdkmetric.NewManualReader() + mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) + otel.SetMeterProvider(mp) + + e := newTestExporter(nil) + bp := NewBatchProcessor( + e, + WithMaxQueueSize(2), + WithExportMaxBatchSize(2), + WithExportInterval(time.Hour), + WithExportTimeout(time.Hour), + ) + ctx := t.Context() + + r := new(Record) + r.SetBody(log.BoolValue(true)) + require.NoError(t, bp.OnEmit(ctx, r)) + require.NoError(t, bp.ForceFlush(ctx)) + + var rm metricdata.ResourceMetrics + require.NoError(t, reader.Collect(ctx, &rm)) + for _, sm := range rm.ScopeMetrics { + assert.NotEqual(t, observ.ScopeName, sm.Scope.Name, + "observ metrics should not be present when disabled") + } + + e.Stop() + require.NoError(t, bp.Shutdown(ctx)) +} + +func TestBatchProcessorMetrics(t *testing.T) { + counter.SetExporterID(blpComponentID) + + t.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + + origLogger := global.GetLogger() + t.Cleanup(func() { global.SetLogger(origLogger) }) + buf := new(concurrentBuffer) + stdr.SetVerbosity(1) + global.SetLogger(stdr.New(stdlog.New(buf, "", 0))) + + orig := otel.GetMeterProvider() + t.Cleanup(func() { otel.SetMeterProvider(orig) }) + + reader := sdkmetric.NewManualReader() + mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) + otel.SetMeterProvider(mp) + + e := newTestExporter(nil) + e.ExportTrigger = make(chan struct{}) + bp := NewBatchProcessor( + e, + WithMaxQueueSize(1), + WithExportMaxBatchSize(1), + WithExportInterval(time.Hour), + WithExportTimeout(time.Hour), + ) + ctx := t.Context() + + r := new(Record) + r.SetBody(log.BoolValue(true)) + require.NoError(t, bp.OnEmit(ctx, r)) + require.Eventually(t, func() bool { + return e.ExportN() > 0 + }, 2*time.Second, time.Microsecond, "export not started") + + assertBLPMetrics(t, reader, + blpQCap(1), + blpQSize(0), + blpProcessed(blpDPt(blpSet(), 1)), + ) + + require.NoError(t, bp.OnEmit(ctx, r)) + require.Eventually(t, func() bool { + return len(bp.exporter.input) == cap(bp.exporter.input) + }, 2*time.Second, time.Microsecond, "buffer channel not filled") + + require.NoError(t, bp.OnEmit(ctx, r)) + require.NoError(t, bp.OnEmit(ctx, r)) + + wantMsg := `"level"=1 "msg"="dropped log records" "dropped"=1` + require.Eventually(t, func() bool { + return strings.Contains(buf.String(), wantMsg) + }, 2*time.Second, time.Microsecond, "drop not detected") + + assertBLPMetrics(t, reader, + blpQCap(1), + blpQSize(1), + blpProcessed( + blpDPt(blpSet(), 1), + blpDPt(blpSet(observ.ErrQueueFull), 1), + ), + ) + + close(e.ExportTrigger) + e.Stop() + require.NoError(t, bp.Shutdown(ctx)) +} + +func blpSet(attrs ...attribute.KeyValue) attribute.Set { + return attribute.NewSet(append([]attribute.KeyValue{ + semconv.OTelComponentTypeBatchingLogProcessor, + observ.BLPComponentName(blpComponentID), + }, attrs...)...) +} + +func blpDPt(set attribute.Set, value int64) metricdata.DataPoint[int64] { + return metricdata.DataPoint[int64]{Attributes: set, Value: value} +} + +func blpQCap(v int64) metricdata.Metrics { + return metricdata.Metrics{ + Name: otelconv.SDKProcessorLogQueueCapacity{}.Name(), + Description: otelconv.SDKProcessorLogQueueCapacity{}.Description(), + Unit: otelconv.SDKProcessorLogQueueCapacity{}.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + DataPoints: []metricdata.DataPoint[int64]{{Attributes: blpSet(), Value: v}}, + }, + } +} + +func blpQSize(v int64) metricdata.Metrics { + return metricdata.Metrics{ + Name: otelconv.SDKProcessorLogQueueSize{}.Name(), + Description: otelconv.SDKProcessorLogQueueSize{}.Description(), + Unit: otelconv.SDKProcessorLogQueueSize{}.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + DataPoints: []metricdata.DataPoint[int64]{{Attributes: blpSet(), Value: v}}, + }, + } +} + +func blpProcessed(dPts ...metricdata.DataPoint[int64]) metricdata.Metrics { + return metricdata.Metrics{ + Name: otelconv.SDKProcessorLogProcessed{}.Name(), + Description: otelconv.SDKProcessorLogProcessed{}.Description(), + Unit: otelconv.SDKProcessorLogProcessed{}.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dPts, + }, + } +} + +func assertBLPMetrics( + t *testing.T, + reader sdkmetric.Reader, + wantMetrics ...metricdata.Metrics, +) { + t.Helper() + + var rm metricdata.ResourceMetrics + require.NoError(t, reader.Collect(t.Context(), &rm)) + + var found bool + var gotScope metricdata.ScopeMetrics + for _, sm := range rm.ScopeMetrics { + if sm.Scope.Name == observ.ScopeName { + gotScope = sm + found = true + break + } + } + require.True(t, found, "observ scope %q not found in collected metrics", observ.ScopeName) + + metricdatatest.AssertEqual( + t, + metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{ + Name: observ.ScopeName, + Version: sdk.Version(), + SchemaURL: observ.SchemaURL, + }, + Metrics: wantMetrics, + }, + gotScope, + metricdatatest.IgnoreTimestamp(), + metricdatatest.IgnoreExemplars(), + ) +} diff --git a/sdk/log/exporter.go b/sdk/log/exporter.go index a9d3c439ba3..e80fcbbf79d 100644 --- a/sdk/log/exporter.go +++ b/sdk/log/exporter.go @@ -12,6 +12,7 @@ import ( "time" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/sdk/log/internal/observ" ) // Exporter handles the delivery of log records to external receivers. @@ -324,3 +325,26 @@ func (e *bufferExporter) Shutdown(ctx context.Context) error { } return e.Exporter.Shutdown(ctx) } + +// metricsExporter wraps an Exporter to record log processing metrics +// just before calling the wrapped exporter. +type metricsExporter struct { + Exporter + inst *observ.BLP +} + +// newMetricsExporter creates a metricsExporter that wraps the given exporter. +func newMetricsExporter(exporter Exporter, inst *observ.BLP) Exporter { + return &metricsExporter{ + Exporter: exporter, + inst: inst, + } +} + +// Export records the number of log records as a metric then forwards +// them to the wrapped Exporter. Error returned from wrapped exporter +// is not considered as per specification (to be measured by exporter). +func (e *metricsExporter) Export(ctx context.Context, records []Record) error { + e.inst.Processed(ctx, int64(len(records))) + return e.Exporter.Export(ctx, records) +} diff --git a/sdk/log/internal/counter/counter.go b/sdk/log/internal/counter/counter.go new file mode 100644 index 00000000000..4eed9ae9c4e --- /dev/null +++ b/sdk/log/internal/counter/counter.go @@ -0,0 +1,31 @@ +// Code generated by gotmpl. DO NOT MODIFY. +// source: internal/shared/counter/counter.go.tmpl + +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package counter provides a simple counter for generating unique IDs. +// +// This package is used to generate unique IDs while allowing testing packages +// to reset the counter. +package counter // import "go.opentelemetry.io/otel/sdk/log/internal/counter" + +import "sync/atomic" + +// exporterN is a global 0-based count of the number of exporters created. +var exporterN atomic.Int64 + +// NextExporterID returns the next unique ID for an exporter. +func NextExporterID() int64 { + const inc = 1 + return exporterN.Add(inc) - inc +} + +// SetExporterID sets the exporter ID counter to v and returns the previous +// value. +// +// This function is useful for testing purposes, allowing you to reset the +// counter. It should not be used in production code. +func SetExporterID(v int64) int64 { + return exporterN.Swap(v) +} diff --git a/sdk/log/internal/counter/counter_test.go b/sdk/log/internal/counter/counter_test.go new file mode 100644 index 00000000000..f3e380d3325 --- /dev/null +++ b/sdk/log/internal/counter/counter_test.go @@ -0,0 +1,65 @@ +// Code generated by gotmpl. DO NOT MODIFY. +// source: internal/shared/counter/counter_test.go.tmpl + +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package counter + +import ( + "sync" + "testing" +) + +func TestNextExporterID(t *testing.T) { + SetExporterID(0) + + var expected int64 + for range 10 { + id := NextExporterID() + if id != expected { + t.Errorf("NextExporterID() = %d; want %d", id, expected) + } + expected++ + } +} + +func TestSetExporterID(t *testing.T) { + SetExporterID(0) + + prev := SetExporterID(42) + if prev != 0 { + t.Errorf("SetExporterID(42) returned %d; want 0", prev) + } + + id := NextExporterID() + if id != 42 { + t.Errorf("NextExporterID() = %d; want 42", id) + } +} + +func TestNextExporterIDConcurrentSafe(t *testing.T) { + SetExporterID(0) + + const goroutines = 100 + const increments = 10 + + var wg sync.WaitGroup + wg.Add(goroutines) + + for range goroutines { + go func() { + defer wg.Done() + for range increments { + NextExporterID() + } + }() + } + + wg.Wait() + + expected := int64(goroutines * increments) + if id := NextExporterID(); id != expected { + t.Errorf("NextExporterID() = %d; want %d", id, expected) + } +} \ No newline at end of file diff --git a/sdk/log/internal/gen.go b/sdk/log/internal/gen.go index dee3f808f95..f7bc509bc32 100644 --- a/sdk/log/internal/gen.go +++ b/sdk/log/internal/gen.go @@ -6,3 +6,6 @@ package internal // import "go.opentelemetry.io/otel/sdk/log/internal" //go:generate gotmpl --body=../../../internal/shared/x/x.go.tmpl "--data={ \"pkg\": \"go.opentelemetry.io/otel/sdk/log\" }" --out=x/x.go //go:generate gotmpl --body=../../../internal/shared/x/x_test.go.tmpl "--data={}" --out=x/x_test.go + +//go:generate gotmpl --body=../../../internal/shared/counter/counter.go.tmpl "--data={ \"pkg\": \"go.opentelemetry.io/otel/sdk/log\" }" --out=counter/counter.go +//go:generate gotmpl --body=../../../internal/shared/counter/counter_test.go.tmpl "--data={}" --out=counter/counter_test.go diff --git a/sdk/log/internal/observ/batch_log_processor.go b/sdk/log/internal/observ/batch_log_processor.go new file mode 100644 index 00000000000..02b6388952e --- /dev/null +++ b/sdk/log/internal/observ/batch_log_processor.go @@ -0,0 +1,128 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package observ // import "go.opentelemetry.io/otel/sdk/log/internal/observ" + +import ( + "context" + "errors" + "fmt" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/sdk" + "go.opentelemetry.io/otel/sdk/log/internal/x" + semconv "go.opentelemetry.io/otel/semconv/v1.40.0" + "go.opentelemetry.io/otel/semconv/v1.40.0/otelconv" +) + +const ( + // SchemaURL is the schema URL of the instrumentation. + SchemaURL = semconv.SchemaURL +) + +// ErrQueueFull is the attribute value for the "queue_full" error type. +var ErrQueueFull = otelconv.SDKProcessorLogProcessed{}.AttrErrorType("queue_full") + +// BLPComponentName returns the component name attribute for a +// BatchLogProcessor with the given ID. +func BLPComponentName(id int64) attribute.KeyValue { + t := otelconv.ComponentTypeBatchingLogProcessor + name := fmt.Sprintf("%s/%d", t, id) + return semconv.OTelComponentName(name) +} + +// BLP is the instrumentation for an OTel SDK BatchLogProcessor. +type BLP struct { + reg metric.Registration + + processed metric.Int64Counter + processedOpts []metric.AddOption + processedQueueFullOpts []metric.AddOption +} + +// NewBLP creates a new BatchLogProcessor instrumentation. +// Returns nil if observability is not enabled. +func NewBLP(id int64, qLen func() int64, qMax int64) (*BLP, error) { + if !x.Observability.Enabled() { + return nil, nil + } + + meter := otel.GetMeterProvider().Meter( + ScopeName, + metric.WithInstrumentationVersion(sdk.Version()), + metric.WithSchemaURL(SchemaURL), + ) + + var err error + qCap, e := otelconv.NewSDKProcessorLogQueueCapacity(meter) + if e != nil { + e = fmt.Errorf("failed to create BLP queue capacity metric: %w", e) + err = errors.Join(err, e) + } + qCapInst := qCap.Inst() + + qSize, e := otelconv.NewSDKProcessorLogQueueSize(meter) + if e != nil { + e = fmt.Errorf("failed to create BLP queue size metric: %w", e) + err = errors.Join(err, e) + } + qSizeInst := qSize.Inst() + + cmpntT := semconv.OTelComponentTypeBatchingLogProcessor + cmpnt := BLPComponentName(id) + set := attribute.NewSet(cmpnt, cmpntT) + + // Register callback for async metrics + obsOpts := []metric.ObserveOption{metric.WithAttributeSet(set)} + reg, e := meter.RegisterCallback( + func(_ context.Context, o metric.Observer) error { + o.ObserveInt64(qSizeInst, qLen(), obsOpts...) + o.ObserveInt64(qCapInst, qMax, obsOpts...) + return nil + }, + qSizeInst, + qCapInst, + ) + if e != nil { + e = fmt.Errorf("failed to register BLP queue size/capacity callback: %w", e) + err = errors.Join(err, e) + } + + processed, e := otelconv.NewSDKProcessorLogProcessed(meter) + if e != nil { + e = fmt.Errorf("failed to create BLP processed logs metric: %w", e) + err = errors.Join(err, e) + } + + processedOpts := []metric.AddOption{metric.WithAttributeSet(set)} + setWithError := attribute.NewSet(cmpnt, cmpntT, ErrQueueFull) + processedQueueFullOpts := []metric.AddOption{metric.WithAttributeSet(setWithError)} + + return &BLP{ + reg: reg, + processed: processed.Inst(), + processedOpts: processedOpts, + processedQueueFullOpts: processedQueueFullOpts, + }, err +} + +func (b *BLP) Shutdown() error { + if b.reg == nil { + return nil + } + return b.reg.Unregister() +} + +func (b *BLP) Processed(ctx context.Context, n int64) { + if b.processed.Enabled(ctx) { + b.processed.Add(ctx, n, b.processedOpts...) + } +} + +func (b *BLP) ProcessedQueueFull(ctx context.Context, n int64) { + if b.processed.Enabled(ctx) { + b.processed.Add(ctx, n, b.processedQueueFullOpts...) + } +} diff --git a/sdk/log/internal/observ/batch_log_processor_test.go b/sdk/log/internal/observ/batch_log_processor_test.go new file mode 100644 index 00000000000..ef34b8ba1b8 --- /dev/null +++ b/sdk/log/internal/observ/batch_log_processor_test.go @@ -0,0 +1,206 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package observ_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/sdk/log/internal/observ" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + semconv "go.opentelemetry.io/otel/semconv/v1.40.0" + "go.opentelemetry.io/otel/semconv/v1.40.0/otelconv" +) + +const id = 0 + +func TestBLPComponentName(t *testing.T) { + got := observ.BLPComponentName(42) + want := semconv.OTelComponentName("batching_log_processor/42") + assert.Equal(t, want, got) +} + +func TestNewBLPDisabled(t *testing.T) { + blp, err := observ.NewBLP(id, nil, 0) + assert.NoError(t, err) + assert.Nil(t, blp) +} + +func TestNewBLPErrors(t *testing.T) { + t.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + + orig := otel.GetMeterProvider() + t.Cleanup(func() { otel.SetMeterProvider(orig) }) + + mp := &errMeterProvider{err: assert.AnError} + otel.SetMeterProvider(mp) + + _, err := observ.NewBLP(id, nil, 0) + require.ErrorIs(t, err, assert.AnError, "new instrument errors") + + assert.ErrorContains(t, err, "create BLP queue capacity metric") + assert.ErrorContains(t, err, "create BLP queue size metric") + assert.ErrorContains(t, err, "register BLP queue size/capacity callback") + assert.ErrorContains(t, err, "create BLP processed logs metric") +} + +func blpSet(attrs ...attribute.KeyValue) attribute.Set { + return attribute.NewSet(append([]attribute.KeyValue{ + semconv.OTelComponentTypeBatchingLogProcessor, + observ.BLPComponentName(id), + }, attrs...)...) +} + +func qCap(v int64) metricdata.Metrics { + return metricdata.Metrics{ + Name: otelconv.SDKProcessorLogQueueCapacity{}.Name(), + Description: otelconv.SDKProcessorLogQueueCapacity{}.Description(), + Unit: otelconv.SDKProcessorLogQueueCapacity{}.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + DataPoints: []metricdata.DataPoint[int64]{ + {Attributes: blpSet(), Value: v}, + }, + }, + } +} + +func qSize(v int64) metricdata.Metrics { + return metricdata.Metrics{ + Name: otelconv.SDKProcessorLogQueueSize{}.Name(), + Description: otelconv.SDKProcessorLogQueueSize{}.Description(), + Unit: otelconv.SDKProcessorLogQueueSize{}.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + DataPoints: []metricdata.DataPoint[int64]{ + {Attributes: blpSet(), Value: v}, + }, + }, + } +} + +func TestBLPCallback(t *testing.T) { + collect := setup(t) + + var n int64 = 3 + blp, err := observ.NewBLP(id, func() int64 { return n }, 5) + require.NoError(t, err) + require.NotNil(t, blp) + + check(t, collect(), qSize(n), qCap(5)) + + n = 4 + check(t, collect(), qSize(n), qCap(5)) + + require.NoError(t, blp.Shutdown()) + got := collect() + assert.Empty(t, got.Metrics, "no metrics after shutdown") +} + +func processed(dPts ...metricdata.DataPoint[int64]) metricdata.Metrics { + return metricdata.Metrics{ + Name: otelconv.SDKProcessorLogProcessed{}.Name(), + Description: otelconv.SDKProcessorLogProcessed{}.Description(), + Unit: otelconv.SDKProcessorLogProcessed{}.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dPts, + }, + } +} + +func TestBLPProcessed(t *testing.T) { + collect := setup(t) + + blp, err := observ.NewBLP(id, nil, 0) + require.NoError(t, err) + require.NotNil(t, blp) + require.NoError(t, blp.Shutdown()) // Unregister callback. + + ctx := t.Context() + const p0 int64 = 10 + blp.Processed(ctx, p0) + const e0 int64 = 1 + blp.ProcessedQueueFull(ctx, e0) + check(t, collect(), processed( + dPt(blpSet(), p0), + dPt(blpSet(observ.ErrQueueFull), e0), + )) + + const p1 int64 = 20 + blp.Processed(ctx, p1) + const e1 int64 = 2 + blp.ProcessedQueueFull(ctx, e1) + check(t, collect(), processed( + dPt(blpSet(), p0+p1), + dPt(blpSet(observ.ErrQueueFull), e0+e1), + )) +} + +func BenchmarkBLP(b *testing.B) { + b.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + + orig := otel.GetMeterProvider() + b.Cleanup(func() { otel.SetMeterProvider(orig) }) + + newBLP := func(b *testing.B) *observ.BLP { + b.Helper() + blp, err := observ.NewBLP(id, func() int64 { return 3 }, 5) + require.NoError(b, err) + require.NotNil(b, blp) + b.Cleanup(func() { + if err := blp.Shutdown(); err != nil { + b.Errorf("Shutdown: %v", err) + } + }) + return blp + } + ctx := b.Context() + + for _, tt := range []struct { + name string + fn func(*observ.BLP, context.Context) + }{ + {"Processed", func(blp *observ.BLP, ctx context.Context) { blp.Processed(ctx, 10) }}, + {"ProcessedQueueFull", func(blp *observ.BLP, ctx context.Context) { blp.ProcessedQueueFull(ctx, 1) }}, + } { + b.Run(tt.name, func(b *testing.B) { + otel.SetMeterProvider(noop.NewMeterProvider()) + blp := newBLP(b) + b.ResetTimer() + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + tt.fn(blp, ctx) + } + }) + }) + } + + b.Run("Callback", func(b *testing.B) { + reader := metric.NewManualReader() + mp := metric.NewMeterProvider(metric.WithReader(reader)) + otel.SetMeterProvider(mp) + + blp := newBLP(b) + var got metricdata.ResourceMetrics + + b.ResetTimer() + b.ReportAllocs() + for b.Loop() { + _ = reader.Collect(ctx, &got) + } + _ = blp + }) +} diff --git a/sdk/log/internal/observ/observ_test.go b/sdk/log/internal/observ/observ_test.go new file mode 100644 index 00000000000..7726dff31f9 --- /dev/null +++ b/sdk/log/internal/observ/observ_test.go @@ -0,0 +1,104 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package observ_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + mapi "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/sdk" + "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/log/internal/observ" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" +) + +func setup(t *testing.T) func() metricdata.ScopeMetrics { + t.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + + orig := otel.GetMeterProvider() + t.Cleanup(func() { otel.SetMeterProvider(orig) }) + + reader := metric.NewManualReader() + mp := metric.NewMeterProvider(metric.WithReader(reader)) + otel.SetMeterProvider(mp) + + return func() metricdata.ScopeMetrics { + var got metricdata.ResourceMetrics + require.NoError(t, reader.Collect(t.Context(), &got)) + for _, sm := range got.ScopeMetrics { + if sm.Scope.Name == observ.ScopeName { + return sm + } + } + return metricdata.ScopeMetrics{} + } +} + +func scopeMetrics(metrics ...metricdata.Metrics) metricdata.ScopeMetrics { + return metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{ + Name: observ.ScopeName, + Version: sdk.Version(), + SchemaURL: observ.SchemaURL, + }, + Metrics: metrics, + } +} + +func check(t *testing.T, got metricdata.ScopeMetrics, want ...metricdata.Metrics) { + o := []metricdatatest.Option{ + metricdatatest.IgnoreTimestamp(), + metricdatatest.IgnoreExemplars(), + } + metricdatatest.AssertEqual(t, scopeMetrics(want...), got, o...) +} + +func dPt(set attribute.Set, value int64) metricdata.DataPoint[int64] { + return metricdata.DataPoint[int64]{Attributes: set, Value: value} +} + +type errMeterProvider struct { + mapi.MeterProvider + + err error +} + +func (m *errMeterProvider) Meter(string, ...mapi.MeterOption) mapi.Meter { + return &errMeter{err: m.err} +} + +type errMeter struct { + mapi.Meter + + err error +} + +func (m *errMeter) Int64UpDownCounter(string, ...mapi.Int64UpDownCounterOption) (mapi.Int64UpDownCounter, error) { + return nil, m.err +} + +func (m *errMeter) Int64Counter(string, ...mapi.Int64CounterOption) (mapi.Int64Counter, error) { + return nil, m.err +} + +func (m *errMeter) Int64ObservableUpDownCounter( + string, + ...mapi.Int64ObservableUpDownCounterOption, +) (mapi.Int64ObservableUpDownCounter, error) { + return nil, m.err +} + +func (m *errMeter) Float64Histogram(string, ...mapi.Float64HistogramOption) (mapi.Float64Histogram, error) { + return nil, m.err +} + +func (m *errMeter) RegisterCallback(mapi.Callback, ...mapi.Observable) (mapi.Registration, error) { + return nil, m.err +} diff --git a/sdk/log/internal/x/README.md b/sdk/log/internal/x/README.md index 33176f78e30..5c06e2723ae 100644 --- a/sdk/log/internal/x/README.md +++ b/sdk/log/internal/x/README.md @@ -19,6 +19,9 @@ To opt-in, set the environment variable `OTEL_GO_X_OBSERVABILITY` to `true`. When enabled, the SDK will create the following metrics using the global `MeterProvider`: - `otel.sdk.log.created` +- `otel.sdk.processor.log.queue.capacity` +- `otel.sdk.processor.log.queue.size` +- `otel.sdk.processor.log.processed` Please see the [Semantic conventions for OpenTelemetry SDK metrics] documentation for more details on these metrics. diff --git a/sdk/log/logger_bench_test.go b/sdk/log/logger_bench_test.go index 40174d5e25d..ba3200832fc 100644 --- a/sdk/log/logger_bench_test.go +++ b/sdk/log/logger_bench_test.go @@ -10,6 +10,8 @@ import ( "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/sdk/log/internal/observ" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/log" "go.opentelemetry.io/otel/sdk/instrumentation" @@ -73,7 +75,13 @@ func BenchmarkLoggerEmitObservability(b *testing.B) { orig := otel.GetMeterProvider() b.Cleanup(func() { otel.SetMeterProvider(orig) }) reader := metric.NewManualReader() - mp := metric.NewMeterProvider(metric.WithReader(reader)) + dropBLPMetrics := metric.NewView( + metric.Instrument{ + Scope: instrumentation.Scope{Name: observ.ScopeName}, + }, + metric.Stream{Aggregation: metric.AggregationDrop{}}, + ) + mp := metric.NewMeterProvider(metric.WithReader(reader), metric.WithView(dropBLPMetrics)) otel.SetMeterProvider(mp) run := func(logger *logger) func(b *testing.B) {