diff --git a/CHANGELOG.md b/CHANGELOG.md index 9e8d56765ea..da0769199c0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc`. (#7353) - Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc`. (#7459) - Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. (#7486) +- Add experimental observability metrics for simple span processor in `go.opentelemetry.io/otel/sdk/trace`. (#7374) ### Fixed diff --git a/sdk/trace/internal/observ/simple_span_processor.go b/sdk/trace/internal/observ/simple_span_processor.go new file mode 100644 index 00000000000..7d33870613a --- /dev/null +++ b/sdk/trace/internal/observ/simple_span_processor.go @@ -0,0 +1,97 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package observ // import "go.opentelemetry.io/otel/sdk/trace/internal/observ" + +import ( + "context" + "fmt" + "sync" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/sdk" + "go.opentelemetry.io/otel/sdk/internal/x" + semconv "go.opentelemetry.io/otel/semconv/v1.37.0" + "go.opentelemetry.io/otel/semconv/v1.37.0/otelconv" +) + +var measureAttrsPool = sync.Pool{ + New: func() any { + // "component.name" + "component.type" + "error.type" + const n = 1 + 1 + 1 + s := make([]attribute.KeyValue, 0, n) + // Return a pointer to a slice instead of a slice itself + // to avoid allocations on every call. + return &s + }, +} + +// SSP is the instrumentation for an OTel SDK SimpleSpanProcessor. +type SSP struct { + spansProcessedCounter metric.Int64Counter + addOpts []metric.AddOption + attrs []attribute.KeyValue +} + +// SSPComponentName returns the component name attribute for a +// SimpleSpanProcessor with the given ID. +func SSPComponentName(id int64) attribute.KeyValue { + t := otelconv.ComponentTypeSimpleSpanProcessor + name := fmt.Sprintf("%s/%d", t, id) + return semconv.OTelComponentName(name) +} + +// NewSSP returns instrumentation for an OTel SDK SimpleSpanProcessor with the +// provided ID. +// +// If the experimental observability is disabled, nil is returned. +func NewSSP(id int64) (*SSP, error) { + if !x.Observability.Enabled() { + return nil, nil + } + + meter := otel.GetMeterProvider().Meter( + ScopeName, + metric.WithInstrumentationVersion(sdk.Version()), + metric.WithSchemaURL(SchemaURL), + ) + spansProcessedCounter, err := otelconv.NewSDKProcessorSpanProcessed(meter) + if err != nil { + err = fmt.Errorf("failed to create SSP processed spans metric: %w", err) + } + + componentName := SSPComponentName(id) + componentType := spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeSimpleSpanProcessor) + attrs := []attribute.KeyValue{componentName, componentType} + addOpts := []metric.AddOption{metric.WithAttributeSet(attribute.NewSet(attrs...))} + + return &SSP{ + spansProcessedCounter: spansProcessedCounter.Inst(), + addOpts: addOpts, + attrs: attrs, + }, err +} + +// SpanProcessed records that a span has been processed by the SimpleSpanProcessor. +// If err is non-nil, it records the processing error as an attribute. +func (ssp *SSP) SpanProcessed(ctx context.Context, err error) { + ssp.spansProcessedCounter.Add(ctx, 1, ssp.addOption(err)...) +} + +func (ssp *SSP) addOption(err error) []metric.AddOption { + if err == nil { + return ssp.addOpts + } + attrs := measureAttrsPool.Get().(*[]attribute.KeyValue) + defer func() { + *attrs = (*attrs)[:0] // reset the slice for reuse + measureAttrsPool.Put(attrs) + }() + *attrs = append(*attrs, ssp.attrs...) + *attrs = append(*attrs, semconv.ErrorType(err)) + // Do not inefficiently make a copy of attrs by using + // WithAttributes instead of WithAttributeSet. + return []metric.AddOption{metric.WithAttributeSet(attribute.NewSet(*attrs...))} +} diff --git a/sdk/trace/internal/observ/simple_span_processor_test.go b/sdk/trace/internal/observ/simple_span_processor_test.go new file mode 100644 index 00000000000..a6b0e359e22 --- /dev/null +++ b/sdk/trace/internal/observ/simple_span_processor_test.go @@ -0,0 +1,127 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 +package observ_test + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/sdk/trace/internal/observ" + semconv "go.opentelemetry.io/otel/semconv/v1.37.0" +) + +const sspComponentID = 0 + +func TestSSPComponentName(t *testing.T) { + got := observ.SSPComponentName(10) + want := semconv.OTelComponentName("simple_span_processor/10") + assert.Equal(t, want, got) +} + +func TestNewSSPError(t *testing.T) { + t.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + + orig := otel.GetMeterProvider() + t.Cleanup(func() { otel.SetMeterProvider(orig) }) + + mp := &errMeterProvider{err: assert.AnError} + otel.SetMeterProvider(mp) + + _, err := observ.NewSSP(sspComponentID) + require.ErrorIs(t, err, assert.AnError, "new instrument errors") + assert.ErrorContains(t, err, "create SSP processed spans metric") +} + +func TestNewSSPDisabled(t *testing.T) { + ssp, err := observ.NewSSP(sspComponentID) + assert.NoError(t, err) + assert.Nil(t, ssp) +} + +func TestSSPSpanProcessed(t *testing.T) { + ctx := t.Context() + collect := setup(t) + ssp, err := observ.NewSSP(sspComponentID) + assert.NoError(t, err) + + ssp.SpanProcessed(ctx, nil) + check(t, collect(), processed(dPt(sspSet(), 1))) + ssp.SpanProcessed(ctx, nil) + ssp.SpanProcessed(ctx, nil) + check(t, collect(), processed(dPt(sspSet(), 3))) + + processErr := errors.New("error processing span") + ssp.SpanProcessed(ctx, processErr) + check(t, collect(), processed( + dPt(sspSet(), 3), + dPt(sspSet(semconv.ErrorType(processErr)), 1), + )) +} + +func BenchmarkSSP(b *testing.B) { + b.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + + newSSP := func(b *testing.B) *observ.SSP { + b.Helper() + ssp, err := observ.NewSSP(sspComponentID) + require.NoError(b, err) + require.NotNil(b, ssp) + return ssp + } + + b.Run("SpanProcessed", func(b *testing.B) { + orig := otel.GetMeterProvider() + b.Cleanup(func() { + otel.SetMeterProvider(orig) + }) + + // Ensure deterministic benchmark by using noop meter. + otel.SetMeterProvider(noop.NewMeterProvider()) + + ssp := newSSP(b) + ctx := b.Context() + + b.ResetTimer() + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + ssp.SpanProcessed(ctx, nil) + } + }) + }) + + b.Run("SpanProcessedWithError", func(b *testing.B) { + orig := otel.GetMeterProvider() + b.Cleanup(func() { + otel.SetMeterProvider(orig) + }) + + // Ensure deterministic benchmark by using noop meter. + otel.SetMeterProvider(noop.NewMeterProvider()) + + ssp := newSSP(b) + ctx := b.Context() + processErr := errors.New("error processing span") + + b.ResetTimer() + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + ssp.SpanProcessed(ctx, processErr) + } + }) + }) +} + +func sspSet(attrs ...attribute.KeyValue) attribute.Set { + return attribute.NewSet(append([]attribute.KeyValue{ + semconv.OTelComponentTypeSimpleSpanProcessor, + observ.SSPComponentName(sspComponentID), + }, attrs...)...) +} diff --git a/sdk/trace/simple_span_processor.go b/sdk/trace/simple_span_processor.go index 411d9ccdd78..771e427a4c5 100644 --- a/sdk/trace/simple_span_processor.go +++ b/sdk/trace/simple_span_processor.go @@ -6,9 +6,12 @@ package trace // import "go.opentelemetry.io/otel/sdk/trace" import ( "context" "sync" + "sync/atomic" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/internal/global" + "go.opentelemetry.io/otel/sdk/trace/internal/observ" + "go.opentelemetry.io/otel/trace" ) // simpleSpanProcessor is a SpanProcessor that synchronously sends all @@ -17,6 +20,8 @@ type simpleSpanProcessor struct { exporterMu sync.Mutex exporter SpanExporter stopOnce sync.Once + + inst *observ.SSP } var _ SpanProcessor = (*simpleSpanProcessor)(nil) @@ -33,11 +38,26 @@ func NewSimpleSpanProcessor(exporter SpanExporter) SpanProcessor { ssp := &simpleSpanProcessor{ exporter: exporter, } + + var err error + ssp.inst, err = observ.NewSSP(nextSimpleProcessorID()) + if err != nil { + otel.Handle(err) + } + global.Warn("SimpleSpanProcessor is not recommended for production use, consider using BatchSpanProcessor instead.") return ssp } +var simpleProcessorIDCounter atomic.Int64 + +// nextSimpleProcessorID returns an identifier for this simple span processor, +// starting with 0 and incrementing by 1 each time it is called. +func nextSimpleProcessorID() int64 { + return simpleProcessorIDCounter.Add(1) - 1 +} + // OnStart does nothing. func (*simpleSpanProcessor) OnStart(context.Context, ReadWriteSpan) {} @@ -46,11 +66,20 @@ func (ssp *simpleSpanProcessor) OnEnd(s ReadOnlySpan) { ssp.exporterMu.Lock() defer ssp.exporterMu.Unlock() + var err error if ssp.exporter != nil && s.SpanContext().TraceFlags().IsSampled() { - if err := ssp.exporter.ExportSpans(context.Background(), []ReadOnlySpan{s}); err != nil { + err = ssp.exporter.ExportSpans(context.Background(), []ReadOnlySpan{s}) + if err != nil { otel.Handle(err) } } + + if ssp.inst != nil { + // Add the span to the context to ensure the metric is recorded + // with the correct span context. + ctx := trace.ContextWithSpanContext(context.Background(), s.SpanContext()) + ssp.inst.SpanProcessed(ctx, err) + } } // Shutdown shuts down the exporter this SimpleSpanProcessor exports to. diff --git a/sdk/trace/simple_span_processor_test.go b/sdk/trace/simple_span_processor_test.go index 62139a6c0e0..be01ec2fc14 100644 --- a/sdk/trace/simple_span_processor_test.go +++ b/sdk/trace/simple_span_processor_test.go @@ -6,11 +6,23 @@ package trace import ( "context" "errors" + "strconv" "sync" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk" + "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + semconv "go.opentelemetry.io/otel/semconv/v1.37.0" + "go.opentelemetry.io/otel/semconv/v1.37.0/otelconv" ) type simpleTestExporter struct { @@ -34,6 +46,17 @@ func (t *simpleTestExporter) Shutdown(ctx context.Context) error { } } +var _ SpanExporter = (*failingTestExporter)(nil) + +type failingTestExporter struct { + simpleTestExporter +} + +func (f *failingTestExporter) ExportSpans(ctx context.Context, spans []ReadOnlySpan) error { + _ = f.simpleTestExporter.ExportSpans(ctx, spans) + return errors.New("failed to export spans") +} + var _ SpanExporter = (*simpleTestExporter)(nil) func TestNewSimpleSpanProcessor(t *testing.T) { @@ -168,3 +191,140 @@ func TestSimpleSpanProcessorShutdownHonorsContextCancel(t *testing.T) { t.Errorf("SimpleSpanProcessor.Shutdown did not return %v, got %v", want, got) } } + +func TestSimpleSpanProcessorObservability(t *testing.T) { + tests := []struct { + name string + enabled bool + exporter SpanExporter + assertMetrics func(t *testing.T, rm metricdata.ResourceMetrics) + }{ + { + name: "Disabled", + enabled: false, + exporter: &simpleTestExporter{}, + assertMetrics: func(t *testing.T, rm metricdata.ResourceMetrics) { + assert.Empty(t, rm.ScopeMetrics) + }, + }, + { + name: "Enabled", + enabled: true, + exporter: &simpleTestExporter{}, + assertMetrics: func(t *testing.T, rm metricdata.ResourceMetrics) { + assert.Len(t, rm.ScopeMetrics, 1) + sm := rm.ScopeMetrics[0] + + want := metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{ + Name: "go.opentelemetry.io/otel/sdk/trace/internal/observ", + Version: sdk.Version(), + SchemaURL: semconv.SchemaURL, + }, + Metrics: []metricdata.Metrics{ + { + Name: otelconv.SDKProcessorSpanProcessed{}.Name(), + Description: otelconv.SDKProcessorSpanProcessed{}.Description(), + Unit: otelconv.SDKProcessorSpanProcessed{}.Unit(), + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet( + semconv.OTelComponentName("simple_span_processor/0"), + semconv.OTelComponentTypeKey.String("simple_span_processor"), + ), + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + }, + } + + metricdatatest.AssertEqual( + t, + want, + sm, + metricdatatest.IgnoreTimestamp(), + metricdatatest.IgnoreExemplars(), + ) + }, + }, + { + name: "Enabled, Exporter error", + enabled: true, + exporter: &failingTestExporter{ + simpleTestExporter: simpleTestExporter{}, + }, + assertMetrics: func(t *testing.T, rm metricdata.ResourceMetrics) { + assert.Len(t, rm.ScopeMetrics, 1) + sm := rm.ScopeMetrics[0] + + want := metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{ + Name: "go.opentelemetry.io/otel/sdk/trace/internal/observ", + Version: sdk.Version(), + SchemaURL: semconv.SchemaURL, + }, + Metrics: []metricdata.Metrics{ + { + Name: otelconv.SDKProcessorSpanProcessed{}.Name(), + Description: otelconv.SDKProcessorSpanProcessed{}.Description(), + Unit: otelconv.SDKProcessorSpanProcessed{}.Unit(), + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet( + semconv.OTelComponentName("simple_span_processor/0"), + semconv.OTelComponentTypeKey.String("simple_span_processor"), + semconv.ErrorTypeKey.String("*errors.errorString"), + ), + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + }, + } + + metricdatatest.AssertEqual( + t, + want, + sm, + metricdatatest.IgnoreTimestamp(), + metricdatatest.IgnoreExemplars(), + ) + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + t.Setenv("OTEL_GO_X_OBSERVABILITY", strconv.FormatBool(test.enabled)) + + original := otel.GetMeterProvider() + t.Cleanup(func() { otel.SetMeterProvider(original) }) + + r := metric.NewManualReader() + mp := metric.NewMeterProvider( + metric.WithReader(r), + metric.WithView(dropSpanMetricsView), + ) + otel.SetMeterProvider(mp) + + ssp := NewSimpleSpanProcessor(test.exporter) + tp := basicTracerProvider(t) + tp.RegisterSpanProcessor(ssp) + startSpan(tp, test.name).End() + + var rm metricdata.ResourceMetrics + require.NoError(t, r.Collect(t.Context(), &rm)) + test.assertMetrics(t, rm) + simpleProcessorIDCounter.Store(0) // reset simpleProcessorIDCounter + }) + } +}