@@ -69,9 +69,14 @@ type batchSpanProcessor struct {
69
69
e SpanExporter
70
70
o BatchSpanProcessorOptions
71
71
72
- queue chan ReadOnlySpan
73
- dropped uint32
74
- callbackRegistration metric.Registration
72
+ queue chan ReadOnlySpan
73
+ dropped uint32
74
+
75
+ callbackRegistration metric.Registration
76
+ spansProcessedCounter metric.Int64Counter
77
+ successAttributes metric.MeasurementOption
78
+ alreadyShutdownAttributes metric.MeasurementOption
79
+ queueFullAttributes metric.MeasurementOption
75
80
76
81
batch []ReadOnlySpan
77
82
batchMutex sync.Mutex
@@ -130,6 +135,15 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO
130
135
return bsp
131
136
}
132
137
138
+ var processorID atomic.Uint64
139
+
140
+ // nextProcessorID returns an identifier for this batch span processor,
141
+ // starting with 0 and incrementing by 1 each time it is called.
142
+ func nextProcessorID () int64 {
143
+ return int64 (processorID .Add (1 ) - 1 )
144
+ }
145
+
146
+ // configureSelfObservability configures metrics for the batch span processor.
133
147
func (bsp * batchSpanProcessor ) configureSelfObservability () {
134
148
mp := otel .GetMeterProvider ()
135
149
if ! x .SelfObservability .Enabled () {
@@ -140,23 +154,42 @@ func (bsp *batchSpanProcessor) configureSelfObservability() {
140
154
metric .WithInstrumentationVersion (version ()),
141
155
)
142
156
143
- queueSizeCounter , err := meter .Int64ObservableUpDownCounter ("otel.sdk.span.processor.queue_size" ,
157
+ queueCapacityUpDownCounter , err := meter .Int64ObservableUpDownCounter ("otel.sdk.span.processor.queue_capacity" ,
158
+ metric .WithUnit ("{span}" ),
159
+ metric .WithDescription ("The maximum number of spans the queue of a given instance of an SDK span processor can hold." ),
160
+ )
161
+ if err != nil {
162
+ otel .Handle (err )
163
+ }
164
+ queueSizeUpDownCounter , err := meter .Int64ObservableUpDownCounter ("otel.sdk.span.processor.queue_size" ,
144
165
metric .WithUnit ("{span}" ),
145
166
metric .WithDescription ("The number of spans in the queue of a given instance of an SDK span processor." ),
146
167
)
147
168
if err != nil {
148
169
otel .Handle (err )
149
170
}
150
-
151
- attrsOpt := metric .WithAttributes (
152
- attribute . String ( "otel.sdk.component.name" , fmt . Sprintf ( "batching_span_processor/%p" , bsp ) ),
171
+ bsp . spansProcessedCounter , err = meter . Int64Counter ( "otel.sdk.span.processor.spans_processed" ,
172
+ metric .WithUnit ( "{span}" ),
173
+ metric . WithDescription ( "The number of spans for which the processing has finished, either successful or failed." ),
153
174
)
175
+ if err != nil {
176
+ otel .Handle (err )
177
+ }
178
+
179
+ componentTypeAttr := attribute .String ("otel.sdk.component.type" , "batching_span_processor" )
180
+ componentNameAttr := attribute .String ("otel.sdk.component.name" , fmt .Sprintf ("batching_span_processor/%d" , nextProcessorID ()))
181
+ bsp .successAttributes = metric .WithAttributes (componentNameAttr , componentTypeAttr , attribute .String ("error.type" , "" ))
182
+ bsp .alreadyShutdownAttributes = metric .WithAttributes (componentNameAttr , componentTypeAttr , attribute .String ("error.type" , "already_shutdown" ))
183
+ bsp .queueFullAttributes = metric .WithAttributes (componentNameAttr , componentTypeAttr , attribute .String ("error.type" , "queue_full" ))
184
+ callabckAttributesOpt := metric .WithAttributes (componentNameAttr , componentTypeAttr )
154
185
bsp .callbackRegistration , err = meter .RegisterCallback (
155
186
func (ctx context.Context , o metric.Observer ) error {
156
- o .ObserveInt64 (queueSizeCounter , int64 (len (bsp .queue )), attrsOpt )
187
+ o .ObserveInt64 (queueSizeUpDownCounter , int64 (len (bsp .queue )), callabckAttributesOpt )
188
+ o .ObserveInt64 (queueCapacityUpDownCounter , int64 (bsp .o .MaxQueueSize ), callabckAttributesOpt )
189
+ // TODO: can we track the number of spans batched, but not exported?
157
190
return nil
158
191
},
159
- queueSizeCounter )
192
+ queueSizeUpDownCounter , queueCapacityUpDownCounter )
160
193
if err != nil {
161
194
otel .Handle (err )
162
195
}
@@ -167,8 +200,10 @@ func (bsp *batchSpanProcessor) OnStart(parent context.Context, s ReadWriteSpan)
167
200
168
201
// OnEnd method enqueues a ReadOnlySpan for later processing.
169
202
func (bsp * batchSpanProcessor ) OnEnd (s ReadOnlySpan ) {
203
+ ctx := context .Background ()
170
204
// Do not enqueue spans after Shutdown.
171
205
if bsp .stopped .Load () {
206
+ bsp .spansProcessedCounter .Add (ctx , 1 , bsp .alreadyShutdownAttributes )
172
207
return
173
208
}
174
209
@@ -315,6 +350,7 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {
315
350
316
351
if l := len (bsp .batch ); l > 0 {
317
352
global .Debug ("exporting spans" , "count" , len (bsp .batch ), "total_dropped" , atomic .LoadUint32 (& bsp .dropped ))
353
+ bsp .spansProcessedCounter .Add (ctx , int64 (len (bsp .batch )), bsp .successAttributes )
318
354
err := bsp .e .ExportSpans (ctx , bsp .batch )
319
355
320
356
// A new batch is always created after exporting, even if the batch failed to be exported.
@@ -419,15 +455,17 @@ func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd R
419
455
return false
420
456
}
421
457
458
+ // TODO: Can we track the number of spans blocking on the queue?
422
459
select {
423
460
case bsp .queue <- sd :
424
461
return true
425
462
case <- ctx .Done ():
463
+ bsp .spansProcessedCounter .Add (ctx , 1 , bsp .queueFullAttributes )
426
464
return false
427
465
}
428
466
}
429
467
430
- func (bsp * batchSpanProcessor ) enqueueDrop (_ context.Context , sd ReadOnlySpan ) bool {
468
+ func (bsp * batchSpanProcessor ) enqueueDrop (ctx context.Context , sd ReadOnlySpan ) bool {
431
469
if ! sd .SpanContext ().IsSampled () {
432
470
return false
433
471
}
@@ -437,6 +475,7 @@ func (bsp *batchSpanProcessor) enqueueDrop(_ context.Context, sd ReadOnlySpan) b
437
475
return true
438
476
default :
439
477
atomic .AddUint32 (& bsp .dropped , 1 )
478
+ bsp .spansProcessedCounter .Add (ctx , 1 , bsp .queueFullAttributes )
440
479
}
441
480
return false
442
481
}
0 commit comments