Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions sdk/log/internal/observ/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package observ provides observability instrumentation for the OTel log SDK
// package.
package observ // import "go.opentelemetry.io/otel/sdk/log/internal/observ"
107 changes: 107 additions & 0 deletions sdk/log/internal/observ/simple_log_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package observ // import "go.opentelemetry.io/otel/sdk/log/internal/observ"

import (
"context"
"fmt"
"sync"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk"
"go.opentelemetry.io/otel/sdk/log/internal/x"
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
)

const (
// ScopeName is the name of the instrumentation scope.
ScopeName = "go.opentelemetry.io/otel/sdk/log/internal/observ"
)

var measureAttrsPool = sync.Pool{
New: func() any {
// "component.name" + "component.type" + "error.type"
const n = 1 + 1 + 1
s := make([]attribute.KeyValue, 0, n)
// Return a pointer to a slice instead of a slice itself
// to avoid allocations on every call.
return &s
},
}

// GetSLPComponentName returns the component name attribute for a
// SimpleLogProcessor with the given ID.
func GetSLPComponentName(id int64) attribute.KeyValue {
t := otelconv.ComponentTypeSimpleLogProcessor
name := fmt.Sprintf("%s/%d", t, id)
return semconv.OTelComponentName(name)
}

// SLP is the instrumentation for an OTel SDK SimpleLogProcessor.
type SLP struct {
processed metric.Int64Counter
attrs []attribute.KeyValue
addOpts []metric.AddOption
}

// NewSLP returns instrumentation for an OTel SDK SimpleLogProcessor with the
// provided ID.
//
// If the experimental observability is disabled, nil is returned.
func NewSLP(id int64) (*SLP, error) {
if !x.Observability.Enabled() {
return nil, nil
}

meter := otel.GetMeterProvider()
mt := meter.Meter(
ScopeName,
metric.WithInstrumentationVersion(sdk.Version()),
metric.WithSchemaURL(semconv.SchemaURL),
)

p, err := otelconv.NewSDKProcessorLogProcessed(mt)
if err != nil {
err = fmt.Errorf("failed to create a processed log metric: %w", err)
return nil, err
}

name := GetSLPComponentName(id)
componentType := p.AttrComponentType(otelconv.ComponentTypeSimpleLogProcessor)
attrs := []attribute.KeyValue{name, componentType}
addOpts := []metric.AddOption{metric.WithAttributeSet(attribute.NewSet(attrs...))}

return &SLP{
processed: p.Inst(),
attrs: attrs,
addOpts: addOpts,
}, nil
}

// LogProcessed records that a log has been processed by the SimpleLogProcessor.
// If err is non-nil, it records the processing error as an attribute.
func (slp *SLP) LogProcessed(ctx context.Context, err error) {
slp.processed.Add(ctx, 1, slp.addOption(err)...)
}

func (slp *SLP) addOption(err error) []metric.AddOption {
if err == nil {
return slp.addOpts
}
attrs := measureAttrsPool.Get().(*[]attribute.KeyValue)
defer func() {
*attrs = (*attrs)[:0] // reset the slice
measureAttrsPool.Put(attrs)
}()

*attrs = append(*attrs, slp.attrs...)
*attrs = append(*attrs, semconv.ErrorType(err))

// Do not inefficiently make a copy of attrs by using
// WithAttributes instead of WithAttributeSet.
return []metric.AddOption{metric.WithAttributeSet(attribute.NewSet(*attrs...))}
}
204 changes: 204 additions & 0 deletions sdk/log/internal/observ/simple_log_processor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package observ

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
mapi "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/sdk"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
)

type errMeterProvider struct {
mapi.MeterProvider
err error
}

func (m *errMeterProvider) Meter(string, ...mapi.MeterOption) mapi.Meter {
return &errMeter{err: m.err}
}

type errMeter struct {
mapi.Meter
err error
}

func (m *errMeter) Int64Counter(string, ...mapi.Int64CounterOption) (mapi.Int64Counter, error) {
return nil, m.err
}

const slpComponentID = 0

