@@ -5,13 +5,19 @@ package trace // import "go.opentelemetry.io/otel/sdk/trace"
5
5
6
6
import (
7
7
"context"
8
+ "errors"
9
+ "fmt"
8
10
"sync"
9
11
"sync/atomic"
10
12
"time"
11
13
12
14
"go.opentelemetry.io/otel"
15
+ "go.opentelemetry.io/otel/attribute"
13
16
"go.opentelemetry.io/otel/internal/global"
17
+ "go.opentelemetry.io/otel/metric"
18
+ "go.opentelemetry.io/otel/metric/noop"
14
19
"go.opentelemetry.io/otel/sdk/internal/env"
20
+ "go.opentelemetry.io/otel/sdk/internal/x"
15
21
"go.opentelemetry.io/otel/trace"
16
22
)
17
23
@@ -63,8 +69,9 @@ type batchSpanProcessor struct {
63
69
e SpanExporter
64
70
o BatchSpanProcessorOptions
65
71
66
- queue chan ReadOnlySpan
67
- dropped uint32
72
+ queue chan ReadOnlySpan
73
+ dropped uint32
74
+ callbackRegistration metric.Registration
68
75
69
76
batch []ReadOnlySpan
70
77
batchMutex sync.Mutex
@@ -111,6 +118,8 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO
111
118
stopCh : make (chan struct {}),
112
119
}
113
120
121
+ bsp .configureSelfObservability ()
122
+
114
123
bsp .stopWait .Add (1 )
115
124
go func () {
116
125
defer bsp .stopWait .Done ()
@@ -121,6 +130,38 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO
121
130
return bsp
122
131
}
123
132
133
+ func (bsp * batchSpanProcessor ) configureSelfObservability () {
134
+ mp := otel .GetMeterProvider ()
135
+ if ! x .SelfObservability .Enabled () {
136
+ mp = metric .MeterProvider (noop .NewMeterProvider ())
137
+ }
138
+ meter := mp .Meter (
139
+ selfObsScopeName ,
140
+ metric .WithInstrumentationVersion (version ()),
141
+ )
142
+
143
+ queueSizeCounter , err := meter .Int64ObservableUpDownCounter ("otel.sdk.span.processor.queue_size" ,
144
+ metric .WithUnit ("{span}" ),
145
+ metric .WithDescription ("The number of spans in the queue of a given instance of an SDK span processor." ),
146
+ )
147
+ if err != nil {
148
+ otel .Handle (err )
149
+ }
150
+
151
+ attrsOpt := metric .WithAttributes (
152
+ attribute .String ("otel.sdk.component.name" , fmt .Sprintf ("batching_span_processor/%p" , bsp )),
153
+ )
154
+ bsp .callbackRegistration , err = meter .RegisterCallback (
155
+ func (ctx context.Context , o metric.Observer ) error {
156
+ o .ObserveInt64 (queueSizeCounter , int64 (len (bsp .queue )), attrsOpt )
157
+ return nil
158
+ },
159
+ queueSizeCounter )
160
+ if err != nil {
161
+ otel .Handle (err )
162
+ }
163
+ }
164
+
124
165
// OnStart method does nothing.
125
166
func (bsp * batchSpanProcessor ) OnStart (parent context.Context , s ReadWriteSpan ) {}
126
167
@@ -162,7 +203,7 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
162
203
err = ctx .Err ()
163
204
}
164
205
})
165
- return err
206
+ return errors . Join ( err , bsp . callbackRegistration . Unregister ())
166
207
}
167
208
168
209
type forceFlushSpan struct {
0 commit comments