diff --git a/CHANGELOG.md b/CHANGELOG.md index ad5c7809834..e48b7c919ea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -89,6 +89,7 @@ For experimental package changes, see the [experimental CHANGELOG](experimental/ * fix(http-instrumentation): stop listening to `request`'s `close` event once it has emitted `response` [#3625](https://github.com/open-telemetry/opentelemetry-js/pull/3625) @SimenB * fix(sdk-node): fix initialization in bundled environments by not loading @opentelemetry/exporter-jaeger [#3739](https://github.com/open-telemetry/opentelemetry-js/pull/3739) @pichlermarc +* fix(sdk-trace-base): eager exporting for batch span processor [#3458](https://github.com/open-telemetry/opentelemetry-js/pull/3458) @seemk ## 1.12.0 diff --git a/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts b/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts index 7d84e0c7349..c09706c87b7 100644 --- a/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts +++ b/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts @@ -43,6 +43,7 @@ export abstract class BatchSpanProcessorBase private _finishedSpans: ReadableSpan[] = []; private _timer: NodeJS.Timeout | undefined; + private _nextExport: number = 0; private _shutdownOnce: BindOnceFuture; private _droppedSpansCount: number = 0; @@ -76,6 +77,8 @@ export abstract class BatchSpanProcessorBase ); this._maxExportBatchSize = this._maxQueueSize; } + + this._resetTimer(this._scheduledDelayMillis); } forceFlush(): Promise { @@ -104,17 +107,11 @@ export abstract class BatchSpanProcessorBase return this._shutdownOnce.call(); } - private _shutdown() { - return Promise.resolve() - .then(() => { - return this.onShutdown(); - }) - .then(() => { - return this._flushAll(); - }) - .then(() => { - return this._exporter.shutdown(); - }); + private async _shutdown() { + this.onShutdown(); + this._clearTimer(); + await this._flushAll(); + await this._exporter.shutdown(); } /** Add a span in the buffer. */ @@ -139,7 +136,7 @@ export abstract class BatchSpanProcessorBase } this._finishedSpans.push(span); - this._maybeStartTimer(); + this._exportCompleteBatch(); } /** @@ -147,34 +144,30 @@ export abstract class BatchSpanProcessorBase * This function is used only on forceFlush or shutdown, * for all other cases _flush should be used * */ - private _flushAll(): Promise { - return new Promise((resolve, reject) => { - const promises = []; - // calculate number of batches - const count = Math.ceil( - this._finishedSpans.length / this._maxExportBatchSize - ); - for (let i = 0, j = count; i < j; i++) { - promises.push(this._flushOneBatch()); - } - Promise.all(promises) - .then(() => { - resolve(); - }) - .catch(reject); - }); + private async _flushAll(): Promise { + const promises = []; + // calculate number of batches + const count = Math.ceil( + this._finishedSpans.length / this._maxExportBatchSize + ); + for (let i = 0, j = count; i < j; i++) { + promises.push(this._exportOneBatch()); + } + + await Promise.all(promises); } - private _flushOneBatch(): Promise { - this._clearTimer(); + private async _exportOneBatch(): Promise { if (this._finishedSpans.length === 0) { - return Promise.resolve(); + return; } + return new Promise((resolve, reject) => { const timer = setTimeout(() => { // don't wait anymore for export, this way the next batch can start reject(new Error('Timeout')); }, this._exportTimeoutMillis); + unrefTimer(timer); // prevent downstream exporter calls from generating spans context.with(suppressTracing(context.active()), () => { // Reset the finished spans buffer here because the next invocations of the _flush method @@ -215,20 +208,43 @@ export abstract class BatchSpanProcessorBase }); } - private _maybeStartTimer() { - if (this._timer !== undefined) return; - this._timer = setTimeout(() => { - this._flushOneBatch() - .then(() => { - if (this._finishedSpans.length > 0) { - this._clearTimer(); - this._maybeStartTimer(); - } - }) - .catch(e => { - globalErrorHandler(e); - }); - }, this._scheduledDelayMillis); + private async _tryExportOneBatch(): Promise { + try { + await this._exportOneBatch(); + } catch (e) { + globalErrorHandler(e); + } + } + + private async _export() { + await this._tryExportOneBatch(); + + if (this._finishedSpans.length >= this._maxExportBatchSize) { + this._resetTimer(0); + } else { + this._resetTimer(this._scheduledDelayMillis); + } + } + + private _exportCompleteBatch() { + if (this._finishedSpans.length < this._maxExportBatchSize) { + return; + } + + if (this._nextExport === 0) { + return; + } + + this._resetTimer(0); + } + + private _resetTimer(timeout: number) { + this._nextExport = timeout; + this._clearTimer(); + + this._timer = setTimeout(async () => { + await this._export(); + }, timeout); unrefTimer(this._timer); } diff --git a/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts b/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts index 069287fc599..088cccaec0c 100644 --- a/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts +++ b/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts @@ -175,7 +175,7 @@ describe('BatchSpanProcessorBase', () => { assert.strictEqual(spy.args.length, 0); }); - it('should export the sampled spans with buffer size reached', done => { + it('should export the sampled spans with buffer size reached', async () => { const clock = sinon.useFakeTimers(); const processor = new BatchSpanProcessor(exporter, defaultBufferConfig); for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) { @@ -190,14 +190,11 @@ describe('BatchSpanProcessorBase', () => { processor.onStart(span, ROOT_CONTEXT); processor.onEnd(span); - setTimeout(async () => { - assert.strictEqual(exporter.getFinishedSpans().length, 5); - await processor.shutdown(); - assert.strictEqual(exporter.getFinishedSpans().length, 0); - done(); - }, defaultBufferConfig.scheduledDelayMillis + 1000); - clock.tick(defaultBufferConfig.scheduledDelayMillis + 1000); + clock.tick(0); + assert.strictEqual(exporter.getFinishedSpans().length, 5); clock.restore(); + await processor.shutdown(); + assert.strictEqual(exporter.getFinishedSpans().length, 0); }); it('should force flush when timeout exceeded', done => {