diff --git a/sdk/log/internal/observ/doc.go b/sdk/log/internal/observ/doc.go new file mode 100644 index 00000000000..6879567cbf4 --- /dev/null +++ b/sdk/log/internal/observ/doc.go @@ -0,0 +1,6 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package observ provides observability instrumentation for the OTel log SDK +// package. +package observ // import "go.opentelemetry.io/otel/sdk/log/internal/observ" diff --git a/sdk/log/internal/observ/simple_log_processor.go b/sdk/log/internal/observ/simple_log_processor.go new file mode 100644 index 00000000000..f69bc5f1d38 --- /dev/null +++ b/sdk/log/internal/observ/simple_log_processor.go @@ -0,0 +1,107 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package observ // import "go.opentelemetry.io/otel/sdk/log/internal/observ" + +import ( + "context" + "fmt" + "sync" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/sdk" + "go.opentelemetry.io/otel/sdk/log/internal/x" + semconv "go.opentelemetry.io/otel/semconv/v1.37.0" + "go.opentelemetry.io/otel/semconv/v1.37.0/otelconv" +) + +const ( + // ScopeName is the name of the instrumentation scope. + ScopeName = "go.opentelemetry.io/otel/sdk/log/internal/observ" +) + +var measureAttrsPool = sync.Pool{ + New: func() any { + // "component.name" + "component.type" + "error.type" + const n = 1 + 1 + 1 + s := make([]attribute.KeyValue, 0, n) + // Return a pointer to a slice instead of a slice itself + // to avoid allocations on every call. + return &s + }, +} + +// GetSLPComponentName returns the component name attribute for a +// SimpleLogProcessor with the given ID. +func GetSLPComponentName(id int64) attribute.KeyValue { + t := otelconv.ComponentTypeSimpleLogProcessor + name := fmt.Sprintf("%s/%d", t, id) + return semconv.OTelComponentName(name) +} + +// SLP is the instrumentation for an OTel SDK SimpleLogProcessor. +type SLP struct { + processed metric.Int64Counter + attrs []attribute.KeyValue + addOpts []metric.AddOption +} + +// NewSLP returns instrumentation for an OTel SDK SimpleLogProcessor with the +// provided ID. +// +// If the experimental observability is disabled, nil is returned. +func NewSLP(id int64) (*SLP, error) { + if !x.Observability.Enabled() { + return nil, nil + } + + meter := otel.GetMeterProvider() + mt := meter.Meter( + ScopeName, + metric.WithInstrumentationVersion(sdk.Version()), + metric.WithSchemaURL(semconv.SchemaURL), + ) + + p, err := otelconv.NewSDKProcessorLogProcessed(mt) + if err != nil { + err = fmt.Errorf("failed to create a processed log metric: %w", err) + return nil, err + } + + name := GetSLPComponentName(id) + componentType := p.AttrComponentType(otelconv.ComponentTypeSimpleLogProcessor) + attrs := []attribute.KeyValue{name, componentType} + addOpts := []metric.AddOption{metric.WithAttributeSet(attribute.NewSet(attrs...))} + + return &SLP{ + processed: p.Inst(), + attrs: attrs, + addOpts: addOpts, + }, nil +} + +// LogProcessed records that a log has been processed by the SimpleLogProcessor. +// If err is non-nil, it records the processing error as an attribute. +func (slp *SLP) LogProcessed(ctx context.Context, err error) { + slp.processed.Add(ctx, 1, slp.addOption(err)...) +} + +func (slp *SLP) addOption(err error) []metric.AddOption { + if err == nil { + return slp.addOpts + } + attrs := measureAttrsPool.Get().(*[]attribute.KeyValue) + defer func() { + *attrs = (*attrs)[:0] // reset the slice + measureAttrsPool.Put(attrs) + }() + + *attrs = append(*attrs, slp.attrs...) + *attrs = append(*attrs, semconv.ErrorType(err)) + + // Do not inefficiently make a copy of attrs by using + // WithAttributes instead of WithAttributeSet. + return []metric.AddOption{metric.WithAttributeSet(attribute.NewSet(*attrs...))} +} diff --git a/sdk/log/internal/observ/simple_log_processor_test.go b/sdk/log/internal/observ/simple_log_processor_test.go new file mode 100644 index 00000000000..1d63c4d6a94 --- /dev/null +++ b/sdk/log/internal/observ/simple_log_processor_test.go @@ -0,0 +1,204 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package observ + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + mapi "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/sdk" + "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + semconv "go.opentelemetry.io/otel/semconv/v1.37.0" + "go.opentelemetry.io/otel/semconv/v1.37.0/otelconv" +) + +type errMeterProvider struct { + mapi.MeterProvider + err error +} + +func (m *errMeterProvider) Meter(string, ...mapi.MeterOption) mapi.Meter { + return &errMeter{err: m.err} +} + +type errMeter struct { + mapi.Meter + err error +} + +func (m *errMeter) Int64Counter(string, ...mapi.Int64CounterOption) (mapi.Int64Counter, error) { + return nil, m.err +} + +const slpComponentID = 0 + +func TestNewSLPError(t *testing.T) { + t.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + orig := otel.GetMeterProvider() + t.Cleanup(func() { otel.SetMeterProvider(orig) }) + + errMp := &errMeterProvider{err: assert.AnError} + otel.SetMeterProvider(errMp) + + _, err := NewSLP(slpComponentID) + require.ErrorIs(t, err, assert.AnError) + assert.ErrorContains(t, err, "failed to create a processed log metric") +} + +func TestNewSLPDisabled(t *testing.T) { + // Do not set OTEL_GO_X_OBSERVABILITY + bsp, err := NewSLP(slpComponentID) + assert.NoError(t, err) + assert.Nil(t, bsp) +} + +func setup(t *testing.T) (*SLP, func() metricdata.ScopeMetrics) { + t.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + + orig := otel.GetMeterProvider() + t.Cleanup(func() { + otel.SetMeterProvider(orig) + }) + + reader := metric.NewManualReader() + mp := metric.NewMeterProvider(metric.WithReader(reader)) + otel.SetMeterProvider(mp) + + slp, err := NewSLP(slpComponentID) + require.NoError(t, err) + require.NotNil(t, slp) + + return slp, func() metricdata.ScopeMetrics { + var got metricdata.ResourceMetrics + require.NoError(t, reader.Collect(t.Context(), &got)) + require.Len(t, got.ScopeMetrics, 1) + return got.ScopeMetrics[0] + } +} + +func processedMetric(err error) metricdata.Metrics { + processed := &otelconv.SDKProcessorLogProcessed{} + + attrs := []attribute.KeyValue{ + GetSLPComponentName(slpComponentID), + processed.AttrComponentType(otelconv.ComponentTypeSimpleLogProcessor), + } + + if err != nil { + attrs = append(attrs, semconv.ErrorType(err)) + } + + dp := []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(attrs...), + Value: 1, + }, + } + + return metricdata.Metrics{ + Name: processed.Name(), + Description: processed.Description(), + Unit: processed.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dp, + }, + } +} + +var Scope = instrumentation.Scope{ + Name: ScopeName, + Version: sdk.Version(), + SchemaURL: semconv.SchemaURL, +} + +func assertMetric(t *testing.T, got metricdata.ScopeMetrics, err error) { + t.Helper() + assert.Equal(t, Scope, got.Scope, "unexpected scope") + m := got.Metrics + require.Len(t, m, 1, "expected 1 metrics") + + o := metricdatatest.IgnoreTimestamp() + want := processedMetric(err) + + metricdatatest.AssertEqual(t, want, m[0], o) +} + +func TestSLP(t *testing.T) { + t.Run("NoError", func(t *testing.T) { + slp, collect := setup(t) + slp.LogProcessed(t.Context(), nil) + assertMetric(t, collect(), nil) + }) + + t.Run("Error", func(t *testing.T) { + processErr := errors.New("error processing log") + slp, collect := setup(t) + slp.LogProcessed(t.Context(), processErr) + assertMetric(t, collect(), processErr) + }) +} + +func BenchmarkSLP(b *testing.B) { + b.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + + newSLP := func(b *testing.B) *SLP { + b.Helper() + slp, err := NewSLP(slpComponentID) + require.NoError(b, err) + require.NotNil(b, slp) + return slp + } + + b.Run("LogProcessed", func(b *testing.B) { + orig := otel.GetMeterProvider() + b.Cleanup(func() { + otel.SetMeterProvider(orig) + }) + + otel.SetMeterProvider(noop.NewMeterProvider()) + + ssp := newSLP(b) + ctx := b.Context() + + b.ResetTimer() + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + ssp.LogProcessed(ctx, nil) + } + }) + }) + + b.Run("LogProcessedWithError", func(b *testing.B) { + orig := otel.GetMeterProvider() + b.Cleanup(func() { + otel.SetMeterProvider(orig) + }) + otel.SetMeterProvider(noop.NewMeterProvider()) + slp := newSLP(b) + ctx := b.Context() + + processErr := errors.New("error processing log") + + b.ResetTimer() + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + slp.LogProcessed(ctx, processErr) + } + }) + }) +}