@@ -5,13 +5,17 @@ package trace // import "go.opentelemetry.io/otel/sdk/trace"
5
5
6
6
import (
7
7
"context"
8
+ "errors"
8
9
"sync"
9
10
"sync/atomic"
10
11
"time"
11
12
12
13
"go.opentelemetry.io/otel"
13
14
"go.opentelemetry.io/otel/internal/global"
15
+ "go.opentelemetry.io/otel/metric"
16
+ "go.opentelemetry.io/otel/metric/noop"
14
17
"go.opentelemetry.io/otel/sdk/internal/env"
18
+ "go.opentelemetry.io/otel/sdk/internal/x"
15
19
"go.opentelemetry.io/otel/trace"
16
20
)
17
21
@@ -63,8 +67,10 @@ type batchSpanProcessor struct {
63
67
e SpanExporter
64
68
o BatchSpanProcessorOptions
65
69
66
- queue chan ReadOnlySpan
67
- dropped uint32
70
+ queue chan ReadOnlySpan
71
+ dropped uint32
72
+ processedCounter metric.Int64Counter
73
+ callbackRegistration metric.Registration
68
74
69
75
batch []ReadOnlySpan
70
76
batchMutex sync.Mutex
@@ -111,6 +117,8 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO
111
117
stopCh : make (chan struct {}),
112
118
}
113
119
120
+ bsp .configureSelfObservability ()
121
+
114
122
bsp .stopWait .Add (1 )
115
123
go func () {
116
124
defer bsp .stopWait .Done ()
@@ -121,6 +129,34 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO
121
129
return bsp
122
130
}
123
131
132
+ func (bsp * batchSpanProcessor ) configureSelfObservability () {
133
+ mp := otel .GetMeterProvider ()
134
+ if ! x .SelfObservability .Enabled () {
135
+ mp = metric .MeterProvider (noop .NewMeterProvider ())
136
+ }
137
+ meter := mp .Meter (
138
+ selfObsScopeName ,
139
+ metric .WithInstrumentationVersion (version ()),
140
+ )
141
+
142
+ queueSizeCounter , err := meter .Int64ObservableUpDownCounter ("otel.sdk.batch_span_processor.queue_size" ,
143
+ metric .WithUnit ("{span}" ),
144
+ metric .WithDescription ("The number of ended spans currently enqueued by the processor." ),
145
+ )
146
+ if err != nil {
147
+ otel .Handle (err )
148
+ }
149
+ bsp .callbackRegistration , err = meter .RegisterCallback (
150
+ func (ctx context.Context , o metric.Observer ) error {
151
+ o .ObserveInt64 (queueSizeCounter , int64 (len (bsp .queue )))
152
+ return nil
153
+ },
154
+ queueSizeCounter )
155
+ if err != nil {
156
+ otel .Handle (err )
157
+ }
158
+ }
159
+
124
160
// OnStart method does nothing.
125
161
func (bsp * batchSpanProcessor ) OnStart (parent context.Context , s ReadWriteSpan ) {}
126
162
@@ -162,7 +198,7 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
162
198
err = ctx .Err ()
163
199
}
164
200
})
165
- return err
201
+ return errors . Join ( err , bsp . callbackRegistration . Unregister ())
166
202
}
167
203
168
204
type forceFlushSpan struct {
0 commit comments