Skip to content

Commit

Permalink
add OTEL_GO_X_SELF_OBSERVABILITY feature gate, and otel.sdk.batch_spa…
Browse files Browse the repository at this point in the history
…n_processor.queue_size metric
  • Loading branch information
dashpole committed Jan 10, 2025
1 parent 14b874e commit eaa4776
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 3 deletions.
7 changes: 7 additions & 0 deletions sdk/internal/x/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ All other values are ignored.
[OpenTelemetry resource semantic conventions]: https://opentelemetry.io/docs/specs/semconv/resource/
[resource detectors]: https://pkg.go.dev/go.opentelemetry.io/otel/sdk/resource#Detector

### SDK Self-Observability

To enable experimental metric and trace instrumentation in SDKs, set the `OTEL_GO_X_SELF_OBSERVABILITY` environment variable.
If enabled, this instrumentation uses the global `TracerProvider` and `MeterProvider`.
The value set must be the case-insensitive string of `"true"` to enable the feature.
All other values are ignored.

#### Examples

Enable experimental resource semantic conventions.
Expand Down
13 changes: 13 additions & 0 deletions sdk/internal/x/x.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,19 @@ var Resource = newFeature("RESOURCE", func(v string) (string, bool) {
return "", false
})

// SelfObservability is an experimental feature flag that determines if SDK
// self-observability metrics are enabled.
//
// To enable this feature set the OTEL_GO_X_SELF_OBSERVABILITY environment variable
// to the case-insensitive string value of "true" (i.e. "True" and "TRUE"
// will also enable this).
var SelfObservability = newFeature("SELF_OBSERVABILITY", func(v string) (string, bool) {
if strings.ToLower(v) == "true" {
return v, true
}
return "", false
})

// Feature is an experimental feature control flag. It provides a uniform way
// to interact with these feature flags and parse their values.
type Feature[T any] struct {
Expand Down
42 changes: 39 additions & 3 deletions sdk/trace/batch_span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@ package trace // import "go.opentelemetry.io/otel/sdk/trace"

import (
"context"
"errors"
"sync"
"sync/atomic"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/sdk/internal/env"
"go.opentelemetry.io/otel/sdk/internal/x"
"go.opentelemetry.io/otel/trace"
)

Expand Down Expand Up @@ -63,8 +67,10 @@ type batchSpanProcessor struct {
e SpanExporter
o BatchSpanProcessorOptions

queue chan ReadOnlySpan
dropped uint32
queue chan ReadOnlySpan
dropped uint32
processedCounter metric.Int64Counter
callbackRegistration metric.Registration

batch []ReadOnlySpan
batchMutex sync.Mutex
Expand Down Expand Up @@ -111,6 +117,8 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO
stopCh: make(chan struct{}),
}

bsp.configureSelfObservability()

bsp.stopWait.Add(1)
go func() {
defer bsp.stopWait.Done()
Expand All @@ -121,6 +129,34 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO
return bsp
}

func (bsp *batchSpanProcessor) configureSelfObservability() {
mp := otel.GetMeterProvider()
if !x.SelfObservability.Enabled() {
mp = metric.MeterProvider(noop.NewMeterProvider())
}
meter := mp.Meter(
selfObsScopeName,
metric.WithInstrumentationVersion(version()),
)

queueSizeCounter, err := meter.Int64ObservableUpDownCounter("otel.sdk.batch_span_processor.queue_size",
metric.WithUnit("{span}"),
metric.WithDescription("The number of ended spans currently enqueued by the processor."),
)
if err != nil {
otel.Handle(err)
}
bsp.callbackRegistration, err = meter.RegisterCallback(
func(ctx context.Context, o metric.Observer) error {
o.ObserveInt64(queueSizeCounter, int64(len(bsp.queue)))
return nil
},
queueSizeCounter)
if err != nil {
otel.Handle(err)
}
}

// OnStart method does nothing.
func (bsp *batchSpanProcessor) OnStart(parent context.Context, s ReadWriteSpan) {}

Expand Down Expand Up @@ -162,7 +198,7 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
err = ctx.Err()
}
})
return err
return errors.Join(err, bsp.callbackRegistration.Unregister())
}

type forceFlushSpan struct {
Expand Down
1 change: 1 addition & 0 deletions sdk/trace/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

const (
defaultTracerName = "go.opentelemetry.io/otel/sdk/tracer"
selfObsScopeName = "go.opentelemetry.io/otel/sdk/trace"
)

// tracerProviderConfig.
Expand Down

0 comments on commit eaa4776

Please sign in to comment.