Skip to content

Commit

Permalink
Merge 68247b3 into 2499708
Browse files Browse the repository at this point in the history
  • Loading branch information
seemk authored Sep 27, 2023
2 parents 2499708 + 68247b3 commit f2b8d51
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 53 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ For experimental package changes, see the [experimental CHANGELOG](experimental/

* fix(http-instrumentation): stop listening to `request`'s `close` event once it has emitted `response` [#3625](https://github.com/open-telemetry/opentelemetry-js/pull/3625) @SimenB
* fix(sdk-node): fix initialization in bundled environments by not loading @opentelemetry/exporter-jaeger [#3739](https://github.com/open-telemetry/opentelemetry-js/pull/3739) @pichlermarc
* fix(sdk-trace-base): eager exporting for batch span processor [#3458](https://github.com/open-telemetry/opentelemetry-js/pull/3458) @seemk

## 1.12.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig>

private _finishedSpans: ReadableSpan[] = [];
private _timer: NodeJS.Timeout | undefined;
private _nextExport: number = 0;
private _shutdownOnce: BindOnceFuture<void>;
private _droppedSpansCount: number = 0;

Expand Down Expand Up @@ -76,6 +77,8 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig>
);
this._maxExportBatchSize = this._maxQueueSize;
}

this._resetTimer(this._scheduledDelayMillis);
}

forceFlush(): Promise<void> {
Expand Down Expand Up @@ -104,17 +107,11 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig>
return this._shutdownOnce.call();
}

private _shutdown() {
return Promise.resolve()
.then(() => {
return this.onShutdown();
})
.then(() => {
return this._flushAll();
})
.then(() => {
return this._exporter.shutdown();
});
private async _shutdown() {
this.onShutdown();
this._clearTimer();
await this._flushAll();
await this._exporter.shutdown();
}

/** Add a span in the buffer. */
Expand All @@ -139,42 +136,38 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig>
}

this._finishedSpans.push(span);
this._maybeStartTimer();
this._exportCompleteBatch();
}

/**
* Send all spans to the exporter respecting the batch size limit
* This function is used only on forceFlush or shutdown,
* for all other cases _flush should be used
* */
private _flushAll(): Promise<void> {
return new Promise((resolve, reject) => {
const promises = [];
// calculate number of batches
const count = Math.ceil(
this._finishedSpans.length / this._maxExportBatchSize
);
for (let i = 0, j = count; i < j; i++) {
promises.push(this._flushOneBatch());
}
Promise.all(promises)
.then(() => {
resolve();
})
.catch(reject);
});
private async _flushAll(): Promise<void> {
const promises = [];
// calculate number of batches
const count = Math.ceil(
this._finishedSpans.length / this._maxExportBatchSize
);
for (let i = 0, j = count; i < j; i++) {
promises.push(this._exportOneBatch());
}

await Promise.all(promises);
}

private _flushOneBatch(): Promise<void> {
this._clearTimer();
private async _exportOneBatch(): Promise<void> {
if (this._finishedSpans.length === 0) {
return Promise.resolve();
return;
}

return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
// don't wait anymore for export, this way the next batch can start
reject(new Error('Timeout'));
}, this._exportTimeoutMillis);
unrefTimer(timer);
// prevent downstream exporter calls from generating spans
context.with(suppressTracing(context.active()), () => {
// Reset the finished spans buffer here because the next invocations of the _flush method
Expand Down Expand Up @@ -215,20 +208,43 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig>
});
}

private _maybeStartTimer() {
if (this._timer !== undefined) return;
this._timer = setTimeout(() => {
this._flushOneBatch()
.then(() => {
if (this._finishedSpans.length > 0) {
this._clearTimer();
this._maybeStartTimer();
}
})
.catch(e => {
globalErrorHandler(e);
});
}, this._scheduledDelayMillis);
private async _tryExportOneBatch(): Promise<void> {
try {
await this._exportOneBatch();
} catch (e) {
globalErrorHandler(e);
}
}

private async _export() {
await this._tryExportOneBatch();

if (this._finishedSpans.length >= this._maxExportBatchSize) {
this._resetTimer(0);
} else {
this._resetTimer(this._scheduledDelayMillis);
}
}

private _exportCompleteBatch() {
if (this._finishedSpans.length < this._maxExportBatchSize) {
return;
}

if (this._nextExport === 0) {
return;
}

this._resetTimer(0);
}

private _resetTimer(timeout: number) {
this._nextExport = timeout;
this._clearTimer();

this._timer = setTimeout(async () => {
await this._export();
}, timeout);
unrefTimer(this._timer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ describe('BatchSpanProcessorBase', () => {
assert.strictEqual(spy.args.length, 0);
});

it('should export the sampled spans with buffer size reached', done => {
it('should export the sampled spans with buffer size reached', async () => {
const clock = sinon.useFakeTimers();
const processor = new BatchSpanProcessor(exporter, defaultBufferConfig);
for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) {
Expand All @@ -190,14 +190,11 @@ describe('BatchSpanProcessorBase', () => {
processor.onStart(span, ROOT_CONTEXT);
processor.onEnd(span);

setTimeout(async () => {
assert.strictEqual(exporter.getFinishedSpans().length, 5);
await processor.shutdown();
assert.strictEqual(exporter.getFinishedSpans().length, 0);
done();
}, defaultBufferConfig.scheduledDelayMillis + 1000);
clock.tick(defaultBufferConfig.scheduledDelayMillis + 1000);
clock.tick(0);
assert.strictEqual(exporter.getFinishedSpans().length, 5);
clock.restore();
await processor.shutdown();
assert.strictEqual(exporter.getFinishedSpans().length, 0);
});

it('should force flush when timeout exceeded', done => {
Expand Down

0 comments on commit f2b8d51

Please sign in to comment.