func TestNewSLPError(t *testing.T) {
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
orig := otel.GetMeterProvider()
t.Cleanup(func() { otel.SetMeterProvider(orig) })

errMp := &errMeterProvider{err: assert.AnError}
otel.SetMeterProvider(errMp)

_, err := NewSLP(slpComponentID)
require.ErrorIs(t, err, assert.AnError)
assert.ErrorContains(t, err, "failed to create a processed log metric")
}

func TestNewSLPDisabled(t *testing.T) {
// Do not set OTEL_GO_X_OBSERVABILITY
bsp, err := NewSLP(slpComponentID)
assert.NoError(t, err)
assert.Nil(t, bsp)
}

func setup(t *testing.T) (*SLP, func() metricdata.ScopeMetrics) {
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")

orig := otel.GetMeterProvider()
t.Cleanup(func() {
otel.SetMeterProvider(orig)
})

reader := metric.NewManualReader()
mp := metric.NewMeterProvider(metric.WithReader(reader))
otel.SetMeterProvider(mp)

slp, err := NewSLP(slpComponentID)
require.NoError(t, err)
require.NotNil(t, slp)

return slp, func() metricdata.ScopeMetrics {
var got metricdata.ResourceMetrics
require.NoError(t, reader.Collect(t.Context(), &got))
require.Len(t, got.ScopeMetrics, 1)
return got.ScopeMetrics[0]
}
}

func processedMetric(err error) metricdata.Metrics {
processed := &otelconv.SDKProcessorLogProcessed{}

attrs := []attribute.KeyValue{
GetSLPComponentName(slpComponentID),
processed.AttrComponentType(otelconv.ComponentTypeSimpleLogProcessor),
}

if err != nil {
attrs = append(attrs, semconv.ErrorType(err))
}

dp := []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attrs...),
Value: 1,
},
}

return metricdata.Metrics{
Name: processed.Name(),
Description: processed.Description(),
Unit: processed.Unit(),
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: dp,
},
}
}

var Scope = instrumentation.Scope{
Name: ScopeName,
Version: sdk.Version(),
SchemaURL: semconv.SchemaURL,
}

func assertMetric(t *testing.T, got metricdata.ScopeMetrics, err error) {
t.Helper()
assert.Equal(t, Scope, got.Scope, "unexpected scope")
m := got.Metrics
require.Len(t, m, 1, "expected 1 metrics")

o := metricdatatest.IgnoreTimestamp()
want := processedMetric(err)

metricdatatest.AssertEqual(t, want, m[0], o)
}

func TestSLP(t *testing.T) {
t.Run("NoError", func(t *testing.T) {
slp, collect := setup(t)
slp.LogProcessed(t.Context(), nil)
assertMetric(t, collect(), nil)
})

t.Run("Error", func(t *testing.T) {
processErr := errors.New("error processing log")
slp, collect := setup(t)
slp.LogProcessed(t.Context(), processErr)
assertMetric(t, collect(), processErr)
})
}

func BenchmarkSLP(b *testing.B) {
b.Setenv("OTEL_GO_X_OBSERVABILITY", "true")

newSLP := func(b *testing.B) *SLP {
b.Helper()
slp, err := NewSLP(slpComponentID)
require.NoError(b, err)
require.NotNil(b, slp)
return slp
}

b.Run("LogProcessed", func(b *testing.B) {
orig := otel.GetMeterProvider()
b.Cleanup(func() {
otel.SetMeterProvider(orig)
})

otel.SetMeterProvider(noop.NewMeterProvider())

ssp := newSLP(b)
ctx := b.Context()

b.ResetTimer()
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
ssp.LogProcessed(ctx, nil)
}
})
})

b.Run("LogProcessedWithError", func(b *testing.B) {
orig := otel.GetMeterProvider()
b.Cleanup(func() {
otel.SetMeterProvider(orig)
})
otel.SetMeterProvider(noop.NewMeterProvider())
slp := newSLP(b)
ctx := b.Context()

processErr := errors.New("error processing log")

b.ResetTimer()
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
slp.LogProcessed(ctx, processErr)
}
})
})
}