@@ -32,8 +32,10 @@ import { SpanExporter } from './SpanExporter';
32
32
* the SDK then pushes them to the exporter pipeline.
33
33
*/
34
34
export class BatchSpanProcessor implements SpanProcessor {
35
- private readonly _bufferSize : number ;
36
- private readonly _bufferTimeout : number ;
35
+ private readonly _maxExportBatchSize : number ;
36
+ private readonly _maxQueueSize : number ;
37
+ private readonly _scheduledDelayMillis : number ;
38
+ private readonly _exportTimeoutMillis : number ;
37
39
38
40
private _finishedSpans : ReadableSpan [ ] = [ ] ;
39
41
private _timer : NodeJS . Timeout | undefined ;
@@ -42,21 +44,29 @@ export class BatchSpanProcessor implements SpanProcessor {
42
44
43
45
constructor ( private readonly _exporter : SpanExporter , config ?: BufferConfig ) {
44
46
const env = getEnv ( ) ;
45
- this . _bufferSize =
46
- config && config . bufferSize
47
- ? config . bufferSize
48
- : env . OTEL_BSP_MAX_BATCH_SIZE ;
49
- this . _bufferTimeout =
50
- config && typeof config . bufferTimeout === 'number'
51
- ? config . bufferTimeout
52
- : env . OTEL_BSP_SCHEDULE_DELAY_MILLIS ;
47
+ this . _maxExportBatchSize =
48
+ typeof config ?. maxExportBatchSize === 'number'
49
+ ? config . maxExportBatchSize
50
+ : env . OTEL_BSP_MAX_EXPORT_BATCH_SIZE ;
51
+ this . _maxQueueSize =
52
+ typeof config ?. maxQueueSize === 'number'
53
+ ? config ?. maxQueueSize
54
+ : env . OTEL_BSP_MAX_QUEUE_SIZE ;
55
+ this . _scheduledDelayMillis =
56
+ typeof config ?. scheduledDelayMillis === 'number'
57
+ ? config ?. scheduledDelayMillis
58
+ : env . OTEL_BSP_SCHEDULE_DELAY ;
59
+ this . _exportTimeoutMillis =
60
+ typeof config ?. exportTimeoutMillis === 'number'
61
+ ? config ?. exportTimeoutMillis
62
+ : env . OTEL_BSP_EXPORT_TIMEOUT ;
53
63
}
54
64
55
65
forceFlush ( ) : Promise < void > {
56
66
if ( this . _isShutdown ) {
57
67
return this . _shuttingDownPromise ;
58
68
}
59
- return this . _flush ( ) ;
69
+ return this . _flushAll ( ) ;
60
70
}
61
71
62
72
// does nothing.
@@ -77,7 +87,7 @@ export class BatchSpanProcessor implements SpanProcessor {
77
87
this . _shuttingDownPromise = new Promise ( ( resolve , reject ) => {
78
88
Promise . resolve ( )
79
89
. then ( ( ) => {
80
- return this . _flush ( ) ;
90
+ return this . _flushAll ( ) ;
81
91
} )
82
92
. then ( ( ) => {
83
93
return this . _exporter . shutdown ( ) ;
@@ -92,49 +102,84 @@ export class BatchSpanProcessor implements SpanProcessor {
92
102
93
103
/** Add a span in the buffer. */
94
104
private _addToBuffer ( span : ReadableSpan ) {
105
+ if ( this . _finishedSpans . length >= this . _maxQueueSize ) {
106
+ // limit reached, drop span
107
+ return ;
108
+ }
95
109
this . _finishedSpans . push ( span ) ;
96
110
this . _maybeStartTimer ( ) ;
97
- if ( this . _finishedSpans . length > this . _bufferSize ) {
98
- this . _flush ( ) . catch ( e => {
99
- globalErrorHandler ( e ) ;
100
- } ) ;
101
- }
102
111
}
103
112
104
- /** Send the span data list to exporter */
105
- private _flush ( ) : Promise < void > {
113
+ /**
114
+ * Send all spans to the exporter respecting the batch size limit
115
+ * This function is used only on forceFlush or shutdown,
116
+ * for all other cases _flush should be used
117
+ * */
118
+ private _flushAll ( ) : Promise < void > {
119
+ return new Promise ( ( resolve , reject ) => {
120
+ const promises = [ ] ;
121
+ // calculate number of batches
122
+ const count = Math . ceil (
123
+ this . _finishedSpans . length / this . _maxExportBatchSize
124
+ ) ;
125
+ for ( let i = 0 , j = count ; i < j ; i ++ ) {
126
+ promises . push ( this . _flushOneBatch ( ) ) ;
127
+ }
128
+ Promise . all ( promises )
129
+ . then ( ( ) => {
130
+ resolve ( ) ;
131
+ } )
132
+ . catch ( reject ) ;
133
+ } ) ;
134
+ }
135
+
136
+ private _flushOneBatch ( ) : Promise < void > {
106
137
this . _clearTimer ( ) ;
107
138
if ( this . _finishedSpans . length === 0 ) {
108
139
return Promise . resolve ( ) ;
109
140
}
110
141
return new Promise ( ( resolve , reject ) => {
142
+ const timer = setTimeout ( ( ) => {
143
+ // don't wait anymore for export, this way the next batch can start
144
+ reject ( new Error ( 'Timeout' ) ) ;
145
+ } , this . _exportTimeoutMillis ) ;
111
146
// prevent downstream exporter calls from generating spans
112
147
context . with ( suppressInstrumentation ( context . active ( ) ) , ( ) => {
113
148
// Reset the finished spans buffer here because the next invocations of the _flush method
114
149
// could pass the same finished spans to the exporter if the buffer is cleared
115
150
// outside of the execution of this callback.
116
- this . _exporter . export ( this . _finishedSpans . splice ( 0 ) , result => {
117
- if ( result . code === ExportResultCode . SUCCESS ) {
118
- resolve ( ) ;
119
- } else {
120
- reject (
121
- result . error ??
122
- new Error ( 'BatchSpanProcessor: span export failed' )
123
- ) ;
151
+ this . _exporter . export (
152
+ this . _finishedSpans . splice ( 0 , this . _maxExportBatchSize ) ,
153
+ result => {
154
+ clearTimeout ( timer ) ;
155
+ if ( result . code === ExportResultCode . SUCCESS ) {
156
+ resolve ( ) ;
157
+ } else {
158
+ reject (
159
+ result . error ??
160
+ new Error ( 'BatchSpanProcessor: span export failed' )
161
+ ) ;
162
+ }
124
163
}
125
- } ) ;
164
+ ) ;
126
165
} ) ;
127
166
} ) ;
128
167
}
129
168
130
169
private _maybeStartTimer ( ) {
131
170
if ( this . _timer !== undefined ) return ;
132
-
133
171
this . _timer = setTimeout ( ( ) => {
134
- this . _flush ( ) . catch ( e => {
135
- globalErrorHandler ( e ) ;
136
- } ) ;
137
- } , this . _bufferTimeout ) ;
172
+ this . _flushOneBatch ( )
173
+ . catch ( e => {
174
+ globalErrorHandler ( e ) ;
175
+ } )
176
+ . then ( ( ) => {
177
+ if ( this . _finishedSpans . length > 0 ) {
178
+ this . _clearTimer ( ) ;
179
+ this . _maybeStartTimer ( ) ;
180
+ }
181
+ } ) ;
182
+ } , this . _scheduledDelayMillis ) ;
138
183
unrefTimer ( this . _timer ) ;
139
184
}
140
185
0 commit comments