Skip to content

Commit

Permalink
implement otel.sdk.span.processor.queue_capacity and otel.sdk.span.pr…
Browse files Browse the repository at this point in the history
…ocessor.spans_processed self-observability metrics
  • Loading branch information
dashpole committed Jan 28, 2025
1 parent f7f8890 commit 21e5323
Showing 1 changed file with 56 additions and 10 deletions.
66 changes: 56 additions & 10 deletions sdk/trace/batch_span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,16 @@ type batchSpanProcessor struct {
e SpanExporter
o BatchSpanProcessorOptions

queue chan ReadOnlySpan
dropped uint32
callbackRegistration metric.Registration
queue chan ReadOnlySpan
dropped uint32

callbackRegistration metric.Registration
spansProcessedCounter metric.Int64Counter
successAttributes metric.MeasurementOption
alreadyShutdownAttributes metric.MeasurementOption
noExporterAttributes metric.MeasurementOption
notSampledAttributes metric.MeasurementOption
queueFullAttributes metric.MeasurementOption

batch []ReadOnlySpan
batchMutex sync.Mutex
Expand Down Expand Up @@ -130,6 +137,15 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO
return bsp
}

var processorID atomic.Uint64

// nextProcessorID returns an identifier for this batch span processor,
// starting with 0 and incrementing by 1 each time it is called.
func nextProcessorID() int64 {
return int64(processorID.Add(1) - 1)
}

// configureSelfObservability configures metrics for the batch span processor.
func (bsp *batchSpanProcessor) configureSelfObservability() {
mp := otel.GetMeterProvider()
if !x.SelfObservability.Enabled() {
Expand All @@ -140,23 +156,44 @@ func (bsp *batchSpanProcessor) configureSelfObservability() {
metric.WithInstrumentationVersion(version()),
)

queueSizeCounter, err := meter.Int64ObservableUpDownCounter("otel.sdk.span.processor.queue_size",
queueCapacityUpDownCounter, err := meter.Int64ObservableUpDownCounter("otel.sdk.span.processor.queue_capacity",
metric.WithUnit("{span}"),
metric.WithDescription("The maximum number of spans the queue of a given instance of an SDK span processor can hold."),
)
if err != nil {
otel.Handle(err)
}
queueSizeUpDownCounter, err := meter.Int64ObservableUpDownCounter("otel.sdk.span.processor.queue_size",
metric.WithUnit("{span}"),
metric.WithDescription("The number of spans in the queue of a given instance of an SDK span processor."),
)
if err != nil {
otel.Handle(err)
}

attrsOpt := metric.WithAttributes(
attribute.String("otel.sdk.component.name", fmt.Sprintf("batching_span_processor/%p", bsp)),
bsp.spansProcessedCounter, err = meter.Int64Counter("otel.sdk.span.processor.spans_processed",
metric.WithUnit("{span}"),
metric.WithDescription("The number of spans for which the processing has finished, either successful or failed."),
)
if err != nil {
otel.Handle(err)
}

componentTypeAttr := attribute.String("otel.sdk.component.type", "batching_span_processor")
componentNameAttr := attribute.String("otel.sdk.component.name", fmt.Sprintf("batching_span_processor/%d", nextProcessorID()))
bsp.successAttributes = metric.WithAttributes(componentNameAttr, componentTypeAttr, attribute.String("error.type", ""))
bsp.alreadyShutdownAttributes = metric.WithAttributes(componentNameAttr, componentTypeAttr, attribute.String("error.type", "already_shutdown"))
bsp.noExporterAttributes = metric.WithAttributes(componentNameAttr, componentTypeAttr, attribute.String("error.type", "no_exporter"))
bsp.notSampledAttributes = metric.WithAttributes(componentNameAttr, componentTypeAttr, attribute.String("error.type", "not_sampled"))
bsp.queueFullAttributes = metric.WithAttributes(componentNameAttr, componentTypeAttr, attribute.String("error.type", "queue_full"))
callabckAttributesOpt := metric.WithAttributes(componentNameAttr, componentTypeAttr)
bsp.callbackRegistration, err = meter.RegisterCallback(
func(ctx context.Context, o metric.Observer) error {
o.ObserveInt64(queueSizeCounter, int64(len(bsp.queue)), attrsOpt)
o.ObserveInt64(queueSizeUpDownCounter, int64(len(bsp.queue)), callabckAttributesOpt)
o.ObserveInt64(queueCapacityUpDownCounter, int64(bsp.o.MaxQueueSize), callabckAttributesOpt)
// TODO: can we track the number of spans batched, but not exported?
return nil
},
queueSizeCounter)
queueSizeUpDownCounter, queueCapacityUpDownCounter)
if err != nil {
otel.Handle(err)
}
Expand All @@ -167,13 +204,16 @@ func (bsp *batchSpanProcessor) OnStart(parent context.Context, s ReadWriteSpan)

// OnEnd method enqueues a ReadOnlySpan for later processing.
func (bsp *batchSpanProcessor) OnEnd(s ReadOnlySpan) {
ctx := context.Background()
// Do not enqueue spans after Shutdown.
if bsp.stopped.Load() {
bsp.spansProcessedCounter.Add(ctx, 1, bsp.alreadyShutdownAttributes)
return
}

// Do not enqueue spans if we are just going to drop them.
if bsp.e == nil {
bsp.spansProcessedCounter.Add(ctx, 1, bsp.noExporterAttributes)
return
}
bsp.enqueue(s)
Expand Down Expand Up @@ -315,6 +355,7 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {

if l := len(bsp.batch); l > 0 {
global.Debug("exporting spans", "count", len(bsp.batch), "total_dropped", atomic.LoadUint32(&bsp.dropped))
bsp.spansProcessedCounter.Add(ctx, int64(len(bsp.batch)), bsp.successAttributes)
err := bsp.e.ExportSpans(ctx, bsp.batch)

// A new batch is always created after exporting, even if the batch failed to be exported.
Expand Down Expand Up @@ -416,19 +457,23 @@ func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) {

func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd ReadOnlySpan) bool {
if !sd.SpanContext().IsSampled() {
bsp.spansProcessedCounter.Add(ctx, 1, bsp.notSampledAttributes)
return false
}

// TODO: Can we track the number of spans blocking on the queue?
select {
case bsp.queue <- sd:
return true
case <-ctx.Done():
bsp.spansProcessedCounter.Add(ctx, 1, bsp.queueFullAttributes)
return false
}
}

func (bsp *batchSpanProcessor) enqueueDrop(_ context.Context, sd ReadOnlySpan) bool {
func (bsp *batchSpanProcessor) enqueueDrop(ctx context.Context, sd ReadOnlySpan) bool {
if !sd.SpanContext().IsSampled() {
bsp.spansProcessedCounter.Add(ctx, 1, bsp.notSampledAttributes)
return false
}

Expand All @@ -437,6 +482,7 @@ func (bsp *batchSpanProcessor) enqueueDrop(_ context.Context, sd ReadOnlySpan) b
return true
default:
atomic.AddUint32(&bsp.dropped, 1)
bsp.spansProcessedCounter.Add(ctx, 1, bsp.queueFullAttributes)
}
return false
}
Expand Down

0 comments on commit 21e5323

Please sign in to comment.