From c339cd50910f2f1ab51f129d4f85c473ca854d00 Mon Sep 17 00:00:00 2001 From: Siim Kallas Date: Wed, 30 Nov 2022 13:59:49 +0200 Subject: [PATCH 1/8] feat(sdk-trace-base): eager exporting for batch span processor --- .../src/export/BatchSpanProcessorBase.ts | 88 +++++++++---------- .../export/BatchSpanProcessorBase.test.ts | 29 +++--- 2 files changed, 60 insertions(+), 57 deletions(-) diff --git a/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts b/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts index 8978fc7d79..5fdaae84f0 100644 --- a/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts +++ b/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts @@ -68,6 +68,15 @@ export abstract class BatchSpanProcessorBase implements diag.warn('BatchSpanProcessor: maxExportBatchSize must be smaller or equal to maxQueueSize, setting maxExportBatchSize to match maxQueueSize'); this._maxExportBatchSize = this._maxQueueSize; } + + this._timer = setTimeout(async () => { + try { + await this._flushOneBatch(); + } catch (e) { + globalErrorHandler(e); + } + }, this._scheduledDelayMillis); + unrefTimer(this._timer); } forceFlush(): Promise { @@ -96,17 +105,11 @@ export abstract class BatchSpanProcessorBase implements return this._shutdownOnce.call(); } - private _shutdown() { - return Promise.resolve() - .then(() => { - return this.onShutdown(); - }) - .then(() => { - return this._flushAll(); - }) - .then(() => { - return this._exporter.shutdown(); - }); + private async _shutdown() { + this.onShutdown(); + this._clearTimer(); + await this._flushAll(); + await this._exporter.shutdown(); } /** Add a span in the buffer. */ @@ -116,7 +119,7 @@ export abstract class BatchSpanProcessorBase implements return; } this._finishedSpans.push(span); - this._maybeStartTimer(); + this._maybeSend(); } /** @@ -124,29 +127,24 @@ export abstract class BatchSpanProcessorBase implements * This function is used only on forceFlush or shutdown, * for all other cases _flush should be used * */ - private _flushAll(): Promise { - return new Promise((resolve, reject) => { - const promises = []; - // calculate number of batches - const count = Math.ceil( - this._finishedSpans.length / this._maxExportBatchSize - ); - for (let i = 0, j = count; i < j; i++) { - promises.push(this._flushOneBatch()); - } - Promise.all(promises) - .then(() => { - resolve(); - }) - .catch(reject); - }); + private async _flushAll(): Promise { + const promises = []; + // calculate number of batches + const count = Math.ceil( + this._finishedSpans.length / this._maxExportBatchSize + ); + for (let i = 0, j = count; i < j; i++) { + promises.push(this._flushOneBatch()); + } + + await Promise.all(promises); } - private _flushOneBatch(): Promise { - this._clearTimer(); + private async _flushOneBatch(): Promise { if (this._finishedSpans.length === 0) { - return Promise.resolve(); + return; } + return new Promise((resolve, reject) => { const timer = setTimeout(() => { // don't wait anymore for export, this way the next batch can start @@ -175,21 +173,19 @@ export abstract class BatchSpanProcessorBase implements }); } - private _maybeStartTimer() { - if (this._timer !== undefined) return; - this._timer = setTimeout(() => { - this._flushOneBatch() - .then(() => { - if (this._finishedSpans.length > 0) { - this._clearTimer(); - this._maybeStartTimer(); - } - }) - .catch(e => { - globalErrorHandler(e); - }); - }, this._scheduledDelayMillis); - unrefTimer(this._timer); + private _maybeSend() { + if (this._finishedSpans.length < this._maxExportBatchSize) { + return; + } + + setImmediate(async () => { + try { + await this._flushOneBatch(); + } catch (e) { + globalErrorHandler(e); + } + this._maybeSend(); + }); } private _clearTimer() { diff --git a/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts b/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts index 229e2bdb3c..5c8e6309b1 100644 --- a/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts +++ b/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts @@ -168,7 +168,7 @@ describe('BatchSpanProcessorBase', () => { assert.strictEqual(spy.args.length, 0); }); - it('should export the sampled spans with buffer size reached', done => { + it('should export the sampled spans with buffer size or timeout reached', done => { const clock = sinon.useFakeTimers(); const processor = new BatchSpanProcessor(exporter, defaultBufferConfig); for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) { @@ -179,18 +179,25 @@ describe('BatchSpanProcessorBase', () => { processor.onEnd(span); assert.strictEqual(exporter.getFinishedSpans().length, 0); } - const span = createSampledSpan(`${name}_6`); - processor.onStart(span, ROOT_CONTEXT); - processor.onEnd(span); - setTimeout(async () => { + setTimeout(() => { assert.strictEqual(exporter.getFinishedSpans().length, 5); - await processor.shutdown(); - assert.strictEqual(exporter.getFinishedSpans().length, 0); - done(); - }, defaultBufferConfig.scheduledDelayMillis + 1000); - clock.tick(defaultBufferConfig.scheduledDelayMillis + 1000); - clock.restore(); + + const span = createSampledSpan(`${name}_6`); + processor.onStart(span, ROOT_CONTEXT); + processor.onEnd(span); + + setTimeout(async () => { + assert.strictEqual(exporter.getFinishedSpans().length, 6); + await processor.shutdown(); + assert.strictEqual(exporter.getFinishedSpans().length, 0); + done(); + }, defaultBufferConfig.scheduledDelayMillis + 1000); + clock.tick(defaultBufferConfig.scheduledDelayMillis + 1000); + clock.restore(); + }, 0); + + clock.tick(10); }); it('should force flush when timeout exceeded', done => { From 48fab8def48f6397a6eb905cac191736025f96f4 Mon Sep 17 00:00:00 2001 From: Siim Kallas Date: Wed, 30 Nov 2022 14:30:34 +0200 Subject: [PATCH 2/8] fix: use interval not timeout --- .../src/export/BatchSpanProcessorBase.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts b/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts index 5fdaae84f0..d71125f1bf 100644 --- a/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts +++ b/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts @@ -69,7 +69,7 @@ export abstract class BatchSpanProcessorBase implements this._maxExportBatchSize = this._maxQueueSize; } - this._timer = setTimeout(async () => { + this._timer = setInterval(async () => { try { await this._flushOneBatch(); } catch (e) { @@ -190,7 +190,7 @@ export abstract class BatchSpanProcessorBase implements private _clearTimer() { if (this._timer !== undefined) { - clearTimeout(this._timer); + clearInterval(this._timer); this._timer = undefined; } } From c12124b5f38f671f796a38f43b1b79aaee7ed891 Mon Sep 17 00:00:00 2001 From: Siim Kallas Date: Wed, 30 Nov 2022 20:01:49 +0200 Subject: [PATCH 3/8] chore: cleanup, add test for periodic exports --- .../src/export/BatchSpanProcessorBase.ts | 30 +++++++++---------- .../export/BatchSpanProcessorBase.test.ts | 18 +++++++++++ 2 files changed, 33 insertions(+), 15 deletions(-) diff --git a/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts b/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts index d71125f1bf..682fced855 100644 --- a/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts +++ b/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts @@ -70,11 +70,7 @@ export abstract class BatchSpanProcessorBase implements } this._timer = setInterval(async () => { - try { - await this._flushOneBatch(); - } catch (e) { - globalErrorHandler(e); - } + await this._tryExportOneBatch(); }, this._scheduledDelayMillis); unrefTimer(this._timer); } @@ -119,7 +115,7 @@ export abstract class BatchSpanProcessorBase implements return; } this._finishedSpans.push(span); - this._maybeSend(); + this._exportCompleteBatches(); } /** @@ -134,13 +130,13 @@ export abstract class BatchSpanProcessorBase implements this._finishedSpans.length / this._maxExportBatchSize ); for (let i = 0, j = count; i < j; i++) { - promises.push(this._flushOneBatch()); + promises.push(this._exportOneBatch()); } await Promise.all(promises); } - private async _flushOneBatch(): Promise { + private async _exportOneBatch(): Promise { if (this._finishedSpans.length === 0) { return; } @@ -173,18 +169,22 @@ export abstract class BatchSpanProcessorBase implements }); } - private _maybeSend() { + private async _tryExportOneBatch(): Promise { + try { + await this._exportOneBatch(); + } catch (e) { + globalErrorHandler(e); + } + } + + private _exportCompleteBatches() { if (this._finishedSpans.length < this._maxExportBatchSize) { return; } setImmediate(async () => { - try { - await this._flushOneBatch(); - } catch (e) { - globalErrorHandler(e); - } - this._maybeSend(); + await this._tryExportOneBatch(); + this._exportCompleteBatches(); }); } diff --git a/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts b/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts index 5c8e6309b1..871fc3986c 100644 --- a/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts +++ b/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts @@ -391,6 +391,24 @@ describe('BatchSpanProcessorBase', () => { done(); }); }); + + it('should periodically export spans', () => { + const clock = sinon.useFakeTimers(); + const processor = new BatchSpanProcessor(exporter, defaultBufferConfig); + const batchSize = Math.round(defaultBufferConfig.maxExportBatchSize / 2); + + for (let cycle = 0; cycle < 2; cycle++) { + for (let i = 0; i < batchSize; i++) { + const span = createSampledSpan(`${name}_${cycle}_${i}`); + processor.onStart(span, ROOT_CONTEXT); + processor.onEnd(span); + } + clock.tick(defaultBufferConfig.scheduledDelayMillis + 10); + assert.strictEqual(exporter.getFinishedSpans().length, batchSize); + assert(exporter.getFinishedSpans().every(s => s.name.startsWith(`${name}_${cycle}`))); + exporter.reset(); + } + }); }); describe('flushing spans with exporter triggering instrumentation', () => { From e101c00dee973073f7ab2d48489ef2cb2df40963 Mon Sep 17 00:00:00 2001 From: Siim Kallas Date: Wed, 30 Nov 2022 20:10:45 +0200 Subject: [PATCH 4/8] chore: update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6ec5fb321e..daa9d4d442 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ For experimental package changes, see the [experimental CHANGELOG](experimental/ * `telemetry.sdk.name` * `telemetry.sdk.language` * `telemetry.sdk.version` +* fix(sdk-trace-base): eager exporting for batch span processor [#3458](https://github.com/open-telemetry/opentelemetry-js/pull/3458) @seemk ### :books: (Refine Doc) From d79ba672b55652d12ef635033c1d88cc120cc9a8 Mon Sep 17 00:00:00 2001 From: Siim Kallas Date: Tue, 6 Dec 2022 17:24:40 +0200 Subject: [PATCH 5/8] fix: use setTimeout for browser compatibility --- .../src/export/BatchSpanProcessorBase.ts | 41 +++++++++++----- .../export/BatchSpanProcessorBase.test.ts | 49 ++++--------------- 2 files changed, 40 insertions(+), 50 deletions(-) diff --git a/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts b/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts index 682fced855..a4df72a1b8 100644 --- a/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts +++ b/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts @@ -41,6 +41,7 @@ export abstract class BatchSpanProcessorBase implements private _finishedSpans: ReadableSpan[] = []; private _timer: NodeJS.Timeout | undefined; + private _nextExport: number = 0; private _shutdownOnce: BindOnceFuture; constructor(private readonly _exporter: SpanExporter, config?: T) { @@ -69,10 +70,7 @@ export abstract class BatchSpanProcessorBase implements this._maxExportBatchSize = this._maxQueueSize; } - this._timer = setInterval(async () => { - await this._tryExportOneBatch(); - }, this._scheduledDelayMillis); - unrefTimer(this._timer); + this._resetTimer(this._scheduledDelayMillis); } forceFlush(): Promise { @@ -115,7 +113,7 @@ export abstract class BatchSpanProcessorBase implements return; } this._finishedSpans.push(span); - this._exportCompleteBatches(); + this._exportCompleteBatch(); } /** @@ -177,20 +175,41 @@ export abstract class BatchSpanProcessorBase implements } } - private _exportCompleteBatches() { + private async _export() { + await this._tryExportOneBatch(); + + if (this._finishedSpans.length >= this._maxExportBatchSize) { + this._resetTimer(0); + } else { + this._resetTimer(this._scheduledDelayMillis); + } + } + + private _exportCompleteBatch() { if (this._finishedSpans.length < this._maxExportBatchSize) { return; } - setImmediate(async () => { - await this._tryExportOneBatch(); - this._exportCompleteBatches(); - }); + if (this._nextExport === 0) { + return; + } + + this._resetTimer(0); + } + + private _resetTimer(timeout: number) { + this._nextExport = timeout; + this._clearTimer(); + + this._timer = setTimeout(async () => { + await this._export(); + }, timeout); + unrefTimer(this._timer); } private _clearTimer() { if (this._timer !== undefined) { - clearInterval(this._timer); + clearTimeout(this._timer); this._timer = undefined; } } diff --git a/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts b/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts index 871fc3986c..71a055bb39 100644 --- a/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts +++ b/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts @@ -168,7 +168,7 @@ describe('BatchSpanProcessorBase', () => { assert.strictEqual(spy.args.length, 0); }); - it('should export the sampled spans with buffer size or timeout reached', done => { + it('should export the sampled spans with buffer size reached', async () => { const clock = sinon.useFakeTimers(); const processor = new BatchSpanProcessor(exporter, defaultBufferConfig); for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) { @@ -179,25 +179,15 @@ describe('BatchSpanProcessorBase', () => { processor.onEnd(span); assert.strictEqual(exporter.getFinishedSpans().length, 0); } + const span = createSampledSpan(`${name}_6`); + processor.onStart(span, ROOT_CONTEXT); + processor.onEnd(span); - setTimeout(() => { - assert.strictEqual(exporter.getFinishedSpans().length, 5); - - const span = createSampledSpan(`${name}_6`); - processor.onStart(span, ROOT_CONTEXT); - processor.onEnd(span); - - setTimeout(async () => { - assert.strictEqual(exporter.getFinishedSpans().length, 6); - await processor.shutdown(); - assert.strictEqual(exporter.getFinishedSpans().length, 0); - done(); - }, defaultBufferConfig.scheduledDelayMillis + 1000); - clock.tick(defaultBufferConfig.scheduledDelayMillis + 1000); - clock.restore(); - }, 0); - - clock.tick(10); + clock.tick(0); + assert.strictEqual(exporter.getFinishedSpans().length, 5); + clock.restore(); + await processor.shutdown(); + assert.strictEqual(exporter.getFinishedSpans().length, 0); }); it('should force flush when timeout exceeded', done => { @@ -329,7 +319,6 @@ describe('BatchSpanProcessorBase', () => { beforeEach(() => { processor = new BatchSpanProcessor(exporter, defaultBufferConfig); }); - it('should call an async callback when flushing is complete', done => { const span = createSampledSpan('test'); processor.onStart(span, ROOT_CONTEXT); @@ -391,26 +380,7 @@ describe('BatchSpanProcessorBase', () => { done(); }); }); - - it('should periodically export spans', () => { - const clock = sinon.useFakeTimers(); - const processor = new BatchSpanProcessor(exporter, defaultBufferConfig); - const batchSize = Math.round(defaultBufferConfig.maxExportBatchSize / 2); - - for (let cycle = 0; cycle < 2; cycle++) { - for (let i = 0; i < batchSize; i++) { - const span = createSampledSpan(`${name}_${cycle}_${i}`); - processor.onStart(span, ROOT_CONTEXT); - processor.onEnd(span); - } - clock.tick(defaultBufferConfig.scheduledDelayMillis + 10); - assert.strictEqual(exporter.getFinishedSpans().length, batchSize); - assert(exporter.getFinishedSpans().every(s => s.name.startsWith(`${name}_${cycle}`))); - exporter.reset(); - } - }); }); - describe('flushing spans with exporter triggering instrumentation', () => { beforeEach(() => { const contextManager = new TestStackContextManager().enable(); @@ -438,6 +408,7 @@ describe('BatchSpanProcessorBase', () => { }); }); }); + describe('maxQueueSize', () => { let processor: BatchSpanProcessor; From faffbd299058296e4c3c565d4cab0aef68609cde Mon Sep 17 00:00:00 2001 From: Siim Kallas Date: Tue, 6 Dec 2022 17:28:09 +0200 Subject: [PATCH 6/8] refactor: spacing --- .../test/common/export/BatchSpanProcessorBase.test.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts b/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts index 71a055bb39..ab3bb79e33 100644 --- a/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts +++ b/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts @@ -319,6 +319,7 @@ describe('BatchSpanProcessorBase', () => { beforeEach(() => { processor = new BatchSpanProcessor(exporter, defaultBufferConfig); }); + it('should call an async callback when flushing is complete', done => { const span = createSampledSpan('test'); processor.onStart(span, ROOT_CONTEXT); @@ -381,6 +382,7 @@ describe('BatchSpanProcessorBase', () => { }); }); }); + describe('flushing spans with exporter triggering instrumentation', () => { beforeEach(() => { const contextManager = new TestStackContextManager().enable(); @@ -408,7 +410,6 @@ describe('BatchSpanProcessorBase', () => { }); }); }); - describe('maxQueueSize', () => { let processor: BatchSpanProcessor; From 4e011116bd1706f9a22481036631bb83a7c1be49 Mon Sep 17 00:00:00 2001 From: Siim Kallas Date: Wed, 7 Dec 2022 11:53:50 +0200 Subject: [PATCH 7/8] fix: unref timeout timer --- .../src/export/BatchSpanProcessorBase.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts b/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts index 7668e30668..550a3336e4 100644 --- a/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts +++ b/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts @@ -148,6 +148,7 @@ export abstract class BatchSpanProcessorBase // don't wait anymore for export, this way the next batch can start reject(new Error('Timeout')); }, this._exportTimeoutMillis); + unrefTimer(timer); // prevent downstream exporter calls from generating spans context.with(suppressTracing(context.active()), () => { // Reset the finished spans buffer here because the next invocations of the _flush method From d5a1020b8965086ffc6c859d0275262202f004b8 Mon Sep 17 00:00:00 2001 From: Daniel Dyla Date: Mon, 24 Apr 2023 17:26:13 +0200 Subject: [PATCH 8/8] Remove duplicate changelog entry --- CHANGELOG.md | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1b10fa132e..5c287dd25e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,7 +43,6 @@ For experimental package changes, see the [experimental CHANGELOG](experimental/ * fix(sdk-metrics): merge uncollected delta accumulations [#3667](https://github.com/open-telemetry/opentelemetry-js/pull/3667) @legendecas * fix(sdk-trace-web): make `parseUrl()` respect document.baseURI [#3670](https://github.com/open-telemetry/opentelemetry-js/pull/3670) @domasx2 -* fix(resource): make properties for async resource resolution optional [#3677](https://github.com/open-telemetry/opentelemetry-js/pull/3677) @pichlermarc ### :books: (Refine Doc